# This script is the CLI uploader for http://covid19.genenetwork.org/ # # To upload a sequence with its metadata: # # python3 bh20sequploader/main.py example/sequence.fasta example/maximum_metadata_example.yaml # # Usage: described in http://covid19.genenetwork.org/blog?id=using-covid-19-pubseq-part3 import argparse import time import arvados import arvados.collection import json import logging import magic from pathlib import Path import urllib.request import socket import getpass import sys sys.path.insert(0,'.') from bh20sequploader.qc_metadata import qc_metadata from bh20sequploader.qc_fasta import qc_fasta logging.basicConfig(level=logging.DEBUG) log = logging.getLogger(__name__ ) log.debug("Entering sequence uploader") # ---- Tokens for uploading data to Arvados ARVADOS_API_HOST='lugli.arvadosapi.com' UPLOADER_API_TOKEN='2fbebpmbo3rw3x05ueu2i6nx70zhrsb1p22ycu3ry34m4x4462' ANONYMOUS_API_TOKEN='5o42qdxpxp5cj15jqjf7vnxx5xduhm4ret703suuoa3ivfglfh' UPLOAD_PROJECT='lugli-j7d0g-n5clictpuvwk8aa' VALIDATED_PROJECT='lugli-j7d0g-5ct8p1i1wrgyjvp' def qc_stuff(metadata, sequence_p1, sequence_p2, do_qc=True): """Quality control. Essentially it checks the RDF schema and the FASTA sequence for enough overlap with the reference genome """ failed = False sample_id = '' try: log.debug("Checking metadata" if do_qc else "Skipping metadata check") if do_qc: sample_id = qc_metadata(metadata.name) if not sample_id: log.warning("Failed metadata QC") failed = True except Exception as e: log.exception("Failed metadata QC") failed = True # continue with the FASTA checker target = [] if sequence_p1: try: log.debug("FASTA/FASTQ QC" if do_qc else "Limited FASTA/FASTQ QC") target.append(qc_fasta(sequence_p1, check_with_mimimap2=do_qc)) if sequence_p2: if target[0][2] == 'text/fasta': raise ValueError("It is possible to upload just one FASTA file at a time") target.append(qc_fasta(sequence_p2)) target[0] = ("reads_1."+target[0][0][6:], target[0][1], target[0][2]) target[1] = ("reads_2."+target[1][0][6:], target[1][1], target[1][2]) if do_qc and target[0][2] == 'text/fasta' and sample_id != target[0][1]: raise ValueError(f"The sample_id field in the metadata ({sample_id}) must be the same as the FASTA header ({target[0][1]})") except Exception as e: log.exception("Failed sequence QC") failed = True if failed: log.debug("Bailing out!") exit(1) return target def upload_sequence(col, target, sequence): with col.open(target[0], "wb") as f: r = sequence.read(65536) while r: f.write(r) r = sequence.read(65536) def main(): parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis') parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json') parser.add_argument('sequence_p1', type=argparse.FileType('rb'), default=None, nargs='?', help='sequence FASTA/FASTQ') parser.add_argument('sequence_p2', type=argparse.FileType('rb'), default=None, nargs='?', help='sequence FASTQ pair') parser.add_argument("--validate", action="store_true", help="Dry run, validate only") parser.add_argument("--skip-qc", action="store_true", help="Skip local qc check") parser.add_argument("--trusted", action="store_true", help="Trust local validation and add directly to validated project") args = parser.parse_args() if args.trusted: # Use credentials from environment api = arvados.api() else: api = arvados.api(host=ARVADOS_API_HOST, token=UPLOADER_API_TOKEN, insecure=True) # ---- First the QC target = qc_stuff(args.metadata, args.sequence_p1, args.sequence_p2, not args.skip_qc) if target: seqlabel = target[0][1] else: seqlabel = "" if args.validate: log.info("Valid") exit(0) col = arvados.collection.Collection(api_client=api) # ---- Upload the sequence to Arvados if args.sequence_p1: upload_sequence(col, target[0], args.sequence_p1) if args.sequence_p2: upload_sequence(col, target[1], args.sequence_p2) # ---- Make sure the metadata YAML is valid log.info("Reading metadata") with col.open("metadata.yaml", "w") as f: r = args.metadata.read(65536) log.info(r[0:20]) while r: f.write(r) r = args.metadata.read(65536) # ---- Get the uploader IP address (gateway) and local user info external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8') try: username = getpass.getuser() except KeyError: username = "unknown" properties = { "sequence_label": seqlabel, "upload_app": "bh20-seq-uploader", "upload_ip": external_ip, "upload_user": "%s@%s" % (username, socket.gethostname()) } # ---- Get ready for actual uploading api2 = arvados.api(host=ARVADOS_API_HOST, token=ANONYMOUS_API_TOKEN, insecure=True) dup = api2.collections().list(filters=[["owner_uuid", "in", [VALIDATED_PROJECT, UPLOAD_PROJECT]], ["portable_data_hash", "=", col.portable_data_hash()]]).execute() if dup["items"]: # This exact collection has been uploaded before. log.error("Duplicate of %s" % ([d["uuid"] for d in dup["items"]])) exit(1) if args.trusted: properties["status"] = "validated" owner_uuid = VALIDATED_PROJECT else: owner_uuid = UPLOAD_PROJECT # ---- and stream the 'collection' up col.save_new(owner_uuid=owner_uuid, name="%s uploaded by %s from %s" % (seqlabel, properties['upload_user'], properties['upload_ip']), properties=properties, ensure_unique_name=True) log.info("Saved to %s" % col.manifest_locator()) log.info("Done") exit(0) if __name__ == "__main__": main()