Custom Dataflow Column Mapping
This article describes the rules workers must adhere to and the facilities available
for mapping, copying, and comparing dataflow columns and rows when implementing
new workers, or performing the same tasks in ad-hoc *Action* dataflow workers.
Also see Dataflow Column Mapping and Compare Dataflow Columns for how to use existing workers that take advantage of these facilities.
Note
Column name matching should be implemented as ordinal case insensitive, while giving a case sensitive match precedence over a case insensitive match. In the examples below, this is handled automatically by out-of-box functionality in FromTypeRowMapper<TFrom>, RowComparer<T> etc.
Row Ownership
At any time, a row (and its column values) must be 'owned' by at most a single worker, which is the worker that currently holds a reference to the row (or to any of its reference type column values). To accomplish this, all workers must follow these rules:
Important
- After a worker has sent a row to downstream workers, it must not retain any reference to that row, or to any of its reference type column values
- When a row or column is duplicated, it must be done as a deep copy
Rule 2 means that:
- For whole rows, it is allowed to (optionally modify and) pass an input row once on to a single output port. To pass the same data additional times to the same or a different output port, the input row must first be duplicated using deep copy.
- For individual columns in a row, it is allowed to (optionally modify and) copy
an input column once to a single output column. To copy the same input column data to
additional output columns (in the same or a different output row), the input column
must first be duplicated using deep copy.
- Technically, deep copy is not needed for column types that are either
immutable (e.g.
string) or are value types without any embedded instance references (int,DateTimeetc.) The mapping facilities automatically use the correct and fastest approach for each type.
- Technically, deep copy is not needed for column types that are either
immutable (e.g.
Note
If two workers did hold references to the same row or column, changes to the row would be seen by both workers, they could overwrite each others changes, and could easily corrupt the row.
TypeRowSchema and FlatRowSchema
These classes contain information about the columns in a dataflow row and their data types, or the columns of an external data source. This is useful for working with column names and their types, e.g. for implementing column mapping directly. More commonly, the user access these via higher level constructs such as TypeColumnCopier<TFrom, TTo>.
Use TypeRowSchema when you have access to the required .NET CLR type, since it can populate the instance automatically from the type. Otherwise use FlatRowSchema and specify columns explicitly.
Mapping and Copying
The mapping and copying facilities:
- Allow mapping to and from rows of the same or different types
- Map based on column name or ordinal position
- Can automatically map names
TypeColumnCopiercan create a high performance copying method, including using deep copy (adhering to the second Row Ownership rule above) as needed
You can add this functionality to the dataflow workers you develop yourself to make them easy to configure, using one of:
- FromTypeRowMapper<TFrom>, FromTypeColumnMapper<TFrom> - map from dataflow columns (in a .NET CLR types) to a list of names
- ToTypeRowMapper<TTo>, ToTypeColumnMapper<TTo> - map from a list of names to dataflow columns (in a .NET CLR types)
- TypeRowMapper<TFrom, TTo>, TypeColumnMapper<TFrom, TTo> - map from dataflow columns to dataflow columns, i.e. between .NET CLR types
- TypeColumnCopier<TFrom, TTo> - Copy dataflow columns between to .NET CLR types
This custom MyCopyTransform worker copies specified (defaulting to auto-mapped, in this
case "Sales") columns from an input port to an output port, which would be used when changing
the row type and copying columns that both row types support:
using actionETL;
using actionETL.Logging;
using System;
public class MyCopyTransform<TInputError, TOutput>
: RowTransformBase<MyCopyTransform<TInputError, TOutput>
, TInputError, TOutput>
where TInputError : class
where TOutput : class, new()
{
private readonly Func<TInputError, TOutput> _cloneFunc;
internal MyCopyTransform(
in DownstreamFactory<TInputError> downstreamFactory
, string workerName
, Action<IColumnMapperCommand> columnMapperCommandAction = null
)
: base(downstreamFactory, workerName, true)
{
// Create a column copier, defaulting to auto-mapping columns by name
var columnCopier = new TypeColumnCopier<TInputError, TOutput>(
columnMapperCommandAction ?? (rcc => rcc.AutoName()));
_cloneFunc = columnCopier.CreateDeepCloneFunc();
// Log column mappings to aid log analysis
Logger.Info(ALogCategory.RowColumnMapperMappings
, columnCopier.Mappings.ToStringMappedUnmapped());
}
protected override (TransformRowTreatment, TOutput) OnInputRow(TInputError inputRow)
{
// Clone specified columns, and send to output port
return (TransformRowTreatment.Send, _cloneFunc(inputRow));
}
}
public static class MyCopyTransformFactory // Define factory method(s)
{
public static MyCopyTransform<TInputError, TOutput>
MyCopyTransform<TInputError, TOutput>(
in this DownstreamFactory<TInputError> downstreamFactory
, string workerName
, Action<IColumnMapperCommand> columnMapperCommandAction = null
)
where TInputError : class
where TOutput : class, new()
{
return new MyCopyTransform<TInputError, TOutput>(
downstreamFactory
, workerName
, columnMapperCommandAction
);
}
}
public class MySales
{
public decimal Sales { get; set; }
}
public class MySalesBudget
{
public decimal Budget { get; set; }
public decimal Sales { get; set; }
}
// ...
// Create MySalesBudget rows, copying "Sales" column from the incoming records. It logs:
// "Summary(1 member maps: 1 of 1 from columns, 1 of 2 to columns). Mapped(Sales>Sales).
// Unmapped-from(). Unmapped-to(Budget)."
var transform = source.Output.Link
.MyCopyTransform<MySales, MySalesBudget>("transform");
// ...
Duplicating Rows
Whole dataflow rows can be duplicated using CreateCopyAction(Boolean) or CreateDeepCloneFunc(), i.e. a new row instance is allocated (using the required parameterless constructor), and all supported columns are (deep) copied to the new row instance.
Comparing
The RowCompare facilities:
- Allow comparing two rows by comparing one or more columns
- The row can have the same or different types
- Columns can be addressed based on name or ordinal position
- By default, string comparisons use
- Column comparisons can also be made by specifying a StringComparer (which has several predefined string comparers), or a Comparison<T> (which provides complete freedom in defining how a particular column is compared)
- Can create a high performance compare method
You can add this functionality to the dataflow workers you develop yourself to make them easy to configure. The implementation pattern is similar to the mapping and copying example above:
- Compare two rows of the same type (as used by SortTransform<TInputOutput>):
- RowComparer<T> - creates the Comparison<T> or Comparer<T>
- IRowComparerCommand<T> - specify columns to include and how to compare them
- Compare two rows of different type (as used by InnerJoinMergeSortedTransform<TLeftInput, TRightInput, TOutput>):
- RowComparer<TLeft, TRight> - creates the Comparison<TLeft, TRight>
- IRowComparerCommand<TLeft, TRight> - specify columns to include and how to compare them
Note
- The default comparison for string columns is case sensitive and uses the current culture
- Do specify explicitly how string columns are compared (e.g. by supplying a
StringComparer), to minimize issues if e.g. the host current culture changes - Please see Best Practices for Using Strings in .NET for details on string comparisons, current culture etc.
See Also
- Dataflow
- Dataflow Rows
- Dataflow Columns
- Dataflow Column Mapping
- Mapping commands: IRowMapperCommand, IColumnMapperCommand
- Row mappers and mapping result:
- Row copier: TypeColumnCopier<TFrom, TTo>
- Compare Dataflow Columns
- Custom Dataflow Workers