Dataflow Row Errors Reject Example
This is the combined example that is discussed in Rejecting Examples:
using actionETL;
using actionETL.Adb;
using actionETL.Adb.SqlClientExternal;
using actionETL.FileHelper;
using FileHelpers;
using System;
public static partial class RejectErrorRows
{
[DelimitedRecord(",")] // Needed for reading CSV file
public class Product
{
public int ProductId;
public string ProductName;
public double ProductWeight;
[FieldHidden] // Member not loaded from CSV file
public CaptureRowErrors CaptureRowErrors;
}
#pragma warning restore CA1051 // Do not declare visible instance fields
private static SystemOutcomeStatus RunExample()
{
var workerSystem = new WorkerSystem("Stage Product")
.Root(ws =>
{
var acs = AdbSqlClientProvider.Get()
.CreateConnectionString(ws.Config["SqlServer"]);
// Source worker uses a StringRowError error output type, since no original
// strongly typed row is available. If multiple workers uses StringRowErrors,
// they also could be combined and inserted into a single table:
var source = new FileHelperFileSource<Product>(ws, "Read Product"
, @"Src/RejectErrorRows/Product.csv");
// Any errors are inserted directly into a database table:
source.ErrorOutput.Link.AdbInsertTarget<StringRowErrors>(
"Insert Read Product Errors"
, acs.CreateConnectionBuilder()
, "dbo.StringRowErrors"
);
// Link source to downstream workers...
/* Assumes the following table already exist:
CREATE TABLE [dbo].[StringRowErrors]
(
[RowErrorRow] NTEXT NULL,
[RowErrorDateTime] DATETIME DEFAULT (Getdate()),
[RowErrorLocator] NTEXT NULL,
[RowErrorMessage] NTEXT NULL,
[RowErrorRowNumber] BIGINT NULL,
[RowErrorColumnName] NCHAR(128) NULL,
[RowErrorException] NTEXT NULL
)
*/
// Transform worker uses the incoming Product rows also as the error output
// rows, which will be linked to 'unionErrors' further down:
var trx1 = source.Output.Link.RowActionTransform1("Transform1"
, row => row.ProductWeight <= 10
? TransformRowTreatment.Reject : TransformRowTreatment.Send);
// Transform worker uses the incoming Product rows also as the error output
// rows:
var trx2 = trx1.Output.Link.RowActionTransform1("Transform2"
, row => row.ProductWeight <= 20
? TransformRowTreatment.Reject : TransformRowTreatment.Send);
// Some errors are corrected, and sent to the output port. Uncorrectable
// rows are sent to the error output port, in this case for further
// checks:
var trx2CheckA = trx2.ErrorOutput.Link.RowActionTransform1("Transform2 CheckA"
, row =>
{
if (row.ProductWeight <= 15)
{
row.ProductWeight = 30;
row.CaptureRowErrors.RowErrors?.Clear(); // Optional
return TransformRowTreatment.Send;
}
return TransformRowTreatment.Reject;
});
// Additional errors are corrected, and sent to the output port. Uncorrectable
// rows are sent to the error output port, which will be linked to
// 'unionErrors' further down:
var trx2CheckB = trx2CheckA.ErrorOutput.Link.RowActionTransform1("Transform2 CheckB"
, row =>
{
if (row.ProductWeight <= 18)
{
row.ProductWeight = 35;
return TransformRowTreatment.Send;
}
return TransformRowTreatment.Reject;
});
trx2CheckB.Output.ClearRowErrors = true; // Optional
// Combine and insert data and corrected error rows:
CreateUnionAndInsert("Product Data", "dbo.Product"
, trx2.Output, trx2CheckA.Output, trx2CheckB.Output);
// Combine and insert error rows:
CreateUnionAndInsert("Product Errors", "dbo.ProductError"
, trx1.ErrorOutput, trx2CheckB.ErrorOutput);
// Local helper function to union and insert rows into a database table
void CreateUnionAndInsert(string name, string tableName,
params OutputPortBase<Product>[] inputsFrom)
{
if (inputsFrom.Length == 0)
return;
inputsFrom[0].Worker.Parent.GetDownstreamFactory<Product>()
.UnionAllTransform("Union " + name, inputsFrom)
.Output.Link
.AdbInsertTarget("Insert " + name, acs.CreateConnectionBuilder(), tableName);
}
/* Assumes the following tables already exist:
CREATE TABLE [dbo].[Product]
(
[ProductId] INT NOT NULL PRIMARY KEY,
[ProductName] NCHAR(50) NOT NULL,
[ProductWeight] FLOAT NULL
)
CREATE TABLE [dbo].[ProductError]
(
[ProductId] INT NOT NULL PRIMARY KEY,
[ProductName] NCHAR(50) NOT NULL,
[ProductWeight] FLOAT NULL,
[RowErrorDateTime] DATETIME DEFAULT (Getdate()),
[RowErrorLocator] NTEXT NULL,
[RowErrorMessage] NTEXT NULL,
[RowErrorRowNumber] BIGINT NULL,
[RowErrorColumnName] NCHAR(128) NULL,
[RowErrorExceptionString] NTEXT NULL
)
*/
});
return workerSystem.Start();
}
}