Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/assets/dask-dataframe-column-ops.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/dask-dataframe-concat.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/dask-dataframe-groupby.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/dask-dataframe-merge.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/dask-dataframe-partitions.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/distributed-benchmarks.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
46 changes: 46 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,52 @@ config:
| (optional) | `show_progress` | `boolean` | Whether to show a progress bar for each Dask transformation | `False` |
| (optional) | `git_auth_timeout` | `integer` | Number of seconds to wait for the user to enter Git credentials if needed during package installation (see [project composition](usage.md#earthmover-deps)) | `60` |

#### Dask Distributed
If using earthmover with [parallel processing](./usage#parallel-processing), the following additional configuration may be provided (optional if calling with `--workers`/`-w`):
```yaml
config:
dask_cluster_kwargs:
n_workers: - # `--workers` value if passed; no default
memory_limit: - # `--mem_per_worker` value if passed; no default
threads_per_worker: 1 # (not recommended to change this)
processes: True # (not recommended to change this)
```

#### Dask
In additional to the above `config`, one may also optionally pass and [supported Dask configuration](https://docs.dask.org/en/latest/configuration.html#dask) via `config.dask`. The default configurations are given below:
```yaml
config:
dask:
temporary_directory: `tempfile.gettempdir()`
dataframe:
backend: pandas # (not recommended to change this)
convert-string: False
query-planning: False
shuffle:
method: tasks
multiprocessing:
context: fork
distributed:
scheduler:
active-memory-manager:
measure: managed
processes: True
worker-saturation: 1.0
worker:
memory:
recent-to-old-time: 5s
monitor-interval: 15s
rebalance:
measure: managed
spill: 0.5
pause: 0.95
terminate: 0.99
max-spill: False
nanny:
pre-spawn-environ:
MALLOC_TRIM_THRESHOLD_: 0
```


### `definitions`

Expand Down
84 changes: 76 additions & 8 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@ Each component is materialized in [topological order](https://en.wikipedia.org/w
All data processing is done using [Pandas Dataframes](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) and [Dask](https://www.dask.org/), with values stored as strings (or [Categoricals](https://pandas.pydata.org/docs/user_guide/categorical.html), for memory efficiency in columns with few unique values). This choice of datatypes prevents issues arising from Pandas' datatype inference (like inferring numbers as dates), but does require casting string-representations of numeric values using Jinja when doing comparisons or computations.


## Performance
Tool performance depends on a variety of factors including source file size and/or database performance, the system's storage performance (HDD vs. SSD), memory, and transformation complexity. But some effort has been made to engineer this tool for high throughput and to work in memory- and compute-constrained environments.

Smaller source data (which all fits into memory) processes very quickly. Larger chunked sources are necessarily slower. We have tested with sources files of 3.3GB, 100M rows (synthetic attendance data): creating 100M lines of JSONL (30GB) takes around 50 minutes on a modern laptop.

The [state feature](usage.md#state) adds some overhead, as hashes of input data and JSON payloads must be computed and stored, but this can be disabled if desired.


## Comparison to `dbt`
`earthmover` is similar in a number of ways to [`dbt`](). Some ways in which the tools are similar include...

Expand All @@ -67,3 +59,79 @@ But there are some significant differences between the tools too, including...
* `earthmover`'s data transformation instructions are `operations` expressed as YAML, while `dbt`'s transformation instructions are (Jinja-templated) SQL queries.

The team that maintains `earthmover` also uses (and loves!) `dbt`. Our data engineers typically use `dbt` for large datasets (GB+) in a cloud database (like Snowflake) and `earthmover` for smaller datasets (< GB) in files (CSV, etc.).


## Performance
Tool performance depends on a variety of factors including source file size and/or database performance, the system's storage performance (HDD vs. SSD), memory, and transformation complexity. But some effort has been made to engineer this tool for high throughput and to work in memory- and compute-constrained environments.

Smaller source data (which all fits into memory) processes very quickly. Larger chunked sources are necessarily slower. We have tested with sources files of 3.3GB, 100M rows (synthetic attendance data): creating 100M lines of JSONL (30GB) takes around 50 minutes on a modern laptop.

The [state feature](usage.md#state) adds some overhead, as hashes of input data and JSON payloads must be computed and stored, but this can be disabled if desired.

### Parallel Processing
`earthmover` supports [parallel processing](./usage#parallel-processing) across several CPU cores via [Dask distributed](https://distributed.dask.org/en/stable/) with a [LocalCluster](https://distributed.dask.org/en/stable/api.html#cluster). In this section we discuss how this works, and give some performance benchmark data.

A core innovation of [Dask](https://www.dask.org/) is representation of dataframes as a collection of [partitions](https://docs.dask.org/en/latest/dataframe-design.html#partitions), each being a [Pandas dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). This, together with writing intermediate Pandas partitions to disk as needed and clever re-implementation of some Pandas dataframe methods, enable Dask to _handle dataframes larger than memory_ - the main reason we initially built earthmover on Dask.

![dask dataframe partitions](./assets/dask-dataframe-partitions.png)


Once data is partitioned, it becomes possible to _process_ it in parallel - this is the functionality that [Dask distributed](https://distributed.dask.org/en/stable/) provides. It consists of
1. a scheduler
1. a cluster of Dask workers - which can be CPU cores on a single machine (how earthmover works) or even separate machines on a network

The scheduler (1) carves up data transformations on an entire dataframe into a graph of smaller transformations on each partition ("tasks"), and it coordinates tasks across the workers (2).

Next, we discuss how [earthmover's `operations`](./configuration/#transformations) work with Dask's partitioning.

#### `union`
Union is quite straightforward; the partitions of two (or more) dataframes are stacked to form a single larger dataframe.

![dask dataframe concat](./assets/dask-dataframe-concat.png)


#### column operations
[Column operations](./configuration#column-operations) such as `add_columns`, `modify_columns`, `keep_columns`, `drop_columns`, `map_values`, etc. can be applied to each partition separately, and so are highly parallelizable. They can increase the size of partitions; in this case, one can keep partitions smaller specifying a `source`'s `blocksize` smaller than the default `25MB`.

![dask dataframe concat](./assets/dask-dataframe-column-ops.png)


#### `join`
Join can be fast and parallelizable _if_ both frames are sorted on the join key(s) - then each worker can take one or a few partitions from each dataframe and join those (in memory). If one or both dataframes are large and not sorted, then each partition from one dataframe may need to be joined against every partition of the other dataframe (the dotted lines in the picture below), which is _much_ slower.

![dask dataframe concat](./assets/dask-dataframe-merge.png)


#### `group_by`
Group-by also benefits from sorted input - each worker produces a few groups, and communication between workers is minimized.

![dask dataframe concat](./assets/dask-dataframe-groupby.png)


### Parallelism benchmarks
To understand the performance characteristics of `earthmover[distributed]`, we have run a battery of workloads and configurations. We present some data below.

![dask dataframe concat](./assets/distributed-benchmarks.png)

* **big attendance** is a fairly simple workload that takes a 1B-row, 3.2GB TSV input file (of school attendance data) that produces a 1B-line 27.9GB JSONL output file.
* **TCP-H query X** are earthmover implementations of the first 5 of [the canonical TPC-H queries](https://github.com/dragansah/tpch-dbgen/tree/master/tpch-queries). (TPC-H is a standard data [schema](https://docs.snowflake.com/en/user-guide/sample-data-tpch) representing a business's operations: parts, suppliers, orders, customers, etc. It has been used for decades to test relational database systems.) We used [tcph-kit](https://github.com/gregrahn/tpch-kit) to generate various (doubling) sizes (from about 10MB to 5GB) of the TPC-H dataset.
- **TPC-H query 1** excercises `group_by`: it selects from a single table, filter rows by date, groups by statuses and computes sum and average of several columns, and finally sorts result by statuses. (The query ultimately produces very few rows, less than a dozen.)
- **TPC-H query 2** exercises `join`: it computes account balances for suppliers via selecting from 5 tables (joined together), filtering rows by supplier region and part type and size, and finally sorting the result by balance and other fields. (Our `earthmover.yml` implementation pushes the filters up front, to minimize the number of rows that must be joined.)
- **TPC-H query 3** exercises `join` and `group_by`: it computes revenue per order via selecting from 3 tables (joined together), filtering rows by market segment and for a specific date, grouping by several order-related fields, and finally sort the result by revenue and order date.
- **TPC-H query 4** exercises `join` and `group_by`: it computes the number of orders by priority over a certain date range via selecting from one table, filtering rows by a specific date range, grouping by order priority, ad finally sorting result by order priority.
- **TPC-H query 5** exercises `join` and `group_by`: it computes revenue by country over a certain date range via selecting from 6 tables, filtering rows by a specific date range, grouping by country name with sum a revenue expression, and finally sorting result by revenue.
* (Missing datapoints above indicate runs that did not complete successfully in a reasonable amount of time - typically either Dask killed all workers, or Dask got stuck in a loop where workers are replaced and their tasks retried.)

#### Benchmark takeaways:
* parallelizing earthmover can cut runtimes in half (or better)
* 4 workers generally performs best - likely a good balance between parallelism and communication overhead
* eventually memory - not compute - becomes the constraint; without enough memory, workers may not successfully complete


### Parallelism FAQs
* **When should I use parallel earthmover?**

With large input files and/or longer (1m+) runtimes.
* **What's an optimal configuration?**

Start with 4 workers. If there's memory pressure, reduce number of workers and/or use a smaller `blocksize` on `sources`.
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#

<center>
![earthmover](https://edanalytics.github.io/earthmover/assets/ea-earthmover.png)
![earthmover](https://raw.githubusercontent.com/edanalytics/earthmover/main/docs/assets/ea-earthmover.png)
</center>

`earthmover` transforms collections of tabular source data (flat files, FTP files, database tables/queries) into text-based (JSONL, XML) data via YAML configuration.

<center>
![earthmover demo](https://edanalytics.github.io/earthmover/assets/earthmover_demo.gif)
![earthmover demo](https://raw.githubusercontent.com/edanalytics/earthmover/main/docs/assets/earthmover_demo.gif)
</center>

## Quick-start
Expand Down
25 changes: 25 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,28 @@ In the example below, an `earthmover` project `projA` depends on a package `proj
* There is no limit to the number of packages that can be imported and no limit to how deeply they can be nested (i.e. packages can import other packages). However, there are a few things to keep in mind with using multiple packages.
- If multiple packages at the same level (e.g. `projA/packages/pkgB` and `projA/packages/pkgC`, not `projA/packages/pkgB/packages/pkgC`) include same-named nodes, the package specified later in the `packages` list will overwrite. If the node is also specified in the top-level project, its version of the node will overwrite as usual.
- A similar limitation exists for macros &ndash; a single definition of each macro will be applied everywhere in the project and packages using the same overwrite logic used for the nodes. When you are creating projects that are likely to be used as packages, consider including a namespace in the names of macros with more common operations, such as `assessment123_filter()` instead of the more generic `filter()`.


### Parallel Processing
By default, `earthmover run` - like Python - is single-threaded. For compute-heavy workloads, however, it also supports parallel processing across CPU cores via a [Dask Distributed](https://distributed.dask.org/en/stable/) [LocalCluster](https://distributed.dask.org/en/stable/api.html#distributed.LocalCluster).

The easiest way to enable distributed is:
```bash
pip install earthmover[distributed]
earthmover run --workers auto
```

The `--workers` or `-w` flag specifies the number of workers (and CPU cores) to use. Specifying `auto` will detect the number of cores available and run with one fewer.

The amount of memory available to each worker can be controlled with `--mem_per_worker` or `-m`, passing human-readable values like `200MB` or `1.8GB`. If unspecified, available memory is detected and divided evenly across CPU cores.

Dask distributed configuration may also be specified in the `earthmover.yml` config; see [`config.dask`](./configuration/#dask) and [`config.dask_cluster_kwargs`](./configuration/#dask-cluster-kwargs) for documentation and defaults.

Making earthmover use parallel processing may not always improve wall-clock completion time. For small input datasets (10s of MB), the overhead of Dask Distributed's scheduler and worker communication will likely result in _slower_ runtime than earthmover with no parallelism. For larger input datasets, parallelism can help but it depends on the type of workload... more workers means more data processed in parallel, but also more communication between workers for some operations.

Empirically, we find 4 workers optimal for many types of workloads. See [Design / Performance](./design#performance) for further optimization discussion and benchmarking.

#### Optimization tips
* Using `sort_rows` before `join` or `group_by` operations can help them be more performant.
* If Dask is running out of memory during `earthmover run` (Dask workers getting "killed", Dask erors about shuffle, etc.), try specifying `sources.{source}.blocksize` in `earthmover.yaml`: the default is `25MB` but if transformations significantly increase data size (creating JSONL often does), a smaller value like `5MB` or even `2MB` may work better

16 changes: 15 additions & 1 deletion earthmover/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,22 @@ def main(argv=None):
nargs="*",
help='overrides a setting in the config YAML; example: --set config.tmp_dir /tmp'
)
parser.add_argument("-w", "--workers",
type=str,
help='distributes processing across several CPU cores; `auto` available-1 cores; example: --workers 4'
)
parser.add_argument("-m", "--mem_per_worker",
type=str,
help='memory limit per worker; if unspecified, memory is evenly divided; example: --mem_per_worker 2.4GB'
)

# Set empty defaults in case they've not been populated by the user.
parser.set_defaults(**{
"selector": "*",
"params": "",
"results_file": "",
"workers": "",
"mem_per_worker": ""
})

### Parse the user-inputs and run Earthmover, depending on the command and subcommand passed.
Expand Down Expand Up @@ -153,7 +163,9 @@ def main(argv=None):
logger=logger,
params='{"BASE_DIR": "' + tests_dir + '"}',
force=True,
skip_hashing=True
skip_hashing=True,
workers=args.workers,
mem_per_worker=args.mem_per_worker,
)
em.logger.info("running tests...")
em.test(tests_dir)
Expand Down Expand Up @@ -200,6 +212,8 @@ def main(argv=None):
cli_state_configs=cli_state_configs,
results_file=args.results_file,
overrides=overrides,
workers=args.workers,
mem_per_worker=args.mem_per_worker,
)

except Exception as err:
Expand Down
Loading