Search Results for

    Show / Hide Table of Contents

    Custom Workers

    This article describes how to implement custom functionality, ranging from configuring existing workers, to creating new worker types.

    Configure Existing Worker

    Existing workers are highly configurable, allowing custom functionality to be added to them, often obviating the need to add additional workers or create new types of workers.

    • Pros:
      • Straightforward to implement
      • Code is (normally) placed inline with the worker creation, making it very easy to see what the worker system has implemented
    • Cons:
      • Cannot add members (properties, methods...) to, or remove members from the worker
      • Good for one-offs, but leads to code duplication if the same custom functionality is needed at different places. This can be partly mitigated by creating a method or delegate to implement the custom functionality, and supply it to multiple workers.

    Examples include:

    • Customizable workers, such as SortTransform<TInputOutput> and the join transforms that also allows specifying the sort condition as a custom comparer function
    • All *Action* Workers, where the library user provides the bulk of the logic via a callback (often Action or Func<T>), e.g. ActionWorker
    • Execution callbacks common to all workers, see Worker Execution

    Instantiate Workers via Helper Method

    A common and simple alternative to create either a single complex worker, or a new worker that uses worker children to implement its functionality, is to instead use a helper method that creates multiple workers when called.

    • Pros:
      • Straightforward to implement
      • Reduces code duplication
      • Can create (but not fully encapsulate) any number of child workers - the user only calls the helper method
    • Cons:
      • Cannot add members to, or remove members from the workers

    In this example, a helper method instantiates and links a transform and a target worker (upstream transforms and source not shown). Note how the helper method only takes the few parameters that change between invocations:

    // Combine and insert data and corrected error rows:
    CreateUnionAndInsert("Product Data", "dbo.Product"
        , trx2.Output, trx2CheckA.Output, trx2CheckB.Output);
    
    // Combine and insert error rows:
    CreateUnionAndInsert("Product Errors", "dbo.ProductError"
        , trx1.ErrorOutput, trx2CheckB.ErrorOutput);
    
    
    // Local helper function to union and insert rows into a database table
    void CreateUnionAndInsert(string name, string tableName,
        params OutputPortBase<Product>[] inputsFrom)
    {
        if (inputsFrom.Length == 0)
            return;
    
        inputsFrom[0].Worker.Parent.GetDownstreamFactory<Product>()
        .UnionAllTransform("Union " + name, inputsFrom)
        
        .Output.Link
        .AdbInsertTarget("Insert " + name, acs.CreateConnectionBuilder(), tableName);
    }
    

    Define New Worker Type

    A new worker type is defined by inheriting, directly or indirectly, from WorkerBase<TDerived>. In general, inherit from the worker that is most similar to your desired final functionality, without having any extra unwanted functionality.

    For instance, if your new worker will execute a user supplied delegate (and you will add additional functionality on top of that), consider deriving from ActionWorkerBase<TDerived>. On the other hand, if your new worker does not execute a user supplied delegate, you might derive directly from WorkerBase<TDerived>.

    Note

    All Workers with names ending in *Base cannot be instantiated directly, and are specifically meant to be inherited from. Other (unsealed) workers can also be inherited from, but that is a less common use case.

    • Pros:
      • Can add members to the worker, and can avoid inheriting unwanted members
      • Very easy to reuse, and reduces code duplication
      • Provides a single worker to check for completion status, set start constraints etc.
      • Can encapsulate any number of child workers - only the outermost worker needs to be configured and instantiated by the user
    • Cons:
      • Takes slightly more code to implement vs. using callbacks

    Best Practices

    Worker Constructors

    • All worker constructors SHOULD start with the parameters WorkerParent parentWorker, string workerName
    • All workers SHOULD have at least one overload that accepts a Func<bool> isStartableFunc parameter (as the third parameter directly after string workerName), except dataflow transform and target workers which SHOULD NOT have this parameter in any overload
    • SHOULD avoid large resource allocations or heavy CPU usage in the constructor, and instead perform them when the worker execution is started. This lowers overall resource usage since a created worker might not be started for quite a while, or at all.
    • Worker values that must be set SHOULD be present in constructor overloads, so that the worker is always in a consistent state.
    • Do validate all parameters and properties, in the constructor if it cannot be set later, or when the worker starts running if a property can be set after worker instantiation.

    Public Properties etc.

    Public properties (and less commonly, public methods) can be added to allow sibling and ancestor workers to:

    • Set values after instantiation:
      • Most commonly before the worker starts running, to provide values calculated by worker siblings, or to provide optional values not included in the constructors
      • Sometimes also after worker completion, to e.g. clear large memory instances
    • Get values after instantiation:
      • Most commonly after the worker completes successfully (and sometimes also with error), to provide a worker result value (beyond the default Status)
      • Sometimes at any time, even when the worker runs, if the value never changes after the worker starts running
    Important

    You MUST use SafeWorkerParentValue<T> to get and set publicly accessible values that can change after instantiation, including references where the pointed to instance can change. This ensures thread-safety, and throws if the worker is in an unexpected state, as well as when getting an unpopulated value.

    The below example allows setting a filename property after worker instantiation, and in these three dataflow examples the property can also be set after the worker has completed.

    Error Handling

    Worker Error Handling covers error handling when using workers.

    In worker implementations, when an error condition (e.g. incorrect parameter, IOException etc.) occurs, it has four options for dealing with it:

    1. Handle the error (e.g. use a fall-back value, retry etc.), continue processing, and return a success status (e.g. Succeeded) when finished.
      • Normally, also log a Warn or lower message with details about the error that was handled.
    2. Stop processing and return an error status, e.g. via one of the Error(String) overloads, including any failure details. This error status will be visible to its parent and sibling workers, and allows them to react accordingly, e.g. by running fall-back or cleanup workers, or halt processing.

      Do not explicitly log any message, the system will log the OutcomeStatus details.

    3. Stop processing and return a fatal status, e.g. via one of the Fatal(String) overloads. This fatal status will force all ancestors to also get the Fatal status, and will halt the whole worker system as soon as possible.

      This option is appropriate when using less severe options 1. or 2. could lead to an unknown or corrupted application state.

      Except for programming errors (i.e. bugs), this option is strongly preferable to option 4., since it allows adding explaining log categories and messages to the (fatal) error, and shows that it is a known possible error that have been coded appropriately.

      Do not explicitly log any message, the system will log the OutcomeStatus details.

    4. Allow an unhandled exception to escape from the worker. The worker system will force the worker and all its ancestors to have a Fatal status, and the whole worker system is halted as in option 3.

      This option is appropriate for programming errors (i.e. bugs), e.g. when receiving incorrect values in method parameters and properties. Workers SHOULD handle all predictable exceptions that are not bugs using options 1., 2., or 3.

      Do include relevant troubleshooting information in the exception, e.g. in its Message property. Do explicitly log a failure message if there are additional information that can't be included in the exception.

    Important

    Do include sufficient details with the failure to allow troubleshooting the issue.

    Also see Logging Messages regarding logging levels etc.

    Single Worker Implementation

    Here we define the new worker type FileExistsInheritWorker (similar to the existing FileExistsWorker), using inheritance. The worker will fail the worker system if the specified file doesn't exist:

    using actionETL.Logging;
    using System;
    using System.IO;
    using System.Threading.Tasks;
    
    public class FileExistsInheritWorker
       : WorkerBase<FileExistsInheritWorker>
    {
        // For all workers except transforms and targets, best practice is to provide
        // overloads both with and without an 'isStartableFunc' parameter.
        public FileExistsInheritWorker(
              WorkerParent workerParent
            , string workerName
            , Func<bool> isStartableFunc
            , string fileName
            )
        : base(workerParent, workerName, isStartableFunc)
        {
            _fileName = new SafeWorkerParentValue<string>(this, fileName);
        }
    
        public FileExistsInheritWorker(
              WorkerParent workerParent
            , string workerName
            , string fileName
            )
        : this(workerParent, workerName, null, fileName)
        { }
    
        // Allow setting filename after worker creation (but before worker runs).
        public string FileName 
        {
            get => _fileName.GetValueUnchecked();
            set => _fileName.SetValueChecked(value, WorkerParentState.Created);
        }
        private readonly SafeWorkerParentValue<string> _fileName;
    
        // Executed when worker runs.
        protected override Task<OutcomeStatus> RunAsync()
        {
            if (FileName == null)
                throw new InvalidOperationException(nameof(FileName) + " cannot be null.");
    
            // Since the method doesn't use the 'async' keyword, we return success/failure
            // wrapped in a Task.
            return File.Exists(FileName)
                ? OutcomeStatus.SucceededTask
                : OutcomeStatus.Error(ALogCategory.FileOperationFailed,
                    $"Expected file '{FileName}' to exist, but it did not."
                    ).ToTask();
        }
    }
    

    The worker can now be used exactly like any of the out of box workers:

    new WorkerSystem()
        .Root(ws =>
        {
            var feiw = new FileExistsInheritWorker(ws, "File exists"
                , ws.Config["TriggerFile"]);
    
            // Create other workers...
        })
        .Start().ThrowOnFailure();
    

    Also see the more extensive Process Incoming Files Example.

    CRTP Inheritance Pattern

    In the above example, the custom worker inherited from WorkerBase<TDerived>, and its name was also specified as the type parameter to WorkerBase<TDerived>:

    public class FileExistsInheritWorker
       : WorkerBase<FileExistsInheritWorker>
    {
    

    This construct is used on all the out-of-box *Base<TDerived> worker classes that you inherit from, and is called the Curiously Recurring Template Pattern (CRTP).

    The advantage this construct brings is that members in the base class can use the type of the derived worker, see e.g. AddStartingCallback(Func<TDerived, Task<ProgressStatus>>) which allows you to use the derived worker type without using a cast.

    Compose Worker Children

    In this example we define the new worker type LoadFlatFileToTable, using inheritance, and add two worker children to implement the desired functionality of loading CSV files into a database table. Note that:

    • LoadFlatFileToTable<T> is not hard-coded for any particular schema. Instead the schema (Category and Product below) is provided by the caller, so the worker system can be used to load different files/rows/tables. A row class is a reusable schema (and a struct is a reusable partial schema), see Dataflow Columns.
    • We create the AdbConnectionBuilder outside the worker, and pass it in, to increase flexibility
    • The file name can also be set in a thread-safe manner via the FileName property, before the worker is started (guaranteed by SafeWorkerParentValue<T>). This allows it to be set from sibling workers that calculate the value on the fly.

    Define the worker:

    using actionETL;
    using actionETL.Adb;
    using actionETL.Adb.SqlClientExternal;
    using actionETL.FileHelper;
    using FileHelpers;
    using System;
    using System.Threading.Tasks;
    
    public class LoadFlatFileToTableWorker<T>
        : WorkerBase<LoadFlatFileToTableWorker<T>>
        where T : class // Dataflow rows must have the 'class' constraint
    {
        private readonly IAdbConnectionBuilder _connectionBuilder;
        private readonly string _tableName;
    
        public string FileName // Make thread-safe and ensure worker status
        {
            get => _fileName.GetValueUnchecked();
            set => _fileName.SetValueChecked(value, WorkerParentState.Created);
        }
        private readonly SafeWorkerParentValue<string> _fileName;
    
        public LoadFlatFileToTableWorker(
              WorkerParent workerParent
            , string workerName
            , Func<bool> isStartableFunc
            , IAdbConnectionBuilder connectionBuilder
            , string fileName
            , string tableName
            )
            : base(workerParent, workerName, isStartableFunc)
        {
            _connectionBuilder = connectionBuilder;
            _fileName = new SafeWorkerParentValue<string>(this, fileName);
            _tableName = tableName;
        }
    
        protected override Task<OutcomeStatus> RunAsync()
        {
            new FileHelperFileSource<T>(this, "Read CSV", FileName)
    
            .Output.Link.AdbInsertTarget("Insert", _connectionBuilder, _tableName);
    
            return OutcomeStatus.SucceededTask;
        }
    }
    

    The worker can now be used exactly like any of the out of box workers:

    [DelimitedRecord(",")]
    public class Category
    {
        public int CategoryId { get; set; }
        public string CategoryName { get; set; }
    }
    
    [DelimitedRecord(",")]
    public class Product
    {
        public int ProductId { get; set; }
        public string ProductName { get; set; }
    }
    
    public static SystemOutcomeStatus RunExample()
    {
        var workerSystem = new WorkerSystem().Root(ws =>
        {
            var acs = AdbSqlClientProvider.Get()
                .CreateConnectionString(ws.Config["SqlServer"]);
    
            _ = new LoadFlatFileToTableWorker<Category>(ws, "Load Category", null
                , acs.CreateConnectionBuilder()
                , @"Src/CreateWorkerViaInheritance/Category.csv"
                , "dbo.Category"
                );
    
            _ = new LoadFlatFileToTableWorker<Product>(ws, "Load Product", null
                , acs.CreateConnectionBuilder()
                , @"Src/CreateWorkerViaInheritance/Product.csv"
                , "dbo.Product");
        }
        );
    
        return workerSystem.Start();
    }
    
    /* The example assumes the following tables already exists:
        CREATE TABLE Category
        (
            CategoryId int,
            CategoryName varchar(50)
        )
    
        CREATE TABLE Product
        (
            ProductId int,
            ProductName varchar(50)
        )
     */
    

    Key Points

    • It is very straight forward and only takes a few lines to define a new worker. Furthermore, there are a number of different base classes (see Workers) whose only purpose is to assist with defining new kinds of workers.
    • The more a piece of code is reused, the bigger the benefit from creating a new worker type. A well chosen worker type name 'explains' what the worker does, and is often preferable to having a code snippet that adds existing workers, but can't change their type names.
    • Functionality is added by overriding the method RunAsync(), which the system calls when the worker is 'started'.
      • Some workers already override RunAsync(), and provide a further refined API for derived workers to use. E.g. with RowTransformBase<TDerived, TInputOutputError> the OnInputRow(TInputOutputError) must be overridden; always check the API documentation of the worker you derive from.
    • Using inheritance allows adding custom members and encapsulating contained workers (e.g. the new parent worker would normally automatically configure its child workers)
    • The above examples inherits from the least derived WorkerBase<TDerived> worker - the same approach is used when inheriting from more derived workers

    Also see Slowly Changing Dimension Example.

    Asynchronous Programming

    actionETL makes heavy use of and widely supports async and await, a facility introduced with .NET Framework 4.5 that makes asynchronous programming quite straightforward to implement.

    This is valuable, since file, network, database and other interactions are inherently asynchronous, and supporting asynchronous programming well is one of the enablers for building larger ETL systems that perform well.

    actionETL API methods that support asynchrony have a name ending in *Async, and an awaitable type, typically Task, Task<TResult>, or ValueTask<TResult>. This means they can use the async keyword; they are however not required to do so:

    • DO use async on a method if you also need to use the await keyword in the method.
      • If the return type is Task<int>, then you return an integer from the method, which automatically gets wrapped in a task.
      • Using async adds a (fairly small) performance overhead, so only use it if it's needed.
    • DON'T use async on a method if you don't need the await keyword in the method.
      • If the return type is Task<int>, then you must return an integer wrapped in a task, typically with Task.FromResult().
      • As the last two examples showed, OutcomeStatus (and several other status classes) have members providing wrapped versions of the status, e.g. SucceededTask, which you can use and avoid creating your own.
    • Some workers have both synchronous and asynchronous APIs, allowing you to call the most appropriate one.

    See Also

    • Worker System
      • Logging
        • Logging Messages
    • Workers
      • Custom Workers
        • Process Incoming Files Example
        • Slowly Changing Dimension Example
    • Dataflow
      • Custom Dataflow Workers
    • SQL Database Access
      • Custom Adb Development
    • Data Formats
    • Transfer Protocols
    In This Article
    Back to top Copyright © 2020 Envobi Ltd