Adding and Grouping Workers
An arbitrary number of descendant workers and levels can be added to the worker system, limited only by available memory, to perform any required work. E.g.:
A minimal system would run a callback to do the required work, without adding any actual workers. This can still be useful, taking advantage of configuration and logging facilities.
A small system could have a single or a few workers
A medium sized ETL application might create a hierarchy a few levels deep, with a few dozen workers
A large ETL application might create a hierarchy with 5 - 15 levels, and with several hundred workers
Where a large amount of work has been divided into many small pieces, a worker system can even have millions of simultaneous workers
- Having thousands of levels in the worker hierarchy is also perfectly possible, although this is likely a poor design choice since some parts of the logged information would be correspondingly long.
Adding child workers to a parent worker is also a way of grouping workers, since it affects worker start order, error escalation, and logging information.
Worker Instantiation
This example shows the most common way of creating and grouping workers. Note that we often don't need to store the worker instances in variables.
new WorkerSystem()
.Root(root =>
{
var group = new ActionWorker(root, "Group", aw =>
{
var moveFile = new MoveFileWorker(aw, "Move Source File"
, originalFileName, extractFilename);
_ = new FileHelperFileSource<Category>(aw, "Read CSV"
, () => moveFile.IsSucceeded, extractFilename)
.Output.Link.AggregateTransform1("Last Row", RowAggregationFunction.LastOrNone)
.Output.Link.AdbInsertTarget("Insert"
, provider.CreateConnectionBuilder(root.Config["SqlServer"])
, "dbo.Category");
});
// Other workers, optionally referencing the group worker...
})
.Start()
.ThrowOnFailure();
The following rules govern when and how workers are instantiated (i.e. allocated and initialized, ready to potentially run):
- Non-dataflow workers, and dataflow source workers are always instantiated by invoking
one of their constructors, e.g.
new ExecuteProcessWorker(parent, "MyName1" /*, ...*/)
ornew FileHelperFileSource<MyClass>(parent, "MyName2" /*, ...*/)
- The worker constructor always take:
- Its parent as the first parameter, which the system uses to immediately place the worker into the worker hierarchy
- Its name as the second parameter, which must be unique among the direct children of its parent
- If present, the
isStartable
callback as its third parameter
- The worker constructor always take:
- Dataflow transform and target workers are always instantiated by invoking one of their
factory methods
(which for them reduces typing and allows a
fluent coding style
vs. using constructors)
- The factory method always takes the worker name as the first parameter, which must be unique among the direct children of its parent
- Access the factory method via either of:
- An upstream output port, which is the most common way, and will also link the ports, e.g.:
source.Output.Link.AggregateTransform1("MyName3" /*, ...*/)
orsource.Output.Link.AdbInsertTarget("MyName4" /*, ...*/)
. Also see e.g. the AdbConnectionBuilder Example. - The parent, when the upstream port is not yet known, e.g.
parent.GetDownstreamFactory<MyClass>.CollectionTarget("MyName5" /*,...*/)
, and then link the port explicitly. See the below example.
- An upstream output port, which is the most common way, and will also link the ports, e.g.:
- All workers (and the worker system) have the inherent ability to contain child workers
- Child workers can only be added when the parent:
- Has the Created WorkerParentStatus (i.e. before the parent is run), or
- Has the Running
WorkerParentStatus
, and contains no currently running or previously run children- A worker can remove its child workers by calling RemoveChildren() (e.g. in each iteration when implementing looping)
This also means that to add a worker to the worker hierarchy, all its ancestors must already have been added.
Note
- Always give workers meaningful names since they will appear in the logging output
- Set to a prefix plus a trailing
"/"
(e.g. "MyPrefix /"`) to generate a unique name from the prefix plus an increasing number starting at 1 - While less useful, set to
null
, whitespace or"/"
to generate a unique name from the worker type plus an increasing number starting at 1 - The name cannot otherwise contain
"/"
, and cannot start with double underscore"__"
- Set to a prefix plus a trailing
- Workers are instantiated and (optionally) executed on the fly
- There is no predefined meta-data describing which workers or hierarchies to create, there is only the code itself that decides whether or not to add additional workers. This approach has several benefits, including having a very low overhead for creating workers - roughly 1kB per worker, and the ability to add and execute hundreds of thousands of workers per second, and even billions of workers during an ETL run.
- actionETL uses static typing in most cases (especially the dataflow) which allows the IDE and compiler to automatically check that correct, matching data schemas are being used
- After a worker has been added, it automatically resides in the worker hierarchy, and can be discovered, its status queried etc. By default, the worker then goes out of scope and becomes inaccessible (and its memory available for garbage collection) when its parent completes, although this can be changed with the KeepChildrenLevels property.
Ways to Add Workers
Workers can be added to the worker system in several different ways, catering to different scenarios.
Adding via Callback
The most common way to add workers is to use a callback, which can include adding child workers. This provides a very compact way of adding and configuring workers, making it easy to view and understand the overall structure of the worker system.
The callback can be provided by:
- Calling a WorkerSystem.Root() overload
- Passing a callback to the workers with *Action* in the name, e.g. ActionWorker, they all have overloads that accept callbacks as parameters to specify what the worker should do
- Adding a callback using WorkerBase`1.AddStartingCallback(), WorkerBase.AddStartingCallback(), WorkerBase`1.AddRanCallback() or WorkerBase.AddRanCallback(), which can be useful when none of the other approaches are available.
When the callback is only used in one place in your program, an inline code snippet,
a.k.a. anonymous function or lambda, is usually the preferred approach. This is
demonstrated in the following example by the root WorkerSystem
and the "Parent" worker,
which both use a lambda callback to add a child worker:
// using actionETL;
new WorkerSystem()
.Root(ws =>
{
var parent = new ActionWorker(ws, "Parent", awParent =>
{
var child = new ActionWorker(awParent, "Child", awChild => { /* ... */ });
});
})
.Start()
.ThrowOnFailure();
The above code creates the following worker hierarchy:
When the callback is reused in multiple places, it is instead usually better to:
- Create a regular method, and use that as the callback, as demonstrated in the
Pass Callback to
Root()
example, or - Create a new worker type, as per below
Note
The worker system, workers, and ports all have a Locator
property that describes
where in the worker system hierarchy they reside. This is used both in logging
output, and for setting configuration values on specific workers etc.
In the above example e.g., the "Child" worker would have the Locator
string
"/Root/Parent/Child". Also see Port Locator.
Creating the Hierarchy Ahead of Time
This example creates the same hierarchy as above, but does it before the worker system starts:
// using actionETL;
var workerSystem = new WorkerSystem();
var parent = new ActionWorker(workerSystem, "Parent", awParent => { /* ... */ });
var child = new ActionWorker(parent, "Child", awChild => { /* ... */ });
workerSystem
.Start()
.ThrowOnFailure();
This can e.g. be used to ensure the worker hierarchy creation succeeds, before actually starting the worker system, and can be used for all or part of the worker hierarchy.
Note
This approach should generally not be used after the worker system has started, to avoid the added complexity of synchronizing workers with events outside the worker system.
Adding via Worker Implementation
A new worker type can be created that instantiates and adds workers as part of
its implementation, normally during its Running
phase. The new worker would usually add
child workers to itself, to help provide its required functionality. A new worker type
allows adding custom constructors, properties etc., which can be used to fully encapsulate
its child workers - see Custom Workers for details.
Note
- While child workers can be added in the constructor, this means they will be created even if the worker does not run, which increases resource usage and is normally undesirable.
- Technically, the new worker type could also add child workers to some other parent, but this is usually better done with a callback or a regular method instead.
Creating a new worker type is often a good idea when it will be reused in different parts of your program, or across different programs.
Adding Children to a Different Parent
In most cases, child workers are added by their direct parent. It is however also possible for a worker to add children to some other worker. Potential reasons for doing this include:
- Adding children to workers that can't create them themselves (e.g.
CopyFileWorker
), to benefit from the default start and error roll-up behavior (the same effect can be accomplished by adding anActionWorker
for grouping and anIsStartable
constraint, but that takes noticeably more code to do):- A child will by default start after the parent has completed executing its
RunAsync() method successfully, akin to an
IsSucceeded
start constraint - A failure in the child will by default roll up and also fail the parent
- A child will by default start after the parent has completed executing its
RunAsync() method successfully, akin to an
- Building a (sub-)hierarchy of workers ahead of time, before allowing the root of the
(sub-)hierarchy to start, e.g.:
- Easier to do data driven worker instantiation in a single place
- Extremely low latency when starting all the workers in the (sub-)hierarchy due to worker creation already having been done. Note though that even without this approach, actionETL can typically create hundreds of thousands of (simple) workers per second.
Note
Workers that perform looping (i.e. WhileActionWorker
and ForEachActionWorker
) remove
all their children after each iteration, so the children a non-parent worker added from
the outside would only be present on the first iteration. This usually makes the looping
workers a poor fit for this approach.
This example again creates the same hierarchy as above, except that
here it's the grandparent (in this case the worker system callback) that creates the child,
and passes a reference to the intended parent (parent
) to the child constructor:
// using actionETL;
new WorkerSystem()
.Root(ws =>
{
var parent = new ActionWorker(ws, "Parent", awParent => { /* ... */ });
var child = new ActionWorker(parent, "Child", awChild => { /* ... */ });
})
.Start()
.ThrowOnFailure();
Instantiating Workers via Helper Method
If the same several workers (or single worker with complex configuration) are instantiated in multiple places, it is often a good idea to use a helper method to instantiate them, avoiding code duplication and reducing code size.
Here our helper method instantiates two workers to drop a table if it exists, and then (re)create it (for those databases that can't do this in a single query).
Note
The method returns a
tuple
containing both created workers so you can set start constraints
to and from them, as well as other properties.
Alternatively, create these workers as child workers of a grouping ActionWorker
,
and only return the parent grouping worker from the helper method.
public static (AdbTableNonQueryWorker ifExistsDropTable, AdbExecuteNonQueryWorker createTable)
GenerateIfExistsDropCreateTable(WorkerParent workerParent
, AdbConnectionString adbConnectionString, string compositeTableName
, string queryText)
{
var atnqw = new AdbTableNonQueryWorker(workerParent
, adbConnectionString?.CreateConnectionBuilder()
, AdbTableNonQueryOperation.IfExistsDropTable, compositeTableName);
var aenqw = new AdbExecuteNonQueryWorker(workerParent
, "Create table " + compositeTableName, () => atnqw.IsSucceeded
, adbConnectionString.CreateConnectionBuilder(), queryText);
return (atnqw, aenqw);
}
Here's how you can use the method in the worker system:
var cs = AdbSqlClientProvider.Get()
.CreateConnectionString(root.Config["SqlServer"]);
var (ifExistsDropTable, createTable) = GenerateIfExistsDropCreateTable(
root, cs, _tableName, _createTableQueryText);
// Add other workers using the table, set any start constraints...
In this second example (from a larger dataflow scenario), a helper method instantiates and links a transform and a target worker (upstream transforms and source not shown). Note how the helper method only takes the few parameters that change between invocations:
// Combine and insert data and corrected error rows:
CreateUnionAndInsert("Product Data", "dbo.Product"
, trx2.Output, trx2CheckA.Output, trx2CheckB.Output);
// Combine and insert error rows:
CreateUnionAndInsert("Product Errors", "dbo.ProductError"
, trx1.ErrorOutput, trx2CheckB.ErrorOutput);
// Local helper function to union and insert rows into a database table
void CreateUnionAndInsert(string name, string tableName,
params OutputPortBase<Product>[] inputsFrom)
{
if (inputsFrom.Length == 0)
return;
inputsFrom[0].Worker.Parent.GetDownstreamFactory<Product>()
.UnionAllTransform("Union " + name, inputsFrom)
.Output.Link
.AdbInsertTarget("Insert " + name, acs.CreateConnectionBuilder(), tableName);
}
Instantiating Workers via Looping
If a type of worker (or a set of workers) is instantiated multiple times under the same parent, each with slightly different configuration, it is often a good idea to use looping to instantiate them, avoiding code duplication and reducing code size.
If a dedicated parent worker is not needed (e.g. to group child workers), and all
child workers can be created before any child worker is started,
use a regular for
, foreach
or while
statement to perform the work, e.g.:
new WorkerSystem("Generate Workers")
.Root(root =>
{
string[] strings = { "Shipping", "Inventory", "Sales", "Returns" };
foreach (string s in strings)
// Create one or more workers, with unique names
_ = new ActionWorker(root, "Process " + s, aw =>
{
// Process using string 's'...
});
// Optionally limit work, could also use start constraints
root.MaxRunningChildren = 2;
})
.Start()
.ThrowOnFailure();
Note that you can also add start constraints between the generated workers, to more finely control starting order.
If the above direct approach is not possible, use one of the looping workers: ForEachActionWorker<TItem> or WhileActionWorker<T>.
Grouping Workers
It is often useful to group several workers under a single parent worker, as with the
ActionWorker
below:
This allows the three grouped workers to be treated as a unit with regards to:
- Avoiding name collisions between sibling workers
- Execution logs
- Constraints and errors
The most common workers used for grouping are ActionWorker, Worker, and (for custom workers) WorkerBase<TDerived>. Note however that all workers have the inherent ability to have children, giving all workers the ability to group other workers.