Custom Dataflow Workers
Since a dataflow worker is simply a normal worker with ports added, all the techniques described in Custom Workers also apply to dataflow workers. This and related articles will mainly cover the aspects that are unique to custom dataflow workers.
Design Decisions
The following is a checklist of design decisions to consider specifically when implementing custom dataflow workers.
Ports
- Source, transform, or target worker?
- Which input ports, if any?
- What are the ports logical blocking requirements, and how to satisfy them in the implementation (see Dataflow Blocking and Row Buffering)?
- Which output ports, if any?
- Which error output ports, if any?
- What error output port types?
- Same as an input port?
- StringRowErrors?
- Custom type?
- Only provide worker with error output port, and/or version without it?
- What error output port types?
- Transforms:
- Can the output row type(s) be different from the input row type(s)?
- If yes, also provide version where the types are the same, if that makes it easier to instantiate for the user?
- Can the output row type(s) be different from the input row type(s)?
- Which input ports, if any?
- What to name any added port properties (see Ports)?
- Implement with Custom Dataflow Pass-through Workers (i.e. compose existing child workers and connect their ports to the parent ports)?
- Implement from scratch (i.e. without pass-through workers)?
- Synchronous or asynchronous input/output row processing?
- Synchronous can be slightly simpler to implement
- A number of helper workers exist to allow your code to still be synchronous. Internally they are still wholly or partly asynchronous, which avoids blocking the thread.
- Asynchronous is required e.g. if accessing external data sources asynchronously, or waiting on multiple input ports, or waiting on multiple output ports.
- Synchronous can be slightly simpler to implement
- Row-by-row, or row buffer-by-row buffer processing?
- Workers can send or take port rows either one by one, or multiple rows at a time, the latter having slightly lower overhead. The difference is small enough that worker implementations should generally use the approach that provides the least complex code.
- If you have multiple similar input ports, or multiple similar output ports, avoid blocking or starving some ports. E.g. check/take/send rows or row buffers in a round robin order.
- Synchronous or asynchronous input/output row processing?
Rows and Columns
- Do rows and/or columns need to be copied or cloned (see Dataflow Column Mapping and Custom Dataflow Column Mapping)?
- Do rows and/or columns need to be compared or sorted (see Compare Dataflow Columns and Custom Dataflow Column Mapping)?
- Consider using
Expression Trees
to generate high performance code to process each row, e.g. when using
FromTypeRowMapper<TFrom>, FromTypeColumnMapper<TFrom>,
ToTypeRowMapper<TTo>, or ToTypeColumnMapper<TTo>.
- GetMemberAccessExpression(Expression) helps with creating expressions to access individual columns or column schemas.
- ConvertExpressionInfo applies implicit conversions between data types.
Constructors
When defining a new dataflow worker, for the constructor parameters:
- All
publicworker constructors SHOULD start with the parametersWorkerParent parentWorker, string workerName - A source SHOULD have at least one constructor with
Func<bool> isStartableFuncas its third parameter - For transforms and targets:
- The constructor SHOULD start with the parameters
in DownstreamFactory<TInput> downstreamFactory, string workerName - The constructor SHOULD NOT have any
Func<bool> isStartableFuncparameter in any overload - If it has a fixed number of input ports, there SHOULD be parameters
added directly after the
workerNameparameter to at least some overloads, to link them to upstream ports, since input ports parameters are almost always specified at worker creation. The worker constructors SHOULD however also accept anullport value, to allow the user to also link ports after worker creation. As one example, see TargetBase<TDerived, TInput>.
- The constructor SHOULD start with the parameters
- Parameters for linking output ports to downstream workers SHOULD NOT be added to the worker constructors, since output port parameters would almost never be specified at worker creation.
Also see Custom Workers.
Transform and Target Factory Methods
While dataflow sources and non-dataflow workers are instantiated via constructors, all out of box transforms and targets are instead instantiated via factory methods, implemented as extension methods on DownstreamFactory<TOutput>, which is most commonly accessed via the Link property on output and error output ports.
This approach reduces typing and code maintenance effort by allowing the library user to in most cases:
- Avoid specifying the parent when creating a transform or target
- Only specify each dataflow row type once, usually on the source worker, and have the downstream transforms and targets automatically resolve the row types (at compile time)
- Use a fluent coding style when creating dataflow workers
For seamless integration, custom transforms and targets SHOULD be instantiated via factory methods, which are simple to add since they are just a thin wrapper around the constructor, e.g.:
public static class AddMetadata1TransformFactory // Define factory method(s)
{
public static AddMetadata1Transform<TInputOutputError>
AddMetadata1Transform<TInputOutputError>(
in this DownstreamFactory<TInputOutputError> downstreamFactory
, string workerName
)
where TInputOutputError : EtlMetadata // Row is-an EtlMetadata
{
return new AddMetadata1Transform<TInputOutputError>(downstreamFactory, workerName);
}
}
Use the following pattern:
- In the transform/target class (e.g.
MyTransform), implement a minimal number of constructor variations, and 'hide' them by making theminternal(which works particularly well if custom workers are defined in a separate utility project) - Define a class (i.e.
public static class MyTransformFactory), and add the factory methods, each of which should:- Extend the DownstreamFactory<TOutput>
readonly struct, and:- Use its WorkerParent property to set the transform/target worker parent
- Use its UpstreamPort property (which can be
null) as the transform/target "first" input port:- Pick input ports with regular data rows (e.g.
Input) before other types (e.g.DictionaryInput) - Pick the 'first' input port, e.g. pick
LeftInputbeforeRightInput - Pick input ports with dedicated properties (e.g.
Input, if available) before input ports without any dedicated property
- Pick input ports with regular data rows (e.g.
- Return a new instance of the transform/target worker
- Extend the DownstreamFactory<TOutput>
- In both the worker class and the factory class, define
DownstreamFactoryas aninparameter (see examples)
See Worker Instantiation, and
e.g. the implementation examples
Inherit from RowTransformBase and
Inherit from RowsTargetBase.
Adding Ports
In this example we define a new target worker and show:
- The common case of adding a port and its property, in this case an error output port
- The more uncommon case of adding (in this case zero or more input) ports, where
how many to add isn't known ahead of time, and therefore not adding any dedicated
property for each port. They can instead be accessed (without a row type) from the
Inputs property, and as per Ports,
we add a
TypedInputsproperty to easily access them with their row type.
using actionETL;
using System.Threading.Tasks;
public class InputsErrorTarget<TInputError>
: WorkerBase<InputsErrorTarget<TInputError>>
where TInputError : class // Dataflow rows must have 'class' constraint
{
// Add port property for ease of access:
public ErrorOutputPort<TInputError> ErrorOutput { get; }
internal InputsErrorTarget(
in DownstreamFactory<TInputError> downstreamFactory
, string workerName
, int totalInputCount
)
: base(downstreamFactory.WorkerParent, workerName, null)
{
// Create error output port with dedicated property:
ErrorOutput = ErrorOutputs.Create<TInputError>(nameof(ErrorOutput));
// Create input ports all with the same row type:
TypedInputs = new InputPortCollection<TInputError>(this);
for (int i = 0; i < totalInputCount; i++)
Inputs.Create<TInputError>("Input" + i, null);
// Link the downstreamFactory port, if present:
if (downstreamFactory.UpstreamPort != null && totalInputCount > 0)
TypedInputs[0].LinkFrom(downstreamFactory.UpstreamPort);
}
protected async override Task<OutcomeStatus> RunAsync()
{
// Use a typed input port
var row0 = TypedInputs[0].TryTakeRow();
// Use the error output port
if (row0 != null)
{
var progressStatus = ErrorOutput.SendErrorRow(row0, "My error message.");
if (progressStatus.IsFailed)
return progressStatus.ToOutcomeStatus(false);
}
// Use inputs by order added (here we're only emptying them).
// Ports can also be indexed by name.
for (int i = 0; i < Inputs.Count; i++)
await Inputs[i].SkipAllRowsAsync().ConfigureAwait(false);
return OutcomeStatus.Succeeded;
}
public InputPortCollection<TInputError> TypedInputs { get; private set; }
}
public static class InputsErrorTargetFactory
{
public static InputsErrorTarget<TInputError> InputsErrorTarget<TInputError>(
in this DownstreamFactory<TInputError> downstreamFactory
, string workerName
, int totalInputCount
)
where TInputError : class
{
return new InputsErrorTarget<TInputError>(
downstreamFactory
, workerName
, totalInputCount
);
}
}
The custom target worker can now be used:
new WorkerSystem().Root(ws =>
{
var source0 = new EnumerableSource<SmallClass>(ws, "Source0", SmallClass.GetRange(10));
var source1 = new EnumerableSource<SmallClass>(ws, "Source1", SmallClass.GetRange(20));
var target = source0.Output.Link.InputsErrorTarget("Target", 2);
source1.Output.LinkTo(target.TypedInputs[1]);
var errorTarget = target.ErrorOutput.Link.CollectionTarget();
}
).Start().ThrowOnFailure();
Note
Only add input ports and data output ports to workers that either:
- Have no built-in row processing (WorkerBase<TDerived>, SourceBase<TDerived, TOutput>, ...), or
- Have built-in row processing, but are designed to accept additional ports (UnionAllTransform<TInputOutput>, MergeSortedTransform<TInputOutput>, ...)
Breaking these rules by adding ports to e.g. SortTransform<TInputOutput> would leave the added ports without suitable row processing logic, which would normally hang the dataflow.
Row Processing
Workers with ports must follow these rules:
- For each input port, the worker MUST either take or skip over all rows until the port signals it's completed, or cancel the port (immediately or after taking some rows).
- For each data output port, the worker (after sending any desired rows) MUST complete the port.
Breaking the above rules results in the port, the worker, and the worker system getting a Fatal status.
Note however that a few out-of-box workers (e.g. RowSourceBase<TDerived, TOutput>) and some callbacks
(e.g. AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>>))
can complete ports automatically on behalf of the library user; see the API documentation for details.
- Error output ports on the other hand do not need to be completed.
Please also see the above, Custom Dataflow Source, Custom Dataflow Transform, and Custom Dataflow Target examples.
Error Handling
Beyond what applies to all workers, on detecting an error that will fail the worker, dataflow workers:
- MUST cancel all uncompleted input ports by calling InputPortCollection.TryCancel(), or InputPort.TryCancel() on each port
- MUST complete all uncompleted data output ports by calling an
OutputPortBaseCollection.TrySendError() overload,
or an OutputPortBase.SendError() overload on
each port
- Note: Call
TrySendError/SendError(as opposed toTrySendSucceeded/SendSucceeded) even if the error was caused by an upstream worker. That way the downstream worker knows that the sending of rows was unsuccessful.
- Note: Call
- SHOULD fail the worker with details appropriate to the failure. E.g. a transform without any error port that receives an incorrect row might return a failure status such as:
return OutcomeStatus.Fatal(ALogCategory.RowErrorRowFailure, exception
, "User callback threw an exception");
Note
The above approach allows the workers to fail with an easy to troubleshoot error message. Any uncompleted input or data output ports when the worker completes, and any uncaught exception, will still generate additional (and always fatal) error messages, although they will be slightly less easy to troubleshoot.