Search Results for

    Show / Hide Table of Contents

    Custom Dataflow Workers

    Since a dataflow worker is simply a normal worker with ports added, all the techniques described in Custom Workers also apply to dataflow workers. This and related articles will mainly cover the aspects that are unique to custom dataflow workers.

    Design Decisions

    The following is a checklist of design decisions to consider specifically when implementing custom dataflow workers.

    Ports

    • Source, transform, or target worker?
      • Which input ports, if any?
        • What are the ports logical blocking requirements, and how to satisfy them in the implementation (see Dataflow Blocking and Row Buffering)?
          • Does the Default policy pick the correct port buffering?
            • Set Limited port buffering?
            • Set Full port buffering?
            • Set Limited port buffering with worker internal buffering?
      • Which output ports, if any?
      • Which error output ports, if any?
        • What error output port types?
          • Same as an input port?
          • StringRowErrors?
          • Custom type?
        • Only provide worker with error output port, and/or version without it?
      • Transforms:
        • Can the output row type(s) be different from the input row type(s)?
          • If yes, also provide version where the types are the same, if that makes it easier to instantiate for the user?
    • What to name any added port properties (see Ports)?
    • Implement with Custom Dataflow Pass-through Workers (i.e. compose existing child workers and connect their ports to the parent ports)?
    • Implement from scratch (i.e. without pass-through workers)?
      • Synchronous or asynchronous input/output row processing?
        • Synchronous can be slightly simpler to implement
          • A number of helper workers exist to allow your code to still be synchronous. Internally they are still wholly or partly asynchronous, which avoids blocking the thread.
        • Asynchronous is required e.g. if accessing external data sources asynchronously, or waiting on multiple input ports, or waiting on multiple output ports.
      • Row-by-row, or row buffer-by-row buffer processing?
        • Workers can send or take port rows either one by one, or multiple rows at a time, the latter having slightly lower overhead. The difference is small enough that worker implementations should generally use the approach that provides the least complex code.
      • If you have multiple similar input ports, or multiple similar output ports, avoid blocking or starving some ports. E.g. check/take/send rows or row buffers in a round robin order.

    Rows and Columns

    • Do rows and/or columns need to be copied or cloned (see Dataflow Column Mapping and Custom Dataflow Column Mapping)?
    • Do rows and/or columns need to be compared or sorted (see Compare Dataflow Columns and Custom Dataflow Column Mapping)?
    • Consider using Expression Trees to generate high performance code to process each row, e.g. when using FromTypeRowMapper<TFrom>, FromTypeColumnMapper<TFrom>, ToTypeRowMapper<TTo>, or ToTypeColumnMapper<TTo>.
      • GetMemberAccessExpression(Expression) helps with creating expressions to access individual columns or column schemas.
      • ConvertExpressionInfo applies implicit conversions between data types.

    Constructors

    When defining a new dataflow worker, for the constructor parameters:

    • All public worker constructors SHOULD start with the parameters WorkerParent parentWorker, string workerName
    • A source SHOULD have at least one constructor with Func<bool> isStartableFunc as its third parameter
    • For transforms and targets:
      • The constructor SHOULD start with the parameters in DownstreamFactory<TInput> downstreamFactory, string workerName
      • The constructor SHOULD NOT have any Func<bool> isStartableFunc parameter in any overload
      • If it has a fixed number of input ports, there SHOULD be parameters added directly after the workerName parameter to at least some overloads, to link them to upstream ports, since input ports parameters are almost always specified at worker creation. The worker constructors SHOULD however also accept a null port value, to allow the user to also link ports after worker creation. As one example, see TargetBase<TDerived, TInput>.
    • Parameters for linking output ports to downstream workers SHOULD NOT be added to the worker constructors, since output port parameters would almost never be specified at worker creation.

    Also see Custom Workers.

    Transform and Target Factory Methods

    While dataflow sources and non-dataflow workers are instantiated via constructors, all out of box transforms and targets are instead instantiated via factory methods, implemented as extension methods on DownstreamFactory<TOutput>, which is most commonly accessed via the Link property on output and error output ports.

    This approach reduces typing and code maintenance effort by allowing the library user to in most cases:

    • Avoid specifying the parent when creating a transform or target
    • Only specify each dataflow row type once, usually on the source worker, and have the downstream transforms and targets automatically resolve the row types (at compile time)
    • Use a fluent coding style when creating dataflow workers

    For seamless integration, custom transforms and targets SHOULD be instantiated via factory methods, which are simple to add since they are just a thin wrapper around the constructor, e.g.:

    public static class AddMetadata1TransformFactory // Define factory method(s)
    {
        public static AddMetadata1Transform<TInputOutputError>
            AddMetadata1Transform<TInputOutputError>(
                  in this DownstreamFactory<TInputOutputError> downstreamFactory
                , string workerName
                )
            where TInputOutputError : EtlMetadata // Row is-an EtlMetadata
        {
            return new AddMetadata1Transform<TInputOutputError>(downstreamFactory, workerName);
        }
    }
    

    Use the following pattern:

    • In the transform/target class (e.g. MyTransform), implement a minimal number of constructor variations, and 'hide' them by making them internal (which works particularly well if custom workers are defined in a separate utility project)
    • Define a class (i.e. public static class MyTransformFactory), and add the factory methods, each of which should:
      • Extend the DownstreamFactory<TOutput> readonly struct, and:
        • Use its WorkerParent property to set the transform/target worker parent
        • Use its UpstreamPort property (which can be null) as the transform/target "first" input port:
          • Pick input ports with regular data rows (e.g. Input) before other types (e.g. DictionaryInput)
          • Pick the 'first' input port, e.g. pick LeftInput before RightInput
          • Pick input ports with dedicated properties (e.g. Input, if available) before input ports without any dedicated property
      • Return a new instance of the transform/target worker
    • In both the worker class and the factory class, define DownstreamFactory as an in parameter (see examples)

    See Worker Instantiation, and e.g. the implementation examples Inherit from RowTransformBase and Inherit from RowsTargetBase.

    Adding Ports

    In this example we define a new target worker and show:

    • The common case of adding a port and its property, in this case an error output port
    • The more uncommon case of adding (in this case zero or more input) ports, where how many to add isn't known ahead of time, and therefore not adding any dedicated property for each port. They can instead be accessed (without a row type) from the Inputs property, and as per Ports, we add a TypedInputs property to easily access them with their row type.
    using actionETL;
    using System.Threading.Tasks;
    
    public class InputsErrorTarget<TInputError>
        : WorkerBase<InputsErrorTarget<TInputError>>
        where TInputError : class // Dataflow rows must have 'class' constraint
    {
        // Add port property for ease of access:
        public ErrorOutputPort<TInputError> ErrorOutput { get; }
    
        internal InputsErrorTarget(
              in DownstreamFactory<TInputError> downstreamFactory
            , string workerName
            , int totalInputCount
            )
            : base(downstreamFactory.WorkerParent, workerName, null)
        {
            // Create error output port with dedicated property:
            ErrorOutput = ErrorOutputs.Create<TInputError>(nameof(ErrorOutput));
    
            // Create input ports all with the same row type:
            TypedInputs = new InputPortCollection<TInputError>(this);
            for (int i = 0; i < totalInputCount; i++)
                Inputs.Create<TInputError>("Input" + i, null);
            // Link the downstreamFactory port, if present:
            if (downstreamFactory.UpstreamPort != null && totalInputCount > 0)
                TypedInputs[0].LinkFrom(downstreamFactory.UpstreamPort);
        }
    
        protected async override Task<OutcomeStatus> RunAsync()
        {
            // Use a typed input port
            var row0 = TypedInputs[0].TryTakeRow();
    
            // Use the error output port
            if (row0 != null)
            {
                var progressStatus = ErrorOutput.SendErrorRow(row0, "My error message.");
                if (progressStatus.IsFailed)
                    return progressStatus.ToOutcomeStatus(false);
            }
    
            // Use inputs by order added (here we're only emptying them).
            // Ports can also be indexed by name.
            for (int i = 0; i < Inputs.Count; i++)
                await Inputs[i].SkipAllRowsAsync().ConfigureAwait(false);
    
            return OutcomeStatus.Succeeded;
        }
    
        public InputPortCollection<TInputError> TypedInputs { get; private set; }
    }
    
    public static class InputsErrorTargetFactory
    {
        public static InputsErrorTarget<TInputError> InputsErrorTarget<TInputError>(
              in this DownstreamFactory<TInputError> downstreamFactory
            , string workerName
            , int totalInputCount
            )
            where TInputError : class
        {
            return new InputsErrorTarget<TInputError>(
                  downstreamFactory
                , workerName
                , totalInputCount
                );
        }
    }
    

    The custom target worker can now be used:

    new WorkerSystem().Root(ws =>
    {
        var source0 = new EnumerableSource<SmallClass>(ws, "Source0", SmallClass.GetRange(10));
        var source1 = new EnumerableSource<SmallClass>(ws, "Source1", SmallClass.GetRange(20));
    
        var target = source0.Output.Link.InputsErrorTarget("Target", 2);
        source1.Output.LinkTo(target.TypedInputs[1]);
    
        var errorTarget = target.ErrorOutput.Link.CollectionTarget();
    }
    ).Start().ThrowOnFailure();
    
    Note

    Only add input ports and data output ports to workers that either:

    • Have no built-in row processing (WorkerBase<TDerived>, SourceBase<TDerived, TOutput>, ...), or
    • Have built-in row processing, but are designed to accept additional ports (UnionAllTransform<TInputOutput>, MergeSortedTransform<TInputOutput>, ...)

    Breaking these rules by adding ports to e.g. SortTransform<TInputOutput> would leave the added ports without suitable row processing logic, which would normally hang the dataflow.

    Row Processing

    Workers with ports must follow these rules:

    • For each input port, the worker MUST either take or skip over all rows until the port signals it's completed, or cancel the port (immediately or after taking some rows).
    • For each data output port, the worker (after sending any desired rows) MUST complete the port.

    Breaking the above rules results in the port, the worker, and the worker system getting a Fatal status. Note however that a few out-of-box workers (e.g. RowSourceBase<TDerived, TOutput>) and some callbacks (e.g. AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>>)) can complete ports automatically on behalf of the library user; see the API documentation for details.

    • Error output ports on the other hand do not need to be completed.

    Please also see the above, Custom Dataflow Source, Custom Dataflow Transform, and Custom Dataflow Target examples.

    Error Handling

    Beyond what applies to all workers, on detecting an error that will fail the worker, dataflow workers:

    • MUST cancel all uncompleted input ports by calling InputPortCollection.TryCancel(), or InputPort.TryCancel() on each port
    • MUST complete all uncompleted data output ports by calling an OutputPortBaseCollection.TrySendError() overload, or an OutputPortBase.SendError() overload on each port
      • Note: Call TrySendError/SendError (as opposed to TrySendSucceeded/SendSucceeded) even if the error was caused by an upstream worker. That way the downstream worker knows that the sending of rows was unsuccessful.
    • SHOULD fail the worker with details appropriate to the failure. E.g. a transform without any error port that receives an incorrect row might return a failure status such as:
    return OutcomeStatus.Fatal(ALogCategory.RowErrorRowFailure, exception
        , "User callback threw an exception");
    
    Note

    The above approach allows the workers to fail with an easy to troubleshoot error message. Any uncompleted input or data output ports when the worker completes, and any uncaught exception, will still generate additional (and always fatal) error messages, although they will be slightly less easy to troubleshoot.

    See Also

    • Dataflow
      • Custom Workers
      • Custom Dataflow Workers
        • Custom Dataflow Source
        • Custom Dataflow Transform
        • Custom Dataflow Target
        • Custom Dataflow Pass-through Workers
        • Custom Dataflow Column Mapping
        • Slowly Changing Dimension Example
    In This Article
    Back to top Copyright © 2023 Envobi Ltd