Search Results for

    Show / Hide Table of Contents

    Class WorkerBase

    The main unit of execution. Specify this class when referring to workers without knowing the final derived worker type. All workers inherit from this class, indirectly via WorkerBase<TDerived>. Use this latter class when you do know the final derived worker type.

    When starting a worker, the library calls RunAsync() during the worker Running phase. A derived class must override this abstract method, which normally contains the bulk of the worker logic.

    Note that the worker Running phase also includes additional places where logic can optionally be inserted via callbacks, to e.g. customize the initialization, cleanup, and error handling of existing workers. This is mostly used when customizing workers that are not designed to be derived from (i.e. without a "Base" suffix). See Worker Life-cycle for details.

    See Workers for how to use workers, and see Custom Workers for how to develop your own workers.

    Inheritance
    Object
    WorkerParent
    WorkerBase
    WorkerBase<TDerived>
    Implements
    IDisposeOnFinished
    Inherited Members
    WorkerParent.AddChildCompletedCallback(Action<WorkerBase>)
    WorkerParent.AddStartingChildrenCallback(Func<WorkerParent, Task<ProgressStatus>>)
    WorkerParent.BytesPerRowBuffer
    WorkerParent.Children
    WorkerParent.DisposeOnFinished<TDisposable>(TDisposable)
    WorkerParent.GetDownstreamFactory<TInput>()
    WorkerParent.HasChildren
    WorkerParent.IsCanceled
    WorkerParent.IsCompleted
    WorkerParent.IsCreated
    WorkerParent.IsError
    WorkerParent.IsFailed
    WorkerParent.IsFatal
    WorkerParent.IsRunning
    WorkerParent.IsSucceeded
    WorkerParent.KeepChildrenLevels
    WorkerParent.Locator
    WorkerParent.LogFactory
    WorkerParent.Logger
    WorkerParent.MaxRunningChildren
    WorkerParent.Name
    WorkerParent.RemoveChildren()
    WorkerParent.RescheduleChildren()
    WorkerParent.RunAsync()
    WorkerParent.RunChildrenAsync(Boolean)
    WorkerParent.RunChildrenAsync()
    WorkerParent.Status
    WorkerParent.Item[String]
    WorkerParent.ToString()
    WorkerParent.WorkerSystem
    WorkerParent.DebugCommands
    WorkerParent.AggregateErrorOutputRows
    WorkerParent.AggregateOutputRows
    WorkerParent.AggregateWorkersCompleted
    WorkerParent.InstantCompleted
    WorkerParent.InstantCreated
    WorkerParent.InstantStarted
    WorkerParent.RunningDuration
    Namespace: actionETL
    Assembly: actionETL.dll
    Syntax
    public abstract class WorkerBase : WorkerParent, IDisposeOnFinished

    Properties

    ErroredPortErrorsWorkerProtected

    Get or sets whether an Error port should automatically fault the worker (the default), or not. If a setting to control this is presented to the library user, it is recommended this is done as a public bool property on the worker with the name ErroredPortErrorsWorker.

    Set to true (the default) to automatically fault the worker if any port is errored.

    Set to false to not automatically fault the worker if a port is errored. This can e.g. be useful for a worker with multiple inputs where it is acceptable that some (but not all) input ports fail.

    Note that a Fatal port will always fault the worker, irrespective of this setting.

    Note: This property is thread-safe.

    Declaration
    protected bool ErroredPortErrorsWorkerProtected { get; set; }
    Property Value
    Type Description
    Boolean

    ErrorOutputs

    Gets the error output ports. Note that these are 'untyped', and do not specify the type of the error output rows. They are however useful for performing all operations that do not require the row type of the port.

    Note that workers with ports typically add additional members that allow access to typed versions of the ports.

    Note: This property is thread-safe.

    Declaration
    public ErrorOutputPortCollection ErrorOutputs { get; }
    Property Value
    Type Description
    ErrorOutputPortCollection

    The error output ports.

    EscalateError

    Gets or sets a value indicating whether to escalate an Error worker completion to the parent worker, defaults to true. Note that a Fatal status will always be escalated, irrespective of this setting. Cannot be set after the worker has started running.

    Suppressing the escalation allows handling or taking an alternative action on error, which usually involves setting IsStartable constraints.

    Suppressing the escalation can also be used to implement retry-on-error functionality, typically by using an iterating worker such as WhileActionWorker<T> or ForEachActionWorker<TItem>, and setting EscalateError to false on some of their child workers.

    Note: This property is thread-safe.

    Declaration
    public bool EscalateError { get; set; }
    Property Value
    Type Description
    Boolean

    true (the default) to escalate an Error completion; false to not escalate it.

    Exceptions
    Type Condition
    InvalidOperationException

    Cannot set the value after the worker has started running.

    Inputs

    Gets the input ports. Note that these are 'untyped', and do not specify the type of the input rows. They are however useful for performing all operations that do not require the row type of the port.

    Note that workers with ports typically add additional members that allow access to typed versions of the ports.

    Note: This property is thread-safe.

    Declaration
    public InputPortCollection Inputs { get; }
    Property Value
    Type Description
    InputPortCollection

    The input ports.

    IsStartable

    A callback that limits when a worker can be started. Set this (usually via a constructor parameter) to define a custom start constraint (also known as a dependency). After the worker has started, this property is unused, and can then be set.

    The default value is DefaultIsStartable(): workers without input ports are always startable, and workers with input ports (i.e. transforms and targets) are blocked until at least one input has gone beyond the Ready state (i.e. until AnyActivated becomes true).

    Note however that to avoid dataflow deadlocks, all dataflow source workers that are linked to each other via ports (normally indirectly via transforms and targets) will be started as a group, but only when all of their (default or custom) start constraints are startable.

    Also note that MaxRunningChildren can stop a worker from being run.

    For further details, see Start Constraints.

    Note: This property is thread-safe.

    Declaration
    public Func<bool> IsStartable { get; set; }
    Property Value
    Type Description
    Func<Boolean>

    A callback (often an anonymous function) that returns false as long as the worker should not be started, and true when it should be allowed to start.

    Set to null to get the default behavior.

    Exceptions
    Type Condition
    InvalidOperationException

    Cannot set the value after the worker has started running.

    Outputs

    Gets the output ports. Note that these are 'untyped', and do not specify the type of the output rows. They are however useful for performing all operations that do not require the row type of the port.

    Note that workers with ports typically add additional members that allow access to typed versions of the ports.

    Note: This property is thread-safe.

    Declaration
    public OutputPortCollection Outputs { get; }
    Property Value
    Type Description
    OutputPortCollection

    The output ports.

    Parent

    Reference to the parent of this worker.

    Note: This property is thread-safe.

    Declaration
    public WorkerParent Parent { get; }
    Property Value
    Type Description
    WorkerParent

    Methods

    AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>>)

    Adds a callback which will be called as the last step of the worker Running phase, which is after any children and ports have completed. Multiple callbacks can be added.

    These callbacks are useful for performing cleanup tasks. Also see DisposeOnFinished<TDisposable>(TDisposable) and UsingActionWorker<TDisposable> which are specifically for disposing resources.

    If the worker is started, these callbacks will run, even if earlier callbacks, RunAsync(), or any children have failures. If multiple callbacks have been added, they will all be called (even if one fails) in the reverse order from they were added, e.g. callbacks from derived classes will be called before callbacks from base classes.

    Each callback takes the worker itself (typed as WorkerBase) and the OutcomeStatus from previous steps (including any children) as parameters.

    Note: Adding the callback is thread-safe.

    Declaration
    public WorkerBase AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>> completedFuncAsync)
    Parameters
    Type Name Description
    Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>> completedFuncAsync

    The asynchronous callback that will be called when the worker has completed.

    Returns
    Type Description
    WorkerBase

    The instance itself, so you can chain multiple calls.

    Remarks

    The callback takes two parameters:

    WorkerBaseThe worker itself
    OutcomeStatusFrom previous execution steps (including any children)

    The callback implementation is either synchronous and returns a Task<OutcomeStatus> explicitly, or is async and returns an OutcomeStatus.

    • To leave the OutcomeStatus unchanged, return the OutcomeStatus that was passed in as a parameter (and use ToTask() in synchronous callbacks).
    • To change the OutcomeStatus (e.g. changing Error to Succeeded), return a new OutcomeStatus instance. If appropriate, use Combine(Boolean, OutcomeStatus) to also retain the previous information, both of which the system will log.

    If you need to access members of the derived worker type, there are three options:

    • Use AddCompletedCallback(Func<TDerived, OutcomeStatus, Task<OutcomeStatus>>) instead if you know the derived worker type.
    • If you have access to the source of the worker, implement the callback as a (non-static) method in that worker. This allows accessing worker members without casting the worker parameter to the derived worker type.
    • If you don't have access to the source of the worker, or you want to implement the callback using a lambda, cast the worker parameter to the actual derived type of the worker.

    Examples

    In this example we roll back a transaction on failure:

    worker.AddCompletedCallback((w, outcomeStatus) =>
    {
        if (!outcomeStatus.IsSucceeded)
            transaction?.Rollback();
        return outcomeStatus.ToTask();
    });
    Exceptions
    Type Condition
    ArgumentNullException

    completedFuncAsync

    InvalidOperationException

    Cannot add 'Starting' callback after the worker has completed.

    AddRanCallback(Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>>)

    Adds a callback which will be called immediately after RunAsync() finishes, e.g. for performing cleanup.

    Note that after these callbacks return, ports will be checked for any error status or not being completed, and child workers will be run (if not already run explicitly, or the worker RunAsync() failed), both of which can fail the current worker.

    If multiple callbacks have been added, they will all be called (even if one fails) in the reverse order from they were added, e.g. callbacks from derived classes will be called before callbacks from base classes.

    If RunAsync() does not get called (due to an AddStartingCallback() callback), AddRanCallback() callbacks will not run.

    Each callback takes the worker itself, the OutcomeStatus from RunAsync(), and WorkerParentChildrenState as parameters.

    Note: Adding the callback is thread-safe.

    Declaration
    public WorkerBase AddRanCallback(Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>> ranFuncAsync)
    Parameters
    Type Name Description
    Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>> ranFuncAsync

    The asynchronous callback that will be called immediately after RunAsync() finishes. See "Remarks" below for details.

    Returns
    Type Description
    WorkerBase

    The instance itself, so you can chain multiple calls.

    Remarks

    The callback takes three parameters:

    WorkerBaseThe worker itself
    OutcomeStatusThe RunAsync return value
    WorkerParentChildrenStateDescribes whether children have already been added or completed. If completed, you must RemoveChildren() before adding any new child workers. You can also call a RunChildrenAsync() overload, but must then also await its completion.

    The callback implementation is either synchronous and returns a Task<OutcomeStatus> explicitly, or is async and returns an OutcomeStatus.

    • To leave the OutcomeStatus unchanged, return the OutcomeStatus that was passed in as a parameter (and use ToTask() in synchronous callbacks).
    • To change the OutcomeStatus (e.g. changing Error to Succeeded), return a new OutcomeStatus instance. If appropriate, use Combine(Boolean, OutcomeStatus) to also retain the previous information, both of which the system will log.

    If you need to access members of the derived worker type, there are three options:

    • Use AddRanCallback(Func<TDerived, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>>) instead if you know the derived worker type.
    • If you have access to the source of the worker, implement the callback as a (non-static) method in that worker. This allows accessing worker members without casting the worker parameter to the derived worker type.
    • If you don't have access to the source of the worker, or you want to implement the callback using a lambda, cast the worker parameter to the actual derived type of the worker.

    Examples

    In this example we roll back a transaction on failure:

    worker.AddRanCallback((w, outcomeStatus, childrenStatus) =>
    {
        if (!outcomeStatus.IsSucceeded)
            transaction?.Rollback();
        return outcomeStatus.ToTask();
    });
    Exceptions
    Type Condition
    ArgumentNullException

    ranFuncAsync

    InvalidOperationException

    Cannot add 'Ran' callback after the worker has completed.

    AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>>)

    Adds a callback which will be called as the first step of the worker Running phase, which is before RunAsync() is called. Multiple callbacks can be added. Also see AddStartingCallback(Func<TDerived, Task<ProgressStatus>>).

    These callbacks are often used for initialization, or for adding child workers, e.g. when not having access to the worker implementation.

    If the worker is started, these callbacks will run, even if some fail, in the order they were added, e.g. callbacks added in base classes will be called before callbacks added in derived classes.

    The callback takes the worker itself as a parameter, typed as WorkerBase.

    Note: Adding the callback is thread-safe.

    Declaration
    public WorkerBase AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>> startingFuncAsync)
    Parameters
    Type Name Description
    Func<WorkerBase, Task<ProgressStatus>> startingFuncAsync

    The asynchronous callback that will be called when the worker has started. See "Remarks" below for details.

    Returns
    Type Description
    WorkerBase

    The instance itself, so you can chain multiple calls.

    Remarks

    The callback takes the worker itself as a parameter, typed as WorkerBase.

    If you need to access members of the derived worker type, there are three options:

    • Use AddStartingCallback(Func<TDerived, Task<ProgressStatus>>) instead if you know the derived worker type.
    • If you have access to the source of the worker, implement the callback as a (non-static) method in that worker. This allows accessing worker members without casting the worker parameter to the derived worker type.
    • If you don't have access to the source of the worker, or you want to implement the callback using a lambda, cast the worker parameter to the actual derived type of the worker.

    The callback implementation is either synchronous and returns a Task<ProgressStatus> explicitly, or is async and returns a ProgressStatus.

    The callback should return:

    • ProgressStatus.NotCompletedTask (or ProgressStatus.NotCompleted) to continue normal processing.
    • ProgressStatus.SucceededTask (or ProgressStatus.Succeeded) to silently empty any input ports and transmit SendSucceeded() to any output ports. RunAsync(), AddRanCallback() and AddStartingChildrenCallback() callbacks will not be called, and child workers will not be started. All other AddStartingCallback(), AddCompletedCallback and parent AddChildCompletedCallback(Action<WorkerBase>) callbacks will be called.
    • A failure ProgressStatus on failure, which will give the worker that failure status. Neither RunAsync() nor AddRanCallback() callbacks will be called, and child workers will not be started. All other AddStartingCallback(), AddCompletedCallback() and parent AddChildCompletedCallback() callbacks will however be called.

    Examples

    In this example we allocate a row buffer:

    worker.AddStartingCallback(w =>
    {
        _outputRows = new TOutput[Output.RowsPerBuffer];
        return ProgressStatus.NotCompletedTask;
    });
    Exceptions
    Type Condition
    ArgumentNullException

    startingFuncAsync

    InvalidOperationException

    Cannot add 'Starting' callback after the worker has started running.

    DefaultIsStartable()

    The default callback used by IsStartable, unless overridden by the library user.

    With this default callback, workers without input ports are always startable, and workers with input ports (i.e. transforms and targets) are blocked until at least one input has gone beyond the Ready state (i.e. until AnyActivated becomes true). This default callback can be used to incorporate the default start constraint with your custom start constraint, e.g.:

    worker.IsStartable = () => worker.DefaultIsStartable() && MyCustomStartableCondition;

    Note however that to avoid dataflow deadlocks, all dataflow source workers that are linked to each other via ports (normally indirectly via transforms and targets) will be started as a group, but only when all of their (default or custom) start constraints are startable.

    Also note that MaxRunningChildren can stop a worker from being run.

    Note: This method is thread-safe.

    Declaration
    public bool DefaultIsStartable()
    Returns
    Type Description
    Boolean

    true to make the worker startable, false to make it not startable.

    SucceededSequence<TLastWorker>(TLastWorker)

    Set the IsStartable function to run the workers in sequence, i.e. run a worker if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function on each worker.

    Declaration
    public TLastWorker SucceededSequence<TLastWorker>(TLastWorker firstWorker)
        where TLastWorker : WorkerBase
    Parameters
    Type Name Description
    TLastWorker firstWorker

    First worker to add.

    Returns
    Type Description
    TLastWorker
    Type Parameters
    Name Description
    TLastWorker
    Exceptions
    Type Condition
    ArgumentNullException

    Worker number.

    ArgumentException

    Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors.

    SucceededSequence<TLastWorker>(WorkerBase, TLastWorker)

    Set the IsStartable function to run the workers in sequence, i.e. run a worker if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function on each worker.

    Declaration
    public TLastWorker SucceededSequence<TLastWorker>(WorkerBase firstWorker, TLastWorker secondWorker)
        where TLastWorker : WorkerBase
    Parameters
    Type Name Description
    WorkerBase firstWorker

    First worker to add.

    TLastWorker secondWorker

    Second worker to add.

    Returns
    Type Description
    TLastWorker
    Type Parameters
    Name Description
    TLastWorker
    Exceptions
    Type Condition
    ArgumentNullException

    Worker number.

    ArgumentException

    Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors.

    SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, TLastWorker)

    Set the IsStartable function to run the workers in sequence, i.e. run a worker if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function on each worker.

    Declaration
    public TLastWorker SucceededSequence<TLastWorker>(WorkerBase firstWorker, WorkerBase secondWorker, TLastWorker thirdWorker)
        where TLastWorker : WorkerBase
    Parameters
    Type Name Description
    WorkerBase firstWorker

    First worker to add.

    WorkerBase secondWorker

    Second worker to add.

    TLastWorker thirdWorker

    Third worker to add.

    Returns
    Type Description
    TLastWorker
    Type Parameters
    Name Description
    TLastWorker
    Exceptions
    Type Condition
    ArgumentNullException

    Worker number.

    ArgumentException

    Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors.

    SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, WorkerBase, TLastWorker)

    Set the IsStartable function to run the workers in sequence, i.e. run a worker if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function on each worker.

    Declaration
    public TLastWorker SucceededSequence<TLastWorker>(WorkerBase firstWorker, WorkerBase secondWorker, WorkerBase thirdWorker, TLastWorker fourthWorker)
        where TLastWorker : WorkerBase
    Parameters
    Type Name Description
    WorkerBase firstWorker

    First worker to add.

    WorkerBase secondWorker

    Second worker to add.

    WorkerBase thirdWorker

    Third worker to add.

    TLastWorker fourthWorker

    Fourth worker to add.

    Returns
    Type Description
    TLastWorker
    Type Parameters
    Name Description
    TLastWorker
    Exceptions
    Type Condition
    ArgumentNullException

    Worker number.

    ArgumentException

    Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors.

    SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, WorkerBase, WorkerBase, TLastWorker)

    Set the IsStartable function to run the workers in sequence, i.e. run a worker if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function on each worker.

    Declaration
    public TLastWorker SucceededSequence<TLastWorker>(WorkerBase firstWorker, WorkerBase secondWorker, WorkerBase thirdWorker, WorkerBase fourthWorker, TLastWorker fifthWorker)
        where TLastWorker : WorkerBase
    Parameters
    Type Name Description
    WorkerBase firstWorker

    First worker to add.

    WorkerBase secondWorker

    Second worker to add.

    WorkerBase thirdWorker

    Third worker to add.

    WorkerBase fourthWorker

    Fourth worker to add.

    TLastWorker fifthWorker

    Fifth worker to add.

    Returns
    Type Description
    TLastWorker
    Type Parameters
    Name Description
    TLastWorker
    Exceptions
    Type Condition
    ArgumentNullException

    Worker number.

    ArgumentException

    Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors.

    SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, WorkerBase, WorkerBase, WorkerBase, TLastWorker)

    Set the IsStartable function to run the workers in sequence, i.e. run a worker if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function on each worker.

    Declaration
    public TLastWorker SucceededSequence<TLastWorker>(WorkerBase firstWorker, WorkerBase secondWorker, WorkerBase thirdWorker, WorkerBase fourthWorker, WorkerBase fifthWorker, TLastWorker sixthWorker)
        where TLastWorker : WorkerBase
    Parameters
    Type Name Description
    WorkerBase firstWorker

    First worker to add.

    WorkerBase secondWorker

    Second worker to add.

    WorkerBase thirdWorker

    Third worker to add.

    WorkerBase fourthWorker

    Fourth worker to add.

    WorkerBase fifthWorker

    Fifth worker to add.

    TLastWorker sixthWorker

    Sixth worker to add.

    Returns
    Type Description
    TLastWorker
    Type Parameters
    Name Description
    TLastWorker
    Exceptions
    Type Condition
    ArgumentNullException

    Worker number.

    ArgumentException

    Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors.

    Implements

    IDisposeOnFinished
    In This Article
    Back to top Copyright © 2023 Envobi Ltd