Adb Keep Open Connection
Regular connections from AdbConnectionBuilder can only be used with a single database worker. This article describes how to use AdbKeepOpenConnectionBuilder, which allows a single connection to be used with multiple workers.
Common use cases include:
- Working with a temporary database table across multiple workers
- Executing multiple database workers inside a single transaction
Temporary Table Example
This example uses a single keep-open connection across three workers to:
- Create a new "#TempProduct" temporary table
- Insert rows into the new temporary table from a CSV file
- Run a DELETE query that joins on the new temporary table
With regular connections, the temporary table would not have been retained across workers.
using actionETL;
using actionETL.Adb;
using actionETL.Adb.SqlClientExternal;
using actionETL.FileHelper;
using FileHelpers;
public static partial class KeepOpenTemporaryTable
{
[DelimitedRecord(",")]
private sealed class Product
{
public int ProductId2;
}
public static SystemOutcomeStatus RunExample()
{
return new WorkerSystem()
.Root(ws =>
{
const string tempTable = "#TempProduct";
var cb = ws.DisposeOnFinished(AdbSqlClientProvider.Get()
.CreateKeepOpenConnectionBuilder(ws.Config["SqlServer"])
);
var create = new AdbExecuteNonQueryWorker(ws, "CreateTable"
, cb, $@"CREATE TABLE {tempTable} ([ProductId2] [int] NOT NULL)");
var readCSV = new FileHelperFileSource<Product>(ws, "Read CSV"
, () => create.IsSucceeded, @"Src/KeepOpenTemporaryTable/Product.csv");
var insert = readCSV.Output.Link.AdbInsertTarget(
"Insert", cb, tempTable);
_ = new AdbExecuteNonQueryWorker(ws, "DeleteRows", () => insert.IsSucceeded
, cb, $@"
DELETE p
FROM dbo.Products p
INNER JOIN {tempTable} tt ON p.ProductId2 = tt.ProductId2
");
})
.Start();
}
}
/* The example assumes the following table already exists:
CREATE TABLE dbo.Products
(
ProductId2 int
)
*/
Key Points
- Unlike
AdbConnectionBuilder, theAdbKeepOpenConnectionBuilderinstance must be disposed, which in this example is done with DisposeOnFinished<TDisposable>(TDisposable). Other disposing options:- UsingActionWorker<TDisposable> (which would have the above workers as its children)
- Calling
Dispose()in an AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>>) callback (on an additional worker, again with the above workers as its children)
- The
AdbKeepOpenConnectionBuilderinstance is passed to all workers that interacts with the temporary table. Each of these workers creates - opens - closes - disposes the connection as if they owned it, but the underlying .Net connection:- Is only created and opened by the first "CreateTable" worker
- Is only closed and disposed when the connection builder in the parent is disposed
- Connections are not thread-safe - the library user must ensure only one worker
using the connection is running at the same time. Here this is done with:
- Start constraints on "Read CSV" and "DeleteRows"
- Via the dataflow link - "Insert" won't start until "Read CSV" is ready to send data
Note
- Always use a regular
AdbConnectionBuilderif the keep-open feature is not needed - Connections and
AdbKeepOpenConnectionBuilderinstances should be closed and disposed as early as possible - they consume resources, and can timeout if left idle
Transactions
To run multiple Adb workers within a single transaction, AdbKeepOpenConnectionBuilder together with AdbTransaction must be used.
The simplest way to do this is to use AdbTransactionActionWorker, which comes pre-configured for this. Alternatively, AdbTransaction can be used explicitly for greater flexibility.
AdbTransactionActionWorker Example
This example uses AdbTransactionActionWorker to perform database work across two database workers, inside a single local transaction:
AdbTransactionActionWorkerautomatically creates and opens a connection and transaction- Rows are inserted from one CSV file
into two database tables
- Note that the second insert explicitly waits for the first insert to complete, since a single connection must only be used by a single worker (and thread) at a time
- Since the two insert workers share an upstream dataflow worker, creating an explicit dependency between them with a start constraint could lead to a deadlock. This is avoided by making the worker with the start constraint buffer all incoming rows. Also see Worker Start Constraints.
AdbTransactionActionWorkerautomatically commits the transaction on success, or rolls back on error, and disposes the transaction and connection
using actionETL;
using actionETL.Adb;
using actionETL.Adb.SqlClientExternal;
using actionETL.FileHelper;
using FileHelpers;
public static partial class AdbTransactionWorker
{
[DelimitedRecord(",")]
[IgnoreFirst(1)]
private sealed class Product
{
public int Id;
public int CategoryId;
public string Name;
public string Description;
public string Notes;
}
public static SystemOutcomeStatus RunExample()
{
return new WorkerSystem()
.Root(ws =>
{
var akocb = AdbSqlClientProvider.Get()
.CreateKeepOpenConnectionBuilder(ws.Config["SqlServer"]);
var loadProduct = new AdbTransactionActionWorker(ws, "Load Product"
, akocb
, (ataw, cb) =>
{
var readCSV = new FileHelperFileSource<Product>(ataw, "Read CSV"
, @"Src/AdbTransactionWorker/Product.csv");
var mc = readCSV.Output.Link.MulticastTransform("Multicast", 2);
var ic = mc.TypedOutputs[0].Link.AdbInsertTarget(
"Insert ProductCore", cb, "dbo.ProductCore");
var ie = mc.TypedOutputs[1].Link.AdbInsertTarget(
"Insert ProductExtra", cb, "dbo.ProductExtra");
ie.IsStartable = () => ic.IsCompleted;
ie.Input.BufferingMode = PortBufferingMode.Full;
});
loadProduct.DisposeOnFinished(akocb);
})
.Start();
}
}
/* The example assumes the following tables already exists:
CREATE TABLE dbo.ProductCore
(
Id int,
CategoryId int,
Name nvarchar(50)
)
CREATE TABLE dbo.ProductExtra
(
Id int,
Description nvarchar(1000),
Notes nvarchar(1000)
)
*/
AdbTransaction Example
This example uses AdbTransaction directly to perform the same database work as in the previous example, across two database workers, inside a single local transaction.
Note
Implementing the transaction explicitly as in this example is more complex, and should only very rarely be needed.
The UsingActionWorker<TDisposable>:
- Groups the workers, which provides a single point to check if any failed
- Automatically disposes the AdbKeepOpenConnectionBuilder after the worker finishes.
- The main
actioncallback:- Creates a connection
- Adds a callback that commits, rollbacks, and disposes the connection and transaction, with exception handling, after the worker completes
- Opens the connection and begins a transaction
- Creates the child workers
- Rows are inserted from one CSV file into two database tables
- Note that the second insert explicitly waits for the first insert to complete, since a single connection can only be used by a single thread at a time
using System;
using actionETL;
using actionETL.Adb;
using actionETL.Adb.SqlClientExternal;
using actionETL.FileHelper;
using actionETL.Logging;
using FileHelpers;
public static partial class AdbExplicitTransaction
{
[DelimitedRecord(",")]
[IgnoreFirst(1)]
private sealed class Product
{
public int Id;
public int CategoryId;
public string Name;
public string Description;
public string Notes;
}
public static SystemOutcomeStatus RunExample()
{
var provider = AdbSqlClientProvider.Get();
return new WorkerSystem()
.Root(ws =>
{
_ = new UsingActionWorker<AdbKeepOpenConnectionBuilder>(ws, "Load Product"
, () => provider.CreateKeepOpenConnectionBuilder(ws.Config["SqlServer"])
, (uat, cb) =>
{
// Create connection, transaction, commit, rollback, and cleanup
var adbConnection = cb.Create();
uat.AddCompletedCallback((uat2, os) =>
{
if (adbConnection.Transaction != null)
{
if (os.IsSucceeded)
{
try
{
adbConnection.Transaction?.Commit();
}
catch (Exception ex)
{
os = OutcomeStatus.Error(
ALogCategory.DatabaseExceptionCommit, ex);
}
}
if (!os.IsSucceeded)
{
try
{
adbConnection.Transaction?.Rollback();
}
catch (Exception ex)
{
os = os.Combine(OutcomeStatus.Error(
ALogCategory.DatabaseExceptionRollback, ex));
}
}
}
adbConnection.Dispose();
return os.ToTask();
});
adbConnection.Open();
adbConnection.BeginTransaction();
// Create workers
var readCSV = new FileHelperFileSource<Product>(uat, "Read CSV"
, @"Src/AdbExplicitTransaction/Product.csv");
var mc = readCSV.Output.Link.MulticastTransform("Multicast", 2);
var ic = mc.TypedOutputs[0].Link.AdbInsertTarget(
"Insert ProductCore", cb, "dbo.ProductCore");
var ie = mc.TypedOutputs[1].Link.AdbInsertTarget(
"Insert ProductExtra", cb, "dbo.ProductExtra");
ie.IsStartable = () => ic.IsCompleted;
});
})
.Start();
}
}
/* The example assumes the following tables already exist:
CREATE TABLE dbo.ProductCore
(
Id int,
CategoryId int,
Name nvarchar(50)
)
CREATE TABLE dbo.ProductExtra
(
Id int,
Description nvarchar(1000),
Notes nvarchar(1000)
)
*/