We need to create tasks based on different logs produced by the dataset-resource pipeline and the collection pipeline. This is primarily to be used in async after collecting and transforming a resource. We should create this in digital-land-python incase we want to re-use it elsewhere.
Technical Approach
- in the pipeline subpackage create a task.py module and add the TaskPipeline class in there. it should have a run method which can be used to run the pipeline. arguments can either be fed in via the method arguments or pass in a pydantic dataclass
Config into the TaskPipeline classwhich contains the relevant information. This is simalar to the pipelines found in the pyspark-jobs repo.
- it should create the task according to the agreed task schema. this is in the spec however we can ignore start-date and end-date
- it needs to create tasks from:
- log (Collection Log) - it should create a row in the task data if the collection was not successful
- issue (Issue log) - group issues by issue_type resource and field dataset, focus on issues with error severity and external responsibility for now but allow this to change in the future. one task row should represent an issue type for a particular field.
- Use polars to run transformations on the input csvs to generate a single task output csv. The polars functions can live within the task.py module for now. When we need to use then elsewhere we can create a separate python subpackage for polars transformation functions.
expected json for detail column for log tasks:
{
"status":500
"exception":""
}
expected json for detail for issue tasks:
{
"issue_type":"invalid geom"
"count":10
"field":"geometry"
}
Acceptance criteria:
- create code to do the above, create an example output that can be shared and signed off by the team
- merge in code to digital-land-python - no need yet to use this code in async that's the next step
We need to create tasks based on different logs produced by the dataset-resource pipeline and the collection pipeline. This is primarily to be used in async after collecting and transforming a resource. We should create this in digital-land-python incase we want to re-use it elsewhere.
Technical Approach
Configinto the TaskPipeline classwhich contains the relevant information. This is simalar to the pipelines found in the pyspark-jobs repo.expected json for detail column for log tasks:
expected json for detail for issue tasks:
Acceptance criteria: