Search Results for

    Show / Hide Table of Contents

    Class InputPort<TInput>

    Generic class for dataflow input ports, containing the functionality that depends on the data type of the row class.

    The library user can use this class via workers that already have input ports present, as well as create and add input ports to workers by using Create<TInput>(String, OutputPortBase<TInput>).

    The library user does not derive new port classes from this class.

    Note: As per normal, unless otherwise noted, the instance members are not thread-safe, and should only be used from the worker that the port belongs to.

    Inheritance
    Object
    InputPort
    InputPort<TInput>
    Inherited Members
    InputPort.BufferCapacity
    InputPort.BufferingMode
    InputPort.HasRow
    InputPort.HasRowAsync()
    InputPort.Locator
    InputPort.Logger
    InputPort.Name
    InputPort.RowsPerBuffer
    InputPort.RowsTaken
    InputPort.SkipAllRowsAsync()
    InputPort.SkipRow()
    InputPort.SkipRowAsync()
    InputPort.TrySkipRow()
    InputPort.State
    InputPort.Status
    InputPort.ToLongString()
    InputPort.ToString()
    InputPort.TryCancel()
    InputPort.Worker
    Namespace: actionETL
    Assembly: actionETL.dll
    Syntax
    public sealed class InputPort<TInput> : InputPort where TInput : class
    Type Parameters
    Name Description
    TInput

    The type of the input rows.

    Properties

    LinkedFrom

    Gets the upstream output (or error output) port this input port is linked from, or null if it is not linked.

    Note: This property is thread-safe.

    Declaration
    public OutputPortBase<TInput> LinkedFrom { get; }
    Property Value
    Type Description
    OutputPortBase<TInput>

    Methods

    LinkFrom(OutputPortBase<TInput>)

    Link this input port from the specified upstream output (or error output) port, which must have the same row type.

    Note that all input and regular output ports must be linked to a single port when its parent worker starts. For error output ports, linking is optional.

    After linking, ports cannot be relinked. It is however possible to remove all child workers from a parent worker with RemoveChildren(), and recreate them with different links.

    When linking from an ErrorOutputPort<TError>, the MaxRowsBeforeError and MaxRowsSent properties can limit the number of allowed error rows.

    Note: This method is thread-safe.

    Declaration
    public void LinkFrom(OutputPortBase<TInput> outputPort)
    Parameters
    Type Name Description
    OutputPortBase<TInput> outputPort

    The upstream output (or error output) port to link from.

    Exceptions
    Type Condition
    ArgumentNullException

    outputPorts

    InvalidOperationException
    • Both ports must be in the Created state
    • An output (or error output) port can only be linked to one input port, and vice versa
    • Linked dataflow workers must have the same parent worker
    • Cannot link ports after the parent worker has started running its children.

    PeekRow()

    Returns the next incoming row synchronously, without removing it from the incoming buffer.

    Note that this method requires ensuring a row is available before calling it, e.g. by calling HasRowAsync() before calling this method and checking that the port is not completed.

    Declaration
    public TInput PeekRow()
    Returns
    Type Description
    TInput
    Exceptions
    Type Condition
    InvalidOperationException

    Input has no row to peek, ensure rows are available before calling.

    PeekRowAsync()

    Returns the next incoming row asynchronously, if any is or becomes available, without removing it from the incoming buffer. Returns null if there is no row available and the port is or becomes completed.

    Note that the return type is ValueTask<TResult>, which reduces allocations. In the rare scenarios where a Task<TResult> is needed instead, this can be created with AsTask().

    Declaration
    public ValueTask<TInput> PeekRowAsync()
    Returns
    Type Description
    ValueTask<TInput>

    TakeBuffer()

    Take at most RowsPerBuffer currently available rows from the upstream worker as an enumerable.

    This method does not ensure the input port is seeing the latest rows upstream has sent. You should normally only call this method when you know there are rows available, e.g. after HasRow or HasRowAsync() has returned true (which signifies rows are available).

    Note that the returned enumerable can only and must be enumerated once, which will consume it. This also includes counting methods (such as Any<TSource>(IEnumerable<TSource>) or Count<TSource>(IEnumerable<TSource>)), which also consume the enumerable.

    Declaration
    public IEnumerable<TInput> TakeBuffer()
    Returns
    Type Description
    IEnumerable<TInput>

    An enumerable with at most RowsPerBuffer currently available rows. The enumerable will be empty if there are no currently available rows.

    TakeBufferAsync(TInput[])

    Wait asynchronously (if needed) until there are rows available (or upstream completes), and take at most RowsPerBuffer rows into an array.

    Note that this method does not require ensuring sufficient rows are available before calling it.

    Declaration
    public ValueTask<int> TakeBufferAsync(TInput[] array)
    Parameters
    Type Name Description
    TInput[] array

    The array to put rows in.

    Returns
    Type Description
    ValueTask<Int32>

    Number of rows taken, which can be smaller than the length of the array. Returns 0 if no rows could be taken, in which case this input port will be completed (assuming the array length was greater than 0).

    Note that the return type is ValueTask<TResult>, which reduces allocations. In the rare scenarios where a Task<TResult> is needed instead, this can be created with AsTask().

    Exceptions
    Type Condition
    ArgumentNullException

    array

    TakeBufferAsync(TInput[], Int32)

    Wait asynchronously (if needed) until there are rows available (or upstream completes), and take at most RowsPerBuffer rows into an array.

    Note that this method does not require ensuring sufficient rows are available before calling it.

    Declaration
    public ValueTask<int> TakeBufferAsync(TInput[] array, int maxCount)
    Parameters
    Type Name Description
    TInput[] array

    The array to put rows in.

    Int32 maxCount

    The maximum number of rows to take. If 0 or negative, no rows will be taken. The actual number of rows taken will be automatically limited to the smallest number of: all remaining upstream rows, maxCount, and what will fit in the supplied array.

    Returns
    Type Description
    ValueTask<Int32>

    Number of rows taken, which can be smaller than maxCount. Returns 0 if no rows could be taken, in which case this input port will be completed (assuming maxCount was greater than 0).

    Note that the return type is ValueTask<TResult>, which reduces allocations. In the rare scenarios where a Task<TResult> is needed instead, this can be created with AsTask().

    Exceptions
    Type Condition
    ArgumentNullException

    array

    TakeBufferAsync(TInput[], Int32, Int32)

    Wait asynchronously (if needed) until there are rows available (or upstream completes), and take at most RowsPerBuffer rows into an array.

    Note that this method does not require ensuring sufficient rows are available before calling it.

    Declaration
    public ValueTask<int> TakeBufferAsync(TInput[] array, int start, int maxCount)
    Parameters
    Type Name Description
    TInput[] array

    The array to put rows in.

    Int32 start

    The array start index.

    Int32 maxCount

    The maximum number of rows to take. If 0 or negative, no rows will be taken. The actual number of rows taken will be automatically limited to the smallest number of: all remaining upstream rows, maxCount, and what will fit in the supplied array (without wrapping).

    Returns
    Type Description
    ValueTask<Int32>

    Number of rows taken, which can be smaller than maxCount. Returns 0 if no rows could be taken, in which case this input port will be completed (assuming maxCount was greater than 0).

    Note that the return type is ValueTask<TResult>, which reduces allocations. In the rare scenarios where a Task<TResult> is needed instead, this can be created with AsTask().

    Exceptions
    Type Condition
    ArgumentNullException

    array

    ArgumentOutOfRangeException

    Array indexers out of bounds

    TakeRow()

    Returns the next incoming row synchronously.

    Note that this method requires ensuring a row is available before calling it, e.g. by calling HasRowAsync()before calling this method.

    Declaration
    public TInput TakeRow()
    Returns
    Type Description
    TInput

    The next incoming row.

    Exceptions
    Type Condition
    InvalidOperationException

    Input has no row to take, ensure rows are available before calling TakeRow().

    TakeRowAsync()

    Returns the next incoming row asynchronously, if any is or becomes available. Returns null if there is no row available and the port is or becomes completed. Note that the return type is ValueTask<TResult>, which reduces memory allocations.

    Declaration
    public ValueTask<TInput> TakeRowAsync()
    Returns
    Type Description
    ValueTask<TInput>

    TakeRowsFullyBlockingAsync(TInput[])

    Take rows asynchronously from the upstream worker until the supplied array is completely filled (including potentially more than InputPort.RowsPerBuffer rows), or until upstream has completed. This method is very efficient, but must only be used in the following two cases:

    1. Input ports/workers that are already fully blocking, such as the input port of a SortTransform, since the port/worker already requires waiting for all rows.

    2. Input ports on a target worker that is guaranteed to not send any rows to any downstream worker (including via any error output port), since this avoids the risk of dataflow deadlocks.

    To always fill up the receiving buffer (unless the input completes), normally this asynchronous method is used on its own:

    var takeRowsTask = this.TakeRowsFullyBlockingAsync(null);
    var rows = takeRowsTask.IsCompletedSuccessfully
        ? takeRowsTask.Result : await takeRowsTask.ConfigureAwait(false);
    if (rows > 0)
    {
        // Process rows in localBuffer...
    }
    else
    {
        // No more rows, potentially check if input is succeeded or failed
    }

    For non-blocking and partially blocking ports on workers that send rows downstream, instead either take rows one by one, or use the TakeBufferAsync(TInput[]) or its overloads to avoid waiting with rows in the buffer which can delay other workers from progressing and increase memory use.

    Note that this method does not require ensuring sufficient rows are available before calling it.

    Declaration
    public ValueTask<int> TakeRowsFullyBlockingAsync(TInput[] array)
    Parameters
    Type Name Description
    TInput[] array

    The array to put rows in.

    Returns
    Type Description
    ValueTask<Int32>

    Number of rows taken, which can be smaller than the length of the array. Returns 0 if no rows could be taken, in which case this input port will be completed (assuming the array length was greater than 0).

    Note that the return type is ValueTask<TResult>, which reduces allocations. In the rare scenarios where a Task<TResult> is needed instead, this can be created with AsTask().

    Exceptions
    Type Condition
    ArgumentNullException

    array

    TryPeekRow()

    Returns the next incoming row synchronously, if there is one available, without removing it from the incoming buffer. Returns null if there is no row available.

    This method does not ensure the input port is seeing the latest rows upstream has sent. You should normally call PeekRowAsync() if this method returns null.

    Declaration
    public TInput TryPeekRow()
    Returns
    Type Description
    TInput

    TryTakeBuffer(TInput[])

    Take currently available rows into an array, at most RowsPerBuffer rows.

    The actual number of rows taken will be automatically limited to the smallest of: upstream rows currently available, RowsPerBuffer, and what will fit in the supplied array.

    Declaration
    public int TryTakeBuffer(TInput[] array)
    Parameters
    Type Name Description
    TInput[] array

    The array to put rows in.

    Returns
    Type Description
    Int32

    Number of rows taken, which can be smaller than the array length. Returns 0 if no rows could be taken.

    Exceptions
    Type Condition
    ArgumentNullException

    array

    TryTakeBuffer(TInput[], Int32, Int32)

    Take currently available rows into an array, at most RowsPerBuffer rows.

    Declaration
    public int TryTakeBuffer(TInput[] array, int start, int maxCount)
    Parameters
    Type Name Description
    TInput[] array

    The array to put rows in.

    Int32 start

    The array start index.

    Int32 maxCount

    The maximum number of rows to take. If 0 or negative, no rows will be taken. The actual number of rows taken will be automatically limited to the smallest of: upstream rows currently available, RowsPerBuffer, and what will fit in the supplied array (without wrapping).

    Returns
    Type Description
    Int32

    Number of rows taken, which can be smaller than maxCount. Returns 0 if no rows could be taken.

    Exceptions
    Type Condition
    ArgumentNullException

    array

    ArgumentOutOfRangeException

    Array indexers out of bounds

    TryTakeRow()

    Returns the next incoming row synchronously, if there is one available. Returns null if there is no row available.

    This method does not ensure the input port is seeing the latest rows upstream has sent. You should normally call TakeRowAsync() if this method returns null.

    Declaration
    public TInput TryTakeRow()
    Returns
    Type Description
    TInput

    See Also

    InputPort
    InputPortState
    InputPortCollection
    OutputPort<TOutput>
    In This Article
    Back to top Copyright © 2023 Envobi Ltd