aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Amstutz2020-04-10 15:12:08 -0400
committerPeter Amstutz2020-04-10 15:12:42 -0400
commita6ba9a5203a568611a94c043fd13e2ec50f071da (patch)
tree88b3f2796ed6358f61abc9970c938f18a69bd5c0
parent9e44ae814862ea91456d95b31981c16ecae6d317 (diff)
downloadbh20-seq-resource-a6ba9a5203a568611a94c043fd13e2ec50f071da.tar.gz
bh20-seq-resource-a6ba9a5203a568611a94c043fd13e2ec50f071da.tar.lz
bh20-seq-resource-a6ba9a5203a568611a94c043fd13e2ec50f071da.zip
Validate & propagate metadata
-rw-r--r--bh20seqanalyzer/main.py45
-rw-r--r--bh20sequploader/bh20seq-schema.yml6
-rw-r--r--bh20sequploader/main.py2
-rw-r--r--bh20sequploader/qc_metadata.py29
4 files changed, 70 insertions, 12 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index 2030c1e..1fb51b5 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -8,6 +8,8 @@ import json
import logging
import ruamel.yaml
from bh20sequploader.qc_metadata import qc_metadata
+import pkg_resources
+from schema_salad.sourceline import add_lc_filename
logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
@@ -24,8 +26,14 @@ def validate_upload(api, collection, validated_project,
logging.warn("Upload '%s' missing metadata.yaml", collection["name"])
valid = False
else:
- metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml"))
- #valid = qc_metadata(metadata_content) and valid
+ try:
+ metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml"))
+ metadata_content["id"] = "keep:%s/metadata.yaml" % collection["portable_data_hash"]
+ add_lc_filename(metadata_content, metadata_content["id"])
+ valid = qc_metadata(metadata_content) and valid
+ except Exception as e:
+ logging.warn(e)
+ valid = False
if not valid:
logging.warn("Failed metadata qc")
@@ -52,9 +60,10 @@ def validate_upload(api, collection, validated_project,
"owner_uuid": validated_project,
"name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime()))}).execute()
else:
+ pass
# It is invalid, delete it.
- logging.warn("Deleting '%s'" % collection["name"])
- api.collections().delete(uuid=collection["uuid"]).execute()
+ #logging.warn("Deleting '%s'" % collection["name"])
+ #api.collections().delete(uuid=collection["uuid"]).execute()
return valid
@@ -107,12 +116,17 @@ def start_fastq_to_fasta(api, collection,
def start_pangenome_analysis(api,
analysis_project,
pangenome_workflow_uuid,
- validated_project):
+ validated_project,
+ schema_ref):
validated = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", validated_project]])
inputobj = {
"inputReads": [],
"metadata": [],
- "subjects": []
+ "subjects": [],
+ "metadataSchema": {
+ "class": "File",
+ "location": schema_ref
+ }
}
for v in validated:
inputobj["inputReads"].append({
@@ -166,12 +180,26 @@ def move_fastq_to_fasta_results(api, analysis_project, uploader_project):
api.groups().update(uuid=p["uuid"], body={"properties": p["properties"]}).execute()
+def upload_schema(api, workflow_def_project):
+ schema_resource = pkg_resources.resource_stream('bh20sequploader.qc_metadata', "bh20seq-schema.yml")
+ c = arvados.collection.Collection()
+ with c.open("schema.yml", "wb") as f:
+ f.write(schema_resource.read())
+ pdh = c.portable_data_hash()
+ wd = api.collections().list(filters=[["owner_uuid", "=", workflow_def_project],
+ ["portable_data_hash", "=", pdh]]).execute()
+ if len(wd["items"]) == 0:
+ c.save_new(owner_uuid=workflow_def_project, name="Metadata schema", ensure_unique_name=True)
+ return "keep:%s/schema.yml" % pdh
+
+
def main():
parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project')
parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='')
parser.add_argument('--pangenome-analysis-project', type=str, default='lugli-j7d0g-y4k4uswcqi3ku56', help='')
parser.add_argument('--fastq-project', type=str, default='lugli-j7d0g-xcjxp4oox2u1w8u', help='')
parser.add_argument('--validated-project', type=str, default='lugli-j7d0g-5ct8p1i1wrgyjvp', help='')
+ parser.add_argument('--workflow-def-project', type=str, default='lugli-j7d0g-5hswinmpyho8dju', help='')
parser.add_argument('--pangenome-workflow-uuid', type=str, default='lugli-7fd4e-mqfu9y3ofnpnho1', help='')
parser.add_argument('--fastq-workflow-uuid', type=str, default='lugli-7fd4e-2zp9q4jo5xpif9y', help='')
@@ -183,6 +211,8 @@ def main():
logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project))
+ schema_ref = upload_schema(api, args.workflow_def_project)
+
while True:
move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project)
@@ -198,7 +228,8 @@ def main():
start_pangenome_analysis(api,
args.pangenome_analysis_project,
args.pangenome_workflow_uuid,
- args.validated_project)
+ args.validated_project,
+ schema_ref)
copy_most_recent_result(api,
args.pangenome_analysis_project,
diff --git a/bh20sequploader/bh20seq-schema.yml b/bh20sequploader/bh20seq-schema.yml
index cf9b015..a072bd7 100644
--- a/bh20sequploader/bh20seq-schema.yml
+++ b/bh20sequploader/bh20seq-schema.yml
@@ -174,9 +174,9 @@ $graph:
jsonldPredicate:
_id: "@id"
#_type: "@id"
- sequencefile:
- doc: The subject (eg the fasta/fastq file) that this metadata describes
+ id:
+ doc: The subject (eg the fasta/fastq file) that the metadata describes
type: string?
jsonldPredicate:
_id: "@id"
- _type: "@id" \ No newline at end of file
+ _type: "@id"
diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py
index bfb8c51..2032508 100644
--- a/bh20sequploader/main.py
+++ b/bh20sequploader/main.py
@@ -22,7 +22,7 @@ def main():
api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True)
- if not qc_metadata(args.metadata.name):
+ if not bh20sequploader.qc_metadata.qc_metadata(args.metadata.name):
print("Failed metadata qc")
exit(1)
diff --git a/bh20sequploader/qc_metadata.py b/bh20sequploader/qc_metadata.py
index ebe4dfc..38edcaa 100644
--- a/bh20sequploader/qc_metadata.py
+++ b/bh20sequploader/qc_metadata.py
@@ -1,12 +1,25 @@
import schema_salad.schema
+import schema_salad.ref_resolver
import logging
import pkg_resources
import logging
+import traceback
+
+class CustomFetcher(schema_salad.ref_resolver.DefaultFetcher):
+ def check_exists(sup, url):
+ if url.startswith("keep:"):
+ return True
+ else:
+ return super().check_exists(url)
+
+ def supported_schemes(self): # type: () -> List[str]
+ return ["file", "http", "https", "mailto", "keep"]
+
def qc_metadata(metadatafile):
schema_resource = pkg_resources.resource_stream(__name__, "bh20seq-schema.yml")
cache = {"https://raw.githubusercontent.com/arvados/bh20-seq-resource/master/bh20sequploader/bh20seq-schema.yml": schema_resource.read().decode("utf-8")}
- (document_loader,
+ (loader,
avsc_names,
schema_metadata,
metaschema_loader) = schema_salad.schema.load_schema("https://raw.githubusercontent.com/arvados/bh20-seq-resource/master/bh20sequploader/bh20seq-schema.yml", cache=cache)
@@ -15,9 +28,23 @@ def qc_metadata(metadatafile):
print(avsc_names)
return False
+ document_loader = schema_salad.ref_resolver.Loader(
+ loader.ctx,
+ schemagraph=loader.graph,
+ foreign_properties=loader.foreign_properties,
+ idx=loader.idx,
+ cache=loader.cache,
+ fetcher_constructor=CustomFetcher,
+ skip_schemas=loader.skip_schemas,
+ url_fields=loader.url_fields,
+ allow_attachments=loader.allow_attachments,
+ session=loader.session,
+ )
+
try:
doc, metadata = schema_salad.schema.load_and_validate(document_loader, avsc_names, metadatafile, True)
return True
except Exception as e:
+ traceback.print_exc()
logging.warn(e)
return False