Custom Non-Adb Development
In some cases there might not be a suitable Adb worker available, e.g. due to there not being an Adb provider available yet for a particular database, or the existing Adb classes not supporting a particular use case.
For these scenarios, one can create and execute standard .NET database queries and commands directly, using any .NET database provider, including integrating with the actionETL dataflow.
Note that while fully functional, the resulting custom code and custom worker lacks many of the features that the Adb workers provide out of the box.
Ad-hoc Data Source
This example demonstrates reading a database result set into the dataflow, using a .NET database provider directly (as opposed to using AdbDataReaderSource<TOutput>).
The ad-hoc approach in this example is suitable if the code is only ever used in a single place. If there is any reuse, it is much better to create a reusable worker.
While this example uses the OleDb .NET provider (which can access dozens of different types of databases, but is Windows only), the code would look very similar for other database providers.
using actionETL;
using actionETL.Logging;
using System;
using System.Data.OleDb;
using System.Runtime.InteropServices;
public static partial class AdHocOleDbSource
{
private class Product
{
public int Id;
public string Name;
public double Weight;
}
public static void RunExample()
{
const string dbSetting = "OleDbServer";
new WorkerSystem()
.Root(root =>
{
new ActionSource<Product>(root, "Read Product", async s =>
// Any exception will be caught and logged by the worker
{
// Fetch connection details
var cs = root.Config[dbSetting];
// A using block ensures the connection will be closed and disposed
using (var oleDbConnection = new OleDbConnection(cs))
{
// Create a command, and add one parameter
var oleDbCommand = new OleDbCommand(
@"SELECT Id, Name, Weight
FROM Product
WHERE Weight > ?; "
, oleDbConnection);
oleDbCommand.Parameters.AddWithValue("@Weight", 5.5);
await oleDbConnection.OpenAsync().ConfigureAwait(false);
var reader = await oleDbCommand.ExecuteReaderAsync().ConfigureAwait(false);
// Synchronous slightly faster with narrow rows:
// reader.Read()
// Asynchronous more scalable with many workers in parallel:
// await reader.ReadAsync().ConfigureAwait(false)
while (reader.Read())
{
// Create a dataflow row for each database row
var row = new Product
{
Id = reader.GetInt32(0),
Name = reader.GetString(1),
Weight = reader.GetDouble(2)
};
if (s.Output.TrySendRow(row)
|| await s.Output.SendRowAsync(row).ConfigureAwait(false))
continue;
return OutcomeStatus.Error(
ALogCategory.OutputPortUnexpectedNoDemand);
}
s.Output.SendSucceeded();
reader.Close();
}
return OutcomeStatus.Succeeded;
})
// Use source data with dataflow transforms or targets
.Output.Link.CollectionTarget("Collect Products");
})
.Start()
.ThrowOnFailure();
/* The example assumes the following table already exists:
CREATE TABLE dbo.Product
(
Id int,
Name nvarchar(50)
Description nvarchar(1000),
)
*/
}
}
Custom Data Source Worker
This example performs the same work as the above Ad-hoc Data Source, reading a database result set into the dataflow, using a .NET database provider directly (as opposed to using AdbDataReaderSource<TOutput>).
Here however we create a reusable worker, which makes it easy to both use in multiple places, and to maintain the code by avoiding code duplication.
using actionETL;
using actionETL.Logging;
using System;
using System.Data.Common;
using System.Data.OleDb;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
public static partial class CustomOleDbSourceExample
{
private class Product
{
public int Id;
public string Name;
public double Weight;
}
// Create a reusable dataflow source worker
public class CustomOleDbSource<TOutput>
: SourceBase<CustomOleDbSource<TOutput>,TOutput>
where TOutput : class
{
private readonly string _connectionString;
private readonly Func<DbDataReader, TOutput> _createRowFunc;
private readonly OleDbCommand _oleDbCommand;
public CustomOleDbSource(WorkerParent workerParent, string workerName
, Func<bool> isStartableFunc
, string connectionStringConfigurationName
, OleDbCommand oleDbCommand
, Func<DbDataReader, TOutput> createRowFunc
)
: base(workerParent, workerName, isStartableFunc)
{
_connectionString = WorkerSystem.Config[connectionStringConfigurationName];
_oleDbCommand = oleDbCommand;
_createRowFunc = createRowFunc;
}
public CustomOleDbSource(WorkerParent workerParent, string workerName
, string connectionStringSettingsName
, OleDbCommand command
, Func<DbDataReader, TOutput> createRowFunc
)
: this(workerParent, workerName, null, connectionStringSettingsName
, command, createRowFunc)
{ }
protected async override Task<OutcomeStatus> RunAsync()
// Any exception will be caught and logged by the base class worker
{
// A using block ensures the connection will be closed and disposed
using (var oleDbConnection = new OleDbConnection(_connectionString))
{
_oleDbCommand.Connection = oleDbConnection;
await oleDbConnection.OpenAsync().ConfigureAwait(false);
var reader = await _oleDbCommand
.ExecuteReaderAsync().ConfigureAwait(false);
// Synchronous slightly faster with narrow rows:
// reader.Read()
// Asynchronous more scalable with many workers in parallel:
// await reader.ReadAsync().ConfigureAwait(false)
while (reader.Read())
{
var row = _createRowFunc(reader);
if (Output.TrySendRow(row)
|| await Output.SendRowAsync(row).ConfigureAwait(false))
continue;
return OutcomeStatus.Error(
ALogCategory.OutputPortUnexpectedNoDemand);
}
Output.SendSucceeded();
reader.Close();
}
return OutcomeStatus.Succeeded;
}
}
public static void RunExample()
{
new WorkerSystem()
.Root(root =>
{
// Create a command, and add one parameter
var oleDbCommand = root.DisposeOnFinished(new OleDbCommand(
@"SELECT Id, Name, Weight
FROM Product
WHERE Weight > ?; "));
oleDbCommand.Parameters.AddWithValue("@Weight", 5.5);
// Use the source worker created above
new CustomOleDbSource<Product>(root, "Read Product"
, "OleDbServer", oleDbCommand, dr =>
{
// Create a dataflow row for each database row
return new Product
{
Id = dr.GetInt32(0),
Name = dr.GetString(1),
Weight = dr.GetDouble(2)
};
})
// Use source data with dataflow transforms or targets
.Output.Link.CollectionTarget("Collect Products");
})
.Start()
.ThrowOnFailure();
/* The example assumes the following table already exists:
CREATE TABLE dbo.Product
(
Id int,
Name nvarchar(50)
Description nvarchar(1000),
)
*/
}
}