aboutsummaryrefslogtreecommitdiff
path: root/bh20seqanalyzer/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'bh20seqanalyzer/main.py')
-rw-r--r--bh20seqanalyzer/main.py71
1 files changed, 48 insertions, 23 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index 31ad4c4..9a36cae 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -30,6 +30,7 @@ def validate_upload(api, collection, validated_project,
try:
metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml"))
metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"]
+ sample_id = metadata_content["sample"]["sample_id"]
add_lc_filename(metadata_content, metadata_content["id"])
valid = qc_metadata(metadata_content) and valid
except Exception as e:
@@ -39,21 +40,25 @@ def validate_upload(api, collection, validated_project,
logging.warn("Failed metadata qc")
if valid:
- tgt = None
- for n in ("sequence.fasta", "reads.fastq"):
- if n not in col:
- continue
- with col.open(n) as qf:
- tgt = qc_fasta(qf)
- if tgt != n:
- logging.info("Expected %s but magic says it should be %s", n, tgt)
- valid = False
- elif tgt == "reads.fastq":
- start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid)
- return False
- if tgt is None:
+ try:
+ tgt = None
+ paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"}
+ for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
+ if n not in col:
+ continue
+ with col.open(n, 'rb') as qf:
+ tgt = qc_fasta(qf)[0]
+ if tgt != n and tgt != paired.get(n):
+ logging.info("Expected %s but magic says it should be %s", n, tgt)
+ valid = False
+ elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
+ start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid, n, sample_id)
+ return False
+ if tgt is None:
+ valid = False
+ logging.warn("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"])
+ except ValueError as v:
valid = False
- logging.warn("Upload '%s' does not contain sequence.fasta or reads.fastq", collection["name"])
dup = api.collections().list(filters=[["owner_uuid", "=", validated_project],
["portable_data_hash", "=", col.portable_data_hash()]]).execute()
@@ -69,9 +74,8 @@ def validate_upload(api, collection, validated_project,
"owner_uuid": validated_project,
"name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime()))}).execute()
else:
- pass
# It is invalid, delete it.
- #logging.warn("Deleting '%s'" % collection["name"])
+ logging.warn("Suggest deleting '%s'" % collection["name"])
#api.collections().delete(uuid=collection["uuid"]).execute()
return valid
@@ -95,6 +99,7 @@ def run_workflow(api, parent_project, workflow_uuid, name, inputobj):
tmp.name]
logging.info("Running %s" % ' '.join(cmd))
comp = subprocess.run(cmd, capture_output=True)
+ logging.info("Submitted %s", comp.stdout)
if comp.returncode != 0:
logging.error(comp.stderr.decode('utf-8'))
@@ -103,12 +108,11 @@ def run_workflow(api, parent_project, workflow_uuid, name, inputobj):
def start_fastq_to_fasta(api, collection,
analysis_project,
- fastq_workflow_uuid):
- newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", {
- "fastq_forward": {
- "class": "File",
- "location": "keep:%s/reads.fastq" % collection["portable_data_hash"]
- },
+ fastq_workflow_uuid,
+ tgt,
+ sample_id):
+
+ params = {
"metadata": {
"class": "File",
"location": "keep:%s/metadata.yaml" % collection["portable_data_hash"]
@@ -116,8 +120,26 @@ def start_fastq_to_fasta(api, collection,
"ref_fasta": {
"class": "File",
"location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta"
+ },
+ "sample_id": sample_id
+ }
+
+ if tgt.startswith("reads.fastq"):
+ params["fastq_forward"] = {
+ "class": "File",
+ "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt)
+ }
+ elif tgt.startswith("reads_1.fastq"):
+ params["fastq_forward"] = {
+ "class": "File",
+ "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:])
}
- })
+ params["fastq_reverse"] = {
+ "class": "File",
+ "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:])
+ }
+
+ newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", params)
api.collections().update(uuid=collection["uuid"],
body={"owner_uuid": newproject["uuid"]}).execute()
@@ -222,6 +244,7 @@ def main():
parser.add_argument('--latest-result-collection', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='')
parser.add_argument('--kickoff', action="store_true")
+ parser.add_argument('--once', action="store_true")
args = parser.parse_args()
api = arvados.api()
@@ -265,4 +288,6 @@ def main():
args.pangenome_analysis_project,
args.latest_result_collection)
+ if args.once:
+ break
time.sleep(15)