diff options
author | LLTommy | 2020-04-23 20:18:36 +0200 |
---|---|---|
committer | GitHub | 2020-04-23 20:18:36 +0200 |
commit | 7049cd5d29acd601ccbbc9d04f001b84a51e9bd5 (patch) | |
tree | 34a1254d81c2e526427fedb1deaa9f8441e8b260 /scripts/from_genbank_to_fasta_and_yaml.py | |
parent | f38b9c6f22b82327df9648938a5a4bcf863d8c41 (diff) | |
parent | c7612e7eda5cd38bfbb2d293bebf732893a41b6c (diff) | |
download | bh20-seq-resource-7049cd5d29acd601ccbbc9d04f001b84a51e9bd5.tar.gz bh20-seq-resource-7049cd5d29acd601ccbbc9d04f001b84a51e9bd5.tar.lz bh20-seq-resource-7049cd5d29acd601ccbbc9d04f001b84a51e9bd5.zip |
Merge branch 'master' into patch-3
Diffstat (limited to 'scripts/from_genbank_to_fasta_and_yaml.py')
-rwxr-xr-x[-rw-r--r--] | scripts/from_genbank_to_fasta_and_yaml.py | 349 |
1 files changed, 193 insertions, 156 deletions
diff --git a/scripts/from_genbank_to_fasta_and_yaml.py b/scripts/from_genbank_to_fasta_and_yaml.py index 0c410d7..f76cb29 100644..100755 --- a/scripts/from_genbank_to_fasta_and_yaml.py +++ b/scripts/from_genbank_to_fasta_and_yaml.py @@ -1,59 +1,59 @@ +#!/usr/bin/env python3 + from Bio import Entrez -Entrez.email = 'insert_your_email@gmail.com' +Entrez.email = 'another_email@gmail.com' import xml.etree.ElementTree as ET -import yaml +import json import os -from datetime import date -today = date.today().strftime("%Y%m%d") - -dir_metadata_today = 'metadata_from_nuccore_{}'.format(today) -dir_fasta_and_yaml_today = 'fasta_and_yaml_{}'.format(today) +num_ids_for_request = 100 +dir_metadata = 'metadata_from_nuccore' +dir_fasta_and_yaml = 'fasta_and_yaml' dir_dict_ontology_standardization = 'dict_ontology_standardization/' - path_ncbi_virus_accession = 'sequences.acc' -# Take all the ids -id_set = set() +def chunks(lst, n): + for i in range(0, len(lst), n): + yield lst[i:i + n] -term_list = ['SARS-CoV-2', 'SARS-CoV2', 'SARS CoV2', 'SARSCoV2', 'txid2697049[Organism]'] -for term in term_list: - tmp_list = Entrez.read( - Entrez.esearch(db='nuccore', term=term, idtype='acc', retmax='10000') - )['IdList'] +if not os.path.exists(dir_metadata): + os.makedirs(dir_metadata) - # Remove mRNAs, ncRNAs, Proteins, and predicted models (more information here: https://en.wikipedia.org/wiki/RefSeq) - tmp_list = [x for x in tmp_list if x[:2] not in ['NM', 'NR', 'NP', 'XM', 'XR', 'XP', 'WP']] + # Take all the ids + id_set = set() - # Remove the version in the id - tmp_list = [x.split('.')[0] for x in tmp_list] - - print(term, len(tmp_list)) + term_list = ['SARS-CoV-2', 'SARS-CoV2', 'SARS CoV2', 'SARSCoV2', 'txid2697049[Organism]'] + for term in term_list: + tmp_list = Entrez.read( + Entrez.esearch(db='nuccore', term=term, idtype='acc', retmax='10000') + )['IdList'] - id_set.update([x.split('.')[0] for x in tmp_list]) + # Remove mRNAs, ncRNAs, Proteins, and predicted models (more information here: https://en.wikipedia.org/wiki/RefSeq) + tmp_list = [x for x in tmp_list if x[:2] not in ['NM', 'NR', 'NP', 'XM', 'XR', 'XP', 'WP']] -print(term_list, len(id_set)) + # Remove the version in the id + tmp_list = [x.split('.')[0] for x in tmp_list] -with open(path_ncbi_virus_accession) as f: - tmp_list = [line.strip('\n') for line in f] + print(term, len(tmp_list)) + tmp_list=tmp_list + # tmp_list = tmp_list[0:2] # restricting to small run -print('NCBI Virus', len(tmp_list)) -id_set.update(tmp_list) + id_set.update([x.split('.')[0] for x in tmp_list]) -print(term_list + ['NCBI Virus'], len(id_set)) + print(term_list, len(id_set)) + + with open(path_ncbi_virus_accession) as f: + tmp_list = [line.strip('\n') for line in f] + + print('NCBI Virus', len(tmp_list)) + id_set.update(tmp_list) + + print(term_list + ['NCBI Virus'], len(id_set)) -def chunks(lst, n): - for i in range(0, len(lst), n): - yield lst[i:i + n] - -num_ids_for_request = 100 -if not os.path.exists(dir_metadata_today): - os.makedirs(dir_metadata_today) - for i, id_x_list in enumerate(chunks(list(id_set), num_ids_for_request)): - path_metadata_xxx_xml = os.path.join(dir_metadata_today, 'metadata_{}.xml'.format(i)) + path_metadata_xxx_xml = os.path.join(dir_metadata, 'metadata_{}.xml'.format(i)) print('Requesting {} ids --> {}'.format(len(id_x_list), path_metadata_xxx_xml)) with open(path_metadata_xxx_xml, 'w') as fw: @@ -61,7 +61,7 @@ if not os.path.exists(dir_metadata_today): Entrez.efetch(db='nuccore', id=id_x_list, retmode='xml').read() ) - + term_to_uri_dict = {} for path_dict_xxx_csv in [os.path.join(dir_dict_ontology_standardization, name_xxx_csv) for name_xxx_csv in os.listdir(dir_dict_ontology_standardization) if name_xxx_csv.endswith('.csv')]: @@ -72,144 +72,181 @@ for path_dict_xxx_csv in [os.path.join(dir_dict_ontology_standardization, name_x if len(line.split(',')) > 2: term, uri = line.strip('\n').split('",') term = term.strip('"') - else: + else: term, uri = line.strip('\n').split(',') term_to_uri_dict[term] = uri species_to_taxid_dict = { - 'Homo sapiens': 9606 + 'Homo sapiens': 'http://purl.obolibrary.org/obo/NCBITaxon_9606' } -if not os.path.exists(dir_fasta_and_yaml_today): - os.makedirs(dir_fasta_and_yaml_today) +if not os.path.exists(dir_fasta_and_yaml): + os.makedirs(dir_fasta_and_yaml) - for path_metadata_xxx_xml in [os.path.join(dir_metadata_today, name_metadata_xxx_xml) for name_metadata_xxx_xml in os.listdir(dir_metadata_today) if name_metadata_xxx_xml.endswith('.xml')]: - tree = ET.parse(path_metadata_xxx_xml) - GBSet = tree.getroot() +missing_value_list = [] + +for path_metadata_xxx_xml in [os.path.join(dir_metadata, name_metadata_xxx_xml) for name_metadata_xxx_xml in os.listdir(dir_metadata) if name_metadata_xxx_xml.endswith('.xml')]: + tree = ET.parse(path_metadata_xxx_xml) + GBSet = tree.getroot() + + for GBSeq in GBSet: + accession_version = GBSeq.find('GBSeq_accession-version').text + + GBSeq_sequence = GBSeq.find('GBSeq_sequence') + if GBSeq_sequence is None: + print(accession_version, ' - sequence not found') + continue + + + # A general default-empty yaml could be read from the definitive one + info_for_yaml_dict = { + 'id': 'placeholder', + 'host': {}, + 'sample': {}, + 'virus': {}, + 'technology': {}, + 'submitter': {} + } + + info_for_yaml_dict['sample']['sample_id'] = accession_version + info_for_yaml_dict['sample']['source_database_accession'] = accession_version + info_for_yaml_dict['submitter']['authors'] = ';'.join([x.text for x in GBSeq.iter('GBAuthor')]) + + + GBSeq_comment = GBSeq.find('GBSeq_comment') + if GBSeq_comment is not None and 'Assembly-Data' in GBSeq_comment.text: + GBSeq_comment_text = GBSeq_comment.text.split('##Assembly-Data-START## ; ')[1].split(' ; ##Assembly-Data-END##')[0] + + for info_to_check, field_in_yaml in zip( + ['Assembly Method', 'Coverage', 'Sequencing Technology'], + ['sequence_assembly_method', 'sequencing_coverage', 'sample_sequencing_technology'] + ): + if info_to_check in GBSeq_comment_text: + tech_info_to_parse = GBSeq_comment_text.split('{} :: '.format(info_to_check))[1].split(' ;')[0] + + if field_in_yaml == 'sequencing_coverage': + # A regular expression would be better! + try: + info_for_yaml_dict['technology'][field_in_yaml] = float( + tech_info_to_parse.strip('(average)').strip("reads/nt").replace(',', '.').strip(' xX>')) + except ValueError: + print(accession_version, "Couldn't make sense of Coverage '%s'" % tech_info_to_parse) + pass + elif field_in_yaml == 'sample_sequencing_technology': + new_seq_tec_list = [] + for seq_tec in tech_info_to_parse.split(';'): + seq_tec = seq_tec.strip() + if seq_tec in term_to_uri_dict: + seq_tec = term_to_uri_dict[seq_tec] + else: + #print(accession_version, 'missing sample_sequencing_technology:', seq_tec) + missing_value_list.append('\t'.join([accession_version, 'sample_sequencing_technology', seq_tec])) - for GBSeq in GBSet: - accession_version = GBSeq.find('GBSeq_accession-version').text + new_seq_tec_list.append(seq_tec) - GBSeq_sequence = GBSeq.find('GBSeq_sequence') - if GBSeq_sequence is None: - print(accession_version, ' - sequence not found') - continue + for n, seq_tec in enumerate(new_seq_tec_list): + info_for_yaml_dict['technology'][field_in_yaml + ('' if n == 0 else str(n + 1))] = seq_tec + else: + info_for_yaml_dict['technology'][field_in_yaml] = tech_info_to_parse - # A general default-empty yaml could be read from the definitive one - info_for_yaml_dict = { - 'id': 'placeholder', - 'host': {}, - 'sample': {}, - 'virus': {}, - 'technology': {}, - 'submitter': {} - } - - - info_for_yaml_dict['sample']['sample_id'] = accession_version - info_for_yaml_dict['submitter']['authors'] = ';'.join([x.text for x in GBSeq.iter('GBAuthor')]) - - - GBSeq_comment = GBSeq.find('GBSeq_comment') - if GBSeq_comment is not None and 'Assembly-Data' in GBSeq_comment.text: - GBSeq_comment_text = GBSeq_comment.text.split('##Assembly-Data-START## ; ')[1].split(' ; ##Assembly-Data-END##')[0] - - for info_to_check, field_in_yaml in zip( - ['Assembly Method', 'Coverage', 'Sequencing Technology'], - ['sequence_assembly_method', 'sequencing_coverage', 'sample_sequencing_technology'] - ): - if info_to_check in GBSeq_comment_text: - tech_info_to_parse = GBSeq_comment_text.split('{} :: '.format(info_to_check))[1].split(' ;')[0] - - if field_in_yaml == 'sequencing_coverage': - # A regular expression would be better! - info_for_yaml_dict['technology'][field_in_yaml] = ';'.join( - [x.strip('(average)').strip("reads/nt").replace(',', '.').strip(' xX>') for x in tech_info_to_parse.split(';')] - ) - elif field_in_yaml == 'sample_sequencing_technology': - new_seq_tec_list = [] - for seq_tec in tech_info_to_parse.split(';'): - seq_tec = seq_tec.strip() - if seq_tec in term_to_uri_dict: - seq_tec = term_to_uri_dict[seq_tec] - else: - print(accession_version, 'missing technologies:', seq_tec) - - new_seq_tec_list.append(seq_tec) - - for n, seq_tec in enumerate(new_seq_tec_list): - info_for_yaml_dict['technology'][field_in_yaml + ('' if n == 0 else str(n + 1))] = seq_tec - else: - info_for_yaml_dict['technology'][field_in_yaml] = tech_info_to_parse + #term_to_uri_dict - - #term_to_uri_dict + for GBFeature in GBSeq.iter('GBFeature'): + if GBFeature.find('GBFeature_key').text != 'source': + continue - for GBFeature in GBSeq.iter('GBFeature'): - if GBFeature.find('GBFeature_key').text != 'source': + for GBQualifier in GBFeature.iter('GBQualifier'): + GBQualifier_value = GBQualifier.find('GBQualifier_value') + if GBQualifier_value is None: continue + GBQualifier_value_text = GBQualifier_value.text - for GBQualifier in GBFeature.iter('GBQualifier'): - GBQualifier_value = GBQualifier.find('GBQualifier_value') - if GBQualifier_value is None: - continue - GBQualifier_value_text = GBQualifier_value.text - - GBQualifier_name_text = GBQualifier.find('GBQualifier_name').text + GBQualifier_name_text = GBQualifier.find('GBQualifier_name').text - if GBQualifier_name_text == 'host': - GBQualifier_value_text_list = GBQualifier_value_text.split('; ') + if GBQualifier_name_text == 'host': + GBQualifier_value_text_list = GBQualifier_value_text.split('; ') - info_for_yaml_dict['host']['host_common_name'] = GBQualifier_value_text_list[0] + #info_for_yaml_dict['host']['host_common_name'] = GBQualifier_value_text_list[0] # Removed - if GBQualifier_value_text_list[0] in species_to_taxid_dict: - info_for_yaml_dict['host']['host_species'] = species_to_taxid_dict[GBQualifier_value_text_list[0]] + if GBQualifier_value_text_list[0] in species_to_taxid_dict: + info_for_yaml_dict['host']['host_species'] = species_to_taxid_dict[GBQualifier_value_text_list[0]] - if len(GBQualifier_value_text_list) > 1: - if GBQualifier_value_text_list[1] in ['male', 'female']: - info_for_yaml_dict['host']['host_sex'] = GBQualifier_value_text_list[1] - else: - info_for_yaml_dict['host']['host_health_status'] = GBQualifier_value_text_list[1] - - if 'age' in GBQualifier_value_text: - info_for_yaml_dict['host']['host_age'] = int(GBQualifier_value_text_list[2].split('age ')[1]) - info_for_yaml_dict['host']['host_age_unit'] = 'year' - elif GBQualifier_name_text == 'collected_by': - if any([x in GBQualifier_value_text.lower() for x in ['institute', 'hospital', 'city', 'center']]): - info_for_yaml_dict['sample']['collecting_institution'] = GBQualifier_value_text + if len(GBQualifier_value_text_list) > 1: + if GBQualifier_value_text_list[1] in ['male', 'female']: + if GBQualifier_value_text_list[1]=='male': + info_for_yaml_dict['host']['host_sex'] = "http://purl.obolibrary.org/obo/PATO_0000384" + elif GBQualifier_value_text_list[1]=='female': + info_for_yaml_dict['host']['host_sex'] = "http://purl.obolibrary.org/obo/PATO_0000383" + elif GBQualifier_value_text_list[1] in term_to_uri_dict: + info_for_yaml_dict['host']['host_health_status'] = term_to_uri_dict[GBQualifier_value_text_list[1]] else: - info_for_yaml_dict['sample']['collector_name'] = GBQualifier_value_text - elif GBQualifier_name_text == 'isolation_source': - if GBQualifier_value_text in term_to_uri_dict: - info_for_yaml_dict['sample']['specimen_source'] = term_to_uri_dict[GBQualifier_value_text] + #print(accession_version, 'missing {}:'.format(GBQualifier_name_text), GBQualifier_value_text_list[1]) + missing_value_list.append('\t'.join([accession_version, GBQualifier_name_text, GBQualifier_value_text_list[1]])) + + if 'age' in GBQualifier_value_text: + info_for_yaml_dict['host']['host_age'] = int(GBQualifier_value_text_list[2].split('age ')[1]) + info_for_yaml_dict['host']['host_age_unit'] = 'year' + elif GBQualifier_name_text == 'collected_by': + if any([x in GBQualifier_value_text.lower() for x in ['institute', 'hospital', 'city', 'center']]): + info_for_yaml_dict['sample']['collecting_institution'] = GBQualifier_value_text + else: + info_for_yaml_dict['sample']['collector_name'] = GBQualifier_value_text + elif GBQualifier_name_text == 'isolation_source': + if GBQualifier_value_text.upper() in term_to_uri_dict: + GBQualifier_value_text = GBQualifier_value_text.upper() # For example, in case of 'usa: wa' + + if GBQualifier_value_text in term_to_uri_dict: + info_for_yaml_dict['sample']['specimen_source'] = term_to_uri_dict[GBQualifier_value_text] + else: + if GBQualifier_value_text in ['NP/OP swab', 'nasopharyngeal and oropharyngeal swab', 'nasopharyngeal/oropharyngeal swab', 'np/np swab', 'np/op']: + info_for_yaml_dict['sample']['specimen_source'] = term_to_uri_dict['nasopharyngeal swab'] + info_for_yaml_dict['sample']['specimen_source2'] = term_to_uri_dict['oropharyngeal swab'] + elif GBQualifier_value_text in ['nasopharyngeal swab/throat swab']: + info_for_yaml_dict['sample']['specimen_source'] = term_to_uri_dict['nasopharyngeal swab'] + info_for_yaml_dict['sample']['specimen_source2'] = term_to_uri_dict['throat swab'] + elif GBQualifier_value_text in ['nasopharyngeal aspirate/throat swab']: + info_for_yaml_dict['sample']['specimen_source'] = term_to_uri_dict['nasopharyngeal aspirate'] + info_for_yaml_dict['sample']['specimen_source2'] = term_to_uri_dict['throat swab'] else: - if GBQualifier_value_text in ['NP/OP swab', 'nasopharyngeal and oropharyngeal swab', 'nasopharyngeal/oropharyngeal swab', 'np/np swab', 'np/op']: - info_for_yaml_dict['sample']['specimen_source'] = term_to_uri_dict['nasopharyngeal swab'] - info_for_yaml_dict['sample']['specimen_source2'] = term_to_uri_dict['oropharyngeal swab'] - else: - print(accession_version, 'missing specimen_source:', GBQualifier_value_text) - elif GBQualifier_name_text == 'collection_date': - # TO_DO: which format we will use? - info_for_yaml_dict['sample']['collection_date'] = GBQualifier_value_text - elif GBQualifier_name_text in ['lat_lon', 'country']: - if GBQualifier_value_text in term_to_uri_dict: - GBQualifier_value_text = term_to_uri_dict[GBQualifier_value_text] - else: - print(accession_version, 'missing {}:'.format(GBQualifier_name_text), GBQualifier_value_text) - - info_for_yaml_dict['sample']['collection_location'] = GBQualifier_value_text - elif GBQualifier_name_text == 'note': - info_for_yaml_dict['sample']['additional_collection_information'] = GBQualifier_value_text - elif GBQualifier_name_text == 'isolate': - info_for_yaml_dict['virus']['virus_strain'] = GBQualifier_value_text - elif GBQualifier_name_text == 'db_xref': - info_for_yaml_dict['virus']['virus_species'] = int(GBQualifier_value_text.split('taxon:')[1]) - - with open(os.path.join(dir_fasta_and_yaml_today, '{}.fasta'.format(accession_version)), 'w') as fw: - fw.write('>{}\n{}'.format(accession_version, GBSeq_sequence.text.upper())) - - with open(os.path.join(dir_fasta_and_yaml_today, '{}.yaml'.format(accession_version)), 'w') as fw: - yaml.dump(info_for_yaml_dict, fw, default_flow_style=False) + #print(accession_version, 'missing specimen_source:', GBQualifier_value_text) + missing_value_list.append('\t'.join([accession_version, 'specimen_source', GBQualifier_value_text])) + elif GBQualifier_name_text == 'collection_date': + # TO_DO: which format we will use? + info_for_yaml_dict['sample']['collection_date'] = GBQualifier_value_text + elif GBQualifier_name_text in ['lat_lon', 'country']: + if GBQualifier_value_text == 'Hong Kong': + GBQualifier_value_text = 'China: Hong Kong' + + + if GBQualifier_value_text in term_to_uri_dict: + GBQualifier_value_text = term_to_uri_dict[GBQualifier_value_text] + else: + #print(accession_version, 'missing {}:'.format(GBQualifier_name_text), GBQualifier_value_text) + missing_value_list.append('\t'.join([accession_version, GBQualifier_name_text, GBQualifier_value_text])) + + info_for_yaml_dict['sample']['collection_location'] = GBQualifier_value_text + elif GBQualifier_name_text == 'note': + info_for_yaml_dict['sample']['additional_collection_information'] = GBQualifier_value_text + elif GBQualifier_name_text == 'isolate': + info_for_yaml_dict['virus']['virus_strain'] = GBQualifier_value_text + elif GBQualifier_name_text == 'db_xref': + info_for_yaml_dict['virus']['virus_species'] = "http://purl.obolibrary.org/obo/NCBITaxon_"+GBQualifier_value_text.split('taxon:')[1] + + + # Remove technology key if empty! + if (info_for_yaml_dict['technology']=={}): + del info_for_yaml_dict['technology'] + + with open(os.path.join(dir_fasta_and_yaml, '{}.fasta'.format(accession_version)), 'w') as fw: + fw.write('>{}\n{}'.format(accession_version, GBSeq_sequence.text.upper())) + + with open(os.path.join(dir_fasta_and_yaml, '{}.yaml'.format(accession_version)), 'w') as fw: + json.dump(info_for_yaml_dict, fw, indent=2) + + +if len(missing_value_list) > 0: + with open('missing_terms.tsv', 'w') as fw: + fw.write('\n'.join(missing_value_list)) |