diff --git a/phylogenetic/Snakefile b/phylogenetic/Snakefile index 56dc667..cc7cc11 100644 --- a/phylogenetic/Snakefile +++ b/phylogenetic/Snakefile @@ -20,6 +20,10 @@ rule all: auspice_json="", +# Shared Snakemake files with generic functions are shared across pathogens +include: "../shared/vendored/snakemake/config.smk" +include: "../shared/vendored/snakemake/remote_files.smk" + # These rules are imported in the order that they are expected to run. # Each Snakefile will have documented inputs and outputs that should be kept as # consistent interfaces across pathogen repos. This allows us to define typical @@ -30,6 +34,7 @@ rule all: # If there are build specific customizations, they should be added with the # custom_rules imported below to ensure that the core workflow is not complicated # by build specific rules. +include: "rules/merge_inputs.smk" include: "rules/prepare_sequences.smk" include: "rules/construct_phylogeny.smk" include: "rules/annotate_phylogeny.smk" diff --git a/phylogenetic/build-configs/ci/config.yaml b/phylogenetic/build-configs/ci/config.yaml index de89c67..ca630a9 100644 --- a/phylogenetic/build-configs/ci/config.yaml +++ b/phylogenetic/build-configs/ci/config.yaml @@ -1,7 +1,7 @@ # This configuration file contains the custom configurations parameters # for the CI workflow to run with the example data. -# Custom rules to run as part of the CI automated workflow -# The paths should be relative to the phylogenetic directory. -custom_rules: - - build-configs/ci/copy_example_data.smk +inputs: + - name: example_data + metadata: "example_data/metadata.tsv" + sequences: "example_data/sequences.fasta" diff --git a/phylogenetic/build-configs/ci/copy_example_data.smk b/phylogenetic/build-configs/ci/copy_example_data.smk deleted file mode 100644 index 4e47ee4..0000000 --- a/phylogenetic/build-configs/ci/copy_example_data.smk +++ /dev/null @@ -1,17 +0,0 @@ -rule copy_example_data: - input: - sequences="example_data/sequences.fasta", - metadata="example_data/metadata.tsv", - output: - sequences="data/sequences.fasta", - metadata="data/metadata.tsv", - shell: - """ - cp -f {input.sequences} {output.sequences} - cp -f {input.metadata} {output.metadata} - """ - -# Add a Snakemake ruleorder directive here if you need to resolve ambiguous rules -# that have the same output as the copy_example_data rule. - -# ruleorder: copy_example_data > ... diff --git a/phylogenetic/rules/annotate_phylogeny.smk b/phylogenetic/rules/annotate_phylogeny.smk index facc35f..0c7e297 100644 --- a/phylogenetic/rules/annotate_phylogeny.smk +++ b/phylogenetic/rules/annotate_phylogeny.smk @@ -3,7 +3,7 @@ This part of the workflow creates additonal annotations for the phylogenetic tre REQUIRED INPUTS: - metadata = data/metadata.tsv + metadata = results/metadata.tsv prepared_sequences = results/prepared_sequences.fasta tree = results/tree.nwk diff --git a/phylogenetic/rules/construct_phylogeny.smk b/phylogenetic/rules/construct_phylogeny.smk index 7652005..28aa232 100644 --- a/phylogenetic/rules/construct_phylogeny.smk +++ b/phylogenetic/rules/construct_phylogeny.smk @@ -3,7 +3,7 @@ This part of the workflow constructs the phylogenetic tree. REQUIRED INPUTS: - metadata = data/metadata.tsv + metadata = results/metadata.tsv prepared_sequences = results/prepared_sequences.fasta OUTPUTS: diff --git a/phylogenetic/rules/export.smk b/phylogenetic/rules/export.smk index a273659..c5fed06 100644 --- a/phylogenetic/rules/export.smk +++ b/phylogenetic/rules/export.smk @@ -4,7 +4,7 @@ export a Nextstrain dataset. REQUIRED INPUTS: - metadata = data/metadata.tsv + metadata = results/metadata.tsv tree = results/tree.nwk branch_lengths = results/branch_lengths.json node_data = results/*.json diff --git a/phylogenetic/rules/merge_inputs.smk b/phylogenetic/rules/merge_inputs.smk new file mode 100644 index 0000000..4bb502b --- /dev/null +++ b/phylogenetic/rules/merge_inputs.smk @@ -0,0 +1,184 @@ +""" +This part of the workflow merges inputs based on what is defined in the config. + +OUTPUTS: + + metadata = results/metadata.tsv + sequences = results/sequences.fasta + +The config dict is expected to have a top-level `inputs` list that defines the +separate inputs' name, metadata, and sequences. Optionally, the config can have +a top-level `additional-inputs` list that is used to define additional data that +are combined with the default inputs: + +```yaml +inputs: + - name: default + metadata: + sequences: + +additional_inputs: + - name: private + metadata: + sequences: +``` + +Supports any of the compression formats that are supported by `augur read-file`, +see + +NOTE: The included rules are written for workflows that do not use wildcards +for defining inputs such as zika. You will need to edit the rules to support wildcards + +1. If your workflow needs wildcards for both metadata and sequences, +e.g. serotypes for dengue, then you will need to edit the `output`, `log`, and +`benchmark` paths of the metadata and sequences rules. +The wildcards can then be directly used in the config for inputs: + +```yaml +inputs: + - name: default + metadata: https://data.nextstrain.org/files/workflows/dengue/metadata_{serotype}.tsv.zst + sequences: https://data.nextstrain.org/files/workflows/dengue/sequences_{serotype}.fasta.zst + +``` + +2. If your workflow only needs wildcards for sequences, e.g. segments for influenza, +then you will only need to edit the paths for the sequences rules. +The wildcards can then be directly used in the config for inputs: + +```yaml +inputs: + - name: default + metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst + sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst +``` +""" +from pathlib import Path + + +def _gather_inputs(): + all_inputs = [*config['inputs'], *config.get('additional_inputs', [])] + + if len(all_inputs)==0: + raise InvalidConfigError("Config must define at least one element in config.inputs or config.additional_inputs lists") + if not all([isinstance(i, dict) for i in all_inputs]): + raise InvalidConfigError("All of the elements in config.inputs and config.additional_inputs lists must be dictionaries. " + "If you've used a command line '--config' double check your quoting.") + if len({i['name'] for i in all_inputs})!=len(all_inputs): + raise InvalidConfigError("Names of inputs (config.inputs and config.additional_inputs) must be unique") + if not all(['name' in i and ('sequences' in i or 'metadata' in i) for i in all_inputs]): + raise InvalidConfigError("Each input (config.inputs and config.additional_inputs) must have a 'name' and 'metadata' and/or 'sequences'") + if not any(['metadata' in i for i in all_inputs]): + raise InvalidConfigError("At least one input must have 'metadata'") + if not any (['sequences' in i for i in all_inputs]): + raise InvalidConfigError("At least one input must have 'sequences'") + + available_keys = set(['name', 'metadata', 'sequences']) + if any([len(set(el.keys())-available_keys)>0 for el in all_inputs]): + raise InvalidConfigError(f"Each input (config.inputs and config.additional_inputs) can only include keys of {', '.join(available_keys)}") + + return {el['name']: {k:(v if k=='name' else path_or_url(v)) for k,v in el.items()} for el in all_inputs} + +input_sources = _gather_inputs() +_input_metadata = [info['metadata'] for info in input_sources.values() if info.get('metadata', None)] +_input_sequences = [info['sequences'] for info in input_sources.values() if info.get('sequences', None)] + + +if len(_input_metadata) == 1: + + rule decompress_metadata: + """ + This rule is invoked when there is a single metadata input to + ensure that we have a decompressed input for downstream rules to match + the output of rule.merge_metadata. + """ + input: + metadata = _input_metadata[0], + output: + metadata = "results/metadata.tsv", + log: + "logs/decompress_metadata.txt", + benchmark: + "benchmarks/decompress_metadata.txt", + shell: + r""" + exec &> >(tee {log:q}) + + augur read-file {input.metadata:q} > {output.metadata:q} + """ + +else: + + rule merge_metadata: + """ + This rule is invoked when there are multiple defined metadata inputs + (config.inputs + config.additional_inputs) + """ + input: + **{name: info['metadata'] for name,info in input_sources.items() if info.get('metadata', None)} + params: + metadata = lambda w, input: list(map("=".join, input.items())), + id_field = config['strain_id_field'], + output: + metadata = "results/metadata.tsv" + log: + "logs/merge_metadata.txt", + benchmark: + "benchmarks/merge_metadata.txt" + shell: + r""" + exec &> >(tee {log:q}) + + augur merge \ + --metadata {params.metadata:q} \ + --metadata-id-columns {params.id_field:q} \ + --output-metadata {output.metadata:q} + """ + + +if len(_input_sequences) == 1: + + rule decompress_sequences: + """ + This rule is invoked when there is a single sequences input to + ensure that we have a decompressed input for downstream rules to match + the output of rule.merge_sequences. + """ + input: + sequences = _input_sequences[0], + output: + sequences = "results/sequences.fasta", + log: + "logs/decompress_sequences.txt", + benchmark: + "benchmarks/decompress_sequences.txt", + shell: + r""" + exec &> >(tee {log:q}) + + augur read-file {input.sequences:q} > {output.sequences:q} + """ + +else: + + rule merge_sequences: + """ + This rule is invoked when there are multiple defined sequences inputs + (config.inputs + config.additional_inputs) + """ + input: + **{name: info['sequences'] for name,info in input_sources.items() if info.get('sequences', None)} + output: + sequences = "results/sequences.fasta", + log: + "logs/merge_sequences.txt", + benchmark: + "benchmarks/merge_sequences.txt" + shell: + r""" + exec &> >(tee {log:q}) + + augur merge \ + --sequences {input:q} \ + --output-sequences {output.sequences:q} + """ diff --git a/phylogenetic/rules/prepare_sequences.smk b/phylogenetic/rules/prepare_sequences.smk index c1c9e22..f5f4b74 100644 --- a/phylogenetic/rules/prepare_sequences.smk +++ b/phylogenetic/rules/prepare_sequences.smk @@ -3,8 +3,8 @@ This part of the workflow prepares sequences for constructing the phylogenetic t REQUIRED INPUTS: - metadata = data/metadata.tsv - sequences = data/sequences.fasta + metadata = results/metadata.tsv + sequences = results/sequences.fasta reference = ../shared/reference.fasta OUTPUTS: diff --git a/shared/vendored/.github/workflows/ci.yaml b/shared/vendored/.github/workflows/ci.yaml index c716277..94d3054 100644 --- a/shared/vendored/.github/workflows/ci.yaml +++ b/shared/vendored/.github/workflows/ci.yaml @@ -11,5 +11,5 @@ jobs: shellcheck: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - uses: nextstrain/.github/actions/shellcheck@master diff --git a/shared/vendored/.github/workflows/pre-commit.yaml b/shared/vendored/.github/workflows/pre-commit.yaml index 70da533..a418753 100644 --- a/shared/vendored/.github/workflows/pre-commit.yaml +++ b/shared/vendored/.github/workflows/pre-commit.yaml @@ -7,7 +7,7 @@ jobs: pre-commit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - uses: actions/setup-python@v5 with: python-version: "3.12" diff --git a/shared/vendored/.gitrepo b/shared/vendored/.gitrepo index b0244b0..5bf6c74 100644 --- a/shared/vendored/.gitrepo +++ b/shared/vendored/.gitrepo @@ -6,7 +6,7 @@ [subrepo] remote = https://github.com/nextstrain/shared branch = main - commit = 82880c8b026d4f317e3f3d9b1f8bb4db226ea01e - parent = d914b98862f08486b76121bf5f385938ff3152f7 + commit = 2d063cf2bae0cfc91d70fda2c36f1451656e5757 + parent = f3c792c2a4d6ebec3a70c3d65b61258208789c67 method = merge cmdver = 0.4.6 diff --git a/shared/vendored/README.md b/shared/vendored/README.md index b6aeadf..eb2e3ba 100644 --- a/shared/vendored/README.md +++ b/shared/vendored/README.md @@ -86,6 +86,7 @@ approach to "ingest" has been discussed in various internal places, including: Scripts for supporting workflow automation that don’t really belong in any of our existing tools. +- [assign-colors](scripts/assign-colors) - Generate colors.tsv for augur export based on ordering, color schemes, and what exists in the metadata. Used in the phylogenetic or nextclade workflows. - [notify-on-diff](scripts/notify-on-diff) - Send Slack message with diff of a local file and an S3 object - [notify-on-job-fail](scripts/notify-on-job-fail) - Send Slack message with details about failed workflow job on GitHub Actions and/or AWS Batch - [notify-on-job-start](scripts/notify-on-job-start) - Send Slack message with details about workflow job on GitHub Actions and/or AWS Batch @@ -97,6 +98,7 @@ Scripts for supporting workflow automation that don’t really belong in any of - [trigger-on-new-data](scripts/trigger-on-new-data) - Triggers downstream GitHub Actions if the provided `upload-to-s3` outputs do not contain the `identical_file_message` A hacky way to ensure that we only trigger downstream phylogenetic builds if the S3 objects have been updated. + NCBI interaction scripts that are useful for fetching public metadata and sequences. - [fetch-from-ncbi-entrez](scripts/fetch-from-ncbi-entrez) - Fetch metadata and nucleotide sequences from [NCBI Entrez](https://www.ncbi.nlm.nih.gov/books/NBK25501/) and output to a GenBank file. @@ -122,6 +124,8 @@ Potential Nextstrain CLI scripts Snakemake workflow functions that are shared across many pathogen workflows that don’t really belong in any of our existing tools. - [config.smk](snakemake/config.smk) - Shared functions for parsing workflow configs. +- [remote_files.smk](snakemake/remote_files.smk) - Exposes the `path_or_url` function which will use Snakemake's storage plugins to download/upload files to remote providers as needed. + ## Software requirements diff --git a/shared/vendored/scripts/assign-colors b/shared/vendored/scripts/assign-colors new file mode 100755 index 0000000..e42a44d --- /dev/null +++ b/shared/vendored/scripts/assign-colors @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +""" +Generate colors.tsv for augur export based on ordering, color schemes, and +traits that exists in the metadata. +""" +import argparse +import pandas as pd + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Assign colors based on defined ordering of traits.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + parser.add_argument('--ordering', type=str, required=True, + help="""Input TSV file defining the color ordering where the first + column is the field and the second column is the trait in that field. + Blank lines are ignored. Lines starting with '#' will be ignored as comments.""") + parser.add_argument('--color-schemes', type=str, required=True, + help="Input color schemes where each line is a different color scheme separated by tabs.") + parser.add_argument('--metadata', type=str, + help="""If provided, restrict colors to only those traits found in + metadata. If the metadata includes a 'focal' column that only contains + boolean values, then restrict colors to traits for rows where 'focal' + is set to True.""") + parser.add_argument('--output', type=str, required=True, + help="Output colors TSV file to be passed to augur export.") + args = parser.parse_args() + + assignment = {} + with open(args.ordering) as f: + for line in f.readlines(): + array = line.strip().split("\t") + # Ignore empty lines or commented lines + if not array or not array[0] or array[0].startswith('#'): + continue + # Throw a warning if encountering a line not matching the expected number of columns, ignore line + elif len(array)!=2: + print(f"WARNING: Could not decode color ordering line: {line}") + continue + # Otherwise, process color ordering where we expect 2 columns: name, traits + else: + name = array[0] + trait = array[1] + if name not in assignment: + assignment[name] = [trait] + else: + assignment[name].append(trait) + + # if metadata supplied, go through and + # 1. remove assignments that don't exist in metadata + # 2. remove assignments that have 'focal' set to 'False' in metadata + if args.metadata: + metadata = pd.read_csv(args.metadata, delimiter='\t') + for name, trait in assignment.items(): + if name in metadata: + if 'focal' in metadata and metadata['focal'].dtype == 'bool': + focal_list = metadata.loc[metadata['focal'], name].unique() + subset_focal = [x for x in assignment[name] if x in focal_list] + assignment[name] = subset_focal + else: # no 'focal' present + subset_present = [x for x in assignment[name] if x in metadata[name].unique()] + assignment[name] = subset_present + + + schemes = {} + counter = 0 + with open(args.color_schemes) as f: + for line in f.readlines(): + counter += 1 + array = line.lstrip().rstrip().split("\t") + schemes[counter] = array + + with open(args.output, 'w') as f: + for trait_name, trait_array in assignment.items(): + if len(trait_array)==0: + print(f"No traits found for {trait_name}") + continue + if len(schemes)0): + if (remain>len(schemes)): + color_array = [*color_array, *schemes[len(schemes)]] + remain -= len(schemes) + else: + color_array = [*color_array, *schemes[remain]] + remain = 0 + else: + color_array = schemes[len(trait_array)] + + zipped = list(zip(trait_array, color_array)) + for trait_value, color in zipped: + f.write(trait_name + "\t" + trait_value + "\t" + color + "\n") + f.write("\n") diff --git a/shared/vendored/snakemake/remote_files.smk b/shared/vendored/snakemake/remote_files.smk new file mode 100644 index 0000000..844f80e --- /dev/null +++ b/shared/vendored/snakemake/remote_files.smk @@ -0,0 +1,159 @@ +""" +Helper functions to set-up storage plugins for remote inputs/outputs. See the +docstring of `path_or_url` for usage instructions. + +The errors raised by storage plugins are often confusing. For instance, a HTTP +404 error will result in a `MissingInputException` with little hint as to the +underlying issue. S3 credentials errors are similarly confusing and we attempt +to check these ourselves to improve UX here. +""" + +from urllib.parse import urlparse + +# Keep a list of known public buckets, which we'll allow uncredentialled (unsigned) access to +# We could make this config-definable in the future +PUBLIC_BUCKETS = set(['nextstrain-data']) + +# Keep track of registered storage plugins to enable reuse +_storage_registry = {} + +class RemoteFilesMissingCredentials(Exception): + pass + +def _storage_s3(*, bucket, keep_local, retries) -> snakemake.storage.StorageProviderProxy: + """ + Registers and returns an instance of snakemake-storage-plugin-s3. Typically AWS + credentials are required for _any_ request however we allow requests to known + public buckets (see `PUBLIC_BUCKETS`) to be unsigned which allows for a nice user + experience in the common case of downloading inputs from s3://nextstrain-data. + + The intended behaviour for various (S3) URIs supplied to `path_or_url` is: + + | | S3 buckets | credentials present | credentials missing | + |----------|----------------------------|---------------------|---------------------| + | download | private / private + public | signed | Credentials Error | + | | public | signed | unsigned | + | upload | private / private + public | signed | Credentials Error | + | | public | signed | AccessDenied Error | + """ + # If the bucket is public then we may use an unsigned request which has the nice UX + # of not needing credentials to be present. If we've made other signed requests _or_ + # credentials are present then we just sign everything. This has implications for upload: + # if you attempt to upload to a public bucket without credentials then we allow that here + # and you'll get a subsequent `AccessDenied` error when the upload is attempted. + if bucket in PUBLIC_BUCKETS and \ + "s3_signed" not in _storage_registry and \ + ("s3_unsigned" in _storage_registry or not _aws_credentials_present()): + + if provider:=_storage_registry.get('s3_unsigned', None): + return provider + + from botocore import UNSIGNED # dependency of snakemake-storage-plugin-s3 + storage s3_unsigned: + provider="s3", + signature_version=UNSIGNED, + retries=retries, + keep_local=keep_local, + + _storage_registry['s3_unsigned'] = storage.s3_unsigned + return _storage_registry['s3_unsigned'] + + # Resource fetched/uploaded via a signed request, which will require AWS credentials + if provider:=_storage_registry.get('s3_signed', None): + return provider + + # Enforce the presence of credentials to paper over + if not _aws_credentials_present(): + raise RemoteFilesMissingCredentials() + + # the tag appears in the local file path, so reference 'signed' to give a hint about credential errors + storage s3_signed: + provider="s3", + retries=retries, + keep_local=keep_local, + + _storage_registry['s3_signed'] = storage.s3_signed + return _storage_registry['s3_signed'] + +def _aws_credentials_present() -> bool: + import boto3 # dependency of snakemake-storage-plugin-s3 + session = boto3.Session() + creds = session.get_credentials() + return creds is not None + +def _storage_http(*, keep_local, retries) -> snakemake.storage.StorageProviderProxy: + """ + Registers and returns an instance of snakemake-storage-plugin-http + """ + if provider:=_storage_registry.get('http', None): + return provider + + storage: + provider="http", + allow_redirects=True, + supports_head=True, + keep_local=keep_local, + retries=retries, + + _storage_registry['http'] = storage.http + return _storage_registry['http'] + + +def path_or_url(uri, *, keep_local=True, retries=2) -> str: + """ + Intended for use in Snakemake inputs / outputs to transparently use remote + resources. Returns the URI wrapped by an applicable storage plugin. Local + filepaths will be returned unchanged. + + For example, the following rule will download inputs from HTTPs and upload + the output to S3: + + rule filter: + input: + sequences = path_or_url("https://data.nextstrain.org/..."), + metadata = path_or_url("https://data.nextstrain.org/..."), + output: + sequences = path_or_url("s3://...") + shell: + r''' + augur filter \ + --sequences {input.sequences:q} \ + --metadata {input.metadata:q} \ + --metadata-id-columns accession \ + --output-sequences {output.sequences:q} + ''' + + If *keep_local* is True (the default) then downloaded/uploaded files will + remain in `.snakemake/storage/`. The presence of a previously downloaded + file (via `keep_local=True`) does not guarantee that the file will not be + re-downloaded if the storage plugin decides the local file is out of date. + + Depending on the *uri* authentication may be required. See the specific + helper functions (such as `_storage_s3`) for more details. + + See for + more information on Snakemake storage plugins. Note: various snakemake + plugins will be required depending on the URIs provided. + """ + info = urlparse(uri) + + if info.scheme=='': # local + return uri # no storage wrapper + + if info.scheme=='s3': + try: + return _storage_s3(bucket=info.netloc, keep_local=keep_local, retries=retries)(uri) + except RemoteFilesMissingCredentials as e: + raise Exception(f"AWS credentials are required to access {uri!r}") from e + + if info.scheme=='https': + return _storage_http(keep_local=keep_local, retries=retries)(uri) + elif info.scheme=='http': + raise Exception(f"HTTP remote file support is not implemented in nextstrain workflows (attempting to access {uri!r}).\n" + "Please use an HTTPS address instead.") + + if info.scheme in ['gs', 'gcs']: + raise Exception(f"Google Storage is not yet implemented for nextstrain workflows (attempting to access {uri!r}).\n" + "Please get in touch if you require this functionality and we can add it to our workflows") + + raise Exception(f"Input address {uri!r} (scheme={info.scheme!r}) is from a non-supported remote")