Search Results for

    Show / Hide Table of Contents

    Custom Dataflow Transform

    This article describes how to implement custom dataflow transform workers, on top of what's covered in Custom Workers and Custom Dataflow Workers. Please also see the list of dataflow Transforms.

    Note

    A transform is simply the combination of a source and a target in a single worker. Not only does adding input(s) and output(s) to a regular worker create a transform, but so does adding input(s) to a source, and adding output(s) to a target. Pick a suitable worker as a starting point, and add any missing ports.

    Single Transform Worker

    These examples all implement the same logic, populating the fields of a custom EtlMetadata class, which is then used as a base class for dataflow row types. These metadata fields can e.g. be stored with the row in databases etc., to simplify linking different sets of user data to each other, assist troubleshooting, and to link user data to logging information:

    using actionETL;
    using actionETL.Logging;
    using System;
    using System.Threading.Tasks;
    
    public class EtlMetadata // Could also use an interface
    {
        public Guid CreationGuid { get; set; }
        public NodaTime.Instant InstantCreated { get; set; }
    }
    
    public class MyBudget : EtlMetadata
    {
        public decimal Budget { get; set; }
    }
    
    Note

    There are many useful variations on this approach, e.g.:

    • Use an IEtlMetadata interface, so that user row types can have different base classes
    • Add additional metadata members

    The examples below use different techniques, ranging from ad-hoc to reusable, and from simple but limited to verbose and flexible; use the approach that best matches your requirements.

    Ad-hoc RowActionTransform

    Instantiate RowActionTransform<TInputOutputError> (input and output are the same type) or RowActionTransform<TInputError, TOutput> (input and output are different types), both with an error output port. You must supply a callback that processes each incoming row; there are also overloads that allow you to reject rows to the error output, or discard them. This worker is the ad-hoc version of RowTransformBase.

    • Pros:
      • Row-by-row synchronous row processing
      • Very simple implementation, avoids inheriting and checking for available input rows or output demand
    • Cons:
      • Does not create a reusable worker
      • Does not support:
        • Adding members to the worker
        • Asynchronous processing
        • Multiple input or output ports
        • Buffer row processing
    var transform0 = source0.Output.Link.RowActionTransform1(
          "RowActionTransform"
        , row => // Provide callback to process each incoming row
        {
            row.CreationGuid = root.CreationGuid;
            row.InstantCreated = root.InstantCreated;
        }
        );
    

    Ad-hoc RowsActionTransform

    Instantiate RowsActionTransform<TInputOutputError> (input and output are the same type) or RowsActionTransform<TInputError, TOutput> (input and output are different types), both with an error output port. You must supply a callback that processes available incoming rows and outputs rows (there is at least BufferCapacity row demand on each invocation). This worker is the ad-hoc version of RowsTransformBase.

    Note

    For RowsActionTransform, ActionTransform, and similar workers:

    • Inside the callback, the worker is accessible as an arbitrarily named parameter (here rat, rat.input etc.)
    • Variables outside the callback are accessed as captured variables
    • Pros:
      • Allows buffer row processing
      • Allows asynchronous processing
      • Very simple implementation, avoids inheriting and checking for available input rows or output demand
    • Cons:
      • Does not create a reusable worker
      • Does not support:
        • Adding members to the worker
        • Multiple input or output ports
    var transform1 = source1.Output.Link.RowsActionTransform1(
          "RowsActionTransform"
        , rat => // Provide callback to process available incoming rows
        {
            foreach (var row in rat.Input.TakeBuffer())
            {
                row.CreationGuid = root.CreationGuid;
                row.InstantCreated = root.InstantCreated;
                rat.Output.SendRow(row);
            }
        }
        );
    

    Ad-hoc ActionTransform

    Instantiate ActionTransform<TInputOutputError> (input and output are the same type) or ActionTransform<TInputError, TOutput> (input and output are different types), both with an error output port. You must supply a callback that processes all incoming rows. This worker is the ad-hoc version of TransformBase.

    • Pros:
      • Allows buffer row processing
      • Less code than inheriting when only used a single time
      • Supports multiple input and output ports (but they must include Input and Output)
    • Cons:
      • Does not create a reusable worker
      • Cannot add members to the worker
      • Callback must be asynchronous
    Note

    There is also a similar ActionTwoInputTransform<TLeftInput, TRightInput, TOutput> worker, that instead has two inputs.

    Although we could have used an enumerable and row-by-row processing also with ActionTransform, here we instead demonstrate array buffer processing, which has lower overhead vs. row-by-row. The performance difference is however only noticeable with many millions of rows, when the per-row work is very brief, as in this case:

    var transform2 = source2.Output.Link.ActionTransform1(
          "ActionTransform"
        , async at => // Provide callback to process all incoming rows
        {
            var buffer = new MyBudget[at.Input.BufferCapacity];
            while (true)
            {
                var countTask = at.Input.TakeBufferAsync(buffer);
                // Avoid await if successfully completed synchronously
                if (!countTask.IsCompletedSuccessfully)
                    await countTask.ConfigureAwait(false);
                int count = countTask.Result; // Result also observes any task exception
                if (count == 0)
                {
                    at.Output.SendSucceeded();
                    // No more rows, return input port outcome
                    return at.Input.Status.ToOutcomeStatus(false);
                }
    
                for (int i = 0; i < count; i++)
                {
                    buffer[i].CreationGuid = root.CreationGuid;
                    buffer[i].InstantCreated = root.InstantCreated;
                }
    
                var succeededTask = at.Output.SendRowsAsync(buffer, count);
                // Avoid await if successfully completed synchronously
                if (succeededTask.Status != TaskStatus.RanToCompletion)
                    await succeededTask.ConfigureAwait(false);
                if (!succeededTask.Result) // Result also observes any task exception
                    return OutcomeStatus.Error(ALogCategory.OutputPortUnexpectedNoDemand);
            }
        });
    

    Inherit from RowTransformBase

    Inherit from RowTransformBase<TDerived, TInputOutputError> (input and output are the same type) or RowTransformBase<TDerived, TInputError, TOutput> (input and output are different types), both with an error output port. They both have one input and one data output port, and you must override OnInputRow(TInputOutputError) to process each incoming row. These workers are the reusable versions of RowActionTransform.

    • Pros:
      • Creates reusable worker
      • Can add members to the worker
      • Row-by-row synchronous row processing
      • Simple implementation, avoids checking for available input rows or output demand
    • Cons:
      • Does not support:
        • Asynchronous processing
        • Multiple input or output ports
        • Buffer row processing
    public class AddMetadata1Transform<TInputOutputError>
        : RowTransformBase<AddMetadata1Transform<TInputOutputError>, TInputOutputError>
        where TInputOutputError : EtlMetadata // Row is-an EtlMetadata
    {
        internal AddMetadata1Transform(
              in DownstreamFactory<TInputOutputError> downstreamFactory
            , string workerName
            )
            : base(downstreamFactory, workerName, true)
        {
        }
    
        // Override this method to process each incoming row
        protected override TransformRowTreatment OnInputRow(TInputOutputError inputRow)
        {
            inputRow.CreationGuid = WorkerSystem.CreationGuid;
            inputRow.InstantCreated = WorkerSystem.InstantCreated;
    
            return TransformRowTreatment.Send;
        }
    }
    
    public static class AddMetadata1TransformFactory // Define factory method(s)
    {
        public static AddMetadata1Transform<TInputOutputError>
            AddMetadata1Transform<TInputOutputError>(
                  in this DownstreamFactory<TInputOutputError> downstreamFactory
                , string workerName
                )
            where TInputOutputError : EtlMetadata // Row is-an EtlMetadata
        {
            return new AddMetadata1Transform<TInputOutputError>(downstreamFactory, workerName);
        }
    }
    

    Inherit from RowsTransformBase

    Inherit from RowsTransformBase<TDerived, TInput, TOutput, TError> (with error output port) or RowsTransformBase<TDerived, TInput, TOutput> (without error output port). They both have one input and one data output port, and you must override OnRowsAndDemandAsync() to process incoming rows. These workers are the reusable versions of RowsActionTransform.

    • Pros:
      • Creates reusable worker
      • Can add members to the worker
      • Allows buffer row processing
      • Allows asynchronous processing
      • Simple implementation, avoids checking for available input rows or output demand
    • Cons:
      • Does not support multiple input or output ports
    public class AddMetadata2Transform<TInputOutput>
        : RowsTransformBase<AddMetadata2Transform<TInputOutput>
            , TInputOutput, TInputOutput>
        where TInputOutput : EtlMetadata // Row is-an EtlMetadata
    {
        internal AddMetadata2Transform(
              in DownstreamFactory<TInputOutput> downstreamFactory
            , string workerName
            )
            : base(downstreamFactory, workerName, true)
        {
        }
    
        // Override this method to process incoming rows
        protected override Task<ProgressStatus> OnRowsAndDemandAsync()
        {
            foreach (var row in Input.TakeBuffer())
            {
                row.CreationGuid = WorkerSystem.CreationGuid;
                row.InstantCreated = WorkerSystem.InstantCreated;
                Output.SendRow(row);
            }
            return ProgressStatus.NotCompletedTask;
        }
    }
    
    public static class AddMetadata2TransformFactory // Define factory method(s)
    {
        public static AddMetadata2Transform<TInputOutput>
            AddMetadata2Transform<TInputOutput>(
                  in this DownstreamFactory<TInputOutput> downstreamFactory
                , string workerName
                )
            where TInputOutput : EtlMetadata // Row is-an EtlMetadata
        {
            return new AddMetadata2Transform<TInputOutput>(downstreamFactory, workerName);
        }
    }
    

    Inherit from TransformBase or WorkerBase

    Inherit from TransformBase<TDerived, TInput, TOutput>, which has one input and one data output port (add an error output port if needed). This worker is the reusable version of ActionTransform. Alternatively, inherit from WorkerBase<TDerived>, and add required ports.

    You must override RunAsync() to process all incoming rows.

    • Pros:
      • Creates reusable worker
      • Full flexibility
        • Can add members to the worker
        • Allows buffer row processing
        • Allows asynchronous processing
        • Supports multiple input and output ports
    • Cons:
      • Requires checking for available input rows and output demand
    Note

    There is also a similar TwoInputTransformBase<TDerived, TLeftInput, TRightInput, TOutput> worker, that instead has two inputs.

    Although we could have used an enumerable and row-by-row processing also with TransformBase, here we instead demonstrate array buffer processing, which has lower overhead vs. row-by-row. The performance difference is however only noticeable with many millions of rows, when the per-row work is very brief, as in this case:

    public class AddMetadata3Transform<TInputOutput>
        : TransformBase<AddMetadata3Transform<TInputOutput>
            , TInputOutput, TInputOutput>
        where TInputOutput : EtlMetadata // Row is-an EtlMetadata
    {
        internal AddMetadata3Transform(
              in DownstreamFactory<TInputOutput> downstreamFactory
            , string workerName
            )
            : base(downstreamFactory, workerName)
        {
        }
    
        // Override this method to process all incoming rows
        protected async override Task<OutcomeStatus> RunAsync()
        {
            var buffer = new TInputOutput[Input.BufferCapacity];
            while (true)
            {
                int count;
                var countTask = Input.TakeBufferAsync(buffer);
                // Avoid await if successfully completed synchronously
                if (!countTask.IsCompletedSuccessfully)
                    count = await countTask.ConfigureAwait(false);
                else
                    count = countTask.Result; // Result also observes any task exception
                if (count == 0)
                {
                    Output.SendSucceeded();
                    // No more rows, return input port outcome
                    return Input.Status.ToOutcomeStatus(false);
                }
    
                for (int i = 0; i < count; i++)
                {
                    buffer[i].CreationGuid = WorkerSystem.CreationGuid;
                    buffer[i].InstantCreated = WorkerSystem.InstantCreated;
                }
    
                var succeededTask = Output.SendRowsAsync(buffer, count);
                // Avoid await if successfully completed synchronously
                if (succeededTask.Status != TaskStatus.RanToCompletion)
                    await succeededTask.ConfigureAwait(false);
                if (!succeededTask.Result) // Result also observes any task exception
                    return OutcomeStatus.Error(ALogCategory.OutputPortUnexpectedNoDemand);
            }
        }
    }
    
    public static class AddMetadata3TransformFactory // Define factory method(s)
    {
        public static AddMetadata3Transform<TInputOutput>
            AddMetadata3Transform<TInputOutput>(
                  in this DownstreamFactory<TInputOutput> downstreamFactory
                , string workerName
                )
            where TInputOutput : EtlMetadata // Row is-an EtlMetadata
        {
            return new AddMetadata3Transform<TInputOutput>(downstreamFactory, workerName);
        }
    }
    

    Compose Transform with Pass-through

    Also see Custom Dataflow Pass-through Workers, which provides an overview of the dataflow pass-through facilities.

    You can create a custom transform using pass-through by implementing both Compose Source with Pass-through and Compose Target with Pass-through in the same parent dataflow worker:

    Dataflow Pass-through Transform

    This provides excellent encapsulation, since an arbitrarily complex set of regular and dataflow (child) workers can be presented to the user as a single dataflow (parent) transform worker.

    • Pros:
      • Creates reusable worker
      • Can add members (i.e. properties and methods) to the worker
      • Encapsulates and hides the implementation
      • Reuses the implementation of other workers
      • Supports an arbitrary number of input, output, and error ports
    • Cons:
      • If only using it once, requires somewhat more code compared to not encapsulating the required workers
    Note
    • Pass-through is a Pro feature, see Licensing for details
    • The parent worker and child worker ports are not 'linked' per se, and do not have the same parent worker which would otherwise be required. Instead, the parent and child dataflows interact as if the other is an (extremely fast) external data source.

    See Also

    • Dataflow
      • Custom Dataflow Workers
        • Custom Dataflow Source
        • Custom Dataflow Target
        • Custom Dataflow Pass-through Workers
        • Custom Dataflow Column Mapping
        • Slowly Changing Dimension Example
    In This Article
    Back to top Copyright © 2020 Envobi Ltd