Custom Dataflow Transform
This article describes how to implement custom dataflow transform workers, on top of what's covered in Custom Workers and Custom Dataflow Workers. Please also see the list of dataflow Transforms.
Note
A transform is simply the combination of a source and a target in a single worker. Not only does adding input(s) and output(s) to a regular worker create a transform, but so does adding input(s) to a source, and adding output(s) to a target. Pick a suitable worker as a starting point, and add any missing ports.
Single Transform Worker
These examples all implement the same logic, populating the fields of a custom EtlMetadata
class, which is then used as a base class for dataflow row types. These metadata fields
can e.g. be stored with the row in databases etc., to simplify linking different sets of
user data to each other, assist troubleshooting, and to link user data to logging information:
using actionETL;
using actionETL.Logging;
using System;
using System.Threading.Tasks;
public class EtlMetadata // Could also use an interface
{
public Guid CreationGuid { get; set; }
public NodaTime.Instant InstantCreated { get; set; }
}
public class MyBudget : EtlMetadata
{
public decimal Budget { get; set; }
}
Note
There are many useful variations on this approach, e.g.:
- Use an
IEtlMetadata
interface, so that user row types can have different base classes - Add additional metadata members
The examples below use different techniques, ranging from ad-hoc to reusable, and from simple but limited to verbose and flexible; use the approach that best matches your requirements.
Ad-hoc RowActionTransform
Instantiate RowActionTransform<TInputOutputError> (input and output are the same type) or RowActionTransform<TInputError, TOutput> (input and output are different types), both with an error output port. You must supply a callback that processes each incoming row; there are also overloads that allow you to reject rows to the error output, or discard them. This worker is the ad-hoc version of RowTransformBase.
- Pros:
- Row-by-row synchronous row processing
- Very simple implementation, avoids inheriting and checking for available input rows or output demand
- Cons:
- Does not create a reusable worker
- Does not support:
- Adding members to the worker
- Asynchronous processing
- Multiple input or output ports
- Buffer row processing
var transform0 = source0.Output.Link.RowActionTransform1(
"RowActionTransform"
, row => // Provide callback to process each incoming row
{
row.CreationGuid = root.CreationGuid;
row.InstantCreated = root.InstantCreated;
}
);
Ad-hoc RowsActionTransform
Instantiate RowsActionTransform<TInputOutputError> (input and output are the same type)
or RowsActionTransform<TInputError, TOutput> (input and output are different types),
both with an error output port. You must supply a callback that processes available
incoming rows and outputs rows (there is at least BufferCapacity
row demand on each
invocation). This worker is the ad-hoc version of
RowsTransformBase.
Note
For RowsActionTransform
, ActionTransform
, and similar workers:
- Inside the callback, the worker is accessible as an arbitrarily named parameter (here
rat
,rat.input
etc.) - Variables outside the callback are accessed as captured variables
- Pros:
- Allows buffer row processing
- Allows asynchronous processing
- Very simple implementation, avoids inheriting and checking for available input rows or output demand
- Cons:
- Does not create a reusable worker
- Does not support:
- Adding members to the worker
- Multiple input or output ports
var transform1 = source1.Output.Link.RowsActionTransform1(
"RowsActionTransform"
, rat => // Provide callback to process available incoming rows
{
foreach (var row in rat.Input.TakeBuffer())
{
row.CreationGuid = root.CreationGuid;
row.InstantCreated = root.InstantCreated;
rat.Output.SendRow(row);
}
}
);
Ad-hoc ActionTransform
Instantiate ActionTransform<TInputOutputError> (input and output are the same type) or ActionTransform<TInputError, TOutput> (input and output are different types), both with an error output port. You must supply a callback that processes all incoming rows. This worker is the ad-hoc version of TransformBase.
- Pros:
- Allows buffer row processing
- Less code than inheriting when only used a single time
- Supports multiple input and output ports (but they must include
Input
andOutput
)
- Cons:
- Does not create a reusable worker
- Cannot add members to the worker
- Callback must be asynchronous
Note
There is also a similar ActionTwoInputTransform<TLeftInput, TRightInput, TOutput> worker, that instead has two inputs.
Although we could have used an enumerable and row-by-row processing also with ActionTransform
,
here we instead demonstrate array buffer processing, which has lower overhead vs. row-by-row.
The performance difference is however only noticeable with many millions of rows, when the
per-row work is very brief, as in this case:
var transform2 = source2.Output.Link.ActionTransform1(
"ActionTransform"
, async at => // Provide callback to process all incoming rows
{
var buffer = new MyBudget[at.Input.BufferCapacity];
while (true)
{
var countTask = at.Input.TakeBufferAsync(buffer);
// Avoid await if successfully completed synchronously
if (!countTask.IsCompletedSuccessfully)
await countTask.ConfigureAwait(false);
int count = countTask.Result; // Result also observes any task exception
if (count == 0)
{
at.Output.SendSucceeded();
// No more rows, return input port outcome
return at.Input.Status.ToOutcomeStatus(false);
}
for (int i = 0; i < count; i++)
{
buffer[i].CreationGuid = root.CreationGuid;
buffer[i].InstantCreated = root.InstantCreated;
}
var succeededTask = at.Output.SendRowsAsync(buffer, count);
// Avoid await if successfully completed synchronously
if (succeededTask.Status != TaskStatus.RanToCompletion)
await succeededTask.ConfigureAwait(false);
if (!succeededTask.Result) // Result also observes any task exception
return OutcomeStatus.Error(ALogCategory.OutputPortUnexpectedNoDemand);
}
});
Inherit from RowTransformBase
Inherit from RowTransformBase<TDerived, TInputOutputError> (input and output are the same type) or RowTransformBase<TDerived, TInputError, TOutput> (input and output are different types), both with an error output port. They both have one input and one data output port, and you must override OnInputRow(TInputOutputError) to process each incoming row. These workers are the reusable versions of RowActionTransform.
- Pros:
- Creates reusable worker
- Can add members to the worker
- Row-by-row synchronous row processing
- Simple implementation, avoids checking for available input rows or output demand
- Cons:
- Does not support:
- Asynchronous processing
- Multiple input or output ports
- Buffer row processing
- Does not support:
public class AddMetadata1Transform<TInputOutputError>
: RowTransformBase<AddMetadata1Transform<TInputOutputError>, TInputOutputError>
where TInputOutputError : EtlMetadata // Row is-an EtlMetadata
{
internal AddMetadata1Transform(
in DownstreamFactory<TInputOutputError> downstreamFactory
, string workerName
)
: base(downstreamFactory, workerName, true)
{
}
// Override this method to process each incoming row
protected override TransformRowTreatment OnInputRow(TInputOutputError inputRow)
{
inputRow.CreationGuid = WorkerSystem.CreationGuid;
inputRow.InstantCreated = WorkerSystem.InstantCreated;
return TransformRowTreatment.Send;
}
}
public static class AddMetadata1TransformFactory // Define factory method(s)
{
public static AddMetadata1Transform<TInputOutputError>
AddMetadata1Transform<TInputOutputError>(
in this DownstreamFactory<TInputOutputError> downstreamFactory
, string workerName
)
where TInputOutputError : EtlMetadata // Row is-an EtlMetadata
{
return new AddMetadata1Transform<TInputOutputError>(downstreamFactory, workerName);
}
}
Inherit from RowsTransformBase
Inherit from RowsTransformBase<TDerived, TInput, TOutput, TError> (with error output port) or RowsTransformBase<TDerived, TInput, TOutput> (without error output port). They both have one input and one data output port, and you must override OnRowsAndDemandAsync() to process incoming rows. These workers are the reusable versions of RowsActionTransform.
- Pros:
- Creates reusable worker
- Can add members to the worker
- Allows buffer row processing
- Allows asynchronous processing
- Simple implementation, avoids checking for available input rows or output demand
- Cons:
- Does not support multiple input or output ports
public class AddMetadata2Transform<TInputOutput>
: RowsTransformBase<AddMetadata2Transform<TInputOutput>
, TInputOutput, TInputOutput>
where TInputOutput : EtlMetadata // Row is-an EtlMetadata
{
internal AddMetadata2Transform(
in DownstreamFactory<TInputOutput> downstreamFactory
, string workerName
)
: base(downstreamFactory, workerName, true)
{
}
// Override this method to process incoming rows
protected override Task<ProgressStatus> OnRowsAndDemandAsync()
{
foreach (var row in Input.TakeBuffer())
{
row.CreationGuid = WorkerSystem.CreationGuid;
row.InstantCreated = WorkerSystem.InstantCreated;
Output.SendRow(row);
}
return ProgressStatus.NotCompletedTask;
}
}
public static class AddMetadata2TransformFactory // Define factory method(s)
{
public static AddMetadata2Transform<TInputOutput>
AddMetadata2Transform<TInputOutput>(
in this DownstreamFactory<TInputOutput> downstreamFactory
, string workerName
)
where TInputOutput : EtlMetadata // Row is-an EtlMetadata
{
return new AddMetadata2Transform<TInputOutput>(downstreamFactory, workerName);
}
}
Inherit from TransformBase
or WorkerBase
Inherit from TransformBase<TDerived, TInput, TOutput>, which has one input and one data output port (add an error output port if needed). This worker is the reusable version of ActionTransform. Alternatively, inherit from WorkerBase<TDerived>, and add required ports.
You must override RunAsync() to process all incoming rows.
- Pros:
- Creates reusable worker
- Full flexibility
- Can add members to the worker
- Allows buffer row processing
- Allows asynchronous processing
- Supports multiple input and output ports
- Cons:
- Requires checking for available input rows and output demand
Note
There is also a similar TwoInputTransformBase<TDerived, TLeftInput, TRightInput, TOutput> worker, that instead has two inputs.
Although we could have used an enumerable and row-by-row processing also with TransformBase
,
here we instead demonstrate array buffer processing, which has lower overhead vs. row-by-row.
The performance difference is however only noticeable with many millions of rows, when the
per-row work is very brief, as in this case:
public class AddMetadata3Transform<TInputOutput>
: TransformBase<AddMetadata3Transform<TInputOutput>
, TInputOutput, TInputOutput>
where TInputOutput : EtlMetadata // Row is-an EtlMetadata
{
internal AddMetadata3Transform(
in DownstreamFactory<TInputOutput> downstreamFactory
, string workerName
)
: base(downstreamFactory, workerName)
{
}
// Override this method to process all incoming rows
protected async override Task<OutcomeStatus> RunAsync()
{
var buffer = new TInputOutput[Input.BufferCapacity];
while (true)
{
int count;
var countTask = Input.TakeBufferAsync(buffer);
// Avoid await if successfully completed synchronously
if (!countTask.IsCompletedSuccessfully)
count = await countTask.ConfigureAwait(false);
else
count = countTask.Result; // Result also observes any task exception
if (count == 0)
{
Output.SendSucceeded();
// No more rows, return input port outcome
return Input.Status.ToOutcomeStatus(false);
}
for (int i = 0; i < count; i++)
{
buffer[i].CreationGuid = WorkerSystem.CreationGuid;
buffer[i].InstantCreated = WorkerSystem.InstantCreated;
}
var succeededTask = Output.SendRowsAsync(buffer, count);
// Avoid await if successfully completed synchronously
if (succeededTask.Status != TaskStatus.RanToCompletion)
await succeededTask.ConfigureAwait(false);
if (!succeededTask.Result) // Result also observes any task exception
return OutcomeStatus.Error(ALogCategory.OutputPortUnexpectedNoDemand);
}
}
}
public static class AddMetadata3TransformFactory // Define factory method(s)
{
public static AddMetadata3Transform<TInputOutput>
AddMetadata3Transform<TInputOutput>(
in this DownstreamFactory<TInputOutput> downstreamFactory
, string workerName
)
where TInputOutput : EtlMetadata // Row is-an EtlMetadata
{
return new AddMetadata3Transform<TInputOutput>(downstreamFactory, workerName);
}
}
Compose Transform with Pass-through
Also see Custom Dataflow Pass-through Workers, which provides an overview of the dataflow pass-through facilities.
You can create a custom transform using pass-through by implementing both Compose Source with Pass-through and Compose Target with Pass-through in the same parent dataflow worker:
This provides excellent encapsulation, since an arbitrarily complex set of regular and dataflow (child) workers can be presented to the user as a single dataflow (parent) transform worker.
- Pros:
- Creates reusable worker
- Can add members (i.e. properties and methods) to the worker
- Encapsulates and hides the implementation
- Reuses the implementation of other workers
- Supports an arbitrary number of input, output, and error ports
- Cons:
- If only using it once, requires somewhat more code compared to not encapsulating the required workers
Note
- Pass-through is a Pro feature, see Licensing for details
- The parent worker and child worker ports are not 'linked' per se, and do not have the same parent worker which would otherwise be required. Instead, the parent and child dataflows interact as if the other is an (extremely fast) external data source.