Search Results for

    Show / Hide Table of Contents

    Custom Dataflow Source

    This article describes how to implement a custom dataflow source worker, on top of what's covered in Custom Workers and Custom Dataflow Workers. Please also see the list of dataflow Source Workers.

    Single Source Worker

    These examples all implement the same simple logic, generating 1000 MyData rows:

    using actionETL;
    using actionETL.Logging;
    using System.Threading.Tasks;
    
    public class MyData
    {
        public string Data { get; set; } = "Hello world";
    }
    

    The examples use different techniques, ranging from ad-hoc to reusable, and from simple but limited to verbose and flexible; use the approach that best matches your requirements.

    Ad-hoc RowsActionSource

    Instantiate RowsActionSource<TOutput, TError> (with error output port) or RowsActionSource<TOutput> (without error output port). They both have one data output port, and you must supply a callback that generates rows (there is at least BufferCapacity row demand on each invocation). The workers are the ad-hoc versions of RowsSourceBase.

    Note

    For RowsActionSource, ActionSource, and similar workers:

    • Inside the callback, the worker is accessible as an arbitrarily named parameter (here ras, ras.Output etc.)
    • Variables outside the callback are accessed as captured variables
    • Pros:
      • Allows buffer row processing
      • Allows asynchronous processing
      • Very simple implementation, avoids inheriting and checking for output demand
    • Cons:
      • Does not create a reusable worker
      • Does not support:
        • Adding members to the worker
        • Multiple output ports
    private const int _rowCount = 1000;
    
    long sent0 = 0;
    var source0 = new RowsActionSource<MyData>(root, "RowsActionSource"                    
        , ras => // Provide callback to generate source rows
        {
            while (sent0 < _rowCount)
            {
                if (ras.Output.TrySendRow(new MyData()))
                    sent0++;
                else
                    return ProgressStatus.NotCompletedTask;
            }
    
            return ProgressStatus.SucceededTask;
        });
    

    Ad-hoc ActionSource

    Instantiate ActionSource<TOutput, TError> (with error output port) or ActionSource<TOutput> (without error output port). They both have one data output port, and you must supply a callback that generates rows. The workers are the ad-hoc versions of SourceBase.

    • Pros:
      • Allows buffer row processing
      • Simple implementation, avoids inheriting
      • Supports multiple output ports (but one must be Output)
    • Cons:
      • Does not create a reusable worker
      • Cannot add members to the worker
      • Callback must be asynchronous
    var source1 = new ActionSource<MyData>(root, "ActionSource"                      
        , async s => // Provide asynchronous callback to generate source rows
        {
            long sent = 0;
            while (sent < _rowCount)
            {
                var row = new MyData();
                if (s.Output.TrySendRow(row)
                        || await s.Output.SendRowAsync(row).ConfigureAwait(false))
                    sent++;
                else
                    return OutcomeStatus.Error(ALogCategory.OutputPortUnexpectedNoDemand);
            }
    
            s.Output.SendSucceeded(); // No more rows, successful outcome
            return OutcomeStatus.Succeeded;
        });
    

    Inherit from RowSourceBase

    Inherit from RowSourceBase<TDerived, TOutput>, which has one data output port, and you must override OnOutputRowDemand() to generate each row.

    • Pros:
      • Creates reusable worker
      • Can add members to the worker (see RowCount property below)
      • Row-by-row synchronous row processing
      • Simple implementation, avoids checking for output demand
    • Cons:
      • Does not support:
        • Asynchronous processing
        • Multiple output ports
        • Buffer row processing
    public class RowSourceBaseSource<TOutput>
        : RowSourceBase<RowSourceBaseSource<TOutput>,TOutput>
        where TOutput : class, new() // Dataflow rows must have 'class' constraint
    {
        // public enables siblings and ancestors to change it
        public long RowCount 
        {
            get => _rowCount.GetValueUnchecked();
            set => _rowCount.SetValueChecked(value, WorkerParentState.Created);
        }
        private readonly SafeWorkerParentValue<long> _rowCount;
    
        public RowSourceBaseSource(
            WorkerParent workerParent
            , string workerName
            , long rowCount
            )
            : base(workerParent, workerName, null)
        {
            _rowCount = new SafeWorkerParentValue<long>(this, rowCount);
        }
    
        // Override this method to generate source rows
        protected override ProgressStatusResult<TOutput> OnOutputRowDemand()
        {
            if (_sent++ < RowCount)
                return ProgressStatusResult.NotCompleted(new TOutput());
    
            return ProgressStatusResult.Succeeded<TOutput>(null);
        }
    
        private long _sent;
    }
    

    Inherit from RowsSourceBase

    Inherit from RowsSourceBase<TDerived, TOutput, TError> (with error output port) or RowsSourceBase<TDerived, TOutput> (without error output port). They both have one data output port, and you must override OnOutputRowsDemandAsync() to generate rows (there is at least BufferCapacity row demand on each invocation). The workers are the reusable versions of RowsActionSource.

    • Pros:
      • Creates reusable worker
      • Can add members to the worker (see RowCount property below)
      • Allows buffer row processing
      • Allows asynchronous processing
      • Simple implementation, avoids checking for output demand
    • Cons:
      • Does not support multiple output ports
    public class RowsSourceBaseSource<TOutput>
        : RowsSourceBase<RowsSourceBaseSource<TOutput>,TOutput>
        where TOutput : class, new() // Dataflow rows must have 'class' constraint
    {
        // public enables siblings and ancestors to change it
        public long RowCount
        {
            get => _rowCount.GetValueUnchecked();
            set => _rowCount.SetValueChecked(value, WorkerParentState.Created);
        }
        private readonly SafeWorkerParentValue<long> _rowCount;
    
        public RowsSourceBaseSource(
            WorkerParent workerParent
            , string workerName
            , long rowCount
            )
            : base(workerParent, workerName, null)
        {
            _rowCount = new SafeWorkerParentValue<long>(this, rowCount);
        }
    
        // Override this method to generate source rows
        protected override Task<ProgressStatus> OnOutputRowsDemandAsync()
        {
            while (_sent < RowCount)
            {
                if (Output.TrySendRow(new TOutput()))
                    _sent++;
                else
                    return ProgressStatus.NotCompletedTask;
            }
    
            return ProgressStatus.SucceededTask;
        }
    
        private long _sent;
    }
    

    Inherit from SourceBase or WorkerBase

    Inherit from SourceBase<TDerived, TOutput, TError> (with error output port) or SourceBase<TDerived, TOutput> (without error output port). They both have one data output port. The workers are the reusable versions of ActionSource. Alternatively, inherit from WorkerBase<TDerived>, and add required ports.

    You must override RunAsync() to generate rows.

    • Pros:
      • Creates reusable worker
      • Full flexibility
        • Can add members to the worker (see RowCount property below)
        • Allows buffer row processing
        • Allows asynchronous processing
        • Supports multiple output ports
    • Cons:
      • Requires checking for output demand
    public class SourceBaseSource<TOutput>
        : SourceBase<SourceBaseSource<TOutput>,TOutput>
        where TOutput : class, new() // Dataflow rows must have 'class' constraint
    {
        // public enables siblings and ancestors to change it
        public long RowCount
        {
            get => _rowCount.GetValueUnchecked();
            set => _rowCount.SetValueChecked(value, WorkerParentState.Created);
        }
        private readonly SafeWorkerParentValue<long> _rowCount;
    
        public SourceBaseSource(
            WorkerParent workerParent
            , string workerName
            , long rowCount
            )
            : base(workerParent, workerName, null)
        {
            _rowCount = new SafeWorkerParentValue<long>(this, rowCount);
        }
    
        // Override this method to generate source rows
        protected async override Task<OutcomeStatus> RunAsync()
        {
            long sent = 0;
            long rowCount = RowCount; // Cache value while running
            while (sent < rowCount)
            {
                var row = new TOutput();
                if (Output.TrySendRow(row) 
                        || await Output.SendRowAsync(row).ConfigureAwait(false))
                    sent++;
                else
                    return OutcomeStatus.Error(ALogCategory.OutputPortUnexpectedNoDemand);
            }
    
            Output.SendSucceeded(); // No more rows, successful outcome
            return OutcomeStatus.Succeeded;
        }
    }
    

    Compose Source with Pass-through

    Also see Custom Dataflow Pass-through Workers, which provides an overview of the dataflow pass-through facilities.

    By using PortPassThroughTarget<TInputOutput> as one of the child workers, one can connect an output port in the worker children dataflow to a parent output (or error output) port, thereby making the children provide the implementation for the parent output port.

    This provides excellent encapsulation, since an arbitrarily complex set of regular and dataflow (child) workers can be presented to the user as a single dataflow (parent) source worker.

    • Pros:
      • Creates reusable worker
      • Can add members to the worker (see filename properties below)
      • Encapsulates and hides the implementation
      • Supports multiple output ports
      • Reuses the implementation of other workers
    • Cons:
      • If only using it once, requires somewhat more code compared to not encapsulating the required workers
    Note
    • Pass-through is a Pro feature, see Licensing for details
    • The parent worker and child worker ports are not 'linked' per se, and do not have the same parent worker which would otherwise be required. Instead, the parent and child dataflows interact as if the other is an (extremely fast) external data source.

    The below example defines a new MoveReadFlatFileSource dataflow source worker that moves a flat file before reading its rows. It encapsulates multiple child workers as building blocks to perform the processing:

    Dataflow Pass-through Source

    using actionETL;
    using actionETL.FileHelper;
    using FileHelpers;
    using System;
    using System.Threading.Tasks;
    
    // A dataflow source worker that moves a flat file before reading its rows.
    public class MoveReadFlatFileSource<TOutput>
        : SourceBase<MoveReadFlatFileSource<TOutput>,TOutput> // Inherit an output port
        where TOutput : class
    {
        // Add parameters and additional overloads as needed to correctly configure
        // the worker and to make it easy to use.
        public MoveReadFlatFileSource(
              WorkerParent workerParent
            , string workerName
            , Func<bool> isStartableFunc = null
            , string sourceFileName = null
            , string targetFileName = null
            )
        : base(
              workerParent
            , workerName
            , isStartableFunc
            )
        {
            _sourceFileName = new SafeWorkerParentValue<string>(this, sourceFileName);
            _targetFileName = new SafeWorkerParentValue<string>(this, targetFileName);
        }
    
        // Replace the source and transform below with the worker(s) needed to implement
        // your processing.
        protected async override Task<OutcomeStatus> RunAsync()
        {
            var moveFile = new MoveFileWorker(this, "Inner move file"
                , SourceFileName, TargetFileName, true);
    
    
            new FileHelperFileSource<TOutput>(this, "Inner source"
                , () => moveFile.IsSucceeded, TargetFileName)
    
            .Output.Link.PortPassThroughTarget("Inner target", this.Output);
    
    
            // Must explicitly run the children when using port pass-through.
            return await RunChildrenAsync().ConfigureAwait(false);
        }
    
        // Add properties as needed, either completely new ones, or as a pass-through
        // to properties of the encapsulated workers.
        public string SourceFileName
        {
            get => _sourceFileName.GetValueUnchecked();
            set => _sourceFileName.SetValueChecked(value, WorkerParentState.Created);
        }
        private readonly SafeWorkerParentValue<string> _sourceFileName;
    
        public string TargetFileName
        {
            get => _targetFileName.GetValueUnchecked();
            set => _targetFileName.SetValueChecked(value, WorkerParentState.Created);
        }
        private readonly SafeWorkerParentValue<string> _targetFileName;
    }
    
    // ******************************************************************************
    // Use the worker defined above
    
    public static class MoveReadFlatFileSourceExample
    {
        [DelimitedRecord(",")]
        private class Category
        {
            public int CategoryId;
            public string CategoryName;
        }
    
        public static void Run()
        {
            new WorkerSystem(nameof(MoveReadFlatFileSourceExample))
                .Root(ws =>
                {
                    new MoveReadFlatFileSource<Category>(ws, "Source", null
                        , @"Src/DataflowPassThrough/Incoming.csv"
                        , @"Src/DataflowPassThrough/Processing.csv"
                        )
    
                    .Output.Link.CollectionTarget();
                })
                .Start()
                .ThrowOnFailure();
        }
    

    See Also

    • Dataflow
      • Custom Dataflow Workers
        • Custom Dataflow Transform
        • Custom Dataflow Target
        • Custom Dataflow Pass-through Workers
        • Custom Dataflow Column Mapping
        • Slowly Changing Dimension Example
    In This Article
    Back to top Copyright © 2020 Envobi Ltd