Class InputPort<TInput>
Generic class for dataflow input ports, containing the functionality that depends on the data type of the row class.
The library user can use this class via workers that already have input ports present, as well as create and add input ports to workers by using Create<TInput>(String, OutputPortBase<TInput>).
The library user does not derive new port classes from this class.
Note: As per normal, unless otherwise noted, the instance members are not thread-safe, and should only be used from the worker that the port belongs to.
Inherited Members
Namespace: actionETL
Assembly: actionETL.dll
Syntax
public sealed class InputPort<TInput> : InputPort where TInput : class
Type Parameters
Name | Description |
---|---|
TInput | The type of the input rows. |
Properties
LinkedFrom
Gets the upstream output (or error output) port this input port is linked from,
or null
if it is not linked.
Note: This property is thread-safe.
Declaration
public OutputPortBase<TInput> LinkedFrom { get; }
Property Value
Type | Description |
---|---|
OutputPortBase<TInput> |
Methods
LinkFrom(OutputPortBase<TInput>)
Link this input port from the specified upstream output (or error output) port, which must have the same row type.
Note that all input and regular output ports must be linked to a single port when its parent worker starts. For error output ports, linking is optional.
After linking, ports cannot be relinked. It is however possible to remove all child workers from a parent worker with RemoveChildren(), and recreate them with different links.
When linking from an ErrorOutputPort<TError>, the MaxRowsBeforeError and MaxRowsSent properties can limit the number of allowed error rows.
Note: This method is thread-safe.
Declaration
public void LinkFrom(OutputPortBase<TInput> outputPort)
Parameters
Type | Name | Description |
---|---|---|
OutputPortBase<TInput> | outputPort | The upstream output (or error output) port to link from. |
Exceptions
Type | Condition |
---|---|
ArgumentNullException |
|
InvalidOperationException |
|
PeekRow()
Returns the next incoming row synchronously, without removing it from the incoming buffer.
Note that this method requires ensuring a row is available before
calling it, e.g. by calling HasRowAsync()
before calling this
method and checking that the port is not completed.
Declaration
public TInput PeekRow()
Returns
Type | Description |
---|---|
TInput |
Exceptions
Type | Condition |
---|---|
InvalidOperationException | Input has no row to peek, ensure rows are available before calling. |
PeekRowAsync()
Returns the next incoming row asynchronously, if any is or becomes available,
without removing it from the incoming buffer. Returns null
if there is
no row available and the port is or becomes completed.
Note that the return type is ValueTask<TResult>, which reduces allocations. In the rare scenarios where a Task<TResult> is needed instead, this can be created with AsTask().
Declaration
public ValueTask<TInput> PeekRowAsync()
Returns
Type | Description |
---|---|
ValueTask<TInput> |
TakeBuffer()
Take at most RowsPerBuffer currently available rows from the upstream worker as an enumerable.
This method does not ensure the input port is seeing the latest rows
upstream has sent. You should normally only call this method when you
know there are rows available, e.g. after HasRow
or HasRowAsync() has returned true
(which signifies
rows are available).
Note that the returned enumerable can only and must be enumerated once, which will consume it. This also includes counting methods (such as Any<TSource>(IEnumerable<TSource>) or Count<TSource>(IEnumerable<TSource>)), which also consume the enumerable.
Declaration
public IEnumerable<TInput> TakeBuffer()
Returns
Type | Description |
---|---|
IEnumerable<TInput> | An enumerable with at most RowsPerBuffer currently available rows. The enumerable will be empty if there are no currently available rows. |
TakeBufferAsync(TInput[])
Wait asynchronously (if needed) until there are rows available (or upstream completes), and take at most RowsPerBuffer rows into an array.
Note that this method does not require ensuring sufficient rows are available before calling it.
Declaration
public ValueTask<int> TakeBufferAsync(TInput[] array)
Parameters
Type | Name | Description |
---|---|---|
TInput[] | array | The array to put rows in. |
Returns
Type | Description |
---|---|
ValueTask<Int32> | Number of rows taken, which can be smaller than the length of the array.
Returns Note that the return type is ValueTask<TResult>, which reduces allocations. In the rare scenarios where a Task<TResult> is needed instead, this can be created with AsTask(). |
Exceptions
Type | Condition |
---|---|
ArgumentNullException |
|
TakeBufferAsync(TInput[], Int32)
Wait asynchronously (if needed) until there are rows available (or upstream completes), and take at most RowsPerBuffer rows into an array.
Note that this method does not require ensuring sufficient rows are available before calling it.
Declaration
public ValueTask<int> TakeBufferAsync(TInput[] array, int maxCount)
Parameters
Type | Name | Description |
---|---|---|
TInput[] | array | The array to put rows in. |
Int32 | maxCount | The maximum number of rows to take. If |
Returns
Type | Description |
---|---|
ValueTask<Int32> | Number of rows taken, which can be smaller than Note that the return type is ValueTask<TResult>, which reduces allocations. In the rare scenarios where a Task<TResult> is needed instead, this can be created with AsTask(). |
Exceptions
Type | Condition |
---|---|
ArgumentNullException |
|
TakeBufferAsync(TInput[], Int32, Int32)
Wait asynchronously (if needed) until there are rows available (or upstream completes), and take at most RowsPerBuffer rows into an array.
Note that this method does not require ensuring sufficient rows are available before calling it.
Declaration
public ValueTask<int> TakeBufferAsync(TInput[] array, int start, int maxCount)
Parameters
Type | Name | Description |
---|---|---|
TInput[] | array | The array to put rows in. |
Int32 | start | The array start index. |
Int32 | maxCount | The maximum number of rows to take. If |
Returns
Type | Description |
---|---|
ValueTask<Int32> | Number of rows taken, which can be smaller than Note that the return type is ValueTask<TResult>, which reduces allocations. In the rare scenarios where a Task<TResult> is needed instead, this can be created with AsTask(). |
Exceptions
Type | Condition |
---|---|
ArgumentNullException |
|
ArgumentOutOfRangeException | Array indexers out of bounds |
TakeRow()
Returns the next incoming row synchronously.
Note that this method requires ensuring a row is available before calling
it, e.g. by calling HasRowAsync()
before calling this method.
Declaration
public TInput TakeRow()
Returns
Type | Description |
---|---|
TInput | The next incoming row. |
Exceptions
Type | Condition |
---|---|
InvalidOperationException | Input has no row to take, ensure rows are available before calling TakeRow(). |
TakeRowAsync()
Returns the next incoming row asynchronously, if any is or becomes available.
Returns null
if there is no row available and the port is or becomes completed.
Note that the return type is ValueTask<TResult>, which reduces
memory allocations.
Declaration
public ValueTask<TInput> TakeRowAsync()
Returns
Type | Description |
---|---|
ValueTask<TInput> |
TakeRowsFullyBlockingAsync(TInput[])
Take rows asynchronously from the upstream worker until the supplied array is
completely filled (including potentially more than InputPort.RowsPerBuffer
rows), or until upstream has completed.
This method is very efficient, but must only be used in the following
two cases:
1. Input ports/workers that are already fully blocking, such as the
input port of a SortTransform
, since the port/worker already
requires waiting for all rows.
2. Input ports on a target worker that is guaranteed to not send any rows to any downstream worker (including via any error output port), since this avoids the risk of dataflow deadlocks.
To always fill up the receiving buffer (unless the input completes), normally this asynchronous method is used on its own:
var takeRowsTask = this.TakeRowsFullyBlockingAsync(null);
var rows = takeRowsTask.IsCompletedSuccessfully
? takeRowsTask.Result : await takeRowsTask.ConfigureAwait(false);
if (rows > 0)
{
// Process rows in localBuffer...
}
else
{
// No more rows, potentially check if input is succeeded or failed
}
For non-blocking and partially blocking ports on workers that send rows downstream, instead either take rows one by one, or use the TakeBufferAsync(TInput[]) or its overloads to avoid waiting with rows in the buffer which can delay other workers from progressing and increase memory use.
Note that this method does not require ensuring sufficient rows are available before calling it.
Declaration
public ValueTask<int> TakeRowsFullyBlockingAsync(TInput[] array)
Parameters
Type | Name | Description |
---|---|---|
TInput[] | array | The array to put rows in. |
Returns
Type | Description |
---|---|
ValueTask<Int32> | Number of rows taken, which can be smaller than the length of the array.
Returns Note that the return type is ValueTask<TResult>, which reduces allocations. In the rare scenarios where a Task<TResult> is needed instead, this can be created with AsTask(). |
Exceptions
Type | Condition |
---|---|
ArgumentNullException |
|
TryPeekRow()
Returns the next incoming row synchronously, if there is one available, without
removing it from the incoming buffer. Returns null
if there is no row available.
This method does not ensure the input port is seeing the latest rows
upstream has sent. You should normally call PeekRowAsync()
if this
method returns null
.
Declaration
public TInput TryPeekRow()
Returns
Type | Description |
---|---|
TInput |
TryTakeBuffer(TInput[])
Take currently available rows into an array, at most RowsPerBuffer rows.
The actual number of rows taken will be automatically limited to the smallest of:
upstream rows currently available, RowsPerBuffer
, and what will fit
in the supplied array.
Declaration
public int TryTakeBuffer(TInput[] array)
Parameters
Type | Name | Description |
---|---|---|
TInput[] | array | The array to put rows in. |
Returns
Type | Description |
---|---|
Int32 | Number of rows taken, which can be smaller than the array length.
Returns |
Exceptions
Type | Condition |
---|---|
ArgumentNullException |
|
TryTakeBuffer(TInput[], Int32, Int32)
Take currently available rows into an array, at most RowsPerBuffer rows.
Declaration
public int TryTakeBuffer(TInput[] array, int start, int maxCount)
Parameters
Type | Name | Description |
---|---|---|
TInput[] | array | The array to put rows in. |
Int32 | start | The array start index. |
Int32 | maxCount | The maximum number of rows to take. If |
Returns
Type | Description |
---|---|
Int32 | Number of rows taken, which can be smaller than |
Exceptions
Type | Condition |
---|---|
ArgumentNullException |
|
ArgumentOutOfRangeException | Array indexers out of bounds |
TryTakeRow()
Returns the next incoming row synchronously, if there is one available.
Returns null
if there is no row available.
This method does not ensure the input port is seeing the latest rows
upstream has sent. You should normally call TakeRowAsync()
if this
method returns null
.
Declaration
public TInput TryTakeRow()
Returns
Type | Description |
---|---|
TInput |