Slowly Changing Dimension Example
Many ETL tools come with a built-in slowly changing dimension component, but they invariably also come with restrictions on how the source and target data must look, and how the change detection and data processing is performed, which might not be a good fit for your requirements. Furthermore, they are usually not extensible.
actionETL instead makes it easy to create your own high performance and reusable SCD worker, tailored to fit your specific requirements. The implementation Composes Worker Children (rather than implementing everything from scratch), which makes the code small, high level, and easy to maintain, change and extend.
The custom SCD worker created in this example requires 23 times less C# code (9kB) vs. similar functionality implemented in DimensionMergeSCD (209kB) in one particular traditional ETL tool.
This implementation tracks Type1 and Type2 changes; Further Ideas below has suggestions for customizing and extending the approach.
High Level Design
- A
LoadScdWorker
custom worker is created, which configures and runs (out of box) children to perform the actual SCD processing for one dimension. - To minimize configuration when using the worker, table and column names are derived
based on conventions:
- Source, staging, and dimension tables already exist, and have names matching the row type class, e.g.:
row class =
Customer
, source = "Customer", staging = "Update_DimCustomer", dimension = "DimCustomer" - All dimensions have the
ScdMetadata
columns "SurrogateKey", "ValidFrom", and "ValidTo". The latter isnull
for the current record. - Row type class inheritance describes which columns are SCD Type2, SCD Type1, or business key.
The base class is
ScdMetadata
.
- Source, staging, and dimension tables already exist, and have names matching the row type class, e.g.:
row class =
Example Use
Define the row classes and their Type2, Type1, and business key columns for two dimensions:
using actionETL;
using actionETL.Adb;
using System;
using System.Threading.Tasks;
// Contract row class ///////////////////////
public class ContractBusinessKey : ScdMetadata
{
public string ContractReference { get; set; }
}
public class ContractType1 : ContractBusinessKey // Type1 columns
{
public string Filename { get; set; }
}
public class Contract : ContractType1 // Type2 columns
{
public string ContractDescription { get; set; }
}
/* Expects the tables:
CREATE TABLE Contract (
ContractReference varchar(50) NOT NULL,
ContractDescription varchar(200) NULL,
Filename varchar(200) NULL
)
CREATE TABLE Update_DimContract (
SurrogateKey int NOT NULL,
ValidFrom date NOT NULL,
ScdType int NOT NULL,
ContractReference varchar(50) NOT NULL,
ContractDescription varchar(200) NULL,
Filename varchar(200) NULL
)
CREATE TABLE DimContract (
SurrogateKey int IDENTITY(1,1) PRIMARY KEY,
ValidFrom date NOT NULL,
ValidTo date NULL,
ContractReference varchar(50) NOT NULL,
ContractDescription varchar(200) NULL,
Filename varchar(200) NULL
)
*/
// Customer row class ///////////////////////
public class CustomerBusinessKey : ScdMetadata
{
public string CustomerName { get; set; }
}
public class CustomerType1 : CustomerBusinessKey // Type1 columns
{
public string Email { get; set; }
// public string Phone { get; set; }
}
public class Customer : CustomerType1 // Type2 columns
{
public string Location { get; set; }
// public string CreditRating { get; set; }
}
/* Expects the tables:
CREATE TABLE Customer (
CustomerName varchar(50) NOT NULL,
Location varchar(50) NOT NULL,
Email varchar(50) NULL,
-- CreditRating varchar(50) NULL,
-- Phone varchar(50) NULL
)
CREATE TABLE Update_DimCustomer (
SurrogateKey int NOT NULL,
ValidFrom date NOT NULL,
ScdType int NOT NULL,
CustomerName varchar(50) NOT NULL,
Location varchar(50) NOT NULL,
Email varchar(50) NULL,
-- CreditRating varchar(50) NULL,
-- Phone varchar(50) NULL
)
CREATE TABLE DimCustomer (
SurrogateKey int IDENTITY(1,1) PRIMARY KEY,
ValidFrom date NOT NULL,
ValidTo date NULL,
CustomerName varchar(50) NOT NULL,
Location varchar(50) NOT NULL,
Email varchar(50) NULL,
-- CreditRating varchar(50) NULL,
-- Phone varchar(50) NULL,
)
*/
Run LoadScdWorker
for each dimension:
// Set the 'date' variable to the current processing date.
// Set the adbConnectionString variable with the SQL server connection string.
var sos = await new WorkerSystem()
.Root(ws =>
{
_ = new LoadScdWorker<Contract, string>(ws, null, adbConnectionString
, date, row => row.ContractReference);
_ = new LoadScdWorker<Customer, string>(ws, null, adbConnectionString
, date, row => row.CustomerName);
// Additional dimensions...
})
.StartAsync().ConfigureAwait(false);
Note
When using the custom worker, each column is only specified in a single place, despite being used in a number of queries and dataflows inside the worker. This makes it very easy to maintain the solution when source and dimension columns get changed or added, even with a large number of columns.
E.g., to add a Type1 "Phone" column and a Type2 "CreditRating" column to the Customer dimension processing, simply add (i.e. uncomment) those two columns in the SQL tables and Customer row class above.
Data Example
For instance, enabling the columns CreditRating
and Phone
in the above example, and
processing the following three sets of data (with incremental changes in bold),
will produce six records in the DimCustomer
table.
Processing Date 2018-01-01
CustomerName | Location | CreditRating | Phone | |
---|---|---|---|---|
Volvo | Gothenburg | info@volvo.com | B+ | +46 31 1144380 |
Tesla | Palo Alto | info@tesla.com | C+ | +1 845 528 0442 |
Processing Date 2018-01-02
CustomerName | Location | CreditRating | Phone | |
---|---|---|---|---|
Volvo | Gothenburg | info@volvocars.com | A- | +46 31 1144380 |
Tesla | California | info@tesla.com | B+ | +1 800 11335577 |
Jaguar | Coventry | info@jaguar.com | A | +44 39 22948760 |
Processing Date 2018-01-03
CustomerName | Location | CreditRating | Phone | |
---|---|---|---|---|
Jaguar | England | sales@jaguar.com | A | +44 39 55330000 |
DimCustomer
Table Result
Surro gate Key |
ValidFrom | ValidTo | Customer Name |
Location | Credit Rating |
Phone | |
---|---|---|---|---|---|---|---|
1 | 2018-01-01 | 2018-01-02 | Volvo | Gothenburg | info@volvo.com | B+ | +46 31 1144380 |
2 | 2018-01-01 | 2018-01-02 | Tesla | Palo Alto | info@tesla.com | C+ | +1 845 528 0442 |
3 | 2018-01-02 | 2018-01-03 | Jaguar | Coventry | info@jaguar.com | A | +44 39 22948760 |
4 | 2018-01-02 | Volvo | Gothenburg | info@volvocars.com | A- | +46 31 1144380 | |
5 | 2018-01-02 | Tesla | California | info@tesla.com | B+ | +1 800 11335577 | |
6 | 2018-01-03 | Jaguar | England | sales@jaguar.com | A | +44 39 55330000 |
LoadScdWorker
Implementation
This custom worker configures and runs several (out of box) dataflow and non-dataflow children to do the actual processing:
The row type class must inherit from ScdMetadata
:
using System;
public enum ScdType { None = 0, Type1 = 1, Type2 = 2 }
// Shared by all dimensions
public class ScdMetadata
{
public int SurrogateKey { get; set; }
// Not used by the lookup child worker:
public DateTime ValidFrom { get; set; }
public int ScdType { get; set; }
}
TRow
is the row class. TBusinessKey
is the type of the business key column.
The RunAsync()
method derives table and column names, and configures child workers.
using actionETL;
using actionETL.Adb;
using actionETL.Adb.SqlClientExternal;
using System;
using System.Collections.Generic;
using Microsoft.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
public class LoadScdWorker<TRow, TBusinessKey>
: WorkerBase<LoadScdWorker<TRow, TBusinessKey>>
where TRow : ScdMetadata
{
private readonly AdbConnectionString _adbConnectionString;
private readonly DateTime _processingDate;
private readonly Func<TRow, TBusinessKey> _selectRowKeyFunc;
private static readonly string _tableName =
typeof(TRow).DisplayName().Split('.').Last();
public LoadScdWorker(
WorkerParent workerParent
, Func<bool> isStartableFunc
, AdbConnectionString adbConnectionString
, DateTime processingDate
, Func<TRow, TBusinessKey> selectRowKeyFunc
)
: base(workerParent, "Load Dim" + _tableName, isStartableFunc)
{
_adbConnectionString = adbConnectionString
?? throw new ArgumentNullException(nameof(adbConnectionString));
_processingDate = processingDate;
_selectRowKeyFunc = selectRowKeyFunc
?? throw new ArgumentNullException(nameof(selectRowKeyFunc));
}
protected override Task<OutcomeStatus> RunAsync()
{
// Calculate metadata, Type1, and Type2 column names
Type type2Type = typeof(TRow);
var type2Tri = TypeRowSchema.Create(type2Type);
var allNames = type2Tri.SchemaNodeList
.Where(sn => sn.IsColumn).Select(sn => sn.NameSuffix);
Type type1Type = type2Type.BaseType;
var businessKeyNames = new[] {
TypeRowSchema.Create(type1Type.BaseType)
.SchemaNodeRoot.Children[0].NameSuffix
};
var metadataNames = TypeRowSchema.Create(typeof(ScdMetadata)).SchemaNodeList
.Select(sn => sn.NameSuffix);
var sourceColumns = allNames.Except(metadataNames);
var sourceJoinedNames = string.Join(", ", sourceColumns);
var type1Names = TypeRowSchema.Create(type1Type).SchemaNodeList
.Where(sn => sn.IsColumn).Select(sn => sn.NameSuffix)
.Except(metadataNames).Except(businessKeyNames);
var type2Names = allNames.Except(metadataNames).Except(businessKeyNames)
.Except(type1Names);
// Create child workers
AdbTableNonQueryWorker TruncateUpdateTable(string name, Func<bool> start) =>
new AdbTableNonQueryWorker(this, name + " Truncate Update Table"
, start, _adbConnectionString.CreateConnectionBuilder()
, AdbTableNonQueryOperation.TruncateTable, "dbo.Update_Dim" + _tableName);
var truncateUpdateTable = TruncateUpdateTable("Initial", null);
var selectSource = new AdbDataReaderSource<TRow>(this, "SELECT Source Table"
, () => truncateUpdateTable.IsSucceeded
, _adbConnectionString.CreateConnectionBuilder()
, $@"SELECT {sourceJoinedNames} FROM dbo." + _tableName
, null);
var selectDimLookup = new AdbDataReaderSource<TRow>(this
, "SELECT Dimension Lookup"
, null // Start constraint only required on one of the linked sources
, _adbConnectionString.CreateConnectionBuilder()
, $@"SELECT SurrogateKey, {sourceJoinedNames} FROM dbo.Dim{_tableName
} WHERE ValidTo IS NULL"
, null);
var lookup = selectSource.Output.Link
.DictionaryLookupSplitTransform(
"Check for Insert or Updates"
, selectDimLookup.Output
, row => new KeyValuePair<TBusinessKey, TRow>(_selectRowKeyFunc(row), row) // Select lookup (key,value)
, _selectRowKeyFunc // Lookup key
, (row, key, value) => // Match found
{
if (new RowComparer<TRow>(rcc =>
{
foreach (var name in type2Names)
rcc.Asc(name);
}).Comparison(value, row) != 0)
{
row.ScdType = (int)ScdType.Type2;
goto Changed;
}
if (new RowComparer<TRow>(rcc =>
{
foreach (var name in type1Names)
rcc.Asc(name);
}).Comparison(value, row) != 0)
{
row.ScdType = (int)ScdType.Type1;
goto Changed;
}
return DictionaryLookupRowTreatment.Discard; // No SCD changes
Changed:
row.ValidFrom = _processingDate;
row.SurrogateKey = value.SurrogateKey;
return DictionaryLookupRowTreatment.Found;
}
, (row, key) => // Match not found
{
row.ValidFrom = _processingDate;
return DictionaryLookupRowTreatment.NotFound;
}
);
var insertScd = lookup.FoundOutput.Link.AdbSqlClientBulkInsertTarget(
"INSERT SCD Rows"
, rcc => rcc.AutoName(type2Tri.ColumnCount)
, _adbConnectionString.CreateConnectionBuilder()
, SqlBulkCopyOptions.Default, "dbo.Update_Dim" + _tableName);
var insertNew = lookup.NotFoundOutput.Link.AdbSqlClientBulkInsertTarget(
"INSERT New Rows"
, rcc => rcc.AutoName(type2Tri.ColumnCount - 1)
, _adbConnectionString.CreateConnectionBuilder()
, SqlBulkCopyOptions.Default, "dbo.Dim" + _tableName);
var setType1Columns = string.Join(", "
, type1Names.Select(name => $"D.{name} = UD.{name}"));
var updateScd1 = new AdbExecuteNonQueryWorker(this, "UPDATE SCD1 Rows"
, () => insertScd.IsSucceeded && insertNew.IsSucceeded
, _adbConnectionString.CreateConnectionBuilder(), $@"
UPDATE D
SET
{setType1Columns}
FROM dbo.Dim{_tableName} D
INNER JOIN dbo.Update_Dim{_tableName} UD ON D.SurrogateKey = UD.SurrogateKey
WHERE ScdType = 1 AND D.ValidTo IS NULL
");
var updateScd2Historical = new AdbExecuteNonQueryWorker(this
, "UPDATE SCD2 Historical Rows"
, () => updateScd1.IsSucceeded
, _adbConnectionString.CreateConnectionBuilder(), $@"
UPDATE D
SET
D.ValidTo = '{_processingDate:yyyy-MM-dd}'
FROM dbo.Dim{_tableName} D
INNER JOIN dbo.Update_Dim{_tableName} UD ON D.SurrogateKey = UD.SurrogateKey
WHERE ScdType = 2 AND D.ValidTo IS NULL
");
// Using dataflow to insert allows avoiding large transactions
var selectScd2New = new AdbDataReaderSource<TRow>(this, "SELECT SCD2 Rows"
, () => updateScd2Historical.IsSucceeded
, _adbConnectionString.CreateConnectionBuilder()
, $@"SELECT ValidFrom, {sourceJoinedNames} FROM dbo.Update_Dim{_tableName
} WHERE ScdType = 2"
, null);
var insertScd2New = selectScd2New.Output.Link.AdbSqlClientBulkInsertTarget(
"INSERT SCD2 Rows"
, rcc => rcc.AutoName(type2Tri.ColumnCount - 1)
, _adbConnectionString.CreateConnectionBuilder()
, SqlBulkCopyOptions.Default, "dbo.Dim" + _tableName);
TruncateUpdateTable("Final", () => insertScd2New.IsSucceeded);
return OutcomeStatus.SucceededTask;
}
}
Note
Each child worker is fully logged as per normal, simplifying troubleshooting of any processing or data issues.
Further Ideas
The following are some ideas for customizing and extending the above approach:
- Add another
AdbConnectionString
parameter or a property to support having the source data on a different server than the dimension table - Read source data from a denormalized view that joins multiple source tables
- Allow supplying a source query, which can then join tables, rename source columns to match dimension columns etc.
- Use a ToTypeColumnMapper<TTo> to map unaligned source names
- Mark current record with a "ValidTo" max date (e.g. "9999-12-31"), and/or a separate boolean flag,
instead of using
null
, which can simplify user queries - Use hashes to reduce change detection overhead for very large dimensions
- Send SCD1 and SCD2 rows into two different temporary tables, to avoid using WHERE clauses in the UPDATE queries
- Add a constructor overload without the
isStartableFunc
parameter - Add a boolean
DebugChildren
property that enables debugging on dataflow or all child workers - Use non-SQL data sources, e.g. flat files
- Add public properties for the constructor parameters stored as private fields
- Use AdbTableIdentifier to support spaces etc. in table names
- Have optional steps, where child workers don't get created if not needed
- Support business keys with multiple columns
- Support SCD Type0 (retain original), Type3 (limited history), or Type4 (history tables)
- Use three
struct
partial schemas to specify business key, Type1, and Type2 columns (but keep inheritance ofScdMetadata
to allow access to those columns). This can increase flexibility to more easily add other types of columns, e.g. Type0, Type3 etc.