Dataflow Lookups
Performing lookups in the dataflow is a common requirement, e.g. when mapping between business keys and surrogate keys. actionETL has several dictionary based lookup workers that perform very fast hash-based lookups:
- DictionaryLookupTransform<TInputOutputError, TKey, TValue>
- Without dictionary input port
- Single data output port
- DictionaryLookupSplitTransform<TInputOutputError, TKey, TValue>
- Without dictionary input port
- With second data output port for unmatched rows
- DictionaryLookupTransform<TInputOutputError, TDictionaryInput, TKey, TValue>
- With dictionary input port
- Single data output port
- DictionaryLookupSplitTransform<TInputOutputError, TDictionaryInput, TKey, TValue>
- With dictionary input port
- With second data output port for unmatched rows
The lookup workers are very flexible, and supports many different use cases, as described below.
Dictionary Type
The dictionary most commonly used with these lookup workers is Dictionary<TKey,TValue>, which some lookup workers also create automatically if the user does not provide it. Furthermore, the workers support any IDictionary<TKey,TValue> or (for read-only) IReadOnlyDictionary<TKey,TValue> implementation. E.g.:
- Set initial capacity (to reduce re-allocations) and/or a custom equality comparer (e.g. case insensitive) using an appropriate Dictionary<TKey,TValue> constructor
- Use ConcurrentDictionary<TKey,TValue> to allow multiple threads (i.e. workers) to use and modify the dictionary simultaneously. This can save memory by sharing a single mutable dictionary copy; note however that it can run an order of magnitude slower than a non-concurrent dictionary.
- Use a caching dictionary that discards seldom used items, thereby limiting its memory use
- Create an IReadOnlyDictionary<TKey,TValue> wrapper around an out of process web lookup service
Loading the Dictionary
Important
Note that multiple rows can often match the same lookup key and value. To avoid issues where modifying one row inadvertently also changes another row, best practice is to make the lookup value only consist of value types and/or immutable types.
If the lookup value is, or contains, a mutable reference type, the user must ensure that either there are no lookup value references that are shared and modified across rows, or that the lookup value is cloned, so that each row gets its own unique instance.
The dictionary can be loaded with keys and values ahead of time, and/or on the fly:
Fully Cached Lookups
With fully cached lookups, all dictionary items are added before any lookups are performed, which is also the easiest to configure:
- Pre-populated via standard .NET code, see e.g. Dictionary Initializer and Dictionary<TKey,TValue>
- Pre-populated from a dedicated dictionary input port, i.e. receiving rows from a dataflow upstream worker, e.g. loaded from a flat file or a database query. The key and value are specified with a callback.
Partially Cached Lookups
With partially cached lookups, dictionary items are added after lookups have started:
- Loaded on the fly, typically for each lookup miss. This avoids loading dictionary items that will never be used, which can be advantageous when it is impractical to retrieve all keys and lookup values ahead of time.
- Pre-populated as per fully cached lookups above, typically with commonly used items, and also loaded on the fly, as per above. This can be a good approach to maximize performance while minimizing memory use for the dictionary, but is also the most complex to configure.
Dictionary Maximum Capacity
Every item in the lookup table consumes memory. For extremely large lookup tables, minimize the size of each lookup item and ensure you are running in 64-bit mode.
A second constraint is that by default, .NET Framework maximum array size is 2GB, which in turn limits the Dictionary<TKey,TValue> size with a 64-bit application (using references as key and value) to a maximum of 47.9 million items. You can remove this limit by enabling support for larger arrays, as described in <gcAllowVeryLargeObjects> Element.
.NET Core and .NET 5+ however supports >2GB arrays by default.
Lookup Keys
The user provides a callback function that takes the incoming row as a parameter, and returns the lookup key. This callback can be as simple as picking a single field:
row => row.MyKey
It can also perform calculations using multiple members, change the case of a string to match the case of the dictionary keys etc.:
row => (row.Category + ";" + row.Product).ToUpperInvariant()
If the same key calculation is used with multiple workers, avoid code duplication by implementing it in a reusable method (e.g. as a static method on the row type), and pass that method to the lookup worker.
Lookup Action
By default, incoming rows where the key is found are sent to the FoundOutput
or Output
port,
while rows where the key is not found are sent to the NotFoundOutput
or ErrorOutput
port.
The user can optionally provide callbacks to:
- Modify the incoming rows before they are sent to a downstream worker, e.g. applying the lookup value to the row, and setting row default values on lookup misses
- Redirect the incoming rows to any of the output or error output ports, e.g. redirecting a lookup miss
to the
FoundOutput
orOutput
port after having set default values - Add new dictionary items, typically on lookup misses, e.g. from querying a web lookup service
or a database. Explicitly supply a dictionary instance, which can then be modified in the
notFoundKeyFunc
callback.
Sharing the Dictionary
If the same lookup items are needed multiple times during a run, consider reusing the dictionary to save memory, and to avoid loading the dictionary multiple times. Do take care to adhere to the threading requirements of the dictionary used, e.g.:
- The default Dictionary<TKey,TValue> must be fully populated by a single thread (e.g. a single worker), but can then be used for lookups by multiple threads (or workers). Either pre-populate the dictionary before any of the lookup workers run, or have one lookup worker load it, and use start constraints, and/or grouping of workers, and/or fully blocking dataflow workers to ensure the dictionary has completed loading before subsequent lookup workers start using it.
- See above for using a
ConcurrentDictionary
Fully Cached - Dictionary Provided Example
This example pre-populates the dictionary using standard .NET code, to provide a fully cached lookup. It also sets a default value on any unmatched rows.
using actionETL;
public partial class FullyCachedDictionaryLookup
{
private class Row
{
public string Color;
public int Metric;
public int ColorId; // Populated by lookup
}
private readonly Row[] _sampleData = new Row[]
{
new Row { Color = "Grey", Metric = 8 }
, new Row { Color = "Blue", Metric = 3 }
, new Row { Color = "Gray", Metric = 5 }
};
private readonly Dictionary<string, int> _dictionary = new Dictionary<string, int>
{
{ "White", 0},
{ "Grey", 1}, // Two keys mapped to the same value
{ "Gray", 1}, // Two keys mapped to the same value
{ "Black", 2},
};
private const int _unknownColorId = -1;
public WorkerSystem RunExample()
{
var workerSystem = new WorkerSystem()
.Root(ws =>
{
// Data rows source, typically from files or database query
new EnumerableSource<Row>(ws, "Source", _sampleData)
// Lookup transform, fluent coding style
.Output.Link.DictionaryLookupTransform(
"Set ColorId"
, _dictionary
, row => row.Color // Select dictionary key
, (row, key, value) => row.ColorId = value // Update matched rows
, (row, key) => // Update unmatched rows
{
row.ColorId = _unknownColorId;
return DictionaryLookupRowTreatment.Found; // Send unmatched to Output
}
)
// Collection target, fluent coding style, receives three rows:
// { Color = "Grey", Metric = 8, ColorId = 1 }
// { Color = "Blue", Metric = 3, ColorId = -1 }
// { Color = "Gray", Metric = 5, ColorId = 1 }
.Output.Link.CollectionTarget();
});
workerSystem.Start().ThrowOnFailure();
return workerSystem;
}
}
Fully Cached - Database Query Example
In this example the dictionary items are read from a database query, and the lookup worker
pre-populates the dictionary from its DictionaryInput
port, to provide a fully cached lookup. Any unmatched rows will be sent to the (here unlinked)
ErrorOutput
port.
The DictionaryInput
rows can be of any type; here the MutableKeyValue<TKey, TValue>
helper class is used.
using actionETL;
using actionETL.Adb;
using actionETL.Adb.SqlClientExternal;
public partial class FullyCachedAdbDictionaryLookup
{
private class Row
{
public string Color;
public int Metric;
public int ColorId; // Populated by lookup
}
private readonly Row[] _sampleData = new Row[]
{
new Row { Color = "Grey", Metric = 8 }
, new Row { Color = "Black", Metric = 7 }
, new Row { Color = "Gray", Metric = 5 }
};
public WorkerSystem RunExample()
{
var workerSystem = new WorkerSystem()
.Root(ws =>
{
// Dictionary items source query, typically from tables, views, or sprocs
var dictionary = new AdbDataReaderSource<MutableKeyValue<string, int>>(ws
, "dictionary"
, AdbSqlClientProvider.Get()
.CreateConnectionBuilder(ws.Config["SqlServer"])
, @"SELECT 'White' as [Key], 0 as [Value]
UNION ALL SELECT 'Grey', 1
UNION ALL SELECT 'Gray', 1
UNION ALL SELECT 'Black', 2
"
, rcc => rcc.AutoName()
);
// Data rows source, typically from files or database query
new EnumerableSource<Row>(ws, "Source", _sampleData)
// Lookup transform, fluent coding style
.Output.Link.DictionaryLookupTransform(
"Set ColorId"
, dictionary.Output
, dictRow => new KeyValuePair<string,int>(dictRow.Key, dictRow.Value)
, row => row.Color // Select dictionary key
, (row, key, value) => row.ColorId = value // Update matched rows
)
// Collection target, fluent coding style, receives three rows:
// { Color = "Grey", Metric = 8, ColorId = 1 }
// { Color = "Black", Metric = 7, ColorId = 2 }
// { Color = "Gray", Metric = 5, ColorId = 1 }
.Output.Link.CollectionTarget();
});
workerSystem.Start().ThrowOnFailure();
return workerSystem;
}
}
Partially Cached - Database Lookup Example
In this example the dictionary items are read from a database on the fly, once for each unmatched lookup, thereby only loading items that are actually used. This is beneficial if the number of lookup items is very large, but only a smaller number of unique keys are actually looked up on each run.
Do note that each unmatched key will result in a database round trip, which can be prohibitively slow if the number of unique keys being looked up is very large.
Note
In this example:
- Any keys not available from the database will result in a database round trip each time they appear in the incoming rows. If this is an issue, consider also storing these unavailable keys in the dictionary, together with an "empty" value, and use this to avoid the database round trip for previously seen unavailable keys.
- OpenCloseIfRequired<TResult>(Func<IAdbConnection, TResult>) is used to open and close the connection for each database lookup, which is prudent since the time between database queries is normally unpredictable.
If performing similar database lookups in several places, consider using a helper method to create the database command, the dictionary, and the transform, and only pass in the parameters that changes between invocations.
Any unmatched rows will be sent to the (here unlinked) ErrorOutput
port.
using actionETL;
using actionETL.Adb;
using actionETL.Adb.SqlClientExternal;
using System.Data;
public partial class PartiallyCachedAdbDictionaryLookup
{
private class Row
{
public string Color;
public int Metric;
public int ColorId; // Populated by lookup
}
private readonly Row[] _sampleData = new Row[]
{
new Row { Color = "Grey", Metric = 8 }
, new Row { Color = "Black", Metric = 7 }
, new Row { Color = "Black", Metric = 15 }
};
public WorkerSystem RunExample()
{
var workerSystem = new WorkerSystem()
.Root(ws =>
{
// Create database command to retrieve ColorId on unmatched lookups
var cb = AdbSqlClientProvider.Get()
.CreateCommandBuilder(ws.Config["SqlServer"]);
cb.CommandText = "SELECT ColorId FROM Colors WHERE Color = @Color";
var parameter = cb.Parameters.Add("@Color").SetDbType(DbType.String);
var cmd = ws.DisposeOnFinished(cb.Create()); // Guarantee disposal
// Dictionary to populate on the fly
var dictionary = new Dictionary<string, int>();
// Data rows source, typically from files or database query
new EnumerableSource<Row>(ws, "Source", _sampleData)
// Lookup transform queries database for missing lookup items
.Output.Link.DictionaryLookupTransform(
"Set ColorId"
, dictionary
, row => row.Color // Select dictionary key
, (row, key, value) => row.ColorId = value // Update matched rows
, (row, lookupKey) => // Update unmatched rows
{
parameter.DbValue = lookupKey; // Set key to lookup in database
var result = cmd.Connection
.OpenCloseIfRequired(conn => cmd.ExecuteScalar());
if (result == null)
return DictionaryLookupRowTreatment.NotFound;
row.ColorId = (int)result; // Update row
dictionary.Add(lookupKey, row.ColorId); // Update dictionary
return DictionaryLookupRowTreatment.Found;
}
)
// Collection target, receives three rows:
// { Color = "Grey", Metric = 8, ColorId = 1 }
// { Color = "Black", Metric = 7, ColorId = 2 }
// { Color = "Black", Metric = 15, ColorId = 2 }
.Output.Link.CollectionTarget();
});
workerSystem.Start().ThrowOnFailure();
return workerSystem;
/* The example assumes the following table and data already exist:
CREATE TABLE Colors
(
Color nvarchar[50],
ColorId int
);
INSERT INTO Colors (Color, ColorId) VALUES
('White', 0),
('Grey', 1),
('Gray', 1),
('Black', 2)
*/
}
}
See Also
- Dataflow
- API
- MutableKeyValue<TKey, TValue>
- DictionaryLookupTransform<TInputOutputError, TKey, TValue>
- DictionaryLookupTransform<TInputOutputError, TDictionaryInput, TKey, TValue>
- DictionaryLookupSplitTransform<TInputOutputError, TKey, TValue>
- DictionaryLookupSplitTransform<TInputOutputError, TDictionaryInput, TKey, TValue>
- DictionaryTarget<TInput, TKey, TValue>