Class InputPort
Non-generic base class for dataflow input ports, containing the functionality that doesn't depend on the type of the row class.
The library user does not derive new port classes directly from
InputPort, but instead creates or uses instances of the derived class
InputPort<TInput>.
Note: As per normal, unless otherwise noted, the instance members are not thread-safe, and should only be accessed from the worker that the port belongs to.
Namespace: actionETL
Assembly: actionETL.dll
Syntax
public abstract class InputPort
Properties
BufferCapacity
Deprecated, use RowsPerBuffer instead.
Gets or sets the port buffer capacity, which is the recommended and maximum number of rows when taking rows synchronously in batches (instead of either asynchronously or row by row).
Increasing BufferCapacity can improve performance, but also consumes
more memory. Setting BufferCapacity too large should be avoided,
since it can cause the application to swap large amounts of memory out to disk,
leading to drastically reduced performance.
You can set the value with the AConfig configuration,
using the InputPortRowsPerBuffer key,
before the port is created.
Alternatively, set this property before the worker or its siblings run,
which will also override any AConfig setting.
The value is automatically rounded up to the nearest power of two.
Any value less than 1 will be set to the default value.
Also see PortBufferingMode, and Dataflow Blocking and Row Buffering.
Note: Reading the property is thread-safe, and any one thread can set it. Multiple threads (or workers) setting it simultaneously is not supported.
Declaration
[Obsolete("use RowsPerBuffer instead.")]
public int BufferCapacity { get; set; }
Property Value
| Type | Description |
|---|---|
| Int32 | The buffer capacity as number of rows. |
Exceptions
| Type | Condition |
|---|---|
| InvalidOperationException | Cannot set RowsPerBuffer after workers run. |
BufferingMode
Gets or sets the row buffering strategy an input port uses, i.e. whether the input port only buffers a smaller amount of rows to minimize the time the worker is stalled for input, or allows full buffering for input ports on workers that could otherwise dead-lock (typically having multiple inter-dependent inputs, e.g. MergeSortedTransform<TInputOutput>).
Defaults to Default, where the actual mode will be picked when the worker starts. The buffering mode can be set explicitly, although any attempt to change Full, as well as any attempt to change Limited to anything except Full, will be silently ignored.
Also see PortBufferingMode, and Dataflow Blocking and Row Buffering.
Note: Reading the property is thread-safe, and any one thread can set it. Multiple threads (or workers) setting it simultaneously is not supported.
Declaration
public PortBufferingMode BufferingMode { get; set; }
Property Value
| Type | Description |
|---|---|
| PortBufferingMode | Get or set the buffering mode. It cannot be set after either of the upstream or downstream ports become active. |
Exceptions
| Type | Condition |
|---|---|
| InvalidOperationException |
|
HasRow
Returns true if this port has at least one row available (irrespective of
the port state); otherwise, false.
Declaration
public bool HasRow { get; }
Property Value
| Type | Description |
|---|---|
| Boolean |
LinkedFrom
Returns the upstream output (or error output) port this input port is linked
from, or null if it is not linked.
Note: This property is thread-safe.
Declaration
public OutputPortBase LinkedFrom { get; }
Property Value
| Type | Description |
|---|---|
| OutputPortBase |
Locator
Gets the locator string, which represents where in the worker hierarchy this
input port resides. It consists of the names of the worker system and
all ancestor workers, separated by /, followed by the port type and
its name, e.g.:
/Extract Data/Extract Products/Read CSV Product File.Inputs[LeftInput].
Note: This property is thread-safe.
Declaration
public string Locator { get; }
Property Value
| Type | Description |
|---|---|
| String | The input port locator string. |
Logger
Gets the logger instance for this input port, which should be used for
logging messages related to this port. The logger's name is set to the
Locator string of the input port, which will appear in the logged output.
This property is thread-safe.
Note: This property is thread-safe.
Declaration
public ALog Logger { get; }
Property Value
| Type | Description |
|---|---|
| ALog |
Name
Gets the input port name.
Note: This property is thread-safe.
Declaration
public string Name { get; }
Property Value
| Type | Description |
|---|---|
| String |
RowsPerBuffer
Gets or sets the port buffer capacity, which is the recommended and maximum
number of rows when sending or taking rows to or from the ports
synchronously in batches.
Note that you both get and set this value on the input port; on the
output port you can only get the value.
See Port RowsPerBuffer and WorkerParent.BytesPerRowBuffer
for full details.
By default and when this property is not set, RowsPerBuffer is automatically calculated
by dividing BytesPerRowBuffer with an upfront estimate of the
average row size in bytes. The calculation is then skewed to reduce the likelihood of
using very small buffers (64 rows or less).
In most cases, this approach results in a good balance
between dataflow memory use, buffering to avoid row starvation, and speed of
dataflow transfers.
Increasing RowsPerBuffer can improve performance, but also consumes
more memory. Setting RowsPerBuffer too large should be avoided,
since it can cause the application to swap large amounts of memory out to disk,
leading to drastically reduced performance.
You can set the value with the AConfig configuration,
using the InputPortRowsPerBuffer key,
before the port is created, either as a global setting, or for a specific
input port.
Alternatively, set this property before the worker or its siblings run,
which will also override any AConfig setting.
The value is automatically rounded up to the nearest power of two.
Any value less than 1 will be set to the default value.
Note: This property is thread-safe.
Declaration
public int RowsPerBuffer { get; set; }
Property Value
| Type | Description |
|---|---|
| Int32 | The buffer capacity as number of rows. |
Exceptions
| Type | Condition |
|---|---|
| InvalidOperationException | Cannot set RowsPerBuffer after workers run. |
RowsTaken
Gets the number of rows taken from this input port. Note: This property is only thread-safe for other workers etc. to read after the port has completed.
Declaration
public long RowsTaken { get; }
Property Value
| Type | Description |
|---|---|
| Int64 |
State
Gets the input port state.
Note: This property is thread-safe. Also note however that unless the port is in a final state, then the state can change at any time.
Declaration
public InputPortState State { get; }
Property Value
| Type | Description |
|---|---|
| InputPortState |
Status
Gets the progress status of the port, with descriptive information if the port failed. Also see State.
Note: This property is thread-safe. Also note however that unless the port is in a completed state, then the status can change at any time.
Declaration
public ProgressStatus Status { get; }
Property Value
| Type | Description |
|---|---|
| ProgressStatus |
Worker
Gets the worker instance this port belongs to.
Note: This property is thread-safe.
Declaration
public WorkerBase Worker { get; }
Property Value
| Type | Description |
|---|---|
| WorkerBase |
Methods
HasRowAsync()
Waits (if needed) until there is at least one available row, or the port is completed.
Note that this method should be used also when waiting for multiple rows with non-blocking and partially blocking ports, since this avoids waiting with rows in the buffer which could otherwise delay other ports and workers from progressing, and also increase memory use.
Declaration
public Task<bool> HasRowAsync()
Returns
| Type | Description | ||||
|---|---|---|---|---|---|
| Task<Boolean> |
|
SkipAllRowsAsync()
Discard all incoming rows asynchronously until completed.
Note that this method does not require ensuring sufficient rows are available before calling it.
Declaration
public async Task<bool> SkipAllRowsAsync()
Returns
| Type | Description |
|---|---|
| Task<Boolean> |
|
SkipRow()
Skip one incoming row synchronously.
Note that this method requires ensuring a row is available before calling it, e.g. by calling HasRow before calling this method.
Declaration
public void SkipRow()
Exceptions
| Type | Condition |
|---|---|
| InvalidOperationException | Input has no row to skip. |
SkipRowAsync()
Skip one incoming row asynchronously, or return if there are no rows to skip and the port is completed.
Note that this method does not require ensuring sufficient rows are available before calling it.
Declaration
public Task<bool> SkipRowAsync()
Returns
| Type | Description |
|---|---|
| Task<Boolean> |
|
ToLongString()
Returns a verbose description of this instance.
Note that this method is not thread-safe.
Declaration
public string ToLongString()
Returns
| Type | Description |
|---|---|
| String |
ToString()
Returns a description of this instance.
Note: This method is thread-safe.
Declaration
public override string ToString()
Returns
| Type | Description |
|---|---|
| String |
Overrides
TryCancel()
If the input port is not already canceled or completed, cancels it, and requests cancellation of the upstream output (or error output) port, which must stop sending data and clean up resources when it detects the cancellation. When the method returns, the input port state is either canceled or completed.
The caller should not take or skip any rows from the input port after this method returns. It is also not necessary to wait for the canceled port to complete before completing the worker (normally with a failure status), the system will do this automatically. Also see actionETL.InputPortCollection.TryCancel(System.Boolean).
Note that calling this method does not by itself cancel the worker system.
Declaration
public bool TryCancel()
Returns
| Type | Description |
|---|---|
| Boolean |
|
Exceptions
| Type | Condition |
|---|---|
| InvalidOperationException | Port must be linked before canceling. |
TrySkipRow()
Tries to skip one incoming row.
This method does not ensure the input port is seeing the latest rows
upstream has sent. You should normally call SkipRowAsync() if this
method returns false.
Declaration
public bool TrySkipRow()
Returns
| Type | Description |
|---|---|
| Boolean |
|