Overview
This guide provides an example of how to fit Civis’s Identity Resolution (IDR) tool into a larger workflow that preprocesses data, runs IDR, and then joins IDR output back to tables with data to be used for downstream analyses.
In this example, the IDR job has only one source table, but the workflow should be broadly similar for IDR pipelines with multiple sources.
Step 1: Set up the input data
The input data to an IDR pipeline might be created by a SQL script, an import job, etc.
For this example, below is a simple Container script that will generate some fake data and create a table. Feel free to ignore the details of this step if you are just interested in seeing the structure of an IDR workflow.
The fake data created by the script below includes a few fields of PII, including address information, as well as a couple non-PII fields that could be relevant for downstream analytics applications (e.g., membership type). See this page for how to run such a script in Platform. Note that the faker package will need to be installed for the Script to run, which is why we're using a container script rather than a python script. The lines of code between the EOF markers can also be run in a Python Notebook, with faker added as a Custom Package.
pip install faker
FILE_NAME="generate_data.py"
# Create a temporary file with python code to generate sample data
cat <<EOF > $FILE_NAME
import civis
import faker # This is a package for generating fake data.
import numpy as np
database_name = 'redshift-acme' # REPLACE THIS WITH YOUR DATABASE NAME
member_table = 'scratch.acme_members' # REPLACE THIS WITH A DIFFERENT SCHEMA AND TABLE
# Generate some fake records.
rs = np.random.RandomState(42)
fk = faker.Faker(seed=42)
membership_types = ['1 - Bronze', '2 - Silver', '3 - Gold']
fake_records = [(
i, fk.first_name(), fk.last_name(), fk.street_address(), fk.city(), fk.state_abbr(), fk.zipcode(), fk.email(),
fk.word(ext_word_list=membership_types),
rs.randint(low=100, high=1000)
) for i in range(100)]
# Add in a few duplicates.
fake_records += [(fake[0] + 100,) + fake[1:] for fake in fake_records[:10]]
# Insert the fake records into a table.
stmt = f"""
drop table if exists {member_table};
create table {member_table} (
member_id int,
first_name varchar,
last_name varchar,
street_address varchar,
city varchar,
state_code varchar(2),
zip varchar(5),
email varchar,
membership_type varchar,
donation_total integer
);
comment on table {member_table} is 'fake data generated by the faker package';
insert into {member_table} values {', '.join([str(r) for r in fake_records])};
"""
print("executing...", stmt)
result = civis.io.query_civis(stmt, database_name).result()
print("Result is: ", result)
if result.state == "succeeded":
print("Query was successful.")
EOF
echo "Generating data..."
# Run code in file
python $FILE_NAME
Step 2a: Create a CASS/NCOA job
This step (and also 2b) is only relevant if you have address data and want to standardize it (the CASS part) or update mailing addresses (the NCOA part).
Create an CASS/NCOA job in Platform here. See this help page for more details.
Step 2b: Join the original input to the CASS/NCOA output
Next, create a view with a SQL Script that will bring together the updated address information with the non-address input fields. The view will include the original address columns as well as the CASS/NCOA ones, but that's OK since IDR will only use the ones you tell it to use.
create or replace view scratch.acme_members_with_cass_ncoa as
select a.*, b.fnl_priadr as address_cassncoa, fnl_city as city_cassncoa, fnl_state as state_cassncoa, fnl_zip as zip_cassncoa
from scratch.acme_members a
join scratch.acme_members_ncoa_output b using(member_id);
Step 3: Set up the IDR pipeline
Go to TOOLS -> IDENTITY RESOLUTION in the top navigation bar (or directly to this link), click "New Pipeline", etc. to set up a new IDR pipeline. See this help page for more details.
When mapping PII fields, remember to use the CASS/NCOA columns (e.g., `address_cassncoa`) for the address-related fields.
Also, make sure to specify locations for the customer graph and golden table outputs.
The link scores output is not important for this example but in some situations can be useful for understanding why records get grouped together.
Step 4: Join the golden table to non-PII fields
This is probably the most complicated step, and the details will depend on your data and your application. The goal is to join the golden table output from IDR, which contains one record per individual and the best PII available from your input data sources for that individual, with other tables that have additional information about that person (e.g., from transaction records or the Civis Data modeling files).
One challenge worth highlighting here is that the golden table from IDR has one record per individual, resulting from combining potentially multiple records within and across the source tables. However, the source tables may include other information that would need to be aggregated in SQL (e.g., if multiple customer records are grouped together by IDR, then transaction dollar values may need to be summed up). The code below provides some examples for such aggregations.
Create a SQL script with this example code.
-- To make things easier below, create a temporary customer graph table with
-- info for just the most recent IDR pipeline run.
create table #customer_graph as select * from scratch.acme_customer_graph
where run_id = (select max(run_id) from scratch.acme_customer_graph);
-- Since this example uses fake data, none of the records will match to Civis
-- Data. If you want to add an arbitrary match to Civis Data, uncomment the
-- following lines.
-- insert into #customer_graph (resolved_id, source, source_primary_key)
-- values ((select resolved_id from #customer_graph limit 1), 'Civis Data',
-- (select voterbase_id from ts.modeling_commercial limit 1));
-- Now, join the golden table to other tables to get non-PII fields.
drop table if exists scratch.acme_golden_table_joined;
create table scratch.acme_golden_table_joined as
select * from scratch.acme_golden_table gt
-- Join to the original input table to get important customer attributes.
left join
(
-- There could be multiple records per resolved ID, so we need to group by
-- resolved_id and aggregate somehow. The aggregation method will depend on
-- the nature of the data.
select resolved_id, sum(donation_total) donation_total,
max(membership_type) membership_type
from #customer_graph cg
join scratch.acme_members m on cg.source_primary_key = m.member_id
-- For this subquery, we restrict to just rows in the customer graph table
-- that pertain to the input members table.
where cg.source = 'members'
group by resolved_id
)
using(resolved_id)
-- Join to the Civis Data modeling file to get additional modeling features.
left join
(
select resolved_id, ts_married, ts_unmarried, vehicleowner,
age_pr_18_34_commercial, age_pr_35_49_commercial, age_pr_50_64_commercial,
age_pr_65_plus_commercial
from
-- It's possible, though unlikely, that a resolved ID may be associated with
-- multiple Civis Data records. Here, we'll arbitrarily pick the on with the
-- lowest ID if that happens.
-- Note that in this example, none of the fake records match to the basefile,
-- so the left join above will just add null values for the modeling fields.
(
select resolved_id, min(source_primary_key) as voterbase_id
from #customer_graph cg
where source = 'Civis Data' group by resolved_id
)
join ts.modeling_commercial m
using (voterbase_id)
)
using(resolved_id);
Step 5: Put it all in a workflow
Get the job IDs for the SQL, CASS/NCOA, and IDR jobs above and put them in a Workflow like the one below.
You can then schedule the Workflow, set up success/failure notification emails, etc.
version: '2.0'
workflow:
type: direct
tasks:
# You might want to add an import step here.
run_CASSNCOA:
action: civis.run_job
input:
job_id: 101806157 # CASS/NCOA Job ID. See step 2a
on-success:
- join_CASSNCOA_output
join_CASSNCOA_output:
action: civis.run_job
input:
job_id: 101895675 # SQL Script ID. See step 2b.
on-success:
- run_IDR
run_IDR:
action: civis.run_job
input:
job_id: 101895566 # IDR Job ID. See step 3.
on-success:
- join_golden_table
join_golden_table:
action: civis.run_job
input:
job_id: 101896303 # SQL Script ID. See step 5.
# You might want to add an export step here.
Comments
0 comments
Please sign in to leave a comment.