about summary refs log tree commit diff
path: root/workflows
diff options
context:
space:
mode:
Diffstat (limited to 'workflows')
-rw-r--r--workflows/pull-data/genbank/genbank.py36
-rw-r--r--workflows/pull-data/genbank/ref.py237
2 files changed, 66 insertions, 207 deletions
diff --git a/workflows/pull-data/genbank/genbank.py b/workflows/pull-data/genbank/genbank.py
index 90f5a14..80432de 100644
--- a/workflows/pull-data/genbank/genbank.py
+++ b/workflows/pull-data/genbank/genbank.py
@@ -52,18 +52,28 @@ Example of an output JSON:
   }
 }
 
-Note: missing data should be None! Do not fill in other data by
-'guessing'.
-
-When data is malformed an warning should be issued.
 
 """
 
 def get_metadata(id, gbseq):
+    """This is a minimal data parser from genbank XML records. Inference
+    on, for example geo location, is not allowed in this function and
+    happens downstream.
+
+    That is to keep the parsing simple.
+
+    Important: missing data should be missing or None! Do not fill in
+    data by 'guessing'.
+
+    When data is malformed a warning should be logged and added to the
+    warning list.
+
+    """
     host = types.SimpleNamespace()
     sample = types.SimpleNamespace()
     submitter = types.SimpleNamespace()
     technology = types.SimpleNamespace()
+    virus = types.SimpleNamespace()
     warnings = []
 
     def warn(msg):
@@ -81,13 +91,14 @@ def get_metadata(id, gbseq):
     sample.sample_id = id
     sample.database = "https://www.ncbi.nlm.nih.gov/genbank/"
     sample.source_database_accession = f"http://identifiers.org/insdc/{id}#sequence"
-    # <GBQualifier>
-    #   <GBQualifier_name>country</GBQualifier_name>
     #   <GBQualifier_value>USA: Cruise_Ship_1, California</GBQualifier_value>
-    # </GBQualifier>
-    sample.collection_location = "FIXME"
+    n = fetch("host_species", ".//GBQualifier/GBQualifier_name/[.='country']/../GBQualifier_value")
+    if n: sample.collection_location = n
+    else: warn("Missing collection_location")
 
     submitter.authors = [n.text for n in gbseq.findall(".//GBAuthor")]
+    if not len(submitter.authors): warn("Missing authors")
+
     # <GBReference_journal>Submitted (28-OCT-2020) MDU-PHL, The Peter
     #   Doherty Institute for Infection and Immunity, 792 Elizabeth
     #   Street, Melbourne, Vic 3000, Australia
@@ -103,7 +114,6 @@ def get_metadata(id, gbseq):
         pass
     except ValueError:
         submitter.additional_submitter_information = n
-        pass
 
     try:
         n = gbseq.find("./GBSeq_comment").text
@@ -122,6 +132,7 @@ def get_metadata(id, gbseq):
         p = re.compile(r'.*Sequencing Technology :: ([^;]+).*')
         m = p.match(n)
         if m: technology.sample_sequencing_technology = m.group(1).strip()
+        else: warn("Missing sample_sequencing_technology")
 
     # --- Dates
     n = gbseq.find("./GBSeq_create-date")
@@ -171,13 +182,18 @@ def get_metadata(id, gbseq):
                 host.host_age = int(m.group(1))
                 host.host_age_unit = 'http://purl.obolibrary.org/obo/UO_0000036'
         # sys.exit(1)
+    n = fetch("virus_strain", ".//GBQualifier/GBQualifier_name/[.='isolate']/../GBQualifier_value")
+    if n: virus.virus_strain = n
+    n = fetch("virus_species", ".//GBQualifier/GBQualifier_name/[.='db_xref']/../GBQualifier_value")
+    if n: virus.virus_species = "http://purl.obolibrary.org/obo/NCBITaxon_"+n.split('taxon:')[1]
+
 
     info = {
         'id': 'placeholder',
         'update_date': str(update_date),
         'host': host.__dict__,
         'sample': sample.__dict__,
-        #'virus': virus,
+        'virus': virus.__dict__,
         'technology': technology.__dict__,
         'submitter': submitter.__dict__,
         'warnings': warnings,
diff --git a/workflows/pull-data/genbank/ref.py b/workflows/pull-data/genbank/ref.py
index a9b4f06..4d4df48 100644
--- a/workflows/pull-data/genbank/ref.py
+++ b/workflows/pull-data/genbank/ref.py
@@ -1,199 +1,42 @@
 # ---- BELOW IS JUST FOR REFERENCE ----
 
-                        if field_in_yaml == 'sequencing_coverage':
-                            # A regular expression would be better!
-                            try:
-                                technology[field_in_yaml] = [
-                                    float(tech_info_to_parse.replace('(average)', '').replace("reads/nt", '').
-                                          replace('(average for 6 sequences)', '').replace(',', '.').strip(' xX>'))
-                                ]
-                            except ValueError:
-                                print(accession_version, "Couldn't make sense of Coverage '%s'" % tech_info_to_parse)
-                                pass
-                        elif field_in_yaml == 'sample_sequencing_technology':
-                            new_seq_tec_list = []
-                            for seq_tec in tech_info_to_parse.split(';'):
-                                seq_tec = seq_tec.strip()
-                                if seq_tec in field_to_term_to_uri_dict['ncbi_sequencing_technology']:
-                                    seq_tec = field_to_term_to_uri_dict['ncbi_sequencing_technology'][seq_tec]
-                                    new_seq_tec_list.append(seq_tec)
-                                else:
-                                    missing_value_list.append('\t'.join([accession_version, 'sample_sequencing_technology', seq_tec]))
-
-                            if len(new_seq_tec_list) > 0:
-                                technology['sample_sequencing_technology'] = [x for x in new_seq_tec_list]
-                        else:
-                            technology[field_in_yaml] = tech_info_to_parse
-
-
-
-
-       elif 'gender' in GBQualifier_value_text_list[1]:
-                                    host_sex_one_lecter = GBQualifier_value_text_list[1].split(':')[-1].strip()
-                                    if host_sex_one_lecter in ['F', 'M']:
-                                        host_sex = 'female' if host_sex_one_lecter == 'F' else 'male'
-
-                                if host_sex in ['male', 'female']:
-                                    host['host_sex'] = "http://purl.obolibrary.org/obo/PATO_0000384" if host_sex == 'male' else "http://purl.obolibrary.org/obo/PATO_0000383"
-                                elif GBQualifier_value_text_list[1] in field_to_term_to_uri_dict['ncbi_host_health_status']:
-                                    host['host_health_status'] = field_to_term_to_uri_dict['ncbi_host_health_status'][GBQualifier_value_text_list[1]]
-                                else:
-                                    missing_value_list.append('\t'.join([accession_version, 'host_sex or host_health_status', GBQualifier_value_text_list[1]]))
-
-                                # Host age
-                                host_age = -1
-                                if len(GBQualifier_value_text_list[1].split(' ')) > 1 and is_integer(GBQualifier_value_text_list[1].split(' ')[-1]):
-                                    host_age = int(GBQualifier_value_text_list[1].split(' ')[-1])
-                                elif len(GBQualifier_value_text_list) > 2 and is_integer(GBQualifier_value_text_list[2].split(' ')[-1]):
-                                    host_age = int(GBQualifier_value_text_list[2].split(' ')[-1])
-
-                                if host_age >= 0 and host_age < 110:
-                                    host['host_age'] = host_age
-                                    host['host_age_unit'] = 'http://purl.obolibrary.org/obo/UO_0000036'
-                                elif len(GBQualifier_value_text_list) > 2:
-                                    missing_value_list.append('\t'.join([accession_version, 'host_age', GBQualifier_value_text_list[2]]))
-                    elif GBQualifier_name_text == 'collected_by':
-                        if any([x in GBQualifier_value_text.lower() for x in ['institute', 'hospital', 'city', 'center']]):
-                            sample['collecting_institution'] = GBQualifier_value_text
-                        else:
-                            sample['collector_name'] = GBQualifier_value_text
-                    elif GBQualifier_name_text == 'isolation_source':
-                        if GBQualifier_value_text.upper() in field_to_term_to_uri_dict['ncbi_speciesman_source']:
-                            GBQualifier_value_text = GBQualifier_value_text.upper()  # For example, in case of 'usa: wa'
-
-                        # Little cleaning
-                        GBQualifier_value_text = GBQualifier_value_text.strip("/'")
-
-                        if GBQualifier_value_text in field_to_term_to_uri_dict['ncbi_speciesman_source']:
-                            sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source'][GBQualifier_value_text]]
-                        else:
-                            if GBQualifier_value_text.lower() in ['np/op', 'np-op', 'np/op swab', 'np/np swab', 'nasopharyngeal and oropharyngeal swab', 'nasopharyngeal/oropharyngeal swab', 'combined nasopharyngeal and oropharyngeal swab', 'naso and/or oropharyngeal swab']:
-                                sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['oropharyngeal swab']]
-                            elif GBQualifier_value_text.lower() in ['nasopharyngeal swab/throat swab', 'nasopharyngeal/throat swab', 'nasopharyngeal swab and throat swab', 'nasal swab and throat swab', 'nasopharyngeal aspirate/throat swab', 'Nasopharyngeal/Throat']:
-                                sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
-                            elif GBQualifier_value_text.lower() in ['nasopharyngeal aspirate & throat swab', 'nasopharyngeal aspirate and throat swab']:
-                                sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal aspirate'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
-                            elif GBQualifier_value_text.lower() in ['nasal swab and throat swab']:
-                                sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
-                            elif GBQualifier_value_text.lower() in ['nasal-swab and oro-pharyngeal swab']:
-                                sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['oropharyngeal swab']]
-                            else:
-                                missing_value_list.append('\t'.join([accession_version, 'specimen_source', GBQualifier_value_text]))
-                    elif GBQualifier_name_text == 'collection_date':
-                        # TO_DO: which format we will use?
-                        date_to_write = GBQualifier_value_text
-
-                        if len(GBQualifier_value_text.split('-')) == 1:
-                            if int(GBQualifier_value_text) < 2020:
-                                date_to_write = "{}-12-15".format(GBQualifier_value_text)
-                            else:
-                                date_to_write = "{}-01-15".format(GBQualifier_value_text)
-
-                            if 'additional_collection_information' in sample:
-                                sample['additional_collection_information'] += "; The 'collection_date' is estimated (the original date was: {})".format(GBQualifier_value_text)
-                            else:
-                                sample['additional_collection_information'] = "The 'collection_date' is estimated (the original date was: {})".format(GBQualifier_value_text)
-                        elif len(GBQualifier_value_text.split('-')) == 2:
-                            date_to_write = parse(GBQualifier_value_text).strftime('%Y-%m') + '-15'
-
-                            if 'additional_collection_information' in sample:
-                                sample['additional_collection_information'] += "; The 'collection_date' is estimated (the original date was: {})".format(GBQualifier_value_text)
-                            else:
-                                sample['additional_collection_information'] = "The 'collection_date' is estimated (the original date was: {})".format(GBQualifier_value_text)
-                        elif len(GBQualifier_value_text.split('-')) == 3:
-                            GBQualifier_value_text_list = GBQualifier_value_text.split('-')
-
-                            if GBQualifier_value_text_list[1].isalpha():
-                                date_to_write = parse(GBQualifier_value_text).strftime('%Y-%m-%d')
-
-                        sample['collection_date'] = date_to_write
-                    elif GBQualifier_name_text in ['lat_lon', 'country']:
-                        if GBQualifier_name_text == 'country' and ': ' in GBQualifier_value_text:
-                            GBQualifier_value_text = GBQualifier_value_text.replace(': ', ':')
-
-                        if GBQualifier_value_text in field_to_term_to_uri_dict['ncbi_countries']:
-                            sample['collection_location'] = field_to_term_to_uri_dict['ncbi_countries'][GBQualifier_value_text]
-                        else:
-                            missing_value_list.append('\t'.join([accession_version, GBQualifier_name_text, GBQualifier_value_text]))
-                    elif GBQualifier_name_text == 'note':
-                        if 'additional_collection_information' in sample:
-                            sample['additional_collection_information'] += '; ' + GBQualifier_value_text
-                        else:
-                            sample['additional_collection_information'] = GBQualifier_value_text
-                    elif GBQualifier_name_text == 'isolate':
-                        virus['virus_strain'] = GBQualifier_value_text
-                    elif GBQualifier_name_text == 'db_xref':
-                        virus['virus_species'] = "http://purl.obolibrary.org/obo/NCBITaxon_"+GBQualifier_value_text.split('taxon:')[1]
-
-            # Check if mandatory fields are missing
-            if 'sample_sequencing_technology' not in technology:
-                # print(accession_version, ' - technology not found')
-                if accession_version not in not_created_accession_dict:
-                    not_created_accession_dict[accession_version] = []
-                not_created_accession_dict[accession_version].append('sample_sequencing_technology not found')
-
-            if 'collection_location' not in sample:
-                if accession_version not in not_created_accession_dict:
-                    not_created_accession_dict[accession_version] = []
-                not_created_accession_dict[accession_version].append('collection_location not found')
-
-            if 'collection_date' not in sample:
-                if accession_version not in not_created_accession_dict:
-                    not_created_accession_dict[accession_version] = []
-                not_created_accession_dict[accession_version].append('collection_date not found')
-            else:
-                year, month, day = [int(x) for x in sample['collection_date'].split('-')]
-
-                collection_date_in_yaml = datetime(year, month, day)
-                if collection_date_in_yaml < min_acceptable_collection_date:
-                    if accession_version not in not_created_accession_dict:
-                        not_created_accession_dict[accession_version] = []
-                    not_created_accession_dict[accession_version].append('collection_date too early')
-
-            if 'authors' not in submitter:
-                if accession_version not in not_created_accession_dict:
-                    not_created_accession_dict[accession_version] = []
-                not_created_accession_dict[accession_version].append('authors not found')
-
-            if 'host_species' not in host:
-                if accession_version not in not_created_accession_dict:
-                    not_created_accession_dict[accession_version] = []
-                not_created_accession_dict[accession_version].append('host_species not found')
-
-            if len(GBSeq_sequence.text) < min_len_to_count:
-                if accession_version not in not_created_accession_dict:
-                    not_created_accession_dict[accession_version] = []
-                not_created_accession_dict[accession_version].append('sequence shorter than {} bp'.format(min_len_to_count))
-
-            if accession_version not in not_created_accession_dict:
-                num_seq_with_len_ge_X_bp += 1
-
-                # with open(os.path.join(dir_fasta_and_yaml, '{}.fasta'.format(accession_version)), 'w') as fw:
-                #    fw.write('>{}\n{}'.format(accession_version, GBSeq_sequence.text.upper()))
-
-                with open(os.path.join(dir_fasta_and_yaml, '{}.yaml'.format(accession_version)), 'w') as fw:
-                    json.dump(info, fw, indent=2)
-        except:
-            print("Unexpected error for the ID {}: {}".format(accession_version, sys.exc_info()[0]))
-            accession_with_errors_list.append(accession_version)
-            continue
-
-if len(missing_value_list) > 0:
-    path_missing_terms_tsv = 'missing_terms.genbank.tsv'
-    print('Written missing terms in {}'.format(path_missing_terms_tsv))
-    with open(path_missing_terms_tsv, 'w') as fw:
-        fw.write('\n'.join(missing_value_list))
-
-if len(accession_with_errors_list) > 0:
-    path_accession_with_errors_tsv = 'accession_with_errors.genbank.tsv'
-    print('Written the accession with errors in {}'.format(path_accession_with_errors_tsv))
-    with open(path_accession_with_errors_tsv, 'w') as fw:
-        fw.write('\n'.join(accession_with_errors_list))
-
-if len(not_created_accession_dict) > 0:
-    path_not_created_accession_tsv = 'not_created_accession.genbank.tsv'
-    print('Written not created accession in {}'.format(path_not_created_accession_tsv))
-    with open(path_not_created_accession_tsv, 'w') as fw:
-        fw.write('\n'.join(['\t'.join([accession_version, ','.join(missing_info_list)]) for accession_version, missing_info_list in not_created_accession_dict.items()]))
-
-print('Num. new sequences with length >= {} bp: {}'.format(min_len_to_count, num_seq_with_len_ge_X_bp))
+if field_in_yaml == 'sequencing_coverage':
+    # A regular expression would be better!
+    try:
+        technology[field_in_yaml] = [
+            float(tech_info_to_parse.replace('(average)', '').replace("reads/nt", '').
+                  replace('(average for 6 sequences)', '').replace(',', '.').strip(' xX>'))
+        ]
+    except ValueError:
+        print(accession_version, "Couldn't make sense of Coverage '%s'" % tech_info_to_parse)
+        pass
+
+
+elif GBQualifier_name_text == 'collected_by':
+    if any([x in GBQualifier_value_text.lower() for x in ['institute', 'hospital', 'city', 'center']]):
+        sample['collecting_institution'] = GBQualifier_value_text
+    else:
+        sample['collector_name'] = GBQualifier_value_text
+
+elif GBQualifier_name_text == 'isolation_source':
+if GBQualifier_value_text.upper() in field_to_term_to_uri_dict['ncbi_speciesman_source']:
+    GBQualifier_value_text = GBQualifier_value_text.upper()  # For example, in case of 'usa: wa'
+
+# Little cleaning
+GBQualifier_value_text = GBQualifier_value_text.strip("/'")
+
+if GBQualifier_value_text in field_to_term_to_uri_dict['ncbi_speciesman_source']:
+    sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source'][GBQualifier_value_text]]
+else:
+    if GBQualifier_value_text.lower() in ['np/op', 'np-op', 'np/op swab', 'np/np swab', 'nasopharyngeal and oropharyngeal swab', 'nasopharyngeal/oropharyngeal swab', 'combined nasopharyngeal and oropharyngeal swab', 'naso and/or oropharyngeal swab']:
+        sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['oropharyngeal swab']]
+    elif GBQualifier_value_text.lower() in ['nasopharyngeal swab/throat swab', 'nasopharyngeal/throat swab', 'nasopharyngeal swab and throat swab', 'nasal swab and throat swab', 'nasopharyngeal aspirate/throat swab', 'Nasopharyngeal/Throat']:
+        sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
+    elif GBQualifier_value_text.lower() in ['nasopharyngeal aspirate & throat swab', 'nasopharyngeal aspirate and throat swab']:
+        sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal aspirate'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
+    elif GBQualifier_value_text.lower() in ['nasal swab and throat swab']:
+        sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
+    elif GBQualifier_value_text.lower() in ['nasal-swab and oro-pharyngeal swab']:
+        sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['oropharyngeal swab']]
+    else:
+        missing_value_list.append('\t'.join([accession_version, 'specimen_source', GBQualifier_value_text]))