Custom Dataflow Target
This article describes how to implement a custom dataflow target worker, on top of what's covered in Custom Workers and Custom Dataflow Workers. Please also see the list of dataflow Target Workers.
Single Target Worker
These examples all implement the same simple logic: adding all incoming rows to a collection.
The examples use different techniques, ranging from ad-hoc to reusable, and from simple but limited to verbose and flexible; use the approach that best matches your requirements.
Ad-hoc RowActionTarget
Instantiate RowActionTarget<TInput>, which has one data output port (add an error output port if needed). You must supply a callback that processes each incoming row. This worker is the ad-hoc version of RowTargetBase.
- Pros:
- Row-by-row synchronous row processing
- Very simple implementation, avoids inheriting and checking for input row available
- Cons:
- Does not create a reusable worker
- Does not support:
- Adding members to the worker
- Asynchronous processing
- Multiple output ports
- Buffer row processing
using actionETL;
using System.Collections.Generic;
using System.Threading.Tasks;
var list0 = new List<MyData>();
var target0 = source0.Output.Link.RowActionTarget(
"RowActionTarget"
, row => // Provide callback to process one incoming row
list0.Add(row)
);
Ad-hoc RowsActionTarget
Instantiate RowsActionTarget<TInput>, which has one data output port (add an error output port if needed). You must supply a callback that processes the incoming rows. This worker is the ad-hoc version of RowsTargetBase.
Note
For RowsActionTarget
, ActionTarget
, and similar workers:
- Inside the callback, the worker is accessible as an arbitrarily named parameter (here
rat
,rat.input
etc.) - Variables outside the callback are accessed as captured variables
- Pros:
- Allows buffer row processing
- Allows asynchronous processing
- Very simple implementation, avoids inheriting and checking for input row available
- Cons:
- Does not create a reusable worker
- Does not support:
- Adding members to the worker
- Multiple output ports
var list1 = new List<MyData>();
var target1 = source1.Output.Link.RowsActionTarget(
"RowsActionTarget"
, rat => // Provide callback to process available incoming rows
list1.AddRange(rat.Input.TakeBuffer())
);
Ad-hoc ActionTarget
Instantiate ActionTarget<TInput, TError> (with error output port) or ActionTarget<TInput> (without error output port). They both have one data input port, and you must supply a callback that processes all incoming rows. The workers are the ad-hoc versions of TargetBase.
- Pros:
- Allows buffer row processing
- Simple implementation, avoids inheriting
- Supports multiple input ports (but one must be
Input
)
- Cons:
- Does not create a reusable worker
- Cannot add members to the worker
- Callback must be asynchronous
var list2 = new List<MyData>();
var target2 = source2.Output.Link.ActionTarget(
"ActionTarget"
, async at => // Provide callback to process all incoming rows
{
while (at.Input.HasRow || await at.Input.HasRowAsync().ConfigureAwait(false))
{
list2.AddRange(at.Input.TakeBuffer());
}
// No more rows, return input port outcome
return at.Input.Status.ToOutcomeStatus(false);
});
Inherit from RowTargetBase
or RowWithErrorTargetBase
Inherit from RowWithErrorTargetBase<TDerived, TInputError> (with error output port) or RowTargetBase<TDerived, TInput> (without error output port). They both have one data input port, and you must override OnInputRow(TInput) to process each incoming row. These workers are the reusable versions of RowActionTarget.
- Pros:
- Creates reusable worker
- Can add members to the worker (see
Rows
property below) - Row-by-row synchronous row processing
- Simple implementation, avoids checking for available input rows
- Cons:
- Does not support:
- Asynchronous processing
- Multiple input ports
- Buffer row processing
- Does not support:
public class Collection1Target<TInput>
: RowTargetBase<Collection1Target<TInput>, TInput>
where TInput : class // Dataflow rows must have 'class' constraint
{
internal Collection1Target(
in DownstreamFactory<TInput> downstreamFactory
, string workerName
, ICollection<TInput> rows
)
: base(downstreamFactory, workerName)
{
// Create collection if not provided by user
_rows = new SafeWorkerParentValue<ICollection<TInput>>(
this, rows ?? new List<TInput>());
}
// Override this method to process each incoming row
protected override ProgressStatus OnInputRow(TInput inputRow)
{
_rows.GetValueUnchecked().Add(inputRow);
return ProgressStatus.NotCompleted;
}
public ICollection<TInput> Rows
{
get => _rows.GetValueWhenNotRunning();
}
private readonly SafeWorkerParentValue<ICollection<TInput>> _rows;
}
public static class Collection1TargetFactory // Define factory method(s)
{
public static Collection1Target<TInput> Collection1Target<TInput>(
in this DownstreamFactory<TInput> downstreamFactory
, string workerName
, ICollection<TInput> rows = null
)
where TInput : class
{
return new Collection1Target<TInput>(downstreamFactory, workerName, rows);
}
}
Inherit from RowsTargetBase
Inherit from RowsTargetBase<TDerived, TInput, TError> (with error output port) or RowsTargetBase<TDerived, TInput> (without error output port). They both have one data input port, and you must override OnInputRowsAsync() to process incoming rows. These workers are the reusable versions of RowsActionTarget.
- Pros:
- Creates reusable worker
- Can add members to the worker (see
Rows
property below) - Allows buffer row processing
- Allows asynchronous processing
- Simple implementation, avoids checking for available input rows
- Cons:
- Does not support multiple input ports
public class Collection2Target<TInput>
: RowsTargetBase<Collection2Target<TInput>,TInput>
where TInput : class // Dataflow rows must have 'class' constraint
{
internal Collection2Target(
in DownstreamFactory<TInput> downstreamFactory
, string workerName
, ICollection<TInput> rows
)
: base(downstreamFactory, workerName)
{
// Create collection if not provided by user
_rows = new SafeWorkerParentValue<ICollection<TInput>>(
this, rows ?? new List<TInput>());
}
// Override this method to process incoming rows
protected override Task<ProgressStatus> OnInputRowsAsync()
{
var rows = _rows.GetValueUnchecked();
foreach (var row in Input.TakeBuffer())
rows.Add(row);
return ProgressStatus.NotCompletedTask;
}
public ICollection<TInput> Rows
{
get => _rows.GetValueWhenNotRunning();
}
private readonly SafeWorkerParentValue<ICollection<TInput>> _rows;
}
public static class Collection2TargetFactory // Define factory method(s)
{
public static Collection2Target<TInput> Collection2Target<TInput>(
in this DownstreamFactory<TInput> downstreamFactory
, string workerName
, ICollection<TInput> rows = null
)
where TInput : class
{
return new Collection2Target<TInput>(downstreamFactory, workerName, rows);
}
}
Inherit from TargetBase
or WorkerBase
Inherit from TargetBase<TDerived, TInput, TError> (with error output port) or TargetBase<TDerived, TInput> (without error output port). They both have one data input port. These workers are the reusable versions of ActionTarget. Alternatively, inherit from WorkerBase<TDerived>, and add required ports.
You must override RunAsync() to process all incoming rows.
- Pros:
- Creates reusable worker
- Full flexibility
- Can add members to the worker (see
Rows
property below) - Allows buffer row processing
- Allows asynchronous processing
- Supports multiple input ports
- Can add members to the worker (see
- Cons:
- Requires checking for available input rows
public class Collection3Target<TInput>
: TargetBase<Collection3Target<TInput>,TInput>
where TInput : class // Dataflow rows must have 'class' constraint
{
internal Collection3Target(
in DownstreamFactory<TInput> downstreamFactory
, string workerName
, ICollection<TInput> rows
)
: base(downstreamFactory, workerName)
{
// Create collection if not provided by user
_rows = new SafeWorkerParentValue<ICollection<TInput>>(
this, rows ?? new List<TInput>());
}
// Override this method to process all incoming rows
protected async override Task<OutcomeStatus> RunAsync()
{
TInput row;
var rows = _rows.GetValueUnchecked();
while ((row = Input.TryTakeRow()
?? await Input.TakeRowAsync().ConfigureAwait(false)) != null)
{
rows.Add(row);
}
// No more rows, return input port outcome
return Input.Status.ToOutcomeStatus(false);
}
public ICollection<TInput> Rows
{
get => _rows.GetValueWhenNotRunning();
}
private readonly SafeWorkerParentValue<ICollection<TInput>> _rows;
}
public static class Collection3TargetFactory // Define factory method(s)
{
public static Collection3Target<TInput> Collection3Target<TInput>(
in this DownstreamFactory<TInput> downstreamFactory
, string workerName
, ICollection<TInput> rows = null
)
where TInput : class
{
return new Collection3Target<TInput>(downstreamFactory, workerName, rows);
}
}
Compose Target with Pass-through
Also see Custom Dataflow Pass-through Workers, which provides an overview of the dataflow pass-through facilities.
By using PortPassThroughSource<TOutput> as one of the child workers, one can connect an input port in the worker children dataflow to a parent input port, thereby making the children provide the implementation for the parent input port.
This provides excellent encapsulation, since an arbitrarily complex set of regular and dataflow (child) workers can be presented to the user as a single dataflow (parent) target worker.
- Pros:
- Creates reusable worker
- Can add members to the worker (see
Rows
property below) - Encapsulates and hides the implementation
- Supports multiple input ports (although this is more commonly implemented as a transform)
- Reuses the implementation of other workers
- Cons:
- If only using it once, requires somewhat more code compared to not encapsulating the required workers
Note
- Pass-through is a Pro feature, see Licensing for details
- The parent worker and child worker ports are not 'linked' per se, and do not have the same parent worker which would otherwise be required. Instead, the parent and child dataflows interact as if the other is an (extremely fast) external data source.
The below example defines a new SamplingCollectionTarget
dataflow worker that samples
the incoming rows and stores them in a collection. It encapsulates multiple child workers
as building blocks to perform the processing:
using actionETL;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
// A dataflow target worker that samples the incoming rows before storing them in a
// collection.
public class SamplingCollectionTarget<TInput>
// Inherit an input port
: TargetBase<SamplingCollectionTarget<TInput>,TInput>
where TInput : class
{
private readonly Random _random;
private readonly double _samplingFraction;
private readonly CollectionTarget<TInput> _target;
// Add properties as needed, either completely new ones, or as a pass-through
// to properties of the encapsulated workers.
public ICollection<TInput> Rows { get { return _target.Rows; } }
protected async override Task<OutcomeStatus> RunAsync()
{
// Must explicitly run the children when using port pass-through.
return await RunChildrenAsync().ConfigureAwait(false);
}
// Use only the constructors and parameters required to support the factory methods.
internal SamplingCollectionTarget(
in DownstreamFactory<TInput> downstreamFactory
, string workerName
, double samplingFraction
)
: base(downstreamFactory, workerName)
{
_samplingFraction = samplingFraction;
_random = new Random();
// Replace the transform and target below with the worker(s) needed to implement
// your processing. In this example, children are created in the constructor to
// make _target.Rows available immediately.
var source = new PortPassThroughSource<TInput>(this, "Inner source", Input);
var transform = source.Output.Link.RowActionTransform1("Inner transform"
, row => _random.NextDouble() < _samplingFraction
? TransformRowTreatment.Send : TransformRowTreatment.Discard);
_target = transform.Output.Link.CollectionTarget("Inner target");
}
}
public static class SamplingCollectionTargetFactory // Define factory method(s)
{
// Add parameters and additional overloads as needed to correctly configure
// the worker and to make it easy to use.
public static SamplingCollectionTarget<TInput> SamplingCollectionTarget<TInput>(
in this DownstreamFactory<TInput> downstreamFactory
, string workerName
, double samplingFraction
)
where TInput : class
{
return new SamplingCollectionTarget<TInput>(
downstreamFactory
, workerName
, samplingFraction
);
}
}
// ******************************************************************************
// Use the worker defined above
public static class SamplingCollectionTargetExample
{
public static void RunExample()
{
new WorkerSystem(nameof(SamplingCollectionTargetExample))
.Root(ws =>
{
new EnumerableSource<SmallClass>(ws, "Source"
, SmallClass.GetRange(100000))
.Output.Link.SamplingCollectionTarget("Target", 0.1);
})
.Start()
.ThrowOnFailure();
}
}