Working with SCD Type 2 in PySpark
This blog describes a fundamental approach to dealing with Slowly Changing Dimensions. The scope of this blog is limited to handling changes in a type 2 fashion using PySpark.
Concept
Let’s start with some basics. SCD stands for slowly changing dimensions. It refers to changes in dimensions that are slow and unpredictable. Let’s have an example to understand it better. Consider a customer dimension table and a sales fact table. sales hold the foreign key pointing to the customer table. customer information such as mobile number, email, and zip code changes over time. Such dimensions are called SCD. These changes in dimensions will affect the results of business analysis. If the customer changes its city and updates its zip code for the next orders new zip code should be used, but for old orders, the old zip code should be referred.
To handle these changes in such dimensions different methodologies are referred called type-0, type-1, till type-6. Type 0 refers to dimensions that are static. E.g. Date table. In Type 1 previous value gets overwritten by the new value.
Implementing Type 2 for SCD handling is fairly complex. In type 2 a new record is inserted with the latest values and previous records are marked as invalid. To keep track of the validity of records 3 additional columns are used. effective_date, expiration_date and current_flag.
When the new record gets inserted effective_date is current_date, expiration_date is ‘9999–12–31’ and current_flag will be set to True. If some record got deleted in source data then its expiration_date is set to current_date and current_flag is False. If record get’s updated then record with old values expiration_date will be current_date and current_flag will be false. At the same time, new records will have effective_date as current_date, expiration_date as ‘9999–12–31’, and current_flag as True. To maintain a unique key column surrogate key is created which will be used as a foreign key in fact tables. It just becomes easy to filter records on the boolean column so I included it. It is possible to use only 2 columns i.e. effective_date and expiration_date to handle SCD Type 2. Surrogate key plays important role in maintaining link between fact and dimension table.
Implementation
For implementation let’s consider an example of a customer dimension table. I will go with an explanation of code snippets and their output.
Source columns: CustomerID,Title,FirstName,LastName,CompanyName,EmailAddress,Phone,ZipCode
Destination Columns: CustomerID,Title,FirstName,LastName,CompanyName,EmailAddress,Phone,ZipCode,sk_customer_id, effective_date,expiration_date,current_flag
Let’s start with defining some variables. You can also have a config file for this. It has a SOURCE_PATH
, DEST_PATH
, and key_list
holds the natural key column in the table. type2_cols
holds a list of columns for which type 2 methodology will be implemented. scd2_cols
are columns used for SCD type 2 handling.
Next is creating a spark session. Configuration of spark session depends on your project requirement please change it accordingly.
I have created 2 reusable functions.
column_renamer()
: rename columns based on flag. If true appends suffix to column else removes suffix from column name.
get_hash()
: Generates hash value for the list of columns.
During the first run, data will be loaded with all records set to effective_date = current_date()
expiration_date = "9999-12-31"
and current_flag = True
as there is no previous day data. sk_customer_id
is the surrogate key for records. There can be multiple ways to create a surrogate key for the table. I used a window function row_number
ordered by column customerid
. This is an okay-ish way to do this for a limited number of records, but it eventually affects performance if the record count is large. You will probably see warnings like this: 22/09/22 09:16:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition can cause serious performance degradation.
There is another approach for the surrogate key is to combine the effective_date
and customerid
columns. You need to reformat effective_date
to yyyyMMdd
format. This is a good way but has a slight downside that values in a surrogate key column are not sequential and values are large. Also, this will not work correctly if there are multiple updates in a day. In such cases, you need to shift from date to timestamp. So to keep it simple I decided to go with this approach. You can use different ways as per your requirement.
After the first run, the current data needs to be compared with historical data. There are four possible outcomes of this comparison. Records can either be inserted, deleted, updated, or not changed at all. For better understanding, I modified the source data so that we can cover all scenarios. cutomerid
6 got deleted. New customer Rahul Jain is added with customerid
11. customerid
2 changed his company
and zipcode
.
First, let’s fetch data from SOURCE_PATH
and DEST_PATH
. This part can be different based on your source and sink system.
We need to keep track of the maximum surrogate key in the history table as it will be used to create a surrogate key for newly inserted and updated records.
max_sk = df_history.agg({"sk_customer_id_history": "max"}).collect()[0][0]
After this let’s filter out open records from df_history
. we don't need to do any changes to closed records. Then we need to generate hash for type2 columns and rename column names with _history
and _current
as suffix. At last apply full outer join to history_open
and df_current
dataframe. Create a new column action
that will be used to flag records.
Create different datasets based on the action column. Filter records with action NOCHANGE
remove suffix _history
from column names and select columns same as df_history_open
max_sk value is updated using max value from df_insert
max_sk = df_insert.agg({"sk_customer_id": "max"}).collect()[0][0]
Filter records with action DELETE
remove suffix _history
from column names and select columns same as df_history_open.
Set expiration_date
to current_date
and current_flag
to False.
Filter records with action UPDATE
remove suffix _history
from column names and select columns same as df_history_open
. Set expiration_date
to current_date
and current_flag
to False select columns same as df_current
. Set effective_date
as current_date
and expiration_date
as EOW_DATE
, current_flag
as True. Use similar logic to create a sequential surrogate key for updated records union both parts into one dataframe.
Create a union of all dataframes
using unionByName()
and Finally, write data to DEST_PATH
.