Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ This page tracks releases of `earthmover`, with a summary of what was changed, f
## Unreleased changes

* feature: Add `melt` and `pivot` as dataframe operations
* feature: Introduce precedence order for resolving configuration when using project composition

## 2025 releases

Expand Down
2 changes: 1 addition & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ In the example below, an `earthmover` project `projA` depends on a package `proj
```

#### Composition considerations
* The `config` section is **not** composed from the installed packages, with the exception of `macros` and `parameter_defaults`. Specify all desired configuration in the top-level project.
* The `config` section can be composed from the installed packages, but all config settings in the top-level project take precedence. If your project imports multiple packages with conflicting configuration, those settings must be specified in the top-level project to remove ambiguity.

* There is no limit to the number of packages that can be imported and no limit to how deeply they can be nested (i.e. packages can import other packages). However, there are a few things to keep in mind with using multiple packages.
- If multiple packages at the same level (e.g. `projA/packages/pkgB` and `projA/packages/pkgC`, not `projA/packages/pkgB/packages/pkgC`) include same-named nodes, the package specified later in the `packages` list will overwrite. If the node is also specified in the top-level project, its version of the node will overwrite as usual.
Expand Down
133 changes: 125 additions & 8 deletions earthmover/earthmover.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import dask
import itertools
import json
Expand All @@ -6,6 +7,7 @@
import networkx as nx
import pathlib
import os
import re
import shutil
import string
import time
Expand Down Expand Up @@ -96,12 +98,6 @@ def __init__(self,
# convert state_configs to YamlMapping so we can inject CLI overrides
self.state_configs = YamlMapping().update(self.state_configs)
self.state_configs = self.inject_cli_overrides(self.state_configs, "config.")

# Prepare the output directory for destinations.
self.state_configs['output_dir'] = os.path.expanduser(self.state_configs['output_dir'])

# Set the temporary directory in cases of disk-spillage.
dask.config.set({'temporary_directory': self.state_configs['tmp_dir']})

# Set a directory for installing packages.
self.packages_dir = os.path.join(os.getcwd(), 'packages')
Expand All @@ -115,12 +111,28 @@ def __init__(self,
"row_counts": {}
}

# Update state configs
self.update_state_configs(self.state_configs)

### Prepare objects that are initialized during compile and deps.
self.user_configs: 'YamlMapping' = None
self.package_graph: Graph = None
self.graph: Graph = None


### Abstract state_configs updates so they can be called by `__init__()` and `compile()`
def update_state_configs(self, configs):
# Prepare the output directory for destinations.
self.state_configs['output_dir'] = os.path.expanduser(configs['output_dir'])

# Set the temporary directory in cases of disk-spillage.
if ('tmp_dir' in configs.keys()):
dask.config.set({'temporary_directory': configs['tmp_dir']})

# Update metadata output_dir
self.metadata["output_dir"] = configs["output_dir"]


### Template-Parsing Methods
def load_project_configs(self, filepath: str):
"""
Expand All @@ -130,7 +142,8 @@ def load_project_configs(self, filepath: str):
configs = JinjaEnvironmentYamlLoader.load_project_configs(filepath, params=self.params)

# 1. Update project parameter defaults from the template, if any
for key, val in configs.get("parameter_defaults", {}).items():
self.parameter_defaults = configs.get("parameter_defaults", {})
for key, val in self.parameter_defaults.items():
self.params.setdefault(key, val)

# 2. There may be config keys that expect an environment variable but for which a default is also defined.
Expand Down Expand Up @@ -166,10 +179,18 @@ def compile(self, to_disk: bool = False):
self.graph = Graph(error_handler=self.error_handler)

### Optionally merge packages to update user-configs and write the composed YAML to disk.
# Before merging, check for potential conflicts if installing multiple packages
self.detect_package_config_conflicts()
self.user_configs = self.merge_packages() or self.user_configs
self.user_configs = self.inject_cli_overrides(self.user_configs)

# Apply precedence rules for composed projects
composed_config = self.resolve_composed_config(self.user_configs)
self.user_configs['config'] = composed_config
self.update_state_configs(composed_config)

if to_disk:
self.user_configs.to_disk(self.compiled_yaml_file)
self.user_configs = self.inject_cli_overrides(self.user_configs)

### Compile the nodes and add to the graph type-by-type.
self.sources = self.compile_node_configs(
Expand Down Expand Up @@ -575,6 +596,102 @@ def build_package_graph(self, root_node: str, package_subgraph: Graph, packages_
nested_package_subgraph = nx.ego_graph(self.package_graph, package_name)
self.build_package_graph(root_node=package_name, package_subgraph=nested_package_subgraph, packages_dir=nested_package_dir, install=install)

def resolve_composed_config(self, composed_user_configs):
"""
Resolve final parameter values with correct precedence and expand any remaining variables in the config block
5. (lowest) config_defaults (hardcoded fallbacks)
4. Parent parameter_defaults (from composed YAML)
3. Child parameter_defaults (from original YAML)
2. CLI -p params
1. (highest) CLI --set overrides (passed to this function)

:param composed_user_configs:
:return:
"""
# 5. config_defaults are already set as fallbacks

# 4. Parent config / parameter defaults from composed YAML
final_params = {}
parent_defaults = composed_user_configs.get("config", {}).get("parameter_defaults", {})
final_params.update(parent_defaults)

# 3. Child config / parameter defaults (override parents)
final_params.update(self.parameter_defaults)

# 2. CLI -p params (override defaults)
final_params.update(self.params)

# 1: CLI --set overrides are already in composed_user_configs.config
config_with_vars = composed_user_configs.get("config", {})

# Re-apply parameter substitution for any ${PARAM} values
final_config = copy.deepcopy(config_with_vars)
for key, val in final_config.items():
if isinstance(val, str):
template = string.Template(val)
final_config[key] = template.safe_substitute(final_params)

return final_config

def detect_package_config_conflicts(self):
"""
Before merging packages, check if parents at the same level have conflicting config settings.
If the child does not set a value for that setting, throw an error. We will not pick between
the parents.

This only runs on packages we directly import, but that seems reasonable. If my grandparents
have a conflict, that is my parent's problem.

:return:
"""
parent_packages = list(self.package_graph.successors('root'))
if len(parent_packages) <= 1:
return

self.build_package_graph(root_node='root', package_subgraph=self.package_graph, packages_dir=self.packages_dir, install=False)

# Collect parent configs and child config
parent_configs = {}
for pkg_name in parent_packages:
pkg_node = self.package_graph.nodes[pkg_name]['package']
pkg_node.load_package_yaml(self.params, self.macros)
parent_configs[pkg_name] = pkg_node.package_yaml.get("config", {})

child_config = JinjaEnvironmentYamlLoader.load_project_configs(self.config_file, params={})
child_parameter_defaults = child_config.get("parameter_defaults", {})

# Check hard-coded config block values
skip_keys = {'parameter_defaults', 'macros', 'log_level', 'show_stacktrace', 'show_graph'}
for config_key in {k for pkg_config in parent_configs.values() for k in pkg_config.keys()} - skip_keys:
parent_values = {pkg_name: pkg_config[config_key] for pkg_name, pkg_config in parent_configs.items() if config_key in pkg_config}
if len(parent_values) > 1 and len(set(str(v) for v in parent_values.values())) > 1:
if config_key not in child_config:
self._throw_config_conflict(config_key, parent_values, is_parameter=False)

# Check parameter_defaults conflicts, for params used in child config
child_params = {param for val in child_config.values() if isinstance(val, str) and '${' in val
for param in re.findall(r'\$\{([^}]+)\}', val)}

parent_param_defaults = {pkg_name: pkg_config.get("parameter_defaults", {})
for pkg_name, pkg_config in parent_configs.items()
if pkg_config.get("parameter_defaults")}

for param_name in child_params.intersection({k for params in parent_param_defaults.values() for k in params.keys()}):
parent_param_values = {pkg_name: params[param_name] for pkg_name, params in parent_param_defaults.items() if param_name in params}
if len(parent_param_values) > 1 and len(set(str(v) for v in parent_param_values.values())) > 1:
if param_name not in child_parameter_defaults:
self._throw_config_conflict(param_name, parent_param_values, is_parameter=True)

def _throw_config_conflict(self, key_name, parent_values, is_parameter=False):
msg_type = "parameter_defaults" if is_parameter else "config values"
location = "parameter_defaults" if is_parameter else "config"
conflict_list = "".join(f" - {pkg_name}: {value}\n" for pkg_name, value in parent_values.items())
conflict_msg = f"""Conflicting {msg_type} for '{key_name}' between imported packages:
{conflict_list}
Child project does not define '{key_name}' in {location}, so Earthmover cannot determine which value to use. Please define '{key_name}' in the child project's {location}."""
self.error_handler.throw(conflict_msg)


def clean(self):
"""
Removes local artifacts created by `earthmover run` and `earthmover compile`
Expand Down
1 change: 1 addition & 0 deletions example_projects/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*/packages
*/earthmover_compiled.yaml
4 changes: 3 additions & 1 deletion example_projects/09_edfi/earthmover.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ version: 2

config:
log_level: DEBUG
output_dir: ./output/
output_dir: ./config-output/ #${OUTPUT_DIR}
# show_graph: True
parameter_defaults:
OUTPUT_DIR: ./imported-output/


sources:
Expand Down
6 changes: 4 additions & 2 deletions example_projects/11_composition/earthmover.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ version: 2

config:
log_level: DEBUG
output_dir: ./output/
show_stacktrace: True

# uncomment this block to test child override behavior
# output_dir: ${OUTPUT_DIR}
# parameter_defaults:
# OUTPUT_DIR: ./child_override/

packages:
edfi_example_project:
Expand Down
29 changes: 29 additions & 0 deletions example_projects/11a_multi_parent/earthmover.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: 2

config:
log_level: DEBUG
show_stacktrace: True
# uncomment this block to resolve conflict between parents
# output_dir: ./resolved_output/

packages:
parent1:
local: 'imports/11b_multi_parent_parent1/'
parent2:
local: 'imports/11b_multi_parent_parent2/'

sources:
test_source:
file: ./sources/test.csv
header_rows: 1
columns:
- id
- name

destinations:
test_output:
source: $sources.test_source
template: ./templates/test.jsont
extension: jsonl
linearize: True

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: 2

config:
log_level: DEBUG
output_dir: ./output-1/
parameter_defaults:
OUTPUT_DIR: ./output-1/
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: 2

config:
log_level: DEBUG
output_dir: ./output-2/
parameter_defaults:
OUTPUT_DIR: ./output-2/
5 changes: 5 additions & 0 deletions example_projects/11a_multi_parent/sources/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,name
1,Test Item
2,Another Item
3,Third Item

5 changes: 5 additions & 0 deletions example_projects/11a_multi_parent/templates/test.jsont
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"id": {{ id }},
"name": "{{ name }}"
}

13 changes: 13 additions & 0 deletions example_projects/run_all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,18 @@ earthmover run -f
rm -rf outputs/*, packages/*
echo " ... done!"

echo " running 11_composition..."
cd ../11a_multi_parent/
earthmover deps
earthmover run -f
rm -rf outputs/*, packages/*
echo " ... done!"

echo " running 12_melt_pivot..."
cd ../12_melt_pivot/
earthmover run -f
rm -rf outputs/*
echo " ... done!"

cd ../
echo "all examples have run, goodbye"