diff --git a/DATA_GOV_HARVESTER.md b/DATA_GOV_HARVESTER.md new file mode 100644 index 0000000..d409f6f --- /dev/null +++ b/DATA_GOV_HARVESTER.md @@ -0,0 +1,390 @@ +# Data.gov Harvester + +This document describes the Data.gov harvester implementation for CKAN. + +## Overview + +The Data.gov harvester (`DataGovHarvester`) enables CKAN to harvest datasets from Data.gov's catalog using their current Search API. This replaces the legacy CKAN-based API with support for Data.gov's new API endpoint and DCAT-US metadata format. + +## Features + +- **Cursor-based pagination**: Efficiently handles large result sets using Data.gov's `after` cursor +- **DCAT-US metadata mapping**: Maps Data.gov's DCAT-US metadata format to CKAN's internal structure +- **Configurable query parameters**: Supports all Data.gov API parameters including `q`, `keyword`, `org_slug`, `spatial_geometry`, and more +- **Resource tracking**: Preserves resource IDs across harvests to maintain datastore data +- **Comprehensive metadata**: Maps core DCAT-US fields including spatial, temporal, contact points, and more +- **Extensible configuration**: Supports all configuration processors from CustomHarvester base class + +## Installation + +1. Add the harvester plugin to your CKAN config: + +```ini +ckan.plugins = ... custom_harvest datagov_harvester +``` + +2. Restart CKAN + +## Creating a Harvest Source + +1. Navigate to Organizations → [Your Organization] → Harvest +2. Click "Add Harvest Source" +3. Fill in the form: + - **URL**: Data.gov Search API endpoint with optional query parameters + - Example: `https://catalog.data.gov/search?q=fish&org_slug=epa&keyword=California&spatial_within=false&spatial_geometry={%22type%22:%22Polygon%22,%22coordinates%22:[[[-127.0,32.5],[-127.0,42.0],[-114.1,42.0],[-114.1,32.5],[-127.0,32.5]]]}` + - **Title**: Descriptive name for the harvest source + - **Type**: Select "Data.gov Catalog" (datagov_harvest) + - **Configuration**: JSON configuration (optional) + - **Organization**: Select the organization that will own the harvested datasets + +## API Endpoint + +**URL**: `https://catalog.data.gov/search` + +## Query Parameters + +Query parameters can be included in the harvest source URL. The harvester preserves ALL query parameters during pagination. + +| Parameter | Type | Description | Example | +|-----------|------|-------------|---------| +| `q` | string | Full-text search query | `q=water+quality` | +| `org_slug` | string | Filter by organization slug | `org_slug=epa-gov` | +| `keyword` | string/array | Filter by exact keyword match | `keyword=California` | +| `spatial_geometry` | GeoJSON | Filter by spatial geometry | See example below | +| `spatial_filter` | string | Filter by spatial type | `spatial_filter=geospatial` | +| `spatial_within` | boolean | Spatial containment (true=within, false=intersects) | `spatial_within=true` | +| `org_type` | string | Filter by organization type | `org_type=Federal+Government` | +| `sort` | string | Sort order (relevance, popularity, last_harvested_date, distance) | `sort=popularity` | +| `per_page` | integer | Results per page (default: 10) | `per_page=100` | + +**Example URLs**: + +``` +# Simple text search +https://catalog.data.gov/search?q=water + +# Filter by organization +https://catalog.data.gov/search?q=fish&org_slug=epa + +# Complex query with multiple filters and spatial geometry +https://catalog.data.gov/search?q=fish&org_slug=epa&keyword=California&spatial_within=false&spatial_geometry={%22type%22:%22Polygon%22,%22coordinates%22:[[[-127.0,32.53433506002680],[-127.0,42.00956632985810],[-114.13078987213299,42.00956632985810],[-114.13078987213299,32.53433506002680],[-127.0,32.53433506002680]]]} +``` + +**Note:** For complex spatial queries, URL-encode the GeoJSON geometry parameter (use `%22` for quotes, `%3A` for colons, etc.). + +## Configuration Options + +The harvester supports JSON configuration via the harvest source config field. All options are optional. + +### Harvester Configuration + +Inherited from CustomHarvester base class: + +```json +{ + "default_tags": [{"name": "federal-data"}, {"name": "datagov"}], + "clean_tags": true, + "default_extras": [ + {"key": "harvest_source", "value": "data.gov"} + ], + "mapping_fields": { + "bureauCode": "dcat.bureauCode", + "programCode": "dcat.programCode" + }, + "keep_existing_resources": false +} +``` + +- `default_tags` (array): Tags to add to all harvested datasets +- `clean_tags` (boolean): Sanitize tag names (remove invalid characters) +- `default_extras` (array): Custom extras to add to all datasets +- `mapping_fields` (object): Map additional DCAT-US fields to CKAN extras +- `organizations_filter_include` (array): Whitelist of organization slugs (only harvest these orgs) +- `organizations_filter_exclude` (array): Blacklist of organization slugs (harvest all except these) +- `keep_existing_resources` (boolean): Preserve unmatched existing resources on update + +**Note on Organization Filtering:** +- Filtering happens **after** fetching results from Data.gov API +- Use `organizations_filter_include` for whitelist (only specified orgs are harvested) +- Use `organizations_filter_exclude` for blacklist (all orgs except specified are harvested) +- Organization slugs match the `organization.slug` field from Data.gov (e.g., `"epa"`, `"noaa"`) +- For API-level filtering, use the `org_slug` parameter in the URL instead + +### Example Configurations + +**Basic harvest with tags**: +```json +{ + "default_tags": [{"name": "federal-data"}], + "clean_tags": true +} +``` + +**Filter by organization (post-harvest filtering)**: +```json +{ + "organizations_filter_include": ["epa", "noaa", "usgs"], + "default_tags": [{"name": "federal-science"}] +} +``` + +**Exclude specific organizations**: +```json +{ + "organizations_filter_exclude": ["gsa", "dhs"], + "clean_tags": true +} +``` + +**Map extended DCAT-US fields**: +```json +{ + "mapping_fields": { + "bureauCode": "dcat.bureauCode", + "programCode": "dcat.programCode", + "theme": "dcat.theme", + "accrualPeriodicity": "dcat.accrualPeriodicity" + } +} +``` + +**Map to composite fields** (requires ckanext-scheming): +```json +{ + "composite_field_mapping": [ + { + "idInfoCitation": { + "publicationDate": "extras.dcat_modified", + "title": "extras.landing_page" + } + } + ] +} +``` + +**Note on composite_field_mapping:** +- Use `extras.{key_name}` to reference converted extras (e.g., `extras.dcat_modified`, `extras.contact_name`) +- The harvester automatically copies converted extras to `source_dict` before applying config processors +- Available extras: `dcat_modified`, `contact_name`, `contact_email`, `landing_page`, `spatial`, `temporal`, `guid`, `datagov_slug`, etc. +- See "Metadata Mapping" section below for complete list of extras + +## Metadata Mapping + +The harvester maps Data.gov's DCAT-US metadata format to CKAN's internal structure: + +### Core Fields + +| Data.gov Field | CKAN Field | Notes | +|----------------|------------|-------| +| `title` | `title` | Dataset title | +| `description` | `notes` | Dataset description | +| `keyword[]` | `tags` | Tags (sanitized) | +| `theme[]` | `groups` | Themes mapped to CKAN groups (see note below) | +| `identifier` | extra: `guid` | UUID identifier (used as GUID) | +| `slug` | extra: `datagov_slug` | Human-readable slug | +| `publisher` | extra: `publisher` | Publishing organization name | +| `last_harvested_date` | extra: `source_metadata_modified` | Last harvest date (trimmed to date if midnight) | + +**Note on Theme → Group Mapping:** +Data.gov's `theme` field (from both top-level and `dcat.theme`) is automatically mapped to CKAN groups. Theme names are: +- Converted to lowercase +- Spaces replaced with hyphens +- Special characters removed +- Deduplicated if a theme appears in both locations + +**Important:** CKAN groups must exist before datasets can be assigned to them. The `RemoteGroups` configuration processor will validate that groups exist and filter out any that don't. To use this feature: +1. Create groups in CKAN matching your Data.gov themes (e.g., `environment`, `health`, `natural-resources`) +2. Themes will automatically be assigned to matching groups during harvest +3. Non-existent groups will be silently skipped + +### DCAT-US Core Fields + +| Data.gov Field | CKAN Field | Notes | +|----------------|------------|-------| +| `dcat.accessLevel` | extra: `dcat_access_level` | Access level (public, restricted, etc.) | +| `dcat.modified` | extra: `dcat_modified` | Last modified date (trimmed to date if midnight) | +| `dcat.issued` | extra: `dcat_issued` | Date of formal issuance (trimmed to date if midnight) | +| `dcat.contactPoint.fn` | extra: `contact_name` | Contact person name | +| `dcat.contactPoint.hasEmail` | extra: `contact_email` | Contact email (mailto: removed) | +| `dcat.license` | `license_id` | License (mapped to CKAN license) | +| `spatial_shape` | extra: `spatial` | GeoJSON Polygon (preferred) | +| `dcat.spatial` | extra: `spatial` | Bounding box string (fallback) | +| `spatial_centroid` | extra: `spatial_centroid` | Center point as "lat,lon" | +| `dcat.temporal` | extra: `temporal` | Temporal coverage | +| `dcat.landingPage` | extra: `landing_page` | Dataset landing page URL | +| `dcat.rights` | extra: `dcat_rights` | Rights statement or URL | + +**Note on Spatial Data:** +Data.gov provides spatial data in multiple formats: +- `spatial_shape`: A proper GeoJSON Polygon object (preferred for mapping) +- `dcat.spatial`: A bounding box string like `"west,south,east,north"` (used as fallback) +- `spatial_centroid`: Center point with `lat` and `lon` (stored as `"lat,lon"` string) + +The harvester prefers `spatial_shape` for the `spatial` extra as it provides the actual boundary polygon rather than just a bounding box. + +**Note on Date Formatting:** +Timestamps at midnight (00:00:00) are automatically trimmed to date-only format for cleaner display: +- `2015-10-02T00:00:00.000+00:00` → `2015-10-02` +- `2015-10-02T14:30:00.000+00:00` → `2015-10-02T14:30:00.000+00:00` (time preserved) + +This applies to `dcat_modified` and `source_metadata_modified` extras. + +### Resources (Distributions) + +| Data.gov Field | CKAN Field | Notes | +|----------------|------------|-------| +| `distribution[].title` | `resource['name']` | Resource name | +| `distribution[].description` | `resource['description']` | Resource description | +| `distribution[].downloadURL` | `resource['url']` | Download URL (preferred) | +| `distribution[].accessURL` | `resource['url']` | Access URL (fallback) | +| `distribution[].format` | `resource['format']` | File format | +| `distribution[].mediaType` | `resource['mimetype']` | MIME type | +| `distribution[].byteSize` | `resource['size']` | File size in bytes | + +### Organization Fields + +| Data.gov Field | CKAN Field | Notes | +|----------------|------------|-------| +| `organization.name` | extra: `source_organization_name` | Source org name | +| `organization.organization_type` | extra: `source_organization_type` | Organization type | + +**Note:** The following Data.gov fields are NOT mapped to extras (available in source but not stored): +- `harvest_record`: URL to harvest metadata page +- `harvest_record_raw`: URL to raw harvest record +- `distribution_titles`: Array of distribution titles (distributions themselves are mapped to resources) +- `popularity`: Popularity score (search metadata, not stored) + +### Extended Fields + +Additional DCAT-US fields can be mapped using the `mapping_fields` configuration option: + +- `dcat.bureauCode`: Bureau codes +- `dcat.programCode`: Program codes +- `dcat.theme`: Theme categories +- `dcat.accrualPeriodicity`: Update frequency +- `dcat.references`: Related documents +- And more... + +## Creating Groups for Themes + +Data.gov datasets often include `theme` fields that categorize datasets (e.g., "Environment", "Health", "Natural Resources"). The harvester automatically maps these themes to CKAN groups, but the groups must exist in CKAN first. + +### Common Data.gov Themes + +Common themes found in Data.gov datasets include: +- `environment` +- `health` +- `natural-resources` +- `water-quality` +- `public-safety` +- `transportation` +- `education` +- `energy` +- `agriculture` + +### Discovering Themes in Your Harvest + +To see what themes are available in your Data.gov query results, check the API response: + +```bash +curl "https://catalog.data.gov/search?q=your-query&per_page=10" | \ + jq '.results[].theme[]' | sort -u +``` + +Or after harvesting, check the logs for group validation messages. + +## Architecture + +### Class Structure + +``` +CustomHarvester (base class) +└── DataGovHarvester + ├── info() + ├── gather_stage() + ├── fetch_stage() + ├── import_stage() + └── _search_for_datasets() +``` + +### Harvester Lifecycle + +1. **gather_stage**: + - Parses source URL and extracts query parameters + - Fetches all datasets using cursor-based pagination + - Creates HarvestObject records with status (new/change/delete) + - Uses `identifier` (UUID) as GUID + +2. **fetch_stage**: + - No-op (data already fetched during gather) + +3. **import_stage**: + - Converts Data.gov format to CKAN format using `datagov_to_ckan()` + - Applies configuration processors + - Copies resource IDs for updates + - Creates or updates CKAN packages + +### Pagination + +The harvester uses cursor-based pagination to handle large result sets: + +1. Initial request with no `after` parameter +2. Response includes `after` cursor if more pages exist +3. Subsequent requests include `after` parameter +4. Loop continues until `after` is absent from response + +This is more efficient than offset-based pagination and handles concurrent updates better. + +## Troubleshooting + +### No datasets found + +- Check that the harvest source URL is correct +- Verify query parameters are properly formatted + +### Datasets not updating + +- Verify the `identifier` field is present in Data.gov responses +- Check that harvest jobs are completing successfully +- Review harvest source status page for errors + +### Resource IDs changing + +- Ensure resources have stable URLs +- Check that `copy_across_resource_ids()` is working correctly +- Resources are matched by URL + name + format + +### Missing metadata + +- Verify DCAT-US fields are present in Data.gov response +- Use `mapping_fields` config to map additional fields +- Check converter logs for any mapping errors + +### License not mapped + +- Add custom license mapping in `map_datagov_license()` +- Or license URL will be stored in extras as fallback + +## Implementation Files + +- `ckanext/custom_harvest/harvesters/datagov.py`: Main harvester class +- `ckanext/custom_harvest/converter.py`: Data mapping functions (datagov_to_ckan, extract_format, map_datagov_license, munge_tag) +- `ckanext/custom_harvest/tests/harvesters/test_datagov_harvester.py`: Test suite +- `ckanext/custom_harvest/tests/harvesters/mock_datagov.py`: Mock server for testing +- `setup.py`: Entry point registration + +## Contributing + +When modifying the harvester: + +1. Update converter functions in `converter.py` for new field mappings +2. Add test cases to `test_datagov_harvester.py` +3. Update mock datasets in `mock_datagov.py` if needed +4. Run tests to verify changes +5. Update this documentation + +## Resources + +- [Data.gov Catalog API Documentation](https://resources.data.gov/catalog-api/) +- [DCAT-US Schema](https://resources.data.gov/resources/dcat-us/) +- [CKAN Harvesting Documentation](https://github.com/ckan/ckanext-harvest) diff --git a/ckanext/custom_harvest/converter.py b/ckanext/custom_harvest/converter.py index 1071061..723f502 100644 --- a/ckanext/custom_harvest/converter.py +++ b/ckanext/custom_harvest/converter.py @@ -1,12 +1,29 @@ import logging import mimetypes +import re +from urllib.parse import urlparse from ckan.common import config from ckan.plugins import toolkit +from ckan.lib.helpers import json log = logging.getLogger(__name__) mimetypes.init() +# Cache for license list +_license_list_cache = None + + +def _get_license_list(): + '''Get cached license list to avoid repeated API calls''' + global _license_list_cache + if _license_list_cache is None: + try: + _license_list_cache = toolkit.get_action('license_list')({}, {}) + except Exception: + _license_list_cache = [] + return _license_list_cache + def package_search_to_ckan(source_dict): package_dict = {} @@ -14,7 +31,7 @@ def package_search_to_ckan(source_dict): package_dict['title'] = source_dict.get('title') package_dict['notes'] = source_dict.get('notes', '') - if 'fluent' in config.get('ckan.plugins'): + if 'fluent' in config.get('ckan.plugins', ''): package_dict['title_translated'] = {'en': source_dict.get('title')} package_dict['notes_translated'] = {'en': source_dict.get('notes', '') or ''} @@ -35,7 +52,7 @@ def package_search_to_ckan(source_dict): package_dict['extras'].append({'key': extra.get('key'), 'value': extra.get('value')}) if source_dict.get('license'): - for license in toolkit.get_action('license_list')({}, {}): + for license in _get_license_list(): if license.get('url') == source_dict.get('license'): package_dict['license_id'] = license.get('id') break @@ -69,7 +86,7 @@ def package_search_to_ckan(source_dict): 'format': format, } - if 'fluent' in config.get('ckan.plugins'): + if 'fluent' in config.get('ckan.plugins', ''): resource['name_translated'] = {'en': source_resource.get('name')} resource['description_translated'] = {'en': source_resource.get('description', '') or ''} @@ -117,3 +134,349 @@ def convert_to_filter_list(filter_string): except Exception as e: log.error(e) return format_list + + +def datagov_to_ckan(source_dict): + ''' + Converts a Data.gov Search API result to CKAN package dict format. + + Maps both top-level fields and nested DCAT-US metadata. + ''' + package_dict = {} + + # Get DCAT object + dcat = source_dict.get('dcat', {}) + + # Basic metadata from top-level fields + package_dict['title'] = source_dict.get('title', 'Untitled Dataset') + package_dict['notes'] = source_dict.get('description', '') + + # Fluent support (if enabled) + if 'fluent' in config.get('ckan.plugins', ''): + package_dict['title_translated'] = {'en': source_dict.get('title', 'Untitled Dataset')} + package_dict['notes_translated'] = {'en': source_dict.get('description', '') or ''} + + # Tags from keywords + package_dict['tags'] = [] + for keyword in source_dict.get('keyword', []): + if keyword: + munged_tag = munge_tag(keyword) + if munged_tag: + package_dict['tags'].append({'name': munged_tag}) + + # Groups - Map Data.gov themes to CKAN groups + package_dict['groups'] = [] + + # Get themes from both top-level and dcat fields + themes = set() + if source_dict.get('theme'): + themes.update(source_dict.get('theme')) + if dcat.get('theme'): + themes.update(dcat.get('theme')) + + # Convert themes to group format with slugified names + for theme in themes: + if theme: + # Create a slug-friendly name from the theme + group_name = munge_tag(theme.lower().replace(' ', '-')) + if group_name: + package_dict['groups'].append({'name': group_name}) + + # Extras - store comprehensive metadata + package_dict['extras'] = [] + + # Core identifiers + package_dict['extras'].append({ + 'key': 'guid', + 'value': source_dict.get('identifier') + }) + if source_dict.get('slug'): + package_dict['extras'].append({ + 'key': 'datagov_slug', + 'value': source_dict.get('slug') + }) + + # Temporal information + if source_dict.get('last_harvested_date'): + package_dict['extras'].append({ + 'key': 'source_metadata_modified', + 'value': trim_date(source_dict.get('last_harvested_date')) + }) + + # Organization/Publisher + if source_dict.get('publisher'): + package_dict['extras'].append({ + 'key': 'publisher', + 'value': source_dict.get('publisher') + }) + + org = source_dict.get('organization', {}) + if org: + if org.get('name'): + package_dict['extras'].append({ + 'key': 'source_organization_name', + 'value': org.get('name') + }) + if org.get('organization_type'): + package_dict['extras'].append({ + 'key': 'source_organization_type', + 'value': org.get('organization_type') + }) + + # Spatial information + if source_dict.get('has_spatial'): + package_dict['extras'].append({ + 'key': 'has_spatial', + 'value': str(source_dict.get('has_spatial')) + }) + + # DCAT-US metadata (core fields) + # Access level + if dcat.get('accessLevel'): + package_dict['extras'].append({ + 'key': 'dcat_access_level', + 'value': dcat.get('accessLevel') + }) + + # Modified date + if dcat.get('modified'): + package_dict['extras'].append({ + 'key': 'dcat_modified', + 'value': trim_date(dcat.get('modified')) + }) + + # Issued date + if dcat.get('issued'): + package_dict['extras'].append({ + 'key': 'dcat_issued', + 'value': trim_date(dcat.get('issued')) + }) + + # Contact point + contact_point = dcat.get('contactPoint', {}) + if contact_point: + if contact_point.get('fn'): + package_dict['extras'].append({ + 'key': 'contact_name', + 'value': contact_point.get('fn') + }) + if contact_point.get('hasEmail'): + email = contact_point.get('hasEmail', '').replace('mailto:', '') + package_dict['extras'].append({ + 'key': 'contact_email', + 'value': email + }) + + # License + if dcat.get('license'): + license_id = map_datagov_license(dcat.get('license')) + if license_id: + package_dict['license_id'] = license_id + + # Rights + if dcat.get('rights'): + package_dict['extras'].append({ + 'key': 'dcat_rights', + 'value': dcat.get('rights') + }) + + # Spatial (GeoJSON) - prefer spatial_shape over dcat.spatial + # spatial_shape is a proper GeoJSON polygon, dcat.spatial is just a bbox string + spatial_value = None + + # First try spatial_shape from top-level (GeoJSON polygon) + if source_dict.get('spatial_shape'): + spatial_value = source_dict.get('spatial_shape') + if isinstance(spatial_value, dict): + spatial_value = json.dumps(spatial_value) + # Fall back to dcat.spatial (bounding box string) + elif dcat.get('spatial'): + spatial_value = dcat.get('spatial') + if isinstance(spatial_value, dict): + spatial_value = json.dumps(spatial_value) + + if spatial_value: + package_dict['extras'].append({ + 'key': 'spatial', + 'value': spatial_value + }) + + # Spatial centroid (for map display/search) + if source_dict.get('spatial_centroid'): + centroid = source_dict.get('spatial_centroid') + if isinstance(centroid, dict): + # Store as "lat,lon" string for compatibility + lat = centroid.get('lat') + lon = centroid.get('lon') + if lat is not None and lon is not None: + package_dict['extras'].append({ + 'key': 'spatial_centroid', + 'value': '{},{}'.format(lat, lon) + }) + + # Temporal + if dcat.get('temporal'): + package_dict['extras'].append({ + 'key': 'temporal', + 'value': dcat.get('temporal') + }) + + # Landing page + if dcat.get('landingPage'): + package_dict['extras'].append({ + 'key': 'landing_page', + 'value': dcat.get('landingPage') + }) + + # Resources/Distributions + package_dict['resources'] = [] + distributions = dcat.get('distribution', []) if dcat else [] + + for idx, dist in enumerate(distributions): + # Skip if no access URL + if not dist.get('accessURL') and not dist.get('downloadURL'): + continue + + # Prefer downloadURL over accessURL + resource_url = dist.get('downloadURL') or dist.get('accessURL') + + resource = { + 'name': dist.get('title', '') or dist.get('description', '') or 'Distribution {}'.format(idx + 1), + 'description': dist.get('description', ''), + 'url': resource_url, + 'format': extract_format(dist), + } + + # Fluent support + if 'fluent' in config.get('ckan.plugins', ''): + resource['name_translated'] = {'en': resource['name']} + resource['description_translated'] = {'en': resource['description'] or ''} + + # Media type + if dist.get('mediaType'): + resource['mimetype'] = dist.get('mediaType') + + # Size + if dist.get('byteSize'): + try: + resource['size'] = int(dist.get('byteSize')) + except (ValueError, TypeError): + pass + + # Position for ordering + resource['position'] = idx + + # Skip disallowed formats + clean_format = ''.join(resource['format'].split()).lower() + if disallow_file_format(clean_format): + log.debug('Skip disallowed format %s: %s', resource['format'], resource['url']) + continue + + package_dict['resources'].append(resource) + + return package_dict + + +def extract_format(distribution): + '''Extract format from distribution metadata''' + # Try format field first + if distribution.get('format'): + return distribution.get('format') + + # Try to extract from mediaType + if distribution.get('mediaType'): + mimetype = distribution.get('mediaType') + ext = mimetypes.guess_extension(mimetype) + if ext: + return ext[1:].upper() # Remove leading dot and uppercase + + # Try to extract from URL + url = distribution.get('downloadURL') or distribution.get('accessURL', '') + if url: + # Check URL extension + parsed = urlparse(url) + path = parsed.path + if '.' in path: + ext = path.split('.')[-1].lower() + # Common data formats + if ext in ['csv', 'json', 'xml', 'pdf', 'xlsx', 'xls', 'zip', + 'geojson', 'shp', 'kml', 'kmz', 'txt', 'html']: + return ext.upper() + + return '' + + +def map_datagov_license(license_url): + ''' + Maps license URLs to CKAN license IDs. + Returns None if no match found. + ''' + # Common license mappings + license_mappings = { + 'http://www.opendefinition.org/licenses/cc-zero': 'cc-zero', + 'https://creativecommons.org/publicdomain/zero/1.0/': 'cc-zero', + 'http://creativecommons.org/publicdomain/zero/1.0/': 'cc-zero', + 'http://www.opendefinition.org/licenses/cc-by': 'cc-by', + 'https://creativecommons.org/licenses/by/4.0/': 'cc-by', + 'http://creativecommons.org/licenses/by/4.0/': 'cc-by', + 'http://www.opendefinition.org/licenses/odc-pddl': 'odc-pddl', + 'http://opendatacommons.org/licenses/pddl/': 'odc-pddl', + } + + if license_url in license_mappings: + return license_mappings[license_url] + + # Try to match against CKAN's license list + for license in _get_license_list(): + if license.get('url') == license_url: + return license.get('id') + elif license.get('title') == license_url: + return license.get('id') + + return None + + +def munge_tag(tag): + '''Sanitize tag string for CKAN''' + from ckan.lib.munge import substitute_ascii_equivalents + + tag = substitute_ascii_equivalents(tag) + tag = tag.strip() + # Remove invalid characters + tag = re.sub(r'[^a-zA-Z0-9 \-_.]', '', tag) + tag = tag.strip() + + tag = _munge_to_length(tag, 2, 100) + + return tag + + +def _munge_to_length(string, min_length, max_length): + '''Pad/truncates a string''' + if len(string) < min_length: + string += '_' * (min_length - len(string)) + if len(string) > max_length: + string = string[:max_length] + return string + + +def trim_date(date_string): + ''' + Trim ISO 8601 timestamp to date if time is midnight. + + Examples: + '2015-10-02T00:00:00.000+00:00' -> '2015-10-02' + '2015-10-02T14:30:00.000+00:00' -> '2015-10-02T14:30:00.000+00:00' (kept) + '2015-10-02' -> '2015-10-02' (no change) + ''' + if not date_string or not isinstance(date_string, str): + return date_string + + # Check if it looks like an ISO timestamp with time + if 'T' in date_string: + # Check if time is midnight (00:00:00) + if re.match(r'.*T00:00:00(\.\d+)?(Z|[+-]\d{2}:\d{2})?$', date_string): + # Extract just the date part + return date_string.split('T')[0] + + return date_string diff --git a/ckanext/custom_harvest/harvesters/__init__.py b/ckanext/custom_harvest/harvesters/__init__.py index 6b2a998..bc001dd 100644 --- a/ckanext/custom_harvest/harvesters/__init__.py +++ b/ckanext/custom_harvest/harvesters/__init__.py @@ -1 +1,2 @@ from ckanext.custom_harvest.harvesters.package_search import PackageSearchHarvester +from ckanext.custom_harvest.harvesters.datagov import DataGovHarvester diff --git a/ckanext/custom_harvest/harvesters/datagov.py b/ckanext/custom_harvest/harvesters/datagov.py new file mode 100644 index 0000000..92f4ef8 --- /dev/null +++ b/ckanext/custom_harvest/harvesters/datagov.py @@ -0,0 +1,438 @@ +import uuid +import logging +import requests +import traceback +from requests.exceptions import HTTPError, RequestException +from urllib.parse import urlencode, urlparse, parse_qs + +from ckan import model +from ckan import logic +from ckan import plugins as p +from ckan.lib.helpers import json +from ckanext.harvest.model import HarvestObject, HarvestObjectExtra +from ckanext.harvest.logic.schema import unicode_safe +from ckanext.custom_harvest import converter +from ckanext.custom_harvest.harvesters.base import CustomHarvester +from ckanext.custom_harvest.harvesters.package_search import ( + copy_across_resource_ids, + upload_resources_to_datastore +) + + +log = logging.getLogger(__name__) + + +class DataGovHarvester(CustomHarvester): + ''' + A Harvester for Data.gov catalog using the Search API + ''' + + def info(self): + return { + 'name': 'datagov_harvest', + 'title': 'Data.gov Harvester', + 'description': 'Harvester for Data.gov using their Catalog API', + 'form_config_interface': 'Text' + } + + def _get_content(self, url): + headers = {} + api_key = self.config.get('api_key') + if api_key: + headers['Authorization'] = api_key + + try: + http_request = requests.get(url, headers=headers, timeout=30) + http_request.raise_for_status() + except HTTPError as e: + raise ContentFetchError('HTTP error: %s %s' % (e.response.status_code, e.request.url)) + except RequestException as e: + raise ContentFetchError('Request error: %s' % e) + except Exception as e: + raise ContentFetchError('HTTP general exception: %s' % e) + return http_request.text + + def _set_config(self, config_str): + if config_str: + self.config = json.loads(config_str) + log.debug('Using config: %r', self.config) + else: + self.config = {} + + def gather_stage(self, harvest_job): + log.debug('In DataGovHarvester gather_stage (%s)', + harvest_job.source.url) + + ids = [] + + # Get the previous guids for this source + query = \ + model.Session.query(HarvestObject.guid, HarvestObject.package_id) \ + .filter(HarvestObject.current == True) \ + .filter(HarvestObject.harvest_source_id == harvest_job.source.id) + guid_to_package_id = {} + + for guid, package_id in query: + guid_to_package_id[guid] = package_id + + guids_in_db = list(guid_to_package_id.keys()) + guids_in_source = [] + + self._set_config(harvest_job.source.config) + + # Parse source URL + parsed_url = urlparse(harvest_job.source.url) + base_search_url = parsed_url.scheme + '://' + parsed_url.netloc + parsed_url.path + + # Extract ALL query parameters from URL to preserve them during pagination + url_params = {} + if parsed_url.query: + query_dict = parse_qs(parsed_url.query) + # Convert parse_qs result (dict of lists) to dict of single values + for key, value_list in query_dict.items(): + if len(value_list) == 1: + url_params[key] = value_list[0] + else: + # Join multiple values with space + url_params[key] = ' '.join(value_list) + + # Request all remote datasets + try: + dataset_dicts = self._search_for_datasets( + base_search_url, + url_params + ) + log.info('Found %s datasets at Data.gov: %s', + len(dataset_dicts), base_search_url) + except SearchError as e: + log.info('Searching for all datasets gave an error: %s', e) + self._save_gather_error( + 'Unable to search Data.gov for datasets: %s url: %s ' + 'params: %s' % (e, base_search_url, url_params), + harvest_job) + return None + if not dataset_dicts: + self._save_gather_error( + 'No datasets found at Data.gov: %s' % base_search_url, + harvest_job) + return [] + + # Filter datasets by organization if configured + org_filter_include = self.config.get('organizations_filter_include', []) + org_filter_exclude = self.config.get('organizations_filter_exclude', []) + + if org_filter_include or org_filter_exclude: + filtered_datasets = [] + for dataset_dict in dataset_dicts: + org_slug = dataset_dict.get('organization', {}).get('slug', '') + + # Apply include filter + if org_filter_include: + if org_slug in org_filter_include: + filtered_datasets.append(dataset_dict) + else: + log.debug('Excluding dataset %s - org %s not in include list', + dataset_dict.get('identifier'), org_slug) + # Apply exclude filter + elif org_filter_exclude: + if org_slug not in org_filter_exclude: + filtered_datasets.append(dataset_dict) + else: + log.debug('Excluding dataset %s - org %s in exclude list', + dataset_dict.get('identifier'), org_slug) + + log.info('Filtered %s datasets to %s based on organization filters', + len(dataset_dicts), len(filtered_datasets)) + dataset_dicts = filtered_datasets + + if not dataset_dicts: + log.info('No datasets remaining after organization filtering') + return [] + + # Create harvest objects for each dataset + try: + guids_in_source = [] + for dataset_dict in dataset_dicts: + guid = dataset_dict.get('identifier') + + if not guid: + log.warning('Dataset missing identifier: %s', dataset_dict.get('title')) + continue + + log.info('Got identifier: {0}'.format(guid)) + guids_in_source.append(guid) + log.info('Creating HarvestObject for %s', guid) + + if guid in guids_in_db: + # Dataset needs to be updated + obj = HarvestObject(guid=guid, job=harvest_job, + package_id=guid_to_package_id[guid], + content=json.dumps(dataset_dict), + extras=[ + HarvestObjectExtra(key='status', value='change'), + HarvestObjectExtra(key='base_search_url', value=base_search_url) + ]) + else: + # Dataset needs to be created + obj = HarvestObject(guid=guid, job=harvest_job, + content=json.dumps(dataset_dict), + extras=[ + HarvestObjectExtra(key='status', value='new'), + HarvestObjectExtra(key='base_search_url', value=base_search_url) + ]) + obj.save() + ids.append(obj.id) + + except ValueError as e: + msg = 'Error parsing dataset: {0}'.format(str(e)) + self._save_gather_error(msg, harvest_job) + return None + + # Check datasets that need to be deleted + guids_to_delete = set(guids_in_db) - set(guids_in_source) + for guid in guids_to_delete: + obj = HarvestObject( + guid=guid, job=harvest_job, + package_id=guid_to_package_id[guid], + extras=[HarvestObjectExtra(key='status', value='delete')]) + ids.append(obj.id) + model.Session.query(HarvestObject).\ + filter_by(guid=guid).\ + update({'current': False}, False) + obj.save() + + # Rename package before delete so that its url can be reused + context = {'model': model, 'session': model.Session, + 'user': self._get_user_name()} + p.toolkit.get_action('package_patch')(context, { + 'id': guid_to_package_id[guid], + 'name': guid_to_package_id[guid] + '-deleted' + }) + + return ids + + def _search_for_datasets(self, base_search_url, url_params=None): + ''' + Performs dataset search on Data.gov with cursor-based pagination. + Returns all results across all pages. + + Args: + base_search_url: Base URL for the search endpoint + url_params: Dictionary of query parameters from the source URL + ''' + # Start with URL params (preserves all original query parameters) + params = dict(url_params) if url_params else {} + + # Per page setting from config or default to 100 + per_page = self.config.get('per_page', 100) + params['per_page'] = str(per_page) + + # Add other params from config (these override URL params if present) + if self.config.get('sort'): + params['sort'] = self.config.get('sort') + if self.config.get('org_slug'): + params['org_slug'] = self.config.get('org_slug') + + datasets = [] + after_cursor = None + page_num = 0 + + while True: + page_num += 1 + if after_cursor: + params['after'] = after_cursor + + url = base_search_url + '?' + urlencode(params) + log.info('Fetching Data.gov page %s: %s', page_num, url) + + try: + content = self._get_content(url) + except ContentFetchError as e: + raise SearchError( + 'Error sending request to search Data.gov ' + 'instance %s using URL %r. Error: %s' % + (base_search_url, url, e)) + + try: + response_dict = json.loads(content) + except ValueError: + raise SearchError('Response from Data.gov was not JSON: %r' + % content) + + # Data.gov returns results directly (not wrapped in 'result') + results = response_dict.get('results', []) + + if not isinstance(results, list): + raise SearchError('Response JSON did not contain ' + 'results array: %r' % response_dict) + + datasets.extend(results) + log.info('Page %s: Got %s datasets (total so far: %s)', + page_num, len(results), len(datasets)) + + # Check for next page cursor + after_cursor = response_dict.get('after') + if not after_cursor: + log.info('No more pages. Total datasets: %s', len(datasets)) + break + + # Safety check to prevent infinite loops + if page_num > 1000: + log.warning('Reached maximum page limit (1000). Stopping pagination.') + break + + return datasets + + def fetch_stage(self, harvest_object): + # Nothing to do here - we got the dataset in the search in the + # gather stage + return True + + def import_stage(self, harvest_object): + log.debug('In DataGovHarvester import_stage') + + if not harvest_object: + log.error('No harvest object received') + return False + + base_search_url = self._get_object_extra(harvest_object, 'base_search_url') + status = self._get_object_extra(harvest_object, 'status') + + if status == 'delete': + # Delete package + delete_context = { + 'model': model, + 'session': model.Session, + 'user': self._get_user_name() + } + p.toolkit.get_action('package_delete')(delete_context, {'id': harvest_object.package_id}) + log.info('Deleted package {0} with guid {1}' + .format(harvest_object.package_id, harvest_object.guid)) + + return True + + if harvest_object.content is None: + self._save_object_error('Empty content for object %s' % + harvest_object.id, + harvest_object, 'Import') + return False + + # Get the last harvested object (if any) + previous_object = model.Session.query(HarvestObject) \ + .filter(HarvestObject.guid == harvest_object.guid) \ + .filter(HarvestObject.current == True) \ + .first() + + # Flag previous object as not current anymore + if previous_object and not self.force_import: + previous_object.current = False + previous_object.add() + + self._set_config(harvest_object.job.source.config) + + # Convert Data.gov format to CKAN format + source_dict = json.loads(harvest_object.content) + package_dict = converter.datagov_to_ckan(source_dict) + + try: + # Copy across ids from the existing dataset, otherwise they'll + # be recreated with new ids + if status == 'change': + existing_dataset = self._get_existing_dataset(harvest_object.guid) + if existing_dataset: + copy_across_resource_ids(existing_dataset, package_dict, self.config) + package_dict['name'] = existing_dataset.get('name') + # Copy across private status + if 'private' in existing_dataset.keys(): + package_dict['private'] = existing_dataset['private'] + + # Set name for new package to prevent name conflict + if not package_dict.get('name'): + # Use slug if available, otherwise use identifier + name = source_dict.get('slug') or harvest_object.guid + package_dict['name'] = self._gen_new_name(name) + + # Ensure source_dict has fields expected by config processors + # Copy groups from package_dict (which were mapped from themes) to source_dict + # so that RemoteGroups processor can validate them + source_dict['groups'] = package_dict.get('groups', []) + + # Copy extras from package_dict to source_dict so CompositeMapping and other + # processors can reference converted fields (e.g., extras.dcat_modified) + if 'extras' not in source_dict: + source_dict['extras'] = [] + source_dict['extras'].extend(package_dict.get('extras', [])) + + # Apply config processors + package_dict = self.modify_package_dict(package_dict, source_dict, harvest_object) + + # Get owner organization from the harvest source dataset + harvest_source_dataset = model.Package.get(harvest_object.source.id) + if harvest_source_dataset.owner_org: + package_dict['owner_org'] = harvest_source_dataset.owner_org + + # Flag this object as the current one + harvest_object.current = True + harvest_object.add() + + # Context for package create/update + package_context = { + 'user': self._get_user_name(), + 'return_id_only': True, + 'ignore_auth': True, + } + + if status == 'new': + package_schema = logic.schema.default_create_package_schema() + package_context['schema'] = package_schema + + # We need to explicitly provide a package ID + package_dict['id'] = str(uuid.uuid4()) + package_schema['id'] = [unicode_safe] + + # Save reference to the package on the object + harvest_object.package_id = package_dict['id'] + harvest_object.add() + + # Defer constraints and flush so the dataset can be indexed with + # the harvest object id (on the after_show hook from the harvester + # plugin) + model.Session.execute( + 'SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') + model.Session.flush() + + elif status == 'change': + package_dict['id'] = harvest_object.package_id + + if status in ['new', 'change']: + action = 'package_create' if status == 'new' else 'package_update' + message_status = 'Created' if status == 'new' else 'Updated' + + package_id = p.toolkit.get_action(action)(package_context, package_dict) + log.info('%s dataset with id %s', message_status, package_id) + + # Upload tabular resources to datastore + upload_to_datastore = self.config.get('upload_to_datastore', True) + if upload_to_datastore and p.plugin_loaded('xloader'): + # Get package dict again in case there's new resource ids + pkg_dict = p.toolkit.get_action('package_show')(package_context, {'id': package_id}) + upload_resources_to_datastore(package_context, pkg_dict, source_dict, base_search_url) + + except Exception as e: + dataset_name = source_dict.get('slug') or source_dict.get('identifier', '') + + self._save_object_error('Error importing dataset %s: %r / %s' % (dataset_name, e, traceback.format_exc()), harvest_object, 'Import') + return False + + finally: + model.Session.commit() + + return True + + +class ContentFetchError(Exception): + pass + + +class SearchError(Exception): + pass diff --git a/ckanext/custom_harvest/harvesters/package_search.py b/ckanext/custom_harvest/harvesters/package_search.py index b8c4f58..8854c84 100644 --- a/ckanext/custom_harvest/harvesters/package_search.py +++ b/ckanext/custom_harvest/harvesters/package_search.py @@ -42,7 +42,7 @@ def _get_content(self, url): headers['Authorization'] = api_key try: - http_request = requests.get(url, headers=headers) + http_request = requests.get(url, headers=headers, timeout=30) except HTTPError as e: raise ContentFetchError('HTTP error: %s %s' % (e.response.status_code, e.request.url)) except RequestException as e: @@ -140,7 +140,7 @@ def gather_stage(self, harvest_job): guids_in_source = [] for pkg_dict in pkg_dicts: guid = pkg_dict.get('name') - log.info('Got identifier: {0}'.format(guid.encode('utf8'))) + log.info('Got identifier: {0}'.format(guid)) guids_in_source.append(guid) log.info('Creating HarvestObject for %s %s', pkg_dict['name'], pkg_dict['id']) if guid in guids_in_db: diff --git a/ckanext/custom_harvest/tests/harvesters/mock_datagov.py b/ckanext/custom_harvest/tests/harvesters/mock_datagov.py new file mode 100644 index 0000000..9cfedc0 --- /dev/null +++ b/ckanext/custom_harvest/tests/harvesters/mock_datagov.py @@ -0,0 +1,278 @@ +from __future__ import print_function + +import json +import re +from urllib.parse import unquote_plus + +from threading import Thread + +from http.server import SimpleHTTPRequestHandler +from socketserver import TCPServer + + +PORT = 8999 + + +class MockDataGovHandler(SimpleHTTPRequestHandler): + def do_GET(self): + if self.path.startswith('/search'): + params = self.get_url_params() + + # Simulate cursor-based pagination + after = params.get('after') + + if not after: + # First page - return first 2 datasets + results = DATASETS[:2] + after_cursor = 'page2_cursor' + elif after == 'page2_cursor': + # Second page - return remaining datasets, no more pages + results = DATASETS[2:] + after_cursor = None + else: + # Unknown cursor + results = [] + after_cursor = None + + response = { + 'results': results, + 'total': len(DATASETS) + } + + if after_cursor: + response['after'] = after_cursor + + return self.respond_json(response) + + self.respond('Mock Data.gov doesnt recognize that call', status=400) + + def get_url_params(self): + if '?' not in self.path: + return {} + params_str = self.path.split('?')[-1] + params_unicode = unquote_plus(params_str) + params = params_unicode.split('&') + return dict([param.split('=', 1) for param in params if '=' in param]) + + def respond_json(self, content_dict, status=200): + self.send_response(status) + self.send_header('Content-Type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps(content_dict).encode('utf-8')) + self.wfile.close() + + def respond(self, content, status=200, content_type='application/json'): + self.send_response(status) + self.send_header('Content-Type', content_type) + self.end_headers() + self.wfile.write(content.encode('utf-8')) + self.wfile.close() + + def log_message(self, format, *args): + # Suppress request logging to avoid cluttering test output + pass + + +def serve(port=PORT): + '''Runs a Data.gov-alike app (over HTTP) that is used for harvesting tests''' + + class TestServer(TCPServer): + allow_reuse_address = True + + httpd = TestServer(('', PORT), MockDataGovHandler) + + print('Serving test Data.gov HTTP server at port {}'.format(PORT)) + + httpd_thread = Thread(target=httpd.serve_forever) + httpd_thread.setDaemon(True) + httpd_thread.start() + + +# Sample datasets in Data.gov Search API format +DATASETS = [ + { + 'identifier': 'dataset1-uuid-12345678-1234-1234-1234-123456789012', + 'slug': 'test-water-quality-dataset-1', + 'title': 'Water Quality Monitoring Data 2020-2025', + 'description': 'Comprehensive water quality monitoring data from EPA monitoring stations across California.', + 'publisher': 'Environmental Protection Agency', + 'keyword': ['environment', 'water', 'quality', 'monitoring', 'california'], + 'theme': ['Environment'], + 'has_spatial': True, + 'popularity': 85, + 'last_harvested_date': '2026-04-20T10:00:00Z', + 'distribution_titles': ['Water Quality CSV', 'Water Quality API'], + 'organization': { + 'id': 'org1-epa-id', + 'name': 'epa-gov', + 'slug': 'epa-gov', + 'organization_type': 'Federal Government', + 'logo': 'https://www.epa.gov/sites/all/themes/epa/logo.png', + 'aliases': ['EPA', 'Environmental Protection Agency'] + }, + 'dcat': { + 'accessLevel': 'public', + 'modified': '2026-04-15T12:00:00Z', + 'issued': '2020-01-15T00:00:00Z', + 'contactPoint': { + 'fn': 'John Doe', + 'hasEmail': 'mailto:john.doe@epa.gov' + }, + 'license': 'https://creativecommons.org/publicdomain/zero/1.0/', + 'rights': 'This dataset is in the public domain.', + 'spatial': '-127.00000000,32.50000000,-114.10000000,42.00000000', + 'temporal': '2020-01-01/2026-01-01', + 'bureauCode': ['020:00'], + 'programCode': ['020:072'], + 'theme': ['Environment'], + 'landingPage': 'https://www.epa.gov/waterdata/water-quality', + 'accrualPeriodicity': 'R/P1M', + 'distribution': [ + { + 'title': 'Water Quality Data CSV', + 'description': 'Downloadable CSV file with water quality measurements', + 'downloadURL': 'https://www.epa.gov/sites/default/files/water-quality.csv', + 'mediaType': 'text/csv', + 'format': 'CSV', + 'byteSize': 1024000 + }, + { + 'title': 'Water Quality API', + 'description': 'RESTful API endpoint for programmatic access', + 'accessURL': 'https://api.epa.gov/water-quality', + 'mediaType': 'application/json', + 'format': 'JSON' + } + ] + }, + 'harvest_record': 'https://catalog.data.gov/harvest/object/dataset1-uuid-12345678-1234-1234-1234-123456789012', + 'harvest_record_raw': 'https://catalog.data.gov/harvest/raw/dataset1-uuid-12345678-1234-1234-1234-123456789012', + 'spatial_shape': { + 'type': 'Polygon', + 'coordinates': [[[-127.0, 32.5], [-127.0, 42.0], [-114.1, 42.0], [-114.1, 32.5], [-127.0, 32.5]]] + }, + 'spatial_centroid': { + 'lat': 37.25, + 'lon': -120.55 + } + }, + { + 'identifier': 'dataset2-uuid-abcdefab-abcd-abcd-abcd-abcdefabcdef', + 'slug': 'fish-wildlife-survey-2025', + 'title': 'Fish and Wildlife Population Survey 2025', + 'description': 'Annual survey of fish and wildlife populations in California coastal regions.', + 'publisher': 'U.S. Fish and Wildlife Service', + 'keyword': ['fish', 'wildlife', 'survey', 'population', 'california'], + 'theme': ['Environment', 'Biology'], + 'has_spatial': True, + 'popularity': 72, + 'last_harvested_date': '2026-04-21T08:30:00Z', + 'distribution_titles': ['Survey Results PDF', 'GeoJSON Data'], + 'organization': { + 'id': 'org2-fws-id', + 'name': 'fws-gov', + 'slug': 'fws-gov', + 'organization_type': 'Federal Government', + 'logo': 'https://www.fws.gov/logo.png', + 'aliases': ['FWS', 'U.S. Fish and Wildlife Service'] + }, + 'dcat': { + 'accessLevel': 'public', + 'modified': '2026-04-18T14:20:00Z', + 'contactPoint': { + 'fn': 'Jane Smith', + 'hasEmail': 'mailto:jane.smith@fws.gov' + }, + 'license': 'https://creativecommons.org/licenses/by/4.0/', + 'spatial': '-124.50000000,32.50000000,-114.10000000,42.00000000', + 'temporal': '2025-01-01/2025-12-31', + 'bureauCode': ['010:18'], + 'programCode': ['010:094'], + 'theme': ['Environment', 'Biology'], + 'landingPage': 'https://www.fws.gov/program/fisheries', + 'distribution': [ + { + 'title': 'Survey Results Report', + 'description': 'Complete survey results and analysis', + 'downloadURL': 'https://www.fws.gov/sites/default/files/survey-2025.pdf', + 'mediaType': 'application/pdf', + 'format': 'PDF', + 'byteSize': 5242880 + }, + { + 'title': 'Population Data GeoJSON', + 'description': 'Geographic population distribution data', + 'downloadURL': 'https://www.fws.gov/sites/default/files/population.geojson', + 'mediaType': 'application/geo+json', + 'format': 'GeoJSON', + 'byteSize': 2097152 + } + ] + }, + 'harvest_record': 'https://catalog.data.gov/harvest/object/dataset2-uuid-abcdefab-abcd-abcd-abcd-abcdefabcdef', + 'harvest_record_raw': 'https://catalog.data.gov/harvest/raw/dataset2-uuid-abcdefab-abcd-abcd-abcd-abcdefabcdef', + 'spatial_shape': { + 'type': 'Polygon', + 'coordinates': [[[-124.5, 32.5], [-124.5, 42.0], [-114.1, 42.0], [-114.1, 32.5], [-124.5, 32.5]]] + }, + 'spatial_centroid': { + 'lat': 37.25, + 'lon': -119.3 + } + }, + { + 'identifier': 'dataset3-uuid-99999999-9999-9999-9999-999999999999', + 'slug': 'air-quality-index-data', + 'title': 'Daily Air Quality Index 2024-2026', + 'description': 'Daily air quality index measurements from monitoring stations nationwide.', + 'publisher': 'Environmental Protection Agency', + 'keyword': ['air', 'quality', 'aqi', 'pollution', 'environment'], + 'theme': ['Environment', 'Health'], + 'has_spatial': False, + 'popularity': 95, + 'last_harvested_date': '2026-04-22T06:00:00Z', + 'distribution_titles': ['AQI Data API'], + 'organization': { + 'id': 'org1-epa-id', + 'name': 'epa-gov', + 'slug': 'epa-gov', + 'organization_type': 'Federal Government', + 'logo': 'https://www.epa.gov/sites/all/themes/epa/logo.png', + 'aliases': ['EPA', 'Environmental Protection Agency'] + }, + 'dcat': { + 'accessLevel': 'public', + 'modified': '2026-04-22T00:00:00Z', + 'contactPoint': { + 'fn': 'Air Quality Team', + 'hasEmail': 'mailto:airquality@epa.gov' + }, + 'license': 'http://www.opendefinition.org/licenses/cc-zero', + 'temporal': '2024-01-01/2026-12-31', + 'bureauCode': ['020:00'], + 'programCode': ['020:033'], + 'theme': ['Environment', 'Health'], + 'landingPage': 'https://www.epa.gov/outdoor-air-quality-data', + 'accrualPeriodicity': 'R/P1D', + 'distribution': [ + { + 'title': 'AQI Data API', + 'description': 'Real-time air quality index data API', + 'accessURL': 'https://api.epa.gov/air-quality/aqi', + 'mediaType': 'application/json', + 'format': 'API' + }, + { + 'title': 'Historical AQI Data', + 'description': 'Historical air quality measurements', + 'downloadURL': 'https://www.epa.gov/sites/default/files/aqi-historical.xlsx', + 'mediaType': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', + 'format': 'XLSX', + 'byteSize': 15728640 + } + ] + }, + 'harvest_record': 'https://catalog.data.gov/harvest/object/dataset3-uuid-99999999-9999-9999-9999-999999999999', + 'harvest_record_raw': 'https://catalog.data.gov/harvest/raw/dataset3-uuid-99999999-9999-9999-9999-999999999999' + } +] diff --git a/ckanext/custom_harvest/tests/harvesters/test_datagov_harvester.py b/ckanext/custom_harvest/tests/harvesters/test_datagov_harvester.py new file mode 100644 index 0000000..774a7d3 --- /dev/null +++ b/ckanext/custom_harvest/tests/harvesters/test_datagov_harvester.py @@ -0,0 +1,401 @@ +from __future__ import absolute_import + +import json +import pytest + +from ckantoolkit.tests.factories import Organization + +from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj, + HarvestObjectObj) +from ckanext.harvest.model import HarvestObjectExtra +from ckanext.harvest.tests.lib import run_harvest_job +import ckanext.harvest.model as harvest_model + +from ckanext.custom_harvest.harvesters.datagov import DataGovHarvester +from ckanext.custom_harvest.tests.harvesters import mock_datagov + + +# Start Data.gov-alike server we can test harvesting against it +mock_datagov.serve() + + +@pytest.mark.usefixtures('with_plugins', 'clean_db', 'clean_index') +class TestDataGovHarvester(object): + + def test_gather_normal(self): + source = HarvestSourceObj( + url='http://localhost:%s/search?q=water' % mock_datagov.PORT + ) + job = HarvestJobObj(source=source) + + harvester = DataGovHarvester() + obj_ids = harvester.gather_stage(job) + + assert job.gather_errors == [] + assert isinstance(obj_ids, list) + assert len(obj_ids) == len(mock_datagov.DATASETS) + harvest_object = harvest_model.HarvestObject.get(obj_ids[0]) + assert harvest_object.guid == mock_datagov.DATASETS[0]['identifier'] + assert json.loads(harvest_object.content) == mock_datagov.DATASETS[0] + + def test_gather_with_cursor_pagination(self): + '''Test that cursor-based pagination works correctly''' + source = HarvestSourceObj( + url='http://localhost:%s/search' % mock_datagov.PORT + ) + job = HarvestJobObj(source=source) + + harvester = DataGovHarvester() + obj_ids = harvester.gather_stage(job) + + # Should get all datasets across multiple pages + assert len(obj_ids) == len(mock_datagov.DATASETS) + assert job.gather_errors == [] + + def test_fetch_normal(self): + source = HarvestSourceObj( + url='http://localhost:%s/search' % mock_datagov.PORT + ) + job = HarvestJobObj(source=source) + harvest_object = HarvestObjectObj( + guid=mock_datagov.DATASETS[0]['identifier'], + job=job, + content=json.dumps(mock_datagov.DATASETS[0])) + + harvester = DataGovHarvester() + result = harvester.fetch_stage(harvest_object) + + assert harvest_object.errors == [] + assert result is True + + def test_import_normal(self): + org = Organization() + harvest_object = HarvestObjectObj( + guid=mock_datagov.DATASETS[0]['identifier'], + content=json.dumps(mock_datagov.DATASETS[0]), + job__source__owner_org=org['id']) + + harvester = DataGovHarvester() + result = harvester.import_stage(harvest_object) + + assert harvest_object.errors == [] + assert result is True + assert harvest_object.guid + + def test_harvest_full(self): + '''Test complete harvest cycle''' + source = HarvestSourceObj( + url='http://localhost:%s/search' % mock_datagov.PORT, + config='', + source_type='test' + ) + job = HarvestJobObj(source=source, run=False) + results_by_guid = run_harvest_job(job, DataGovHarvester()) + + for dataset in mock_datagov.DATASETS: + result = results_by_guid[dataset['identifier']] + assert result['state'] == 'COMPLETE' + assert result['errors'] == [] + + def test_import_creates_correct_metadata(self): + '''Test that DCAT-US metadata is correctly mapped''' + org = Organization() + harvest_object = HarvestObjectObj( + guid=mock_datagov.DATASETS[0]['identifier'], + content=json.dumps(mock_datagov.DATASETS[0]), + job__source__owner_org=org['id'] + ) + + # Add status extra + extra = HarvestObjectExtra(object=harvest_object, key='status', value='new') + extra.save() + + harvester = DataGovHarvester() + result = harvester.import_stage(harvest_object) + + assert result is True + assert harvest_object.package_id is not None + + # Verify package was created with correct fields + import ckan.plugins as p + from ckan import model + context = {'ignore_auth': True, 'model': model, 'session': model.Session} + package = p.toolkit.get_action('package_show')( + context, {'id': harvest_object.package_id} + ) + + assert package['title'] == mock_datagov.DATASETS[0]['title'] + assert package['notes'] == mock_datagov.DATASETS[0]['description'] + + # Check tags + tag_names = [tag['name'] for tag in package['tags']] + assert 'environment' in tag_names + assert 'water' in tag_names + + # Check extras + extras_dict = {e['key']: e['value'] for e in package['extras']} + assert extras_dict['guid'] == mock_datagov.DATASETS[0]['identifier'] + assert extras_dict['datagov_slug'] == mock_datagov.DATASETS[0]['slug'] + assert 'spatial' in extras_dict + assert extras_dict['publisher'] == mock_datagov.DATASETS[0]['publisher'] + assert extras_dict['contact_name'] == 'John Doe' + assert extras_dict['contact_email'] == 'john.doe@epa.gov' + assert extras_dict['dcat_rights'] == 'This dataset is in the public domain.' + + # Check resources + assert len(package['resources']) == 2 + assert package['resources'][0]['format'] == 'CSV' + assert package['resources'][0]['name'] == 'Water Quality Data CSV' + assert package['resources'][1]['format'] == 'JSON' + assert package['resources'][1]['name'] == 'Water Quality API' + + def test_query_parameters(self): + '''Test that all query parameters are preserved during harvest''' + source = HarvestSourceObj( + url='http://localhost:%s/search?q=fish&org_slug=epa&keyword=California&spatial_within=false&spatial_geometry={"type":"Polygon","coordinates":[[[-127.0,32.5],[-127.0,42.0],[-114.1,42.0],[-114.1,32.5],[-127.0,32.5]]]}' % mock_datagov.PORT + ) + job = HarvestJobObj(source=source) + + harvester = DataGovHarvester() + obj_ids = harvester.gather_stage(job) + + # Should successfully parse all parameters and fetch datasets + assert job.gather_errors == [] + assert isinstance(obj_ids, list) + + def test_organizations_filter_include(self): + '''Test organizations_filter_include configuration''' + source = HarvestSourceObj( + url='http://localhost:%s/search' % mock_datagov.PORT, + config='{"organizations_filter_include": ["epa-gov"]}' + ) + job = HarvestJobObj(source=source) + + harvester = DataGovHarvester() + obj_ids = harvester.gather_stage(job) + + # Should only get datasets from EPA (dataset1 and dataset3 are from epa-gov) + assert job.gather_errors == [] + assert len(obj_ids) == 2 + + def test_organizations_filter_exclude(self): + '''Test organizations_filter_exclude configuration''' + source = HarvestSourceObj( + url='http://localhost:%s/search' % mock_datagov.PORT, + config='{"organizations_filter_exclude": ["epa-gov"]}' + ) + job = HarvestJobObj(source=source) + + harvester = DataGovHarvester() + obj_ids = harvester.gather_stage(job) + + # Should only get datasets NOT from EPA (dataset2 is from fws-gov) + assert job.gather_errors == [] + assert len(obj_ids) == 1 + + +class TestTrimDate(object): + def test_trim_midnight_timestamps(self): + '''Test that midnight timestamps are trimmed to date only''' + from ckanext.custom_harvest.converter import trim_date + + assert trim_date('2015-10-02T00:00:00.000+00:00') == '2015-10-02' + assert trim_date('2015-10-02T00:00:00Z') == '2015-10-02' + assert trim_date('2015-10-02T00:00:00') == '2015-10-02' + + def test_keep_non_midnight_timestamps(self): + '''Test that non-midnight timestamps are kept as-is''' + from ckanext.custom_harvest.converter import trim_date + + assert trim_date('2015-10-02T14:30:00.000+00:00') == '2015-10-02T14:30:00.000+00:00' + assert trim_date('2026-04-15T12:00:00Z') == '2026-04-15T12:00:00Z' + + def test_date_only_unchanged(self): + '''Test that date-only strings pass through unchanged''' + from ckanext.custom_harvest.converter import trim_date + + assert trim_date('2015-10-02') == '2015-10-02' + assert trim_date('') == '' + assert trim_date(None) is None + + +class TestDataGovConverter(object): + def test_datagov_to_ckan_basic(self): + '''Test basic conversion of Data.gov format to CKAN''' + from ckanext.custom_harvest.converter import datagov_to_ckan + + source_dict = mock_datagov.DATASETS[0] + ckan_dict = datagov_to_ckan(source_dict) + + assert ckan_dict['title'] == source_dict['title'] + assert ckan_dict['notes'] == source_dict['description'] + assert len(ckan_dict['tags']) > 0 + assert len(ckan_dict['resources']) > 0 + + def test_datagov_to_ckan_resources(self): + '''Test distribution to resource mapping''' + from ckanext.custom_harvest.converter import datagov_to_ckan + + source_dict = mock_datagov.DATASETS[0] + ckan_dict = datagov_to_ckan(source_dict) + + resources = ckan_dict['resources'] + assert len(resources) == 2 + + # First resource (CSV) + assert resources[0]['name'] == 'Water Quality Data CSV' + assert resources[0]['url'] == 'https://www.epa.gov/sites/default/files/water-quality.csv' + assert resources[0]['format'] == 'CSV' + assert resources[0]['mimetype'] == 'text/csv' + + # Second resource (JSON API) + assert resources[1]['format'] == 'JSON' + assert resources[1]['url'] == 'https://api.epa.gov/water-quality' + + def test_datagov_to_ckan_extras(self): + '''Test that DCAT extras are correctly mapped''' + from ckanext.custom_harvest.converter import datagov_to_ckan + + source_dict = mock_datagov.DATASETS[0] + ckan_dict = datagov_to_ckan(source_dict) + + extras_dict = {e['key']: e['value'] for e in ckan_dict['extras']} + + # Core identifiers + assert extras_dict['guid'] == source_dict['identifier'] + assert extras_dict['datagov_slug'] == source_dict['slug'] + + # DCAT fields + assert extras_dict['dcat_access_level'] == 'public' + assert extras_dict['dcat_issued'] == '2020-01-15' + assert extras_dict['contact_name'] == 'John Doe' + assert extras_dict['contact_email'] == 'john.doe@epa.gov' + assert 'spatial' in extras_dict + assert extras_dict['temporal'] == '2020-01-01/2026-01-01' + assert extras_dict['landing_page'] == 'https://www.epa.gov/waterdata/water-quality' + assert extras_dict['dcat_rights'] == 'This dataset is in the public domain.' + + def test_datagov_to_ckan_license_mapping(self): + '''Test license URL mapping''' + from ckanext.custom_harvest.converter import datagov_to_ckan + + source_dict = mock_datagov.DATASETS[0] + ckan_dict = datagov_to_ckan(source_dict) + + # CC0 license should be mapped + assert ckan_dict.get('license_id') == 'cc-zero' + + def test_datagov_to_ckan_tags(self): + '''Test tag sanitization''' + from ckanext.custom_harvest.converter import datagov_to_ckan + + source_dict = mock_datagov.DATASETS[0] + ckan_dict = datagov_to_ckan(source_dict) + + tag_names = [tag['name'] for tag in ckan_dict['tags']] + + # Check that keywords are converted to tags + assert 'environment' in tag_names + assert 'water' in tag_names + assert 'quality' in tag_names + assert 'monitoring' in tag_names + assert 'california' in tag_names + + def test_extract_format(self): + '''Test format extraction from distribution''' + from ckanext.custom_harvest.converter import extract_format + + # Format field present + dist1 = {'format': 'CSV'} + assert extract_format(dist1) == 'CSV' + + # MediaType only + dist2 = {'mediaType': 'text/csv'} + assert extract_format(dist2) == 'CSV' + + # URL extension + dist3 = {'downloadURL': 'https://example.com/data.geojson'} + assert extract_format(dist3) == 'GEOJSON' + + # No format info + dist4 = {'accessURL': 'https://example.com/api'} + assert extract_format(dist4) == '' + + def test_map_datagov_license(self): + '''Test license mapping function''' + from ckanext.custom_harvest.converter import map_datagov_license + + # CC0 + assert map_datagov_license('https://creativecommons.org/publicdomain/zero/1.0/') == 'cc-zero' + assert map_datagov_license('http://www.opendefinition.org/licenses/cc-zero') == 'cc-zero' + + # CC-BY + assert map_datagov_license('https://creativecommons.org/licenses/by/4.0/') == 'cc-by' + + # Unknown license + assert map_datagov_license('https://example.com/custom-license') is None + + def test_munge_tag(self): + '''Test tag sanitization function''' + from ckanext.custom_harvest.converter import munge_tag + + # Normal tag + assert munge_tag('environment') == 'environment' + + # Tag with spaces + assert munge_tag('water quality') == 'water quality' + + # Tag with invalid characters + assert munge_tag('tag@with#invalid$chars') == 'tagwithinvalidchars' + + # Long tag (should truncate to 100 chars) + long_tag = 'a' * 150 + assert len(munge_tag(long_tag)) == 100 + + def test_datagov_to_ckan_missing_fields(self): + '''Test handling of missing fields''' + from ckanext.custom_harvest.converter import datagov_to_ckan + + # Minimal dataset + source_dict = { + 'identifier': 'test-uuid', + 'title': 'Test Dataset' + } + ckan_dict = datagov_to_ckan(source_dict) + + # Should have defaults for required fields + assert ckan_dict['title'] == 'Test Dataset' + assert ckan_dict['notes'] == '' + assert ckan_dict['tags'] == [] + assert ckan_dict['resources'] == [] + + def test_datagov_to_ckan_resource_url_priority(self): + '''Test that downloadURL is preferred over accessURL''' + from ckanext.custom_harvest.converter import datagov_to_ckan + + source_dict = { + 'identifier': 'test-uuid', + 'title': 'Test', + 'dcat': { + 'distribution': [ + { + 'title': 'Resource with both URLs', + 'downloadURL': 'https://example.com/download.csv', + 'accessURL': 'https://example.com/access', + 'format': 'CSV' + }, + { + 'title': 'Resource with accessURL only', + 'accessURL': 'https://example.com/api', + 'format': 'API' + } + ] + } + } + ckan_dict = datagov_to_ckan(source_dict) + + # First resource should use downloadURL + assert ckan_dict['resources'][0]['url'] == 'https://example.com/download.csv' + + # Second resource should use accessURL + assert ckan_dict['resources'][1]['url'] == 'https://example.com/api' diff --git a/setup.py b/setup.py index d739429..2bb519d 100644 --- a/setup.py +++ b/setup.py @@ -89,6 +89,7 @@ [ckan.plugins] custom_harvest=ckanext.custom_harvest.plugin:CustomHarvestPlugin package_search_harvester=ckanext.custom_harvest.harvesters:PackageSearchHarvester + datagov_harvester=ckanext.custom_harvest.harvesters:DataGovHarvester [babel.extractors] ckan = ckan.lib.extract:extract_ckan