Custom Dataflow Source
This article describes how to implement a custom dataflow source worker, on top of what's covered in Custom Workers and Custom Dataflow Workers. Please also see the list of dataflow Source Workers.
Single Source Worker
These examples all implement the same simple logic, generating 1000 MyData
rows:
using actionETL;
using actionETL.Logging;
using System.Threading.Tasks;
public class MyData
{
public string Data { get; set; } = "Hello world";
}
The examples use different techniques, ranging from ad-hoc to reusable, and from simple but limited to verbose and flexible; use the approach that best matches your requirements.
Ad-hoc RowsActionSource
Instantiate RowsActionSource<TOutput, TError> (with error output port) or
RowsActionSource<TOutput> (without error output port). They both have one data output
port, and you must supply a callback that generates rows (there is at least BufferCapacity
row demand on each invocation). The workers are the ad-hoc versions of
RowsSourceBase.
Note
For RowsActionSource
, ActionSource
, and similar workers:
- Inside the callback, the worker is accessible as an arbitrarily named parameter (here
ras
,ras.Output
etc.) - Variables outside the callback are accessed as captured variables
- Pros:
- Allows buffer row processing
- Allows asynchronous processing
- Very simple implementation, avoids inheriting and checking for output demand
- Cons:
- Does not create a reusable worker
- Does not support:
- Adding members to the worker
- Multiple output ports
private const int _rowCount = 1000;
long sent0 = 0;
var source0 = new RowsActionSource<MyData>(root, "RowsActionSource"
, ras => // Provide callback to generate source rows
{
while (sent0 < _rowCount)
{
if (ras.Output.TrySendRow(new MyData()))
sent0++;
else
return ProgressStatus.NotCompletedTask;
}
return ProgressStatus.SucceededTask;
});
Ad-hoc ActionSource
Instantiate ActionSource<TOutput, TError> (with error output port) or ActionSource<TOutput> (without error output port). They both have one data output port, and you must supply a callback that generates rows. The workers are the ad-hoc versions of SourceBase.
- Pros:
- Allows buffer row processing
- Simple implementation, avoids inheriting
- Supports multiple output ports (but one must be
Output
)
- Cons:
- Does not create a reusable worker
- Cannot add members to the worker
- Callback must be asynchronous
var source1 = new ActionSource<MyData>(root, "ActionSource"
, async s => // Provide asynchronous callback to generate source rows
{
long sent = 0;
while (sent < _rowCount)
{
var row = new MyData();
if (s.Output.TrySendRow(row)
|| await s.Output.SendRowAsync(row).ConfigureAwait(false))
sent++;
else
return OutcomeStatus.Error(ALogCategory.OutputPortUnexpectedNoDemand);
}
s.Output.SendSucceeded(); // No more rows, successful outcome
return OutcomeStatus.Succeeded;
});
Inherit from RowSourceBase
Inherit from RowSourceBase<TDerived, TOutput>, which has one data output port, and you must override OnOutputRowDemand() to generate each row.
- Pros:
- Creates reusable worker
- Can add members to the worker (see
RowCount
property below) - Row-by-row synchronous row processing
- Simple implementation, avoids checking for output demand
- Cons:
- Does not support:
- Asynchronous processing
- Multiple output ports
- Buffer row processing
- Does not support:
public class RowSourceBaseSource<TOutput>
: RowSourceBase<RowSourceBaseSource<TOutput>,TOutput>
where TOutput : class, new() // Dataflow rows must have 'class' constraint
{
// public enables siblings and ancestors to change it
public long RowCount
{
get => _rowCount.GetValueUnchecked();
set => _rowCount.SetValueChecked(value, WorkerParentState.Created);
}
private readonly SafeWorkerParentValue<long> _rowCount;
public RowSourceBaseSource(
WorkerParent workerParent
, string workerName
, long rowCount
)
: base(workerParent, workerName, null)
{
_rowCount = new SafeWorkerParentValue<long>(this, rowCount);
}
// Override this method to generate source rows
protected override ProgressStatusResult<TOutput> OnOutputRowDemand()
{
if (_sent++ < RowCount)
return ProgressStatusResult.NotCompleted(new TOutput());
return ProgressStatusResult.Succeeded<TOutput>(null);
}
private long _sent;
}
Inherit from RowsSourceBase
Inherit from RowsSourceBase<TDerived, TOutput, TError> (with error output port) or
RowsSourceBase<TDerived, TOutput> (without error output port). They both have one data output
port, and you must override OnOutputRowsDemandAsync()
to generate rows (there is at least BufferCapacity
row demand on each invocation).
The workers are the reusable versions of RowsActionSource.
- Pros:
- Creates reusable worker
- Can add members to the worker (see
RowCount
property below) - Allows buffer row processing
- Allows asynchronous processing
- Simple implementation, avoids checking for output demand
- Cons:
- Does not support multiple output ports
public class RowsSourceBaseSource<TOutput>
: RowsSourceBase<RowsSourceBaseSource<TOutput>,TOutput>
where TOutput : class, new() // Dataflow rows must have 'class' constraint
{
// public enables siblings and ancestors to change it
public long RowCount
{
get => _rowCount.GetValueUnchecked();
set => _rowCount.SetValueChecked(value, WorkerParentState.Created);
}
private readonly SafeWorkerParentValue<long> _rowCount;
public RowsSourceBaseSource(
WorkerParent workerParent
, string workerName
, long rowCount
)
: base(workerParent, workerName, null)
{
_rowCount = new SafeWorkerParentValue<long>(this, rowCount);
}
// Override this method to generate source rows
protected override Task<ProgressStatus> OnOutputRowsDemandAsync()
{
while (_sent < RowCount)
{
if (Output.TrySendRow(new TOutput()))
_sent++;
else
return ProgressStatus.NotCompletedTask;
}
return ProgressStatus.SucceededTask;
}
private long _sent;
}
Inherit from SourceBase
or WorkerBase
Inherit from SourceBase<TDerived, TOutput, TError> (with error output port) or SourceBase<TDerived, TOutput> (without error output port). They both have one data output port. The workers are the reusable versions of ActionSource. Alternatively, inherit from WorkerBase<TDerived>, and add required ports.
You must override RunAsync() to generate rows.
- Pros:
- Creates reusable worker
- Full flexibility
- Can add members to the worker (see
RowCount
property below) - Allows buffer row processing
- Allows asynchronous processing
- Supports multiple output ports
- Can add members to the worker (see
- Cons:
- Requires checking for output demand
public class SourceBaseSource<TOutput>
: SourceBase<SourceBaseSource<TOutput>,TOutput>
where TOutput : class, new() // Dataflow rows must have 'class' constraint
{
// public enables siblings and ancestors to change it
public long RowCount
{
get => _rowCount.GetValueUnchecked();
set => _rowCount.SetValueChecked(value, WorkerParentState.Created);
}
private readonly SafeWorkerParentValue<long> _rowCount;
public SourceBaseSource(
WorkerParent workerParent
, string workerName
, long rowCount
)
: base(workerParent, workerName, null)
{
_rowCount = new SafeWorkerParentValue<long>(this, rowCount);
}
// Override this method to generate source rows
protected async override Task<OutcomeStatus> RunAsync()
{
long sent = 0;
long rowCount = RowCount; // Cache value while running
while (sent < rowCount)
{
var row = new TOutput();
if (Output.TrySendRow(row)
|| await Output.SendRowAsync(row).ConfigureAwait(false))
sent++;
else
return OutcomeStatus.Error(ALogCategory.OutputPortUnexpectedNoDemand);
}
Output.SendSucceeded(); // No more rows, successful outcome
return OutcomeStatus.Succeeded;
}
}
Compose Source with Pass-through
Also see Custom Dataflow Pass-through Workers, which provides an overview of the dataflow pass-through facilities.
By using PortPassThroughTarget<TInputOutput> as one of the child workers, one can connect an output port in the worker children dataflow to a parent output (or error output) port, thereby making the children provide the implementation for the parent output port.
This provides excellent encapsulation, since an arbitrarily complex set of regular and dataflow (child) workers can be presented to the user as a single dataflow (parent) source worker.
- Pros:
- Creates reusable worker
- Can add members to the worker (see filename properties below)
- Encapsulates and hides the implementation
- Supports multiple output ports
- Reuses the implementation of other workers
- Cons:
- If only using it once, requires somewhat more code compared to not encapsulating the required workers
Note
- Pass-through is a Pro feature, see Licensing for details
- The parent worker and child worker ports are not 'linked' per se, and do not have the same parent worker which would otherwise be required. Instead, the parent and child dataflows interact as if the other is an (extremely fast) external data source.
The below example defines a new MoveReadFlatFileSource
dataflow source worker that moves a
flat file before reading its rows. It encapsulates multiple child workers as building blocks
to perform the processing:
using actionETL;
using actionETL.FileHelper;
using FileHelpers;
using System;
using System.Threading.Tasks;
// A dataflow source worker that moves a flat file before reading its rows.
public class MoveReadFlatFileSource<TOutput>
: SourceBase<MoveReadFlatFileSource<TOutput>,TOutput> // Inherit an output port
where TOutput : class
{
// Add parameters and additional overloads as needed to correctly configure
// the worker and to make it easy to use.
public MoveReadFlatFileSource(
WorkerParent workerParent
, string workerName
, Func<bool> isStartableFunc = null
, string sourceFileName = null
, string targetFileName = null
)
: base(
workerParent
, workerName
, isStartableFunc
)
{
_sourceFileName = new SafeWorkerParentValue<string>(this, sourceFileName);
_targetFileName = new SafeWorkerParentValue<string>(this, targetFileName);
}
// Replace the source and transform below with the worker(s) needed to implement
// your processing.
protected async override Task<OutcomeStatus> RunAsync()
{
var moveFile = new MoveFileWorker(this, "Inner move file"
, SourceFileName, TargetFileName, true);
new FileHelperFileSource<TOutput>(this, "Inner source"
, () => moveFile.IsSucceeded, TargetFileName)
.Output.Link.PortPassThroughTarget("Inner target", this.Output);
// Must explicitly run the children when using port pass-through.
return await RunChildrenAsync().ConfigureAwait(false);
}
// Add properties as needed, either completely new ones, or as a pass-through
// to properties of the encapsulated workers.
public string SourceFileName
{
get => _sourceFileName.GetValueUnchecked();
set => _sourceFileName.SetValueChecked(value, WorkerParentState.Created);
}
private readonly SafeWorkerParentValue<string> _sourceFileName;
public string TargetFileName
{
get => _targetFileName.GetValueUnchecked();
set => _targetFileName.SetValueChecked(value, WorkerParentState.Created);
}
private readonly SafeWorkerParentValue<string> _targetFileName;
}
// ******************************************************************************
// Use the worker defined above
public static class MoveReadFlatFileSourceExample
{
[DelimitedRecord(",")]
private class Category
{
public int CategoryId;
public string CategoryName;
}
public static void Run()
{
new WorkerSystem(nameof(MoveReadFlatFileSourceExample))
.Root(ws =>
{
new MoveReadFlatFileSource<Category>(ws, "Source", null
, @"Src/DataflowPassThrough/Incoming.csv"
, @"Src/DataflowPassThrough/Processing.csv"
)
.Output.Link.CollectionTarget();
})
.Start()
.ThrowOnFailure();
}