Custom Dataflow Aggregations
This article describes implementing custom dataflow aggregations and groupings with the
AggregateTransform
workers. Predefined aggregations are instead described in
Dataflow Aggregations.
Custom Aggregations
Beyond the available predefined aggregation functions, you can also provide the AggregateTransform
workers with your own custom aggregation functions, implemented via one or more of:
- Seed delegate - invoked once for the first row
- Accumulation delegate - invoked once for every row
- Output delegate - invoked once after all rows are processed
There are also several options for implementing grouping:
- Select grouping columns with Create<T>(Action<IGroupByCommand>)
- Calculate a grouping key with Create<T, TKey>(Func<T, TKey>)
- Implement a custom IEqualityComparer<T>
See the API documentation of the factory members below for details on the above options.
Note
All the examples below:
- Are useful for showing how to implement custom aggregation functions, but are more easily available via the predefined aggregation functions, except for the string concatenation
- Use grouping of rows. Pass
null
to the grouping parameter to not perform aGROUP BY
. - Either use the dataflow rows themselves, or the
Accumulation property to store
accumulation values. There is also a CustomData
object
property available, which is useful if the accumulation type is not known in advance.
Custom AggregateTransform1()
AggregateTransform<TInputAccumulateOutput> uses the same row type for input, accumulation and output rows.
Create it with this
AggregateTransformFactory.AggregateTransform1()
overload.
The below example calculates the sum of a column in the incoming rows (which can be done without any seed or output action), grouped by another column, and stores the output in a collection.
// using actionETL;
private class Row
{
public int Category;
public int Metric;
}
private readonly Row[] _sampleData = new Row[]
{
new Row { Category = 1, Metric = 2 }
, new Row { Category = 1, Metric = 2 }
, new Row { Category = 1, Metric = 5 }
, new Row { Category = 2, Metric = 10 }
};
public WorkerSystem RunExample()
{
var workerSystem = new WorkerSystem()
.Root(ws =>
{
new EnumerableSource<Row>(ws, "Source", _sampleData)
.Output.Link.AggregateTransform1(
"Transform"
, ra => // Set seed
ra.Accumulation = new Row { Category = ra.NextRow.Category }
, ra => // Accumulate values for all rows with the same [Category]
ra.Accumulation.Metric += ra.NextRow.Metric
, null // Output defaults to last Accumulation
// Group by column [Category]
, RowEqualityComparer.Create<Row>(g => g.Name(nameof(Row.Category)))
)
// Receives two rows:
// { Category = 1, Metric = 9 }
// { Category = 2, Metric = 10 }
.Output.Link.CollectionTarget();
});
workerSystem.Start().ThrowOnFailure();
return workerSystem;
}
The below example calculates statistics etc. (sum, count, average, minimum, maximum, and string concatenation, several of which requires a seed and/or output action) on columns in the incoming rows, grouped by another column, and stores the output in a collection.
Note
The string concatenation could also be done directly in the Text
field, without using any StringBuilder
.
This would however be very inefficient (due to excessive memory allocation and string copying) for any grouping
with a large number of rows.
// using actionETL;
// using System;
// using System.Collections.Generic;
// using System.Text;
private class Row
{
public int Category;
public int Metric;
public string Text;
public long Count;
public double Average;
public int Min;
public int Max;
public StringBuilder TextStringBuilder;
}
private readonly Row[] _sampleData = new Row[]
{
new Row { Category = 1, Metric = 2, Text = "Tic" }
, new Row { Category = 1, Metric = 2, Text = "Tac" }
, new Row { Category = 1, Metric = 5, Text = "Toe" }
, new Row { Category = 2, Metric = 10, Text = "Hello World" }
};
public WorkerSystem RunExample()
{
var workerSystem = new WorkerSystem()
.Root(ws =>
{
new EnumerableSource<Row>(ws, "Source", _sampleData)
.Output.Link.AggregateTransform1(
"Transform"
, ra => // Set ra.Accumulation members for first row of each new [Category]
{
ra.Accumulation = new Row { Category = ra.NextRow.Category };
ra.Accumulation.Min = ra.Accumulation.Max = ra.NextRow.Metric;
ra.Accumulation.TextStringBuilder =
new StringBuilder(ra.Accumulation.Text);
}
, ra => // Accumulate values for all rows with the same [Category]
{
ra.Accumulation.Metric += ra.NextRow.Metric;
ra.Accumulation.Min = Math.Min(ra.Accumulation.Min, ra.NextRow.Metric);
ra.Accumulation.Max = Math.Max(ra.Accumulation.Max, ra.NextRow.Metric);
if (ra.Count > 0)
ra.Accumulation.TextStringBuilder.Append(", ");
ra.Accumulation.TextStringBuilder.Append(ra.NextRow.Text);
}
, ra =>
{ // Set final output values, [Metric] (i.e. Sum), [Min], [Max] already set
ra.Accumulation.Count = ra.Count;
ra.Accumulation.Average = ra.Accumulation.Metric / ra.Count;
ra.Accumulation.Text = ra.Accumulation.TextStringBuilder.ToString();
ra.Accumulation.TextStringBuilder = null; // Remove collection
return ra.Accumulation;
}
// Group by selecting grouping key [Category]
, RowEqualityComparer.Create<Row, int>(row => row.Category)
)
// Receives two rows:
// { Category = 1, Metric = 9, Count = 3, Average = 3.0, Min = 2, Max = 5,
// Text = "Tic, Tac, Toe" }
// { Category = 2, Metric = 10, Count = 1, Average = 10.0, Min = 10, Max = 10,
// Text = "Hello World" }
.Output.Link.CollectionTarget();
});
workerSystem.Start().ThrowOnFailure();
return workerSystem;
}
Custom AggregateTransform2()
AggregateTransform<TInput, TAccumulateOutput> uses the same row type for accumulation and output rows,
which can be different from the input row type. Create it with this
AggregateTransformFactory.AggregateTransform2()
overload.
The below example calculates statistics (sum, count, average, minimum and maximum, several of which requires a seed and/or output action) on columns in the incoming rows, grouped by another column, and stores the output in a collection.
// using actionETL;
// using System;
// using System.Collections.Generic;
private class In
{
public int Category;
public int Metric;
}
private readonly In[] _sampleData = new In[]
{
new In { Category = 1, Metric = 2 }
, new In { Category = 1, Metric = 2 }
, new In { Category = 1, Metric = 5 }
, new In { Category = 2, Metric = 10 }
};
private class AccOut
{
public int Category;
public double Sum;
public long Count;
public double Average;
public int Min;
public int Max;
}
public WorkerSystem RunExample()
{
var workerSystem = new WorkerSystem()
.Root(ws =>
{
new EnumerableSource<In>(ws, "Source", _sampleData)
.Output.Link.AggregateTransform2<In, AccOut>(
"Transform"
, ra =>
{ // Set ra.Accumulation for first row of each new [Category]
ra.Accumulation = new AccOut
{
Category = ra.NextRow.Category,
Sum = 0,
Min = ra.NextRow.Metric,
Max = ra.NextRow.Metric,
};
}
, ra =>
{ // Accumulate values for all rows with the same [Category]
ra.Accumulation.Sum += ra.NextRow.Metric;
ra.Accumulation.Min = Math.Min(ra.Accumulation.Min, ra.NextRow.Metric);
ra.Accumulation.Max = Math.Max(ra.Accumulation.Min, ra.NextRow.Metric);
}
, ra =>
{ // Set final output values, [Sum], [Min] and [Max] already set
ra.Accumulation.Count = ra.Count;
ra.Accumulation.Average = ra.Accumulation.Sum / ra.Count;
return ra.Accumulation;
}
// Group by column [Category]
, RowEqualityComparer.Create<In>(g => g.Name(nameof(In.Category)))
)
// Receives two rows:
// { Category = 1, Sum = 9, Count = 3, Average = 3.0, Min = 2, Max = 5 }
// { Category = 2, Sum = 10, Count = 1, Average = 10.0, Min = 10, Max = 10 }
.Output.Link.CollectionTarget();
});
workerSystem.Start().ThrowOnFailure();
return workerSystem;
}
Custom AggregateTransform3()
AggregateTransform<TInput, TAccumulate, TOutput> can use different row types for input, accumulation and output rows.
Create it with this
AggregateTransformFactory.AggregateTransform3()
overload.
The below example calculates the sum, the count, and the average of columns in the incoming rows, grouped by another column, and stores the output in a collection.
// using actionETL;
private class Row
{
public int Category;
public int Metric;
}
private readonly Row[] _sampleData = new Row[]
{
new Row { Category = 1, Metric = 2 }
, new Row { Category = 1, Metric = 4 }
, new Row { Category = 2, Metric = 10 }
};
private class Out
{
public int Category;
public int Sum;
public long Count;
public double Average;
}
public WorkerSystem RunExample()
{
var workerSystem = new WorkerSystem()
.Root(ws =>
{
new EnumerableSource<Row>(ws, "Source", _sampleData)
.Output.Link.AggregateTransform3<Row, int, Out>(
"Transform"
, ra => ra.Accumulation = 0 // Set seed
, ra => // Accumulate value for all rows with the same [Category]
ra.Accumulation += ra.NextRow.Metric
, ra => new Out
{ // Set final output values
Category = ra.NextRow.Category,
Sum = ra.Accumulation,
Count = ra.Count,
Average = (double)ra.Accumulation / ra.Count
}
// Group by column [Category]
, RowEqualityComparer.Create<Row>(g => g.Name(nameof(Row.Category)))
)
// Receives two rows:
// { Category = 1, Sum = 6, Count = 2, Average = 3.0 }
// { Category = 2, Sum = 10, Count = 1, Average = 10.0 }
.Output.Link.CollectionTarget();
});
workerSystem.Start().ThrowOnFailure();
return workerSystem;
}