Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
efe0fa0
feat: Change how error messages are generated (by writing in batches)…
stevenhsd Jan 26, 2026
f1fd8aa
style: run black, isort and resolve linting issues
stevenhsd Jan 27, 2026
4869482
fix: deal with pathing assumption that file had been moved to process…
georgeRobertson Jan 28, 2026
243a319
style: linting and static tpying now passing
stevenhsd Jan 28, 2026
233e18f
test: reenabled tests removed in error
stevenhsd Jan 28, 2026
cee39a0
fix: re-add audit table logging after accidental removal
georgeRobertson Jan 28, 2026
2d886ac
bump: version 0.5.0 → 0.5.1
georgeRobertson Jan 28, 2026
cefa553
docs: condense patch notes for v0.5.1
georgeRobertson Jan 28, 2026
0f0e72c
Merge pull request #31 from NHSDigital/bugfix/gr-ndit-710-fix_assumpt…
georgeRobertson Jan 28, 2026
c64da59
refactor: Modified business rule step to write feedback messages in b…
stevenhsd Jan 28, 2026
4bbdf02
build: add pytest dev settings with live log
georgeRobertson Jan 30, 2026
cdd5fc8
refactor: improve the logging around dve processing errors and align …
georgeRobertson Jan 30, 2026
5c96472
refactor: ensure traceback in broad exceptions
georgeRobertson Feb 2, 2026
7110ef6
refactor: add sense check for text based file (#32)
stevenhsd Feb 2, 2026
8aa4973
refactor: allow passing of custom loggers into pipeline objects
georgeRobertson Feb 2, 2026
ab8467a
Merge pull request #33 from NHSDigital/bugfix/gr-ndit-850-improve_log…
stevenhsd Feb 2, 2026
9b8f3f5
bump: version 0.5.1 → 0.5.2
georgeRobertson Feb 2, 2026
b111c1c
style: fix linting
stevenhsd Feb 2, 2026
6f6d218
Merge pull request #34 from NHSDigital/release_v052
georgeRobertson Feb 2, 2026
141e1b3
refactor: merge in main and resolve conflicts and linting issues
stevenhsd Feb 3, 2026
aaea473
style: Additional logging around file transformation, business rules …
stevenhsd Feb 6, 2026
fc120b1
refactor: merging logging additions from release branch
stevenhsd Feb 6, 2026
68f5686
feat: Add read of arrow ipc files to reference data loaders
stevenhsd Feb 6, 2026
70d2399
feat: added reference data loading of arrow ipc files including enhan…
stevenhsd Feb 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
## v0.5.2 (2026-02-02)

### Refactor

- allow passing of custom loggers into pipeline objects
- ensure traceback in broad exceptions
- improve the logging around dve processing errors and align reporting to module name rather than legacy name
- add sense check for text based file (#32)

## v0.5.1 (2026-01-28)

### Fix

- deal with pathing assumption that file had been moved to processed_file_path during file transformation

## v0.5.0 (2026-01-16)

### Feat
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ behave:
${activate} behave

pytest:
${activate} pytest tests/
${activate} pytest -c pytest-dev.ini

all-tests: pytest behave

Expand Down
4 changes: 2 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ for entity in data_contract_config.schemas:

# Data contract step here
data_contract = SparkDataContract(spark_session=spark)
entities, validation_messages, success = data_contract.apply_data_contract(
entities, data_contract_config
entities, feedback_errors_uri, success = data_contract.apply_data_contract(
entities, None, data_contract_config
)
```

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "nhs_dve"
version = "0.5.0"
version = "0.5.2"
description = "`nhs data validation engine` is a framework used to validate data"
authors = ["NHS England <england.contactus@nhs.net>"]
readme = "README.md"
Expand Down
3 changes: 3 additions & 0 deletions pytest-dev.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]
log_cli = true
log_cli_level = INFO
File renamed without changes.
187 changes: 187 additions & 0 deletions src/dve/common/error_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""Utilities to support reporting"""

import datetime as dt
import json
import logging
from collections.abc import Iterable
from itertools import chain
from multiprocessing import Queue
from threading import Thread
from typing import Optional, Union

import dve.parser.file_handling as fh
from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.loggers import get_logger
from dve.core_engine.message import UserMessage
from dve.core_engine.type_hints import URI, DVEStage, Messages


def get_feedback_errors_uri(working_folder: URI, step_name: DVEStage) -> URI:
"""Determine the location of json lines file containing all errors generated in a step"""
return fh.joinuri(working_folder, "errors", f"{step_name}_errors.jsonl")


def get_processing_errors_uri(working_folder: URI) -> URI:
"""Determine the location of json lines file containing all processing
errors generated from DVE run"""
return fh.joinuri(working_folder, "errors", "processing_errors", "processing_errors.jsonl")


def dump_feedback_errors(
working_folder: URI,
step_name: DVEStage,
messages: Messages,
key_fields: Optional[dict[str, list[str]]] = None,
) -> URI:
"""Write out captured feedback error messages."""
if not working_folder:
raise AttributeError("processed files path not passed")

if not key_fields:
key_fields = {}

error_file = get_feedback_errors_uri(working_folder, step_name)
processed = []

for message in messages:
if message.original_entity is not None:
primary_keys = key_fields.get(message.original_entity, [])
elif message.entity is not None:
primary_keys = key_fields.get(message.entity, [])
else:
primary_keys = []

error = message.to_dict(
key_field=primary_keys,
value_separator=" -- ",
max_number_of_values=10,
record_converter=None,
)
error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ")
processed.append(error)

with fh.open_stream(error_file, "a") as f:
f.write("\n".join([json.dumps(rec, default=str) for rec in processed]) + "\n")
return error_file


def dump_processing_errors(
working_folder: URI, step_name: str, errors: list[CriticalProcessingError]
):
"""Write out critical processing errors"""
if not working_folder:
raise AttributeError("processed files path not passed")
if not step_name:
raise AttributeError("step name not passed")
if not errors:
raise AttributeError("errors list not passed")

error_file: URI = fh.joinuri(working_folder, "processing_errors", "processing_errors.json")
processed = []

for error in errors:
processed.append(
{
"step_name": step_name,
"error_location": "processing",
"error_level": "integrity",
"error_message": error.error_message,
"error_traceback": error.messages,
}
)

with fh.open_stream(error_file, "a") as f:
f.write("\n".join([json.dumps(rec, default=str) for rec in processed]) + "\n")

return error_file


def load_feedback_messages(feedback_messages_uri: URI) -> Iterable[UserMessage]:
"""Load user messages from jsonl file"""
if not fh.get_resource_exists(feedback_messages_uri):
return
with fh.open_stream(feedback_messages_uri) as errs:
yield from (UserMessage(**json.loads(err)) for err in errs.readlines())


def load_all_error_messages(error_directory_uri: URI) -> Iterable[UserMessage]:
"Load user messages from all jsonl files"
return chain.from_iterable(
[
load_feedback_messages(err_file)
for err_file, _ in fh.iter_prefix(error_directory_uri)
if err_file.endswith(".jsonl")
]
)


class BackgroundMessageWriter:
"""Controls batch writes to error jsonl files"""

def __init__(
self,
working_directory: URI,
dve_stage: DVEStage,
key_fields: Optional[dict[str, list[str]]] = None,
logger: Optional[logging.Logger] = None,
):
self._working_directory = working_directory
self._dve_stage = dve_stage
self._feedback_message_uri = get_feedback_errors_uri(
self._working_directory, self._dve_stage
)
self._key_fields = key_fields
self.logger = logger or get_logger(type(self).__name__)
self._write_thread: Optional[Thread] = None
self._queue: Queue = Queue()

@property
def write_queue(self) -> Queue: # type: ignore
"""Queue for storing batches of messages to be written"""
return self._queue

@property
def write_thread(self) -> Thread: # type: ignore
"""Thread to write batches of messages to jsonl file"""
if not self._write_thread:
self._write_thread = Thread(target=self._write_process_wrapper)
return self._write_thread

def _write_process_wrapper(self):
"""Wrapper for dump feedback errors to run in background process"""
while True:
if msgs := self.write_queue.get():
dump_feedback_errors(
self._working_directory, self._dve_stage, msgs, self._key_fields
)
else:
break

def __enter__(self) -> "BackgroundMessageWriter":
self.write_thread.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
self.logger.exception(
"Issue occured during background write process:",
exc_info=(exc_type, exc_value, traceback),
)
self.write_queue.put(None)
self.write_thread.join()


def conditional_cast(value, primary_keys: list[str], value_separator: str) -> Union[list[str], str]:
"""Determines what to do with a value coming back from the error list"""
if isinstance(value, list):
casts = [
conditional_cast(val, primary_keys, value_separator) for val in value
] # type: ignore
return value_separator.join(
[f"{pk}: {id}" if pk else "" for pk, id in zip(primary_keys, casts)]
)
if isinstance(value, dt.date):
return value.isoformat()
if isinstance(value, dict):
return ""
return str(value)
57 changes: 30 additions & 27 deletions src/dve/core_engine/backends/base/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@
from dve.core_engine.backends.types import Entities, EntityType, StageSuccessful
from dve.core_engine.loggers import get_logger
from dve.core_engine.models import SubmissionInfo
from dve.core_engine.type_hints import (
URI,
EntityLocations,
EntityName,
EntityParquetLocations,
Messages,
)
from dve.core_engine.type_hints import URI, EntityLocations, EntityName, EntityParquetLocations
from dve.parser.file_handling.service import get_parent, joinuri


class BaseBackend(Generic[EntityType], ABC):
Expand Down Expand Up @@ -148,65 +143,71 @@ def convert_entities_to_spark(

def apply(
self,
working_dir: URI,
entity_locations: EntityLocations,
contract_metadata: DataContractMetadata,
rule_metadata: RuleMetadata,
submission_info: Optional[SubmissionInfo] = None,
) -> tuple[Entities, Messages, StageSuccessful]:
) -> tuple[Entities, URI, StageSuccessful]:
"""Apply the data contract and the rules, returning the entities and all
generated messages.
"""
reference_data = self.load_reference_data(
rule_metadata.reference_data_config, submission_info
)
entities, messages, successful = self.contract.apply(entity_locations, contract_metadata)
entities, dc_feedback_errors_uri, successful, processing_errors_uri = self.contract.apply(
working_dir, entity_locations, contract_metadata
)
if not successful:
return entities, messages, successful
return entities, get_parent(processing_errors_uri), successful

for entity_name, entity in entities.items():
entities[entity_name] = self.step_implementations.add_row_id(entity)

# TODO: Handle entity manager creation errors.
entity_manager = EntityManager(entities, reference_data)
# TODO: Add stage success to 'apply_rules'
rule_messages = self.step_implementations.apply_rules(entity_manager, rule_metadata)
messages.extend(rule_messages)
# TODO: In case of large errors in business rules, write messages to jsonl file
# TODO: and return uri to errors
_ = self.step_implementations.apply_rules(working_dir, entity_manager, rule_metadata)

for entity_name, entity in entity_manager.entities.items():
entity_manager.entities[entity_name] = self.step_implementations.drop_row_id(entity)

return entity_manager.entities, messages, True
return entity_manager.entities, get_parent(dc_feedback_errors_uri), True

def process(
self,
working_dir: URI,
entity_locations: EntityLocations,
contract_metadata: DataContractMetadata,
rule_metadata: RuleMetadata,
cache_prefix: URI,
submission_info: Optional[SubmissionInfo] = None,
) -> tuple[MutableMapping[EntityName, URI], Messages]:
) -> tuple[MutableMapping[EntityName, URI], URI]:
"""Apply the data contract and the rules, write the entities out to parquet
and returning the entity locations and all generated messages.
"""
entities, messages, successful = self.apply(
entity_locations, contract_metadata, rule_metadata, submission_info
entities, feedback_errors_uri, successful = self.apply(
working_dir, entity_locations, contract_metadata, rule_metadata, submission_info
)
if successful:
parquet_locations = self.write_entities_to_parquet(entities, cache_prefix)
parquet_locations = self.write_entities_to_parquet(
entities, joinuri(working_dir, "outputs")
)
else:
parquet_locations = {}
return parquet_locations, messages
return parquet_locations, get_parent(feedback_errors_uri)

def process_legacy(
self,
working_dir: URI,
entity_locations: EntityLocations,
contract_metadata: DataContractMetadata,
rule_metadata: RuleMetadata,
cache_prefix: URI,
submission_info: Optional[SubmissionInfo] = None,
) -> tuple[MutableMapping[EntityName, DataFrame], Messages]:
) -> tuple[MutableMapping[EntityName, DataFrame], URI]:
"""Apply the data contract and the rules, create Spark `DataFrame`s from the
entities and return the Spark entities and all generated messages.
Expand All @@ -221,17 +222,19 @@ def process_legacy(
category=DeprecationWarning,
)

entities, messages, successful = self.apply(
entity_locations, contract_metadata, rule_metadata, submission_info
entities, errors_uri, successful = self.apply(
working_dir, entity_locations, contract_metadata, rule_metadata, submission_info
)

if not successful:
return {}, messages
return {}, errors_uri

if self.__entity_type__ == DataFrame:
return entities, messages # type: ignore
return entities, errors_uri # type: ignore

return (
self.convert_entities_to_spark(entities, cache_prefix, _emit_deprecation_warning=False),
messages,
self.convert_entities_to_spark(
entities, joinuri(working_dir, "outputs"), _emit_deprecation_warning=False
),
errors_uri,
)
Loading