Dataflow Rows
Each row that is sent between two ports is an instance of a class
, with public
fields
and properties (a.k.a. columns) to hold the data values being sent.
This class is a reusable schema that multiple workers and their ports can use for sending
and processing the rows. The class must have a parameterless constructor, so that the
system can instantiate rows when needed.
E.g., this class can be used as a row with two columns:
public class Category
{
public int CategoryId;
public string CategoryName { get; set; };
}
Row Creation
- Source workers allocate data row instances and set their column values, before sending them to downstream workers
- Transform workers can modify incoming data row instances, and/or allocate new data row instances, before passing them on to downstream workers
- Target workers only consume incoming data row instances
Note
Using an error output port also requires passing on incoming data row instances and/or allocating and setting column values of new row instances.
Most dataflow workers handle the allocation of rows internally, without any coding needed by the library user. A few dataflow workers do however allow or require the developer to allocate rows, as in this trivial example that allocates, initializes and outputs a single new row:
var source = new RowsActionSource<Category>(parentWorker, "Generate Row", ras =>
{
var row = new SmallClass() { CategoryId = 1, CategoryName = "Food" };
ras.Output.SendRow(row);
return ProgressStatus.Succeeded;
});
If the developer is creating the rows, consider adding one or more constructors to the row class to calculate and initialize columns, especially if the rows are created by more than one worker. E.g.:
public class Category
{
public int CategoryId;
public string CategoryName;
public int CategoryLCName => CategoryName.ToLowerInvariant();
public Category(int CategoryId, string CategoryName)
{
CategoryId = categoryId;
CategoryName = categoryName;
}
}
// ...
var row = new SmallClass(1, "Food");
Note
By using properties (instead of fields) in the class, you can also create rows using Object Initializers, without explicitly creating a constructor. Note though that object initializers are less appropriate for any properties in Column Schemas, since that leads to extra copies being made during initialization.
Row Type
The row type specified to dataflow workers is normally a class
, but can also be an
interface
, which can further help writing reusable code in some scenarios. Specifying a
struct
as a row type will generate a compile time error.
Note
While a struct
can implement an interface
in .NET, using a struct
via an
interface
as a row type is not supported in actionETL.
Row Type Inheritance
The row type (or schema) can optionally have a base class, whose fields will be treated the same as any fields declared in the derived class. This allows creating multiple new (derived) row schemas based on a base class row schema, thereby reducing code duplication and simplifying maintenance. For large schemas, this becomes hugely beneficial.
This approach is particularly appropriate when a downstream worker only uses a subset of the columns that its upstream worker(s) use, since upstream (derived) instances can be automatically (i.e. via implicit) cast to downstream (base) instances.
In the following example:
ProductFull
inherits fromProductCore
, and will therefore have five columns, which is what the source sends- The transform uses the base class
ProductCore
for its output, so rows will be automatically cast toProductCore
- The target doesn't have to explicitly exclude the unwanted
ProductFull
columns (and doesn't even have access to them short of explicitly casting back toProductFull
)
using actionETL;
using actionETL.Adb;
using actionETL.Adb.SqlClientExternal;
using actionETL.FileHelper;
using FileHelpers;
using System;
public static partial class DataflowRowTypeInheritance
{
public class ProductCore
{
public int ProductId { get; set; }
public string ProductName { get; set; }
public double ProductWeight { get; set; }
}
[DelimitedRecord(",")]
public class ProductFull : ProductCore
{
public string ProductCategory { get; set; }
public string ProductSubcategory { get; set; }
}
public static SystemOutcomeStatus RunExample()
{
return new WorkerSystem("Stage ProductCore")
.Root(ws =>
{
var provider = AdbSqlClientProvider.Get();
new FileHelperFileSource<ProductFull>(ws, "Read ProductFull"
, @"Src/DataflowRowTypeInheritance/ProductFull.csv")
.Output.Link.RowActionTransform2<ProductFull, ProductCore>(
"Round ProductWeight", row =>
{
row.ProductWeight = Math.Round(row.ProductWeight, 2);
return (TransformRowTreatment.Send, row);
})
.Output.Link.AdbInsertTarget("Insert ProductCore"
, provider.CreateConnectionBuilder(ws.Config["SqlServer"])
, "dbo.ProductCore");
})
.Start();
}
}
/* The example assumes the following table already exists:
CREATE TABLE [dbo].[ProductCore]
(
[ProductId] INT NOT NULL PRIMARY KEY,
[ProductName] NCHAR(50) NOT NULL,
[ProductWeight] FLOAT NULL
)
*/
Note
Row schemas can also be combined and reused using struct
, which provides more
flexibility in constructing the schema. See
Column Schemas for details.
Row Ownership
At any time, a row (and its column values) must be 'owned' by at most a single worker, which is the worker that currently holds a reference to the row (or to any of its reference type column values). To accomplish this, all workers must follow these rules:
Important
- After a worker has sent a row to downstream workers, it must not retain any reference to that row, or to any of its reference type column values
- When a row or column is duplicated, it must be done as a deep copy
Rule 2 means that:
- For whole rows, it is allowed to (optionally modify and) pass an input row once on to a single output port. To pass the same data additional times to the same or a different output port, the input row must first be duplicated using deep copy.
- For individual columns in a row, it is allowed to (optionally modify and) copy
an input column once to a single output column. To copy the same input column data to
additional output columns (in the same or a different output row), the input column
must first be duplicated using deep copy.
- Technically, deep copy is not needed for column types that are either
immutable (e.g.
string
) or are value types without any embedded instance references (int
,DateTime
etc.) The mapping facilities uses the correct and fastest approach for each type.
- Technically, deep copy is not needed for column types that are either
immutable (e.g.
Note
If two workers did hold references to the same row or column, changes to the row would be seen by both workers, they could overwrite each others changes, and could easily corrupt the row.
actionETL includes helper facilities that makes it easy to implement these rules, see Dataflow Column Mapping for details.