Custom Workers
This article describes how to implement custom functionality, ranging from configuring existing workers, to creating new worker types.
Configure Existing Worker
Existing workers are highly configurable, allowing custom functionality to be added to them, often obviating the need to add additional workers or create new types of workers.
- Pros:
- Straightforward to implement
- Code is (normally) placed inline with the worker creation, making it very easy to see what the worker system has implemented
- Cons:
- Cannot add members (properties, methods...) to, or remove members from the worker
- Good for one-offs, but leads to code duplication if the same custom functionality is needed at different places. This can be partly mitigated by creating a method or delegate to implement the custom functionality, and supply it to multiple workers.
Examples include:
- Customizable workers, such as SortTransform<TInputOutput> and the join transforms that also allows specifying the sort condition as a custom comparer function
- All *Action* Workers, where the library user provides the bulk of the logic via a callback (often Action or Func<TResult>, e.g. ActionWorker)
- Execution callbacks common to all workers, see Worker Execution
Instantiate Workers via Helper Method
A common and simple alternative to create either a single complex worker, or a new worker that uses worker children to implement its functionality, is to instead use a helper method that creates multiple workers when called.
- Pros:
- Straightforward to implement
- Reduces code duplication
- Can create (but not fully encapsulate) any number of child workers - the user only calls the helper method
- Cons:
- Cannot add members to, or remove members from the workers
Here our helper method instantiates two workers to drop a table if it exists, and then (re)create it (for those databases that can't do this in a single query).
Note
The method returns a
tuple
containing both created workers so you can set start constraints
to and from them, as well as other properties.
Alternatively, create these workers as child workers of a grouping ActionWorker
,
and only return the parent grouping worker from the helper method.
public static (AdbTableNonQueryWorker ifExistsDropTable, AdbExecuteNonQueryWorker createTable)
GenerateIfExistsDropCreateTable(WorkerParent workerParent
, AdbConnectionString adbConnectionString, string compositeTableName
, string queryText)
{
var atnqw = new AdbTableNonQueryWorker(workerParent
, adbConnectionString?.CreateConnectionBuilder()
, AdbTableNonQueryOperation.IfExistsDropTable, compositeTableName);
var aenqw = new AdbExecuteNonQueryWorker(workerParent
, "Create table " + compositeTableName, () => atnqw.IsSucceeded
, adbConnectionString.CreateConnectionBuilder(), queryText);
return (atnqw, aenqw);
}
Here's how you can use the method in the worker system:
var cs = AdbSqlClientProvider.Get()
.CreateConnectionString(root.Config["SqlServer"]);
var (ifExistsDropTable, createTable) = GenerateIfExistsDropCreateTable(
root, cs, _tableName, _createTableQueryText);
// Add other workers using the table, set any start constraints...
In this second example (from a larger dataflow scenario), a helper method instantiates and links a transform and a target worker (upstream transforms and source not shown). Note how the helper method only takes the few parameters that change between invocations:
// Combine and insert data and corrected error rows:
CreateUnionAndInsert("Product Data", "dbo.Product"
, trx2.Output, trx2CheckA.Output, trx2CheckB.Output);
// Combine and insert error rows:
CreateUnionAndInsert("Product Errors", "dbo.ProductError"
, trx1.ErrorOutput, trx2CheckB.ErrorOutput);
// Local helper function to union and insert rows into a database table
void CreateUnionAndInsert(string name, string tableName,
params OutputPortBase<Product>[] inputsFrom)
{
if (inputsFrom.Length == 0)
return;
inputsFrom[0].Worker.Parent.GetDownstreamFactory<Product>()
.UnionAllTransform("Union " + name, inputsFrom)
.Output.Link
.AdbInsertTarget("Insert " + name, acs.CreateConnectionBuilder(), tableName);
}
Define New Worker Type
A new worker type is defined by inheriting, directly or indirectly, from WorkerBase<TDerived>. In general, inherit from the worker that is most similar to your desired final functionality, without having any extra unwanted functionality.
For instance, if your new worker will execute a user supplied delegate (and you will add additional functionality on top of that), consider deriving from ActionWorkerBase<TDerived>. On the other hand, if your new worker does not execute a user supplied delegate, you might derive directly from WorkerBase<TDerived>.
Note
All Workers with names ending in *Base
cannot be instantiated
directly, and are specifically meant to be inherited from. Other (unsealed) workers
can also be inherited from, but that is a less common use case.
- Pros:
- Can add members to the worker, and can avoid inheriting unwanted members
- Very easy to reuse, and reduces code duplication
- Provides a single worker to check for completion status, set start constraints etc.
- Can encapsulate any number of child workers - only the outermost worker needs to be configured and instantiated by the user
- Cons:
- Takes slightly more code to implement vs. using callbacks
Best Practices
Worker Constructors
- All worker constructors SHOULD start with the parameters
WorkerParent parentWorker, string workerName
- All workers SHOULD have at least one overload that accepts a
Func<bool> isStartableFunc
parameter (as the third parameter directly afterstring workerName
), except dataflow transform and target workers which SHOULD NOT have this parameter in any overload - SHOULD avoid large resource allocations or heavy CPU usage in the constructor, and instead perform them when the worker execution is started. This lowers overall resource usage since a created worker might not be started for quite a while, or at all.
- Worker values that must be set SHOULD be present in constructor overloads, so that the worker is always in a consistent state.
- Do validate all parameters and properties, in the constructor if it cannot be set later, or when the worker starts running if a property can be set after worker instantiation.
Public Properties etc.
Public properties (and less commonly, public methods) can be added to allow sibling and ancestor workers to:
- Set values after instantiation:
- Most commonly before the worker starts running, to provide values calculated by worker siblings, or to provide optional values not included in the constructors
- Sometimes also after worker completion, to e.g. clear large memory instances
- Get values after instantiation:
- Most commonly after the worker completes successfully (and sometimes also with error), to provide a worker result value (beyond the default Status)
- Sometimes at any time, even when the worker runs, if the value never changes after the worker starts running
Important
You MUST use SafeWorkerParentValue<T> to get and set publicly accessible values that can change after instantiation, including references where the pointed to instance can change. This ensures thread-safety, and throws if the worker is in an unexpected state, as well as when getting an unpopulated value.
The below example allows setting a filename property after worker instantiation, and in these three dataflow examples the property can also be set after the worker has completed.
Error Handling
Worker Error Handling covers error handling when using workers.
In worker implementations, when an error condition (e.g. incorrect parameter, IOException
etc.)
occurs, it has four options for dealing with it:
- Handle the error (e.g. use a fall-back value, retry etc.), continue processing,
and return a success status (e.g. Succeeded) when finished.
- Normally, also log a
Warn
or lower message with details about the error that was handled.
- Normally, also log a
Stop processing and return an error status, e.g. via one of the Error(String) overloads, including any failure details. This error status will be visible to its parent and sibling workers, and allows them to react accordingly, e.g. by running fall-back or cleanup workers, or halt processing.
Do not explicitly log any message, the system will log the
OutcomeStatus
details.Stop processing and return a fatal status, e.g. via one of the Fatal(String) overloads. This fatal status will force all ancestors to also get the
Fatal
status, and will halt the whole worker system as soon as possible.This option is appropriate when using less severe options 1. or 2. could lead to an unknown or corrupted application state.
Except for programming errors (i.e. bugs), this option is strongly preferable to option 4., since it allows adding explaining log categories and messages to the (fatal) error, and shows that it is a known possible error that have been coded appropriately.
Do not explicitly log any message, the system will log the
OutcomeStatus
details.Allow an unhandled exception to escape from the worker. The worker system will force the worker and all its ancestors to have a
Fatal
status, and the whole worker system is halted as in option 3.This option is appropriate for programming errors (i.e. bugs), e.g. when receiving incorrect values in method parameters and properties. Workers SHOULD handle all predictable exceptions that are not bugs using options 1., 2., or 3.
Do include relevant troubleshooting information in the exception, e.g. in its
Message
property. Do explicitly log a failure message if there are additional information that can't be included in the exception.
Important
Do include sufficient details with the failure to allow troubleshooting the issue.
Also see Logging Messages regarding logging levels etc.
Single Worker Implementation
Here we define the new worker type FileExistsInheritWorker
(similar to the existing
FileExistsWorker), using inheritance. The worker will fail the worker
system if the specified file doesn't exist:
using actionETL.Logging;
using System;
using System.IO;
using System.Threading.Tasks;
public class FileExistsInheritWorker
: WorkerBase<FileExistsInheritWorker>
{
// For all workers except transforms and targets, best practice is to provide
// overloads both with and without an 'isStartableFunc' parameter.
public FileExistsInheritWorker(
WorkerParent workerParent
, string workerName
, Func<bool> isStartableFunc
, string fileName
)
: base(workerParent, workerName, isStartableFunc)
{
_fileName = new SafeWorkerParentValue<string>(this, fileName);
}
public FileExistsInheritWorker(
WorkerParent workerParent
, string workerName
, string fileName
)
: this(workerParent, workerName, null, fileName)
{ }
// Allow setting filename after worker creation (but before worker runs).
public string FileName
{
get => _fileName.GetValueUnchecked();
set => _fileName.SetValueChecked(value, WorkerParentState.Created);
}
private readonly SafeWorkerParentValue<string> _fileName;
// Executed when worker runs.
protected override Task<OutcomeStatus> RunAsync()
{
if (FileName == null)
throw new InvalidOperationException(nameof(FileName) + " cannot be null.");
// Since the method doesn't use the 'async' keyword, we return success/failure
// wrapped in a Task.
return File.Exists(FileName)
? OutcomeStatus.SucceededTask
: OutcomeStatus.Error(ALogCategory.FileOperationFailed,
$"Expected file '{FileName}' to exist, but it did not."
).ToTask();
}
}
The worker can now be used exactly like any of the out of box workers:
new WorkerSystem()
.Root(ws =>
{
var feiw = new FileExistsInheritWorker(ws, "File exists"
, ws.Config["TriggerFile"]);
// Create other workers...
})
.Start().ThrowOnFailure();
Also see the more extensive Process Incoming Files Example.
CRTP Inheritance Pattern
In the above example, the custom worker inherited from WorkerBase<TDerived>,
and its name was also specified as the type parameter to WorkerBase<TDerived>
:
public class FileExistsInheritWorker
: WorkerBase<FileExistsInheritWorker>
{
This construct is used on all the out-of-box *Base<TDerived>
worker classes that you
inherit from, and is called the
Curiously Recurring Template Pattern (CRTP).
The advantage this construct brings is that members in the base class can use the type of the derived worker, see e.g. AddStartingCallback(Func<TDerived, Task<ProgressStatus>>) which allows you to use the derived worker type without using a cast.
Compose Worker Children
In this example we define the new worker type LoadFlatFileToTable
, using
inheritance, and add two worker children to implement the desired functionality
of loading CSV files into a database table. Note that:
LoadFlatFileToTable<T>
is not hard-coded for any particular schema. Instead the schema (Category
andProduct
below) is provided by the caller, so the worker system can be used to load different files/rows/tables. A row class is a reusable schema (and astruct
is a reusable partial schema), see Dataflow Columns.- We create the
AdbConnectionBuilder
outside the worker, and pass it in, to increase flexibility - The file name can also be set in a thread-safe manner via the
FileName
property, before the worker is started (guaranteed by SafeWorkerParentValue<T>). This allows it to be set from sibling workers that calculate the value on the fly.
Define the worker:
using actionETL;
using actionETL.Adb;
using actionETL.Adb.SqlClientExternal;
using actionETL.FileHelper;
using FileHelpers;
using System;
using System.Threading.Tasks;
public class LoadFlatFileToTableWorker<T>
: WorkerBase<LoadFlatFileToTableWorker<T>>
where T : class // Dataflow rows must have the 'class' constraint
{
private readonly IAdbConnectionBuilder _connectionBuilder;
private readonly string _tableName;
public string FileName // Make thread-safe and ensure worker status
{
get => _fileName.GetValueUnchecked();
set => _fileName.SetValueChecked(value, WorkerParentState.Created);
}
private readonly SafeWorkerParentValue<string> _fileName;
public LoadFlatFileToTableWorker(
WorkerParent workerParent
, string workerName
, Func<bool> isStartableFunc
, IAdbConnectionBuilder connectionBuilder
, string fileName
, string tableName
)
: base(workerParent, workerName, isStartableFunc)
{
_connectionBuilder = connectionBuilder;
_fileName = new SafeWorkerParentValue<string>(this, fileName);
_tableName = tableName;
}
protected override Task<OutcomeStatus> RunAsync()
{
new FileHelperFileSource<T>(this, "Read CSV", FileName)
.Output.Link.AdbInsertTarget("Insert", _connectionBuilder, _tableName);
return OutcomeStatus.SucceededTask;
}
}
The worker can now be used exactly like any of the out of box workers:
[DelimitedRecord(",")]
public class Category
{
public int CategoryId { get; set; }
public string CategoryName { get; set; }
}
[DelimitedRecord(",")]
public class Product
{
public int ProductId { get; set; }
public string ProductName { get; set; }
}
public static SystemOutcomeStatus RunExample()
{
var workerSystem = new WorkerSystem().Root(ws =>
{
var acs = AdbSqlClientProvider.Get()
.CreateConnectionString(ws.Config["SqlServer"]);
_ = new LoadFlatFileToTableWorker<Category>(ws, "Load Category", null
, acs.CreateConnectionBuilder()
, @"Src/CreateWorkerViaInheritance/Category.csv"
, "dbo.Category"
);
_ = new LoadFlatFileToTableWorker<Product>(ws, "Load Product", null
, acs.CreateConnectionBuilder()
, @"Src/CreateWorkerViaInheritance/Product.csv"
, "dbo.Product");
}
);
return workerSystem.Start();
}
/* The example assumes the following tables already exists:
CREATE TABLE Category
(
CategoryId int,
CategoryName varchar(50)
)
CREATE TABLE Product
(
ProductId int,
ProductName varchar(50)
)
*/
Key Points
- It is very straight forward and only takes a few lines to define a new worker. Furthermore, there are a number of different base classes (see Workers) whose only purpose is to assist with defining new kinds of workers.
- The more a piece of code is reused, the bigger the benefit from creating a new worker type. A well chosen worker type name 'explains' what the worker does, and is often preferable to having a code snippet that adds existing workers, but can't change their type names.
- Functionality is added by overriding the method RunAsync(),
which the system calls when the worker is 'started'.
- Some workers already override RunAsync(), and provide a further refined API for derived workers to use. E.g. with RowTransformBase<TDerived, TInputOutputError> the OnInputRow(TInputOutputError) must be overridden; always check the API documentation of the worker you derive from.
- Using inheritance allows adding custom members and encapsulating contained workers (e.g. the new parent worker would normally automatically configure its child workers)
- The above examples inherits from the least derived WorkerBase<TDerived> worker - the same approach is used when inheriting from more derived workers
Also see Slowly Changing Dimension Example.
Asynchronous Programming
actionETL makes heavy use of and widely supports async and await, a facility introduced with .NET Framework 4.5 that makes asynchronous programming quite straightforward to implement.
This is valuable, since file, network, database and other interactions are inherently asynchronous, and supporting asynchronous programming well is one of the enablers for building larger ETL systems that perform well.
actionETL API methods that support asynchrony have a name ending in *Async
, and an awaitable type,
typically Task, Task<TResult>, or
ValueTask<TResult>. This means they can use the async
keyword;
they are however not required to do so:
- DO use
async
on a method if you also need to use theawait
keyword in the method.- If the return type is
Task<int>
, then you return an integer from the method, which automatically gets wrapped in a task. - Using
async
adds a (fairly small) performance overhead, so only use it if it's needed.
- If the return type is
- DON'T use
async
on a method if you don't need theawait
keyword in the method.- If the return type is
Task<int>
, then you must return an integer wrapped in a task, typically withTask.FromResult()
. - As the last two examples showed,
OutcomeStatus
(and several other status classes) have members providing wrapped versions of the status, e.g. SucceededTask, which you can use and avoid creating your own.
- If the return type is
- Some workers have both synchronous and asynchronous APIs, allowing you to call the most appropriate one.