From a6ba9a5203a568611a94c043fd13e2ec50f071da Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 10 Apr 2020 15:12:08 -0400 Subject: Validate & propagate metadata --- bh20seqanalyzer/main.py | 45 ++++++++++++++++++++++++++++++++------ bh20sequploader/bh20seq-schema.yml | 6 ++--- bh20sequploader/main.py | 2 +- bh20sequploader/qc_metadata.py | 29 +++++++++++++++++++++++- 4 files changed, 70 insertions(+), 12 deletions(-) diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 2030c1e..1fb51b5 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -8,6 +8,8 @@ import json import logging import ruamel.yaml from bh20sequploader.qc_metadata import qc_metadata +import pkg_resources +from schema_salad.sourceline import add_lc_filename logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) @@ -24,8 +26,14 @@ def validate_upload(api, collection, validated_project, logging.warn("Upload '%s' missing metadata.yaml", collection["name"]) valid = False else: - metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml")) - #valid = qc_metadata(metadata_content) and valid + try: + metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml")) + metadata_content["id"] = "keep:%s/metadata.yaml" % collection["portable_data_hash"] + add_lc_filename(metadata_content, metadata_content["id"]) + valid = qc_metadata(metadata_content) and valid + except Exception as e: + logging.warn(e) + valid = False if not valid: logging.warn("Failed metadata qc") @@ -52,9 +60,10 @@ def validate_upload(api, collection, validated_project, "owner_uuid": validated_project, "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime()))}).execute() else: + pass # It is invalid, delete it. - logging.warn("Deleting '%s'" % collection["name"]) - api.collections().delete(uuid=collection["uuid"]).execute() + #logging.warn("Deleting '%s'" % collection["name"]) + #api.collections().delete(uuid=collection["uuid"]).execute() return valid @@ -107,12 +116,17 @@ def start_fastq_to_fasta(api, collection, def start_pangenome_analysis(api, analysis_project, pangenome_workflow_uuid, - validated_project): + validated_project, + schema_ref): validated = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", validated_project]]) inputobj = { "inputReads": [], "metadata": [], - "subjects": [] + "subjects": [], + "metadataSchema": { + "class": "File", + "location": schema_ref + } } for v in validated: inputobj["inputReads"].append({ @@ -166,12 +180,26 @@ def move_fastq_to_fasta_results(api, analysis_project, uploader_project): api.groups().update(uuid=p["uuid"], body={"properties": p["properties"]}).execute() +def upload_schema(api, workflow_def_project): + schema_resource = pkg_resources.resource_stream('bh20sequploader.qc_metadata', "bh20seq-schema.yml") + c = arvados.collection.Collection() + with c.open("schema.yml", "wb") as f: + f.write(schema_resource.read()) + pdh = c.portable_data_hash() + wd = api.collections().list(filters=[["owner_uuid", "=", workflow_def_project], + ["portable_data_hash", "=", pdh]]).execute() + if len(wd["items"]) == 0: + c.save_new(owner_uuid=workflow_def_project, name="Metadata schema", ensure_unique_name=True) + return "keep:%s/schema.yml" % pdh + + def main(): parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project') parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='') parser.add_argument('--pangenome-analysis-project', type=str, default='lugli-j7d0g-y4k4uswcqi3ku56', help='') parser.add_argument('--fastq-project', type=str, default='lugli-j7d0g-xcjxp4oox2u1w8u', help='') parser.add_argument('--validated-project', type=str, default='lugli-j7d0g-5ct8p1i1wrgyjvp', help='') + parser.add_argument('--workflow-def-project', type=str, default='lugli-j7d0g-5hswinmpyho8dju', help='') parser.add_argument('--pangenome-workflow-uuid', type=str, default='lugli-7fd4e-mqfu9y3ofnpnho1', help='') parser.add_argument('--fastq-workflow-uuid', type=str, default='lugli-7fd4e-2zp9q4jo5xpif9y', help='') @@ -183,6 +211,8 @@ def main(): logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project)) + schema_ref = upload_schema(api, args.workflow_def_project) + while True: move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project) @@ -198,7 +228,8 @@ def main(): start_pangenome_analysis(api, args.pangenome_analysis_project, args.pangenome_workflow_uuid, - args.validated_project) + args.validated_project, + schema_ref) copy_most_recent_result(api, args.pangenome_analysis_project, diff --git a/bh20sequploader/bh20seq-schema.yml b/bh20sequploader/bh20seq-schema.yml index cf9b015..a072bd7 100644 --- a/bh20sequploader/bh20seq-schema.yml +++ b/bh20sequploader/bh20seq-schema.yml @@ -174,9 +174,9 @@ $graph: jsonldPredicate: _id: "@id" #_type: "@id" - sequencefile: - doc: The subject (eg the fasta/fastq file) that this metadata describes + id: + doc: The subject (eg the fasta/fastq file) that the metadata describes type: string? jsonldPredicate: _id: "@id" - _type: "@id" \ No newline at end of file + _type: "@id" diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index bfb8c51..2032508 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -22,7 +22,7 @@ def main(): api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True) - if not qc_metadata(args.metadata.name): + if not bh20sequploader.qc_metadata.qc_metadata(args.metadata.name): print("Failed metadata qc") exit(1) diff --git a/bh20sequploader/qc_metadata.py b/bh20sequploader/qc_metadata.py index ebe4dfc..38edcaa 100644 --- a/bh20sequploader/qc_metadata.py +++ b/bh20sequploader/qc_metadata.py @@ -1,12 +1,25 @@ import schema_salad.schema +import schema_salad.ref_resolver import logging import pkg_resources import logging +import traceback + +class CustomFetcher(schema_salad.ref_resolver.DefaultFetcher): + def check_exists(sup, url): + if url.startswith("keep:"): + return True + else: + return super().check_exists(url) + + def supported_schemes(self): # type: () -> List[str] + return ["file", "http", "https", "mailto", "keep"] + def qc_metadata(metadatafile): schema_resource = pkg_resources.resource_stream(__name__, "bh20seq-schema.yml") cache = {"https://raw.githubusercontent.com/arvados/bh20-seq-resource/master/bh20sequploader/bh20seq-schema.yml": schema_resource.read().decode("utf-8")} - (document_loader, + (loader, avsc_names, schema_metadata, metaschema_loader) = schema_salad.schema.load_schema("https://raw.githubusercontent.com/arvados/bh20-seq-resource/master/bh20sequploader/bh20seq-schema.yml", cache=cache) @@ -15,9 +28,23 @@ def qc_metadata(metadatafile): print(avsc_names) return False + document_loader = schema_salad.ref_resolver.Loader( + loader.ctx, + schemagraph=loader.graph, + foreign_properties=loader.foreign_properties, + idx=loader.idx, + cache=loader.cache, + fetcher_constructor=CustomFetcher, + skip_schemas=loader.skip_schemas, + url_fields=loader.url_fields, + allow_attachments=loader.allow_attachments, + session=loader.session, + ) + try: doc, metadata = schema_salad.schema.load_and_validate(document_loader, avsc_names, metadatafile, True) return True except Exception as e: + traceback.print_exc() logging.warn(e) return False -- cgit v1.2.3