Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,4 @@ tmp/
.mypy_cache/

logs/
.claude/
74 changes: 49 additions & 25 deletions nac_yaml/yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,16 @@ def from_yaml(cls, loader: Any, node: Any) -> str:
def load_yaml_files(paths: list[Path], deduplicate: bool = True) -> dict[str, Any]:
"""Load all yaml files from a provided directory."""

# Create YAML instance once and reuse it for all files
y = yaml.YAML()
y.preserve_quotes = True
y.register_class(VaultTag)
y.register_class(EnvTag)

def _load_file(file_path: Path, data: dict[str, Any]) -> None:
with open(file_path) as file:
if file_path.suffix in [".yaml", ".yml"]:
data_yaml = file.read()
y = yaml.YAML()
y.preserve_quotes = True
y.register_class(VaultTag)
y.register_class(EnvTag)
dict = y.load(data_yaml)
merge_dict(dict, data)

Expand All @@ -92,34 +94,49 @@ def _load_file(file_path: Path, data: dict[str, Any]) -> None:

def merge_list_item(source_item: Any, destination: list[Any]) -> None:
"""Merge item into list."""
if isinstance(source_item, dict):
# check if we have an item in destination with matching primitives
for dest_item in destination:
match = True
comparison = False
unique_source = False
unique_dest = False
for k, v in source_item.items():
if isinstance(v, dict) or isinstance(v, list):
continue
if k not in dest_item:
unique_source = True
continue
comparison = True
if v != dest_item[k]:
match = False
if not isinstance(source_item, dict):
destination.append(source_item)
return

# check if we have an item in destination with matching primitives
for dest_item in destination:
if not isinstance(dest_item, dict):
continue

match = True
comparison = False
unique_source = False
unique_dest = False

# Check source item keys
for k, v in source_item.items():
if isinstance(v, (dict, list)):
continue
if k not in dest_item:
unique_source = True
continue
comparison = True
if v != dest_item[k]:
match = False
break # Early exit on mismatch

# Only check dest if we still have a match
if match:
for k, v in dest_item.items():
if isinstance(v, dict) or isinstance(v, list):
if isinstance(v, (dict, list)):
continue
if k not in source_item:
unique_dest = True
continue
comparison = True
if v != source_item[k]:
match = False
if comparison and match and not (unique_source and unique_dest):
merge_dict(source_item, dest_item)
return
break # Early exit on mismatch

if comparison and match and not (unique_source and unique_dest):
merge_dict(source_item, dest_item)
return

destination.append(source_item)


Expand All @@ -130,13 +147,16 @@ def merge_dict(source: dict[str, Any], destination: dict[str, Any]) -> dict[str,
for key, value in source.items():
if key not in destination or destination[key] is None:
destination[key] = value
elif value is None:
# Skip None values when destination already has a value
continue
elif isinstance(value, dict):
if isinstance(destination[key], dict):
merge_dict(value, destination[key])
elif isinstance(value, list):
if isinstance(destination[key], list):
destination[key] += value
elif value is not None:
else:
destination[key] = value
return destination

Expand All @@ -147,9 +167,13 @@ def deduplicate_list_items(data: dict[str, Any]) -> dict[str, Any]:
if isinstance(value, dict):
deduplicate_list_items(value)
elif isinstance(value, list):
# Only deduplicate if there are items
if not value:
continue
deduplicated_list: list[Any] = []
for i in value:
merge_list_item(i, deduplicated_list)
# Recursively deduplicate nested dicts
for i in deduplicated_list:
if isinstance(i, dict):
deduplicate_list_items(i)
Expand Down
Loading
Loading