Loadable Table Mixins¶
Infrastructure for staged, file-based ingestion into ORM tables.
Supports: - CSV-based ingestion - optional fast-path database COPY - dialect-aware merge strategies - Parquet loading hooks
CSVLoadableTableInterface¶
Bases: ORMTableBase
Mixin for ORM tables that support staged CSV-based ingestion.
This interface implements a database-portable ingestion workflow based on temporary staging tables. It supports: - dialect-aware staging table creation - fast-path COPY-based loading where available - ORM-based fallback loading - configurable merge strategies - explicit staging table lifecycle management
The class is designed for controlled ingestion pipelines and does not attempt to provide concurrency guarantees.
create_staging_table(session)
classmethod
¶
Create a fresh staging table for ingestion.
Any existing staging table with the same name is dropped first. The staging table schema mirrors the target table schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
An active SQLAlchemy session bound to an engine. |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the session is not bound to an engine. |
NotImplementedError
|
If the database dialect is unsupported. |
csv_columns()
classmethod
¶
Return a mapping of CSV column names to model columns.
By default this is equivalent to :meth:model_columns.
Override this method to implement custom column mappings.
Returns:
| Type | Description |
|---|---|
dict[str, ColumnElement]
|
Mapping of input column names to SQLAlchemy columns. |
drop_staging_table(session)
classmethod
¶
Drop the staging table if it exists.
get_staging_table(session)
classmethod
¶
Return the reflected staging table, creating it if necessary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
An active SQLAlchemy session bound to an engine. |
required |
Returns:
| Type | Description |
|---|---|
Table
|
The reflected staging table. |
load_csv(session, path, *, loader=None, normalise=True, dedupe=False, chunksize=None, dedupe_incl_db=False, merge_strategy='replace')
classmethod
¶
Load a CSV (or CSV-like) file into the target table.
This method orchestrates the full staged ingestion lifecycle: - staging table creation - file loading - merge into the target table - staging table cleanup
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
An active SQLAlchemy session. |
required |
path
|
Path
|
Path to the input CSV or Parquet file. |
required |
loader
|
LoaderInterface | None
|
Optional explicit loader instance. |
None
|
normalise
|
bool
|
Whether to apply table-level normalisation. |
True
|
dedupe
|
bool
|
Whether to deduplicate incoming rows. |
False
|
chunksize
|
int | None
|
Optional chunk size for incremental loading. |
None
|
dedupe_incl_db
|
bool
|
Whether deduplication should include existing database rows. |
False
|
merge_strategy
|
str
|
Merge strategy to apply (e.g. |
'replace'
|
Returns:
| Type | Description |
|---|---|
int
|
Number of rows loaded. |
load_staging(loader, loader_context)
classmethod
¶
Load data into the staging table.
This method attempts a fast-path database-native load where supported, falling back to an ORM-based loader if necessary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
loader
|
LoaderInterface
|
Loader implementation used for ORM-based loading. |
required |
loader_context
|
LoaderContext
|
Context object containing session, path, and load options. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of rows loaded into the staging table. |
merge_from_staging(session, merge_strategy='replace')
classmethod
¶
Merge data from the staging table into the target table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session
|
An active SQLAlchemy session. |
required |
merge_strategy
|
str
|
Merge strategy to apply. |
'replace'
|
orm_staging_load(loader, loader_context)
classmethod
¶
Load data into the staging table using an ORM-based loader.
Returns:
| Type | Description |
|---|---|
int
|
Number of rows loaded. |
staging_tablename()
classmethod
¶
Return the name of the staging table for this model.
If a custom staging table name has been set on the class, it is
used; otherwise a default name derived from __tablename__
is returned.
Returns:
| Type | Description |
|---|---|
str
|
The staging table name. |