Skip to content

elasticrash/datafowk

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

28 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Datafowk

Terminal ETL tool for MySQL and PostgreSQL.

Diclaimer

To keep things clear, this is my first "vibe-coding" project to test the waters :D. It's about 20% manual coding and 80% AI-generated.

It's based on an old project design I had semi-abandoned. I've worked on ETL projects before, and they can get very complicated very quickly. Until a certain point, this was going well. But as the project started growing, it started getting messier. Now, I half-manually code and half-suggest specific changes, as I think it's getting too complicated for the machine.

I was always on the look for such a project in the open source community but in most cases the best ETL projects are closed source. This is me trying my best. I hope someone finds it useful out there.

files

root

  • mysql_to_mysql.sh — starts two MySQL containers (source + destination) using the bundled MySQL example
  • postgis_to_mysql.sh — starts a PostGIS source and a MySQL destination for the geometry example

projects

Config files that are shown in the project picker when the UI starts without --config:

  • mysql_config.toml — MySQL → MySQL example pipeline
  • postgis_config.toml — PostGIS → MySQL example pipeline (uses area / perimeter transforms)

ops

  • mysql.compose.yaml — Docker Compose for the MySQL example

  • postgis_mysql.compose.yaml — Docker Compose for the PostGIS → MySQL example

  • /mysql/Dockerfile.source.mysql — Dockerfile for source MySQL db

  • /mysql/Dockerfile.destination.mysql — Dockerfile for destination MySQL db

  • /mysql/mysql_schema_source.sql — schema for the source MySQL db

  • /mysql/mysql_schema_destination.sql — schema for the destination MySQL db

  • /postgis/Dockerfile.source.postgis — Dockerfile for the PostGIS source db

  • /postgis/Dockerfile.destination.mysql — Dockerfile for the MySQL destination db

  • /postgis/postgis_schema_source.sql — schema with a regions table containing polygon geometry

  • /postgis/mysql_schema_destination.sql — schema for region_areas and region_perimeters destination tables

src

Rust CLI that reads rules from a .toml config file, pulls rows from the origin DB, applies a transformation chain, and inserts them into the destination DB.

configuration

[connection_properties_origin]
kind = "mysql"
address = "127.0.0.1"
port = 3306
user = "root"
password = "password"
schema = "test"

[connection_properties_destination]
kind = "mysql"
address = "127.0.0.1"
port = 3308
user = "root"
password = "password"
schema = "test"

Supported kind values:

  • "mysql"
  • "postgres"
  • "csv" — read from a CSV file (source only; address field is the file path, schema is a logical name for rule matching)

Rules live under [[rules]] in the config file:

[[rules]]
expression = "(origin:users,address){users.address_id=address.id}[users.firstname,users.lastname,address.address,address.number]<trim>(destination:spot)[name,surname,address,number]"

[[rules]]
expression = "(origin:order_totals)[amount]<sum(10)>(destination:order_totals_plus_ten)[amount]"

[[rules]]
expression = "(origin:sensor_weights)[weight]<multiply(5)>(destination:sensor_weights_scaled)[weight]"

[[rules]]
expression = "(origin:customer_aliases)[email,label]<trim,unique(email)>(destination:customer_aliases_unique)[email,label]"

Rule format:

(database_alias:table1[,table2...]){table1.column=table2.column[,...]}[field1,table2.field2]?(filter1,filter2,...)<transform1,transform2(arg),...>(database_alias:table)[field1,field2]

The ?(...) filter section is optional and placed between source fields and transforms. It supports the following operators: =, !=, >, >=, <, <=, IS NULL, IS NOT NULL.

Filter examples:

?(age>18)
?(status=active,score>=100)
?(deleted_at IS NULL)

Supported database aliases:

  • origin for connection_properties_origin
  • destination for connection_properties_destination

When you use multiple source tables, source fields must be written as table.column and the join conditions describe the relationship path.

Supported transforms

Transform Alias Description
copy identity Pass value through unchanged
trim Strip leading/trailing whitespace from strings
lowercase Convert strings to lowercase
uppercase Convert strings to uppercase
sum(n) add(n) Add n to numeric values
subtract(n) sub(n) Subtract n from numeric values
multiply(n) mul(n) Multiply numeric values by n
divide(n) div(n) Divide numeric values by n (errors if n is 0)
constant(v) Replace the field value with the constant v; v is parsed as integer, float, or string
concat(sep?) Concatenate the first two remaining source fields into a single string, using sep as the separator (default: ""); quote the separator to preserve whitespace, e.g. concat(" "). Requires one more source field than destination field per concat in the chain. Fields to be concatenated must appear first in the source field list.
unique(field[,field...]) Skip rows that duplicate any of the listed destination fields; skipped rows are logged to datafowk-skipped-duplicates.log
area Replace a PostGIS geometry column with its computed area (shoelace formula)
perimeter Replace a PostGIS geometry column with its computed perimeter length

Multiple transforms can be chained with commas: <trim,lowercase> or <sum(10),unique(amount)>.

running it

MySQL → MySQL example

  1. Start the sample databases:

    ./mysql_to_mysql.sh start
  2. Run the pipeline:

    cargo run -- run --config projects/mysql_config.toml
  3. Stop the containers:

    ./mysql_to_mysql.sh stop

PostGIS → MySQL example

  1. Start the PostGIS source and MySQL destination:

    ./postgis_to_mysql.sh start
  2. Run the pipeline (reads polygon geometry, writes computed area and perimeter):

    cargo run -- run --config projects/postgis_config.toml
  3. Stop the containers:

    ./postgis_to_mysql.sh stop

interactive UI

Launch the UI without arguments to get a project picker that lists all .toml files in ./projects/:

cargo run

Or open a specific config directly:

cargo run -- --config projects/mysql_config.toml

The footer keeps a single ? shortcuts hint; press ? to open the shortcuts popup.

Main keys

Key Action
n Create a new rule
c Clone the selected rule
e / Enter Edit the selected rule
d / Delete Delete the selected rule
o Edit origin connection
p Edit destination connection
v Open schema preview (origin + destination)
g Preview live source data for the selected rule (up to 10 rows)
s Save config
t Dry-run simulation
r Run pipeline
x Run pipeline with destination truncation
q Quit

Inside schema preview

Key Action
Arrow keys Pan horizontally and vertically
1 Show table names only
2 Show table names and column names
3 Show table names, column names, and column types
+ / - Cycle zoom levels
Esc Close the preview

Inside data preview (g)

Shows up to 10 source rows for the selected rule in a scrollable table. Geometry columns are rendered as geometry N polygon(s) ~<area>cm² ~<perimeter>cm.

Key Action
/ Scroll rows
g / Esc Close the preview

CLI reference

# Open project picker (interactive UI)
cargo run

# Open UI with a specific config
cargo run -- --config projects/mysql_config.toml

# Run the pipeline
cargo run -- run --config projects/mysql_config.toml

# Dry-run (reads + validates, rolls back all writes)
cargo run -- run --config projects/mysql_config.toml --dry-run

# Run and truncate destination tables first
cargo run -- run --config projects/mysql_config.toml --truncate-destination

--dry-run performs a full simulation: it reads source rows and attempts destination inserts inside a transaction that is rolled back, so missing tables, missing columns, and destination constraints surface without persisting changes.

bundled sample data (MySQL example)

  • users + address → destination spot with trim
  • order_totalsorder_totals_plus_ten with sum(10)
  • sensor_weightssensor_weights_scaled with multiply(5)
  • customer_aliasescustomer_aliases_unique with trim,unique(email)

CSV source example

To read from a CSV file, configure the origin connection with kind = "csv" and set address to the file path:

[connection_properties_origin]
kind = "csv"
address = "data/users.csv"
schema = "csvdb"

[[rules]]
expression = "(csvdb:users)[firstname,lastname]<trim>(destination:spot)[name,surname]"

Filter conditions can be used to select rows in-memory:

[[rules]]
expression = "(csvdb:users)[email]?(active=1)<copy>(destination:active_users)[email]"

Releases

No releases published

Sponsor this project

Packages

 
 
 

Contributors