Process Incoming Files Example
A common ETL requirement is to load multiple feeds of data files into a database, where each stream can have its own file type (CSV, XLSX, ...), content (sales, inventory, ..., different column types and names etc.), and frequency (monthly, daily, ad-hoc, every 5 minutes, ...)
As the number of files and feeds grow, occasional processing interruptions becomes likely due to incorrect or unexpected data, especially with feeds coming from other systems, or originally being manually entered. Since interruptions can occur at many different points in the processing, it becomes challenging and time consuming to deduce exactly which files have been loaded, and which needs to be reprocessed.
This example defines one approach to address the above issues, and implements a custom worker to make it easy and reliable to implement this approach across many feeds. It minimizes code duplication, simplifying maintaining and extending the approach.
Note
The code is also available in the "samples/MyCompany.ETL.ProcessIncomingFiles" folder of the release archive, please see ProcessIncomingFiles Sample for how to run it.
Further Ideas below has suggestions for customizing and extending this example.
Requirements
To have a robust and reusable solution, the requirements are necessarily somewhat extensive, but as we will see, they are quite straight forward to implement with actionETL.
- MUST read folder locations and connection strings from a configuration system
- MUST support any number of feeds, with a subset loaded simultaneously, limited only by available resources
- MUST allow feeds to reside on different file systems
- MUST provide clear separation between incoming files, currently processing files, and archived files
- MUST process incoming files that matches a specified filename pattern in alphabetical and numerical filename order
- MUST allow arbitrary code to process each incoming file
- MUST add a timestamp to the file name of archived files
- MUST compress archived files
- MUST perform comprehensive logging of processing progress and outcomes, to arbitrary logging destinations
- MUST support loading from CSV, fixed format, and XLSX file types, with different contents and data types
- MUST support loading into different databases and database types
- MUST guarantee that a partial incoming file is never created, even when running out of storage space, since that risks later loading the partial file
- MUST guarantee an incoming file is never loaded twice, unless there is a manual intervention
- MUST guarantee that if a feed experiences a processing interruption, further processing of that feed does not occur without manual intervention
- MUST exit with a success (
0
) or failure (not0
) exit code for integration with batch systems
High Level Design
- Each feed has a top folder, where its files reside in sub folders. Files must be
moved (as opposed to copied) into and between folders (on the same device)
to ensure the move is atomic. Folders are:
Input
- Incoming, unprocessed filesProcess
- File currently being processed and temporary filesArchive
- Previously processed files
- Create a custom
ProcessIncomingFilesWorker
that processes all currently available files for one feed, one at a time. It handles the file manipulation itself, but delegates the work of reading and using the data in the file to a callback that is supplied as a parameter. The callback can create child workers, which will be automatically run.
Example Use
This example meets all the above requirements, and it:
- Loads all incoming files in one XLSX feed into a target table
- Loads all incoming files in two CSV feeds by for each file:
- Truncating the target table
- Inserting the rows into the target table
- Executing a stored procedure
Here's the complete console program. Since the two CS feeds are processed in the same way,
we create a LoadFlatFile()
helper method to process them:
using actionETL;
using actionETL.Adb;
using actionETL.Adb.MySqlClientExternal;
using actionETL.Adb.SqlClientExternal;
using actionETL.EPPlus;
using actionETL.FileHelper;
using System.Configuration;
using System.Data;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
static class Program
{
static async Task Main(string[] args)
{
// Select appropriate database provider (AdbMySqlClientProvider, ...)
var provider = AdbSqlClientProvider.Get();
// Supply connection string on command line or set appropriate configuration setting
var cs = args.Length > 0
? provider.CreateConnectionString(args[0])
: provider.CreateConnectionStringFromDefaultAConfig("SqlServer");
var outcomeStatus = await new WorkerSystem().Root(ws =>
{
// Handle different start folders. Can be replaced with e.g.
// absolute paths in configuration file.
Directory.SetCurrentDirectory(
Path.GetDirectoryName(
ConfigurationManager.OpenExeConfiguration(
ConfigurationUserLevel.None).FilePath));
// Run a source and a target worker for every budget file
_ = new ProcessIncomingFilesWorker(ws, "Process Budget", null
, ws.Config["BudgetPath"], "Budget*.xlsx"
, (pif, fileName) =>
{
new XlsxSource<Budget>(pif, $"Extract Budget"
, fileName, xsc => xsc.FromAllByName())
.Output.Link.AdbInsertTarget("Insert Budget"
, cs.CreateConnectionBuilder(), "Sales.Budget");
});
// Run the LoadFlatFile() method for every Offline and Online file,
// which creates four workers for each file to process it
_ = new ProcessIncomingFilesWorker(ws, "Process Offline", null
, ws.Config["OfflinePath"], "Offline*.csv"
, (pif, fileName) => LoadFlatFile<Sales>(pif, cs, fileName
, "Sales.Offline", "Sales.process_offline")
);
_ = new ProcessIncomingFilesWorker(ws, "Process Online", null
, ws.Config["OnlinePath"], "Online*.csv"
, (pif, fileName) => LoadFlatFile<Sales>(pif, cs, fileName
, "Sales.Online", "Sales.process_online")
);
})
.StartAsync(); // Run the worker system
outcomeStatus.Exit(); // Exit with a success or failure exit code when finished
}
// Create four workers (under the specified parentWorker) which truncates the
// target table, inserts rows from CSV file, and runs a stored procedure
static void LoadFlatFile<TRow>(
WorkerParent parentWorker
, AdbConnectionString cs
, string fileName
, string tableName
, string procedureName
)
where TRow : class
{
var entityName = tableName.Split('.').Last();
var truncate = new AdbExecuteNonQueryWorker(parentWorker
, "Truncate Table " + entityName
, cs.CreateConnectionBuilder(), $"Truncate Table {tableName};");
var source = new FileHelperFileSource<TRow>(parentWorker, $"Extract {entityName} CSV"
, () => truncate.IsSucceeded
, fileName);
var target = source.Output.Link.AdbInsertTarget("Insert " + entityName
, cs.CreateConnectionBuilder(), tableName);
var procedure = new AdbExecuteNonQueryWorker(parentWorker
, "Call SPROC " + procedureName
, () => target.IsSucceeded
, cs.CreateConnectionBuilder(), procedureName, CommandType.StoredProcedure);
}
}
Dataflow row types:
using System;
public class Budget
{
public DateTime Month;
public decimal OfflineRevenue;
public decimal OnlineRevenue;
}
using actionETL;
using FileHelpers;
using System;
[DelimitedRecord(",")] // CSV file with comma separator
[IgnoreFirst(1)] // Skip one header row
public class Sales
{
[Order] // File column order is same as class member order
public int ProductId;
[Order]
public int CustomerId;
[Order]
[FieldConverter(ConverterKind.Date, "yyyy-MM-dd")] // Specify date format
public DateTime TransactionDate;
[Order]
public int Quantity;
}
Database objects:
CREATE SCHEMA Sales
CREATE TABLE [Sales].[Budget]
(
[Month] DATE NOT NULL,
[OfflineRevenue] DECIMAL NOT NULL,
[OnlineRevenue] DECIMAL NOT NULL
)
CREATE TABLE [Sales].[Offline]
(
[ProductId] INT NOT NULL,
[CustomerId] INT NOT NULL,
[TransactionDate] DATE NOT NULL,
[Quantity] INT NOT NULL
)
CREATE TABLE [Sales].[Online]
(
[ProductId] INT NOT NULL,
[CustomerId] INT NOT NULL,
[TransactionDate] DATE NOT NULL,
[Quantity] INT NOT NULL
)
CREATE PROCEDURE [Sales].[process_online]
AS
-- Process rows...
RETURN 0
CREATE PROCEDURE [Sales].[process_offline]
AS
-- Process rows...
RETURN 0
"actionetl.aconfig.json" configuration file:
{
"configurations": [
{ "SqlServer": "Trusted_Connection=true;Encrypt=true;TrustServerCertificate=true;server=actionetldb;database=actionetl_tester" },
{ "SqlServerWithPassword": "Encrypt=true;TrustServerCertificate=true;server=actionetldb;database=actionetl_tester;Uid=actionetl_tester;Pwd=MyPassword" },
{ "MySql": "Server=actionetldb;Database=actionetl_tester;Uid=actionetl_tester;Pwd=MyPassword" },
{ "BudgetPath": "Incoming/Budget" },
{ "OfflinePath": "Incoming/Offline" },
{ "OnlinePath": "Incoming/Online" }
]
}
ProcessIncomingFilesWorker
Implementation
using actionETL;
using actionETL.Logging;
using System;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading.Tasks;
public class ProcessIncomingFilesWorker
: WorkerBase<ProcessIncomingFilesWorker>
{
// Allow setting properties after creating the worker (but before it runs)
public string FileNamePattern
{
get => _fileNamePattern.GetValueUnchecked();
set => _fileNamePattern.SetValueChecked(value, WorkerParentState.Created);
}
private readonly SafeWorkerParentValue<string> _fileNamePattern;
public bool LogProgress
{
get => _logProgress.GetValueUnchecked();
set => _logProgress.SetValueUnchecked(value);
}
private readonly SafeWorkerParentValue<bool> _logProgress;
public Action<WorkerBase, string> ProcessFileAction
{
get => _processFileAction.GetValueUnchecked();
set => _processFileAction.SetValueChecked(value, WorkerParentState.Created);
}
private readonly SafeWorkerParentValue<Action<WorkerBase, string>> _processFileAction;
public string TopFolder
{
get => _topFolder.GetValueUnchecked();
set => _topFolder.SetValueChecked(value, WorkerParentState.Created);
}
private readonly SafeWorkerParentValue<string> _topFolder;
public ProcessIncomingFilesWorker(
WorkerParent workerParent
, string workerName
, Func<bool> isStartableFunc
, string topFolder
, string fileNamePattern
, Action<WorkerBase, string> processFileAction
)
: base(workerParent, workerName, isStartableFunc)
{
_logProgress = new SafeWorkerParentValue<bool>(this, true);
_topFolder = new SafeWorkerParentValue<string>(this, topFolder);
_fileNamePattern = new SafeWorkerParentValue<string>(this, fileNamePattern);
_processFileAction = new SafeWorkerParentValue<Action<WorkerBase, string>>(
this,
processFileAction);
}
protected async override Task<OutcomeStatus> RunAsync()
{
if (string.IsNullOrWhiteSpace(TopFolder))
throw new ArgumentException("Parameter must not be blank or null"
, nameof(TopFolder));
if (string.IsNullOrWhiteSpace(FileNamePattern))
throw new ArgumentException("Parameter must not be blank or null"
, nameof(FileNamePattern));
if (ProcessFileAction == null)
throw new ArgumentNullException(nameof(ProcessFileAction));
// 1. Set folder variables
string incomingFolder = Path.Combine(TopFolder, "Input/");
string processFolder = Path.Combine(TopFolder, "Process/");
// 2. Loop over all matching files
foreach (string fileName in Directory.EnumerateFiles(incomingFolder, FileNamePattern)
.Select(fn => Path.GetFileName(fn)))
{
// 3. If Process/ has any left-over files, stop by returning Fatal
string oldProcessFullName = Directory
.EnumerateFiles(processFolder, FileNamePattern).FirstOrDefault();
if (oldProcessFullName != null)
return OutcomeStatus.Fatal(ALogCategory.TargetExists
, $"Files already present, including: {oldProcessFullName}");
// 4. Log current filename
if (LogProgress)
Logger.Info(ALogCategory.ProgressWorker, "Processing file: " + fileName);
// 5. Move Input/ file to Process/
string processFullName = Path.Combine(processFolder, fileName);
File.Move(Path.Combine(incomingFolder, fileName), processFullName);
// 6. Run processFileAction(), which can also add child workers if desired
ProcessFileAction(this, processFullName);
// 7. Run & remove child workers (if any were created), return on failure
OutcomeStatus outcomeStatus;
if (HasChildren && (outcomeStatus
= await RunChildrenAsync(true).ConfigureAwait(false)).IsFailed)
return outcomeStatus;
// 8. Compress file with timestamp, and delete Process/ file
string zipProcessFullName = processFullName + ".zip";
using (var zip = ZipFile.Open(zipProcessFullName, ZipArchiveMode.Create))
zip.CreateEntryFromFile(processFullName, Path.GetFileName(processFullName));
File.Delete(processFullName);
// 9. Move zip file to Archive/. Add a leading underscore to minimize risk
// of matching the input filename pattern.
string archiveFullName = Path.Combine(TopFolder, "Archive/"
, $@"_{Path.GetFileNameWithoutExtension(fileName)}_{
DateTime.Now:yyyy-MM-ddTHHmmss.fff}{
Path.GetExtension(fileName)}.zip");
File.Move(zipProcessFullName, archiveFullName);
}
return OutcomeStatus.Succeeded;
}
}
Further Ideas
The following are some ideas for customizing and extending the above approach:
- Add a constructor overload without the
isStartableFunc
parameter toProcessIncomingFilesWorker
- Add standardized ETL metadata to all loaded data rows, e.g. load start time and a unique value shared by all rows loaded by the same ETL run (which can be very useful during troubleshooting)
- Use worker start constraints, MaxRunningChildren, and/or grouping of multiple ProcessIncomingFilesWorker workers to set processing order and load limits
- Add a constructor parameter or overload, or property to
ProcessIncomingFilesWorker
to on a processing error, reject the file to an Error folder, and continue processing the feed - In
LoadFlatFile()
, make truncating the table and calling the stored procedure optional - Create a
LoadXlsxFile()
method, similar toLoadFlatFile()
, if loading multiple XLSX files - If incoming CSV files are compressed and too large or too slow to uncompress to a new file, instead uncompress to a stream, and load the data via FileHelperStreamSource<TOutput>
- Avoid re-compressing compressed files when archiving, e.g. by checking against a list of filename
extensions such as
new[] { ".gz", ".xlsx", ".zip" }
- Create a
ProcessAllIncomingFilesWorker
worker that allows the callback to process all (or up to a maximum) currently available files in one go, which could improve performance in some cases if there are a very large number of small files to process - Reference data in ETL applications is often only kept in a database. Consider using flat files under source control (e.g. CSV files) as the system of record, and load them into the database e.g. on demand or on every batch run.