Many data pipelines reprocess data each time for simplicity and consistency. Reprocessing all rows through external services however can often be expensive in both time and cost. Regularly we see this with enhancement jobs like geocoder, CASS, and Taxonomer, but the same principle should apply to any data pipeline where only newly updated rows need to be processed.
The most dramatic way to speed up your pipeline is to only reprocess new rows from the original source table, and incrementally update your destination tables. Unfortunately, this can become very complicated and error-prone. Creating a cache of results and only rerunning new records can dramatically improve the speed of your pipeline without introducing as much complexity or risk.
There are two ways to set up your cache, either using the input values or row IDs and timestamps (for invalidation). Using row IDs may lead to quicker joins in SQL, while using input values can be more efficient if the values are repeated in your table.
Row ID Cache
Step 1: Create the cache table
For the purposes of this example, we'll assume that records is your preprocessed table and you are looking to create a records_enhanced table with the new columns.
You'll first create a cache table to store previous results. We'll assume that you're using Civis AI Taxonomer in this case with a response field being analyzed and a category field being output. We also assume that you have an updated_at field and will reprocess the record if this changes. Adjust your column sizes as appropriate!
CREATE TABLE records_enhancement_cache (
id integer,
updated_at timestamp,
category varchar(25)
)
DISTKEY id
SORTKEY id
Step 2: Create a view of new records to process
Create a view that only has uncached records. Setting this to "WITH NO SCHEMA BINDING" will let you keep this view around even if the records table is dropped and recreated.
CREATE VIEW records_enhancement_new AS
SELECT
id, response
FROM
records r
LEFT JOIN records_enhancement_cache rec
ON r.id = rec.id AND r.updated_at = rec.updated_at
WHERE
rec.category IS NULL
AND r.response IS NOT NULL
WITH NO SCHEMA BINDING
Step 3: Run your pipeline
Configure the pipeline to run on the records_enhancement_new table, outputting to a records_enhancement_stg table.
Step 4: Update your cache, Create your final results table
DELETE FROM records_enhancement_cache rec
USING records_enhanced_stg stg
WHERE rec.id = stg.id;
INSERT INTO records_enhancement_cache
SELECT
r.id, r.updated_at, stg.category
FROM
records_enhanced_stg stg, records r
WHERE
r.id = stg2.id
CREATE TABLE records_enhanced AS
SELECT
r.*, cache.category category
FROM
records r
LEFT JOIN records_enhancement_cache cache
ON r.id = cache.id
Step 5: Drop staging table
DROP TABLE records_enhancement_stg;
Going forward
Repeat steps 3-5 when your data is refreshed. If something fundamental changes and you need to update all records, you can simply run TRUNCATE records_enhancement_cache to clear the cache.
Input Value Cache
Step 1: Create the cache table
For the purposes of this example, we'll assume that records is your preprocessed table and you are looking to create a records_enhanced table with additional columns. This example will also create a cache based on the input field, which is helpful if its values may be repeated, but you can also create one using the table's primary key.
You'll first create a cache table to store previous results. We'll assume that you're using Civis AI Taxonomer in this case with a response field being analyzed and a category field being output. Adjust your column sizes as appropriate!
CREATE TABLE records_enhancement_cache (
response varchar(1000),
category varchar(25)
)
DISTSTYLE ALL
SORTKEY response
Step 2: Create your view of new records to process
CREATE VIEW records_enhancement_new AS
SELECT
id, response
FROM records r
LEFT JOIN records_enhancement_cache rec
ON r.response = rec.response
WHERE
rec.category IS NULL
AND r.response IS NOT NULL
WITH NO SCHEMA BINDING
Step 3: Run your pipeline
Configure the pipeline to run on the records_enhancement_new table, outputting to a records_enhancement_stg table.
Step 4: Update your cache, create your final results table
INSERT INTO records_enhancement_cache
SELECT
response, category
FROM
records_enhanced_stg;
CREATE TABLE records_enhancement AS
SELECT
r.*, cache.category category
FROM
records r
LEFT JOIN records_enhancement_cache cache
ON r.response = cache.response
Step 5: Drop staging table
DROP TABLE records_enhancement_stg;
Going forward
Repeat steps 3-5 when your data is refreshed. If something about the processing changes and you need to update all records, you can simply run TRUNCATE records_enhancement_cache to clear the cache.
Comments
0 comments
Article is closed for comments.