Search Results for

    Show / Hide Table of Contents

    Dataflow Lookups

    Performing lookups in the dataflow is a common requirement, e.g. when mapping between business keys and surrogate keys. actionETL has several dictionary based lookup workers that perform very fast hash-based lookups:

    • DictionaryLookupTransform<TInputOutputError, TKey, TValue>
      • Without dictionary input port
      • Single data output port
    • DictionaryLookupSplitTransform<TInputOutputError, TKey, TValue>
      • Without dictionary input port
      • With second data output port for unmatched rows
    • DictionaryLookupTransform<TInputOutputError, TDictionaryInput, TKey, TValue>
      • With dictionary input port
      • Single data output port
    • DictionaryLookupSplitTransform<TInputOutputError, TDictionaryInput, TKey, TValue>
      • With dictionary input port
      • With second data output port for unmatched rows

    The lookup workers are very flexible, and supports many different use cases, as described below.

    Dictionary Type

    The dictionary most commonly used with these lookup workers is Dictionary<TKey,TValue>, which some lookup workers also create automatically if the user does not provide it. Furthermore, the workers support any IDictionary<TKey,TValue> or (for read-only) IReadOnlyDictionary<TKey,TValue> implementation. E.g.:

    • Set initial capacity (to reduce re-allocations) and/or a custom equality comparer (e.g. case insensitive) using an appropriate Dictionary<TKey,TValue> constructor
    • Use ConcurrentDictionary<TKey,TValue> to allow multiple threads (i.e. workers) to use and modify the dictionary simultaneously. This can save memory by sharing a single mutable dictionary copy; note however that it can run an order of magnitude slower than a non-concurrent dictionary.
    • Use a caching dictionary that discards seldom used items, thereby limiting its memory use
    • Create an IReadOnlyDictionary<TKey,TValue> wrapper around an out of process web lookup service

    Loading the Dictionary

    Important

    Note that multiple rows can often match the same lookup key and value. To avoid issues where modifying one row inadvertently also changes another row, best practice is to make the lookup value only consist of value types and/or immutable types.

    If the lookup value is, or contains, a mutable reference type, the user must ensure that either there are no lookup value references that are shared and modified across rows, or that the lookup value is cloned, so that each row gets its own unique instance.

    The dictionary can be loaded with keys and values ahead of time, and/or on the fly:

    Fully Cached Lookups

    With fully cached lookups, all dictionary items are added before any lookups are performed, which is also the easiest to configure:

    • Pre-populated via standard .NET code, see e.g. Dictionary Initializer and Dictionary<TKey,TValue>
    • Pre-populated from a dedicated dictionary input port, i.e. receiving rows from a dataflow upstream worker, e.g. loaded from a flat file or a database query. The key and value are specified with a callback.

    Partially Cached Lookups

    With partially cached lookups, dictionary items are added after lookups have started:

    • Loaded on the fly, typically for each lookup miss. This avoids loading dictionary items that will never be used, which can be advantageous when it is impractical to retrieve all keys and lookup values ahead of time.
    • Pre-populated as per fully cached lookups above, typically with commonly used items, and also loaded on the fly, as per above. This can be a good approach to maximize performance while minimizing memory use for the dictionary, but is also the most complex to configure.

    Dictionary Maximum Capacity

    Every item in the lookup table consumes memory. For extremely large lookup tables, minimize the size of each lookup item and ensure you are running in 64-bit mode.

    A second constraint is that by default, .NET Framework maximum array size is 2GB, which in turn limits the Dictionary<TKey,TValue> size with a 64-bit application (using references as key and value) to a maximum of 47.9 million items. You can remove this limit by enabling support for larger arrays, as described in <gcAllowVeryLargeObjects> Element.

    .NET 6+ however supports >2GB arrays by default.

    Lookup Keys

    The user provides a callback function that takes the incoming row as a parameter, and returns the lookup key. This callback can be as simple as picking a single field:

    row => row.MyKey
    

    It can also perform calculations using multiple members, change the case of a string to match the case of the dictionary keys etc.:

    row => (row.Category + ";" + row.Product).ToUpperInvariant()
    

    If the same key calculation is used with multiple workers, avoid code duplication by implementing it in a reusable method (e.g. as a static method on the row type), and pass that method to the lookup worker.

    Lookup Action

    By default, incoming rows where the key is found are sent to the FoundOutput or Output port, while rows where the key is not found are sent to the NotFoundOutput or ErrorOutput port.

    The user can optionally provide callbacks to:

    • Modify the incoming rows before they are sent to a downstream worker, e.g. applying the lookup value to the row, and setting row default values on lookup misses
    • Redirect the incoming rows to any of the output or error output ports, e.g. redirecting a lookup miss to the FoundOutput or Output port after having set default values
    • Add new dictionary items, typically on lookup misses, e.g. from querying a web lookup service or a database. Explicitly supply a dictionary instance, which can then be modified in the notFoundKeyFunc callback.

    Sharing the Dictionary

    If the same lookup items are needed multiple times during a run, consider reusing the dictionary to save memory, and to avoid loading the dictionary multiple times. Do take care to adhere to the threading requirements of the dictionary used, e.g.:

    • The default Dictionary<TKey,TValue> must be fully populated by a single thread (e.g. a single worker), but can then be used for lookups by multiple threads (or workers). Either pre-populate the dictionary before any of the lookup workers run, or have one lookup worker load it, and use start constraints, and/or grouping of workers, and/or fully blocking dataflow workers to ensure the dictionary has completed loading before subsequent lookup workers start using it.
    • See above for using a ConcurrentDictionary

    Fully Cached - Dictionary Provided Example

    This example pre-populates the dictionary using standard .NET code, to provide a fully cached lookup. It also sets a default value on any unmatched rows.

    using actionETL;
    
    public partial class FullyCachedDictionaryLookup
    {
        private sealed class Row
        {
            public string Color;
            public int Metric;
    
            public int ColorId; // Populated by lookup
        }
    
        private readonly Row[] _sampleData = new Row[]
        {
              new Row { Color = "Grey", Metric = 8 }
            , new Row { Color = "Blue", Metric = 3 }
            , new Row { Color = "Gray", Metric = 5 }
        };
    
        private readonly Dictionary<string, int> _dictionary = new Dictionary<string, int>
        {
            { "White", 0},
            { "Grey", 1}, // Two keys mapped to the same value
            { "Gray", 1}, // Two keys mapped to the same value
            { "Black", 2},
        };
    
        private const int _unknownColorId = -1;
    
        public WorkerSystem RunExample()
        {
            var workerSystem = new WorkerSystem()
                .Root(ws =>
                {
                    // Data rows source, typically from files or database query
                    new EnumerableSource<Row>(ws, "Source", _sampleData)
    
                    // Lookup transform, fluent coding style
                    .Output.Link.DictionaryLookupTransform(
                          "Set ColorId"
                        , _dictionary
                        , row => row.Color // Select dictionary key
                        , (row, key, value) => row.ColorId = value // Update matched rows
                        , (row, key) => // Update unmatched rows
                        {
                            row.ColorId = _unknownColorId;
                            return DictionaryLookupRowTreatment.Found; // Send unmatched to Output
                        }
                        )
    
                    // Collection target, fluent coding style, receives three rows:
                    // { Color = "Grey", Metric = 8, ColorId = 1 }
                    // { Color = "Blue", Metric = 3, ColorId = -1 }
                    // { Color = "Gray", Metric = 5, ColorId = 1 }
                    .Output.Link.CollectionTarget();
                });
            workerSystem.Start().ThrowOnFailure();
            return workerSystem;
        }
    }
    

    Fully Cached - Database Query Example

    In this example the dictionary items are read from a database query, and the lookup worker pre-populates the dictionary from its DictionaryInput port, to provide a fully cached lookup. Any unmatched rows will be sent to the (here unlinked) ErrorOutput port.

    The DictionaryInput rows can be of any type; here the MutableKeyValue<TKey, TValue> helper class is used.

    using actionETL;
    using actionETL.Adb;
    using actionETL.Adb.SqlClientExternal;
    
    public partial class FullyCachedAdbDictionaryLookup
    {
        private sealed class Row
        {
            public string Color;
            public int Metric;
    
            public int ColorId; // Populated by lookup
        }
    
        private readonly Row[] _sampleData = new Row[]
        {
              new Row { Color = "Grey", Metric = 8 }
            , new Row { Color = "Black", Metric = 7 }
            , new Row { Color = "Gray", Metric = 5 }
        };
    
        public WorkerSystem RunExample()
        {
            var workerSystem = new WorkerSystem()
                .Root(ws =>
                {
                    // Dictionary items source query, typically from tables, views, or sprocs
                    var dictionary = new AdbDataReaderSource<MutableKeyValue<string, int>>(ws
                        , "dictionary"
                        , AdbSqlClientProvider.Get()
                            .CreateConnectionBuilder(ws.Config["SqlServer"])
                        , @"SELECT 'White' as [Key], 0 as [Value]
                            UNION ALL SELECT 'Grey', 1
                            UNION ALL SELECT 'Gray', 1
                            UNION ALL SELECT 'Black', 2
                            "
                        , rcc => rcc.AutoName()
                        );
    
                // Data rows source, typically from files or database query
                new EnumerableSource<Row>(ws, "Source", _sampleData)
    
                // Lookup transform, fluent coding style
                .Output.Link.DictionaryLookupTransform(
                      "Set ColorId"
                    , dictionary.Output
                    , dictRow => new KeyValuePair<string,int>(dictRow.Key, dictRow.Value)
                        , row => row.Color // Select dictionary key
                        , (row, key, value) => row.ColorId = value // Update matched rows
                        )
    
                    // Collection target, fluent coding style, receives three rows:
                    // { Color = "Grey", Metric = 8, ColorId = 1 }
                    // { Color = "Black", Metric = 7, ColorId = 2 }
                    // { Color = "Gray", Metric = 5, ColorId = 1 }
                    .Output.Link.CollectionTarget();
                });
            workerSystem.Start().ThrowOnFailure();
            return workerSystem;
        }
    }
    

    Partially Cached - Database Lookup Example

    In this example the dictionary items are read from a database on the fly, once for each unmatched lookup, thereby only loading items that are actually used. This is beneficial if the number of lookup items is very large, but only a smaller number of unique keys are actually looked up on each run.

    Do note that each unmatched key will result in a database round trip, which can be prohibitively slow if the number of unique keys being looked up is very large.

    Note

    In this example:

    • Any keys not available from the database will result in a database round trip each time they appear in the incoming rows. If this is an issue, consider also storing these unavailable keys in the dictionary, together with an "empty" value, and use this to avoid the database round trip for previously seen unavailable keys.
    • OpenCloseIfRequired<TResult>(Func<IAdbConnection, TResult>) is used to open and close the connection for each database lookup, which is prudent since the time between database queries is normally unpredictable.

    If performing similar database lookups in several places, consider using a helper method to create the database command, the dictionary, and the transform, and only pass in the parameters that changes between invocations.

    Any unmatched rows will be sent to the (here unlinked) ErrorOutput port.

    using actionETL;
    using actionETL.Adb;
    using actionETL.Adb.SqlClientExternal;
    using System.Data;
    
    public partial class PartiallyCachedAdbDictionaryLookup
    {
        private sealed class Row
        {
            public string Color;
            public int Metric;
    
            public int ColorId; // Populated by lookup
        }
    
        private readonly Row[] _sampleData = new Row[]
        {
              new Row { Color = "Grey", Metric = 8 }
            , new Row { Color = "Black", Metric = 7 }
            , new Row { Color = "Black", Metric = 15 }
        };
    
        public WorkerSystem RunExample()
        {
            var workerSystem = new WorkerSystem()
                .Root(ws =>
                {
                    // Create database command to retrieve ColorId on unmatched lookups
                    var cb = AdbSqlClientProvider.Get()
                        .CreateCommandBuilder(ws.Config["SqlServer"]);
                    cb.CommandText = "SELECT ColorId FROM Colors WHERE Color = @Color";
                    var parameter = cb.Parameters.Add("@Color").SetDbType(DbType.String);
                    var cmd = ws.DisposeOnFinished(cb.Create()); // Guarantee disposal
    
                    // Dictionary to populate on the fly
                    var dictionary = new Dictionary<string, int>();
    
    
                    // Data rows source, typically from files or database query
                    new EnumerableSource<Row>(ws, "Source", _sampleData)
    
                    // Lookup transform queries database for missing lookup items
                    .Output.Link.DictionaryLookupTransform(
                          "Set ColorId"
                        , dictionary
                        , row => row.Color // Select dictionary key
                        , (row, key, value) => row.ColorId = value // Update matched rows
                        , (row, lookupKey) => // Update unmatched rows
                        {
                            parameter.DbValue = lookupKey; // Set key to lookup in database
                            var result = cmd.Connection
                                .OpenCloseIfRequired(conn => cmd.ExecuteScalar());
                            if (result == null)
                                return DictionaryLookupRowTreatment.NotFound;
                            row.ColorId = (int)result; // Update row
                            dictionary.Add(lookupKey, row.ColorId); // Update dictionary
                            return DictionaryLookupRowTreatment.Found;
                        }
                        )
    
                    // Collection target, receives three rows:
                    // { Color = "Grey", Metric = 8, ColorId = 1 }
                    // { Color = "Black", Metric = 7, ColorId = 2 }
                    // { Color = "Black", Metric = 15, ColorId = 2 }
                    .Output.Link.CollectionTarget();
                });
            workerSystem.Start().ThrowOnFailure();
            return workerSystem;
    
            /* The example assumes the following table and data already exist:
            CREATE TABLE Colors
            (
                Color nvarchar[50],
                ColorId int
            );
    
            INSERT INTO Colors (Color, ColorId) VALUES 
                ('White', 0),
                ('Grey', 1),
                ('Gray', 1),
                ('Black', 2)
            */
        }
    }
    

    See Also

    • Dataflow
    • API
      • MutableKeyValue<TKey, TValue>
      • DictionaryLookupTransform<TInputOutputError, TKey, TValue>
      • DictionaryLookupTransform<TInputOutputError, TDictionaryInput, TKey, TValue>
      • DictionaryLookupSplitTransform<TInputOutputError, TKey, TValue>
      • DictionaryLookupSplitTransform<TInputOutputError, TDictionaryInput, TKey, TValue>
      • DictionaryTarget<TInput, TKey, TValue>
    In This Article
    Back to top Copyright © 2023 Envobi Ltd