aboutsummaryrefslogtreecommitdiff
path: root/scripts/download_genbank_data/from_genbank_to_fasta_and_yaml.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/download_genbank_data/from_genbank_to_fasta_and_yaml.py')
-rwxr-xr-xscripts/download_genbank_data/from_genbank_to_fasta_and_yaml.py82
1 files changed, 30 insertions, 52 deletions
diff --git a/scripts/download_genbank_data/from_genbank_to_fasta_and_yaml.py b/scripts/download_genbank_data/from_genbank_to_fasta_and_yaml.py
index 52aee4e..9b8fedc 100755
--- a/scripts/download_genbank_data/from_genbank_to_fasta_and_yaml.py
+++ b/scripts/download_genbank_data/from_genbank_to_fasta_and_yaml.py
@@ -16,11 +16,15 @@ import xml.etree.ElementTree as ET
import json
import os
import requests
-import sys
from datetime import date
from dateutil.parser import parse
+import sys
+sys.path.append('../')
+from utils import is_integer, chunks, check_and_get_ontology_dictionaries
+
+
num_ids_for_request = 100
dir_metadata = 'metadata_from_nuccore'
@@ -30,16 +34,9 @@ dir_dict_ontology_standardization = args.dict_ontology
today_date = date.today().strftime("%Y.%m.%d")
path_ncbi_virus_accession = 'sequences.{}.acc'.format(today_date)
-def is_integer(string_to_check):
- try:
- int(string_to_check)
- return True
- except ValueError:
- return False
-def chunks(lst, n):
- for i in range(0, len(lst), n):
- yield lst[i:i + n]
+field_to_term_to_uri_dict = check_and_get_ontology_dictionaries(dir_dict_ontology_standardization)
+
if os.path.exists(dir_metadata):
print("The directory '{}' already exists.".format(dir_metadata))
@@ -123,29 +120,10 @@ if not os.path.exists(dir_metadata):
)
-term_to_uri_dict = {}
-
-for path_dict_xxx_csv in [os.path.join(dir_dict_ontology_standardization, name_xxx_csv) for name_xxx_csv in os.listdir(dir_dict_ontology_standardization) if name_xxx_csv.endswith('.csv')]:
- print('Read {}'.format(path_dict_xxx_csv))
-
- with open(path_dict_xxx_csv) as f:
- for line in f:
- if len(line.split(',')) > 2:
- term, uri = line.strip('\n').split('",')
- else:
- term, uri = line.strip('\n').split(',')
-
- term = term.strip('"')
-
- if term in term_to_uri_dict:
- print('Warning: in the dictionaries there are more entries for the same term ({}).'.format(term))
- continue
-
- term_to_uri_dict[term] = uri
-
if not os.path.exists(dir_fasta_and_yaml):
os.makedirs(dir_fasta_and_yaml)
+
min_len_to_count = 15000
num_seq_with_len_ge_X_bp = 0
@@ -166,7 +144,7 @@ for path_metadata_xxx_xml in [os.path.join(dir_metadata, name_metadata_xxx_xml)
continue
try:
- #print(path_metadata_xxx_xml, accession_version)
+ # print(path_metadata_xxx_xml, accession_version)
# A general default-empty yaml could be read from the definitive one
info_for_yaml_dict = {
@@ -230,8 +208,8 @@ for path_metadata_xxx_xml in [os.path.join(dir_metadata, name_metadata_xxx_xml)
new_seq_tec_list = []
for seq_tec in tech_info_to_parse.split(';'):
seq_tec = seq_tec.strip()
- if seq_tec in term_to_uri_dict:
- seq_tec = term_to_uri_dict[seq_tec]
+ if seq_tec in field_to_term_to_uri_dict['ncbi_sequencing_technology']:
+ seq_tec = field_to_term_to_uri_dict['ncbi_sequencing_technology'][seq_tec]
new_seq_tec_list.append(seq_tec)
else:
missing_value_list.append('\t'.join([accession_version, 'sample_sequencing_technology', seq_tec]))
@@ -256,17 +234,17 @@ for path_metadata_xxx_xml in [os.path.join(dir_metadata, name_metadata_xxx_xml)
if GBQualifier_name_text == 'host':
GBQualifier_value_text = GBQualifier_value_text.split(';')[0] # For case like Homo sapiens;sex:female
- if GBQualifier_value_text in term_to_uri_dict:
+ if GBQualifier_value_text in field_to_term_to_uri_dict['ncbi_host_species']:
# Cases like 'Felis catus; Domestic Shorthair'
- info_for_yaml_dict['host']['host_species'] = term_to_uri_dict[GBQualifier_value_text]
+ info_for_yaml_dict['host']['host_species'] = field_to_term_to_uri_dict['ncbi_host_species'][GBQualifier_value_text]
else:
GBQualifier_value_text_list = GBQualifier_value_text.split('; ')
- if GBQualifier_value_text_list[0] in term_to_uri_dict:
- info_for_yaml_dict['host']['host_species'] = term_to_uri_dict[GBQualifier_value_text_list[0]]
+ if GBQualifier_value_text_list[0] in field_to_term_to_uri_dict['ncbi_host_species']:
+ info_for_yaml_dict['host']['host_species'] = field_to_term_to_uri_dict['ncbi_host_species'][GBQualifier_value_text_list[0]]
elif GBQualifier_value_text_list[0] and ('MT215193' in accession_version or 'MT270814' in accession_version):
# Information checked manually from NCBI Virus
- info_for_yaml_dict['host']['host_species'] = term_to_uri_dict['Canis lupus familiaris']
+ info_for_yaml_dict['host']['host_species'] = field_to_term_to_uri_dict['ncbi_host_species']['Canis lupus familiaris']
else:
missing_value_list.append('\t'.join([accession_version, 'host_species', GBQualifier_value_text_list[0]]))
@@ -295,8 +273,8 @@ for path_metadata_xxx_xml in [os.path.join(dir_metadata, name_metadata_xxx_xml)
if host_sex in ['male', 'female']:
info_for_yaml_dict['host']['host_sex'] = "http://purl.obolibrary.org/obo/PATO_0000384" if host_sex == 'male' else "http://purl.obolibrary.org/obo/PATO_0000383"
- elif GBQualifier_value_text_list[1] in term_to_uri_dict:
- info_for_yaml_dict['host']['host_health_status'] = term_to_uri_dict[GBQualifier_value_text_list[1]]
+ elif GBQualifier_value_text_list[1] in field_to_term_to_uri_dict['ncbi_host_health_status']:
+ info_for_yaml_dict['host']['host_health_status'] = field_to_term_to_uri_dict['ncbi_host_health_status'][GBQualifier_value_text_list[1]]
else:
missing_value_list.append('\t'.join([accession_version, 'host_sex or host_health_status', GBQualifier_value_text_list[1]]))
@@ -318,25 +296,25 @@ for path_metadata_xxx_xml in [os.path.join(dir_metadata, name_metadata_xxx_xml)
else:
info_for_yaml_dict['sample']['collector_name'] = GBQualifier_value_text
elif GBQualifier_name_text == 'isolation_source':
- if GBQualifier_value_text.upper() in term_to_uri_dict:
- GBQualifier_value_text = GBQualifier_value_text.upper() # For example, in case of 'usa: wa'
+ if GBQualifier_value_text.upper() in field_to_term_to_uri_dict['ncbi_speciesman_source']:
+ GBQualifier_value_text = GBQualifier_value_text.upper() # For example, in case of 'usa: wa'
# Little cleaning
GBQualifier_value_text = GBQualifier_value_text.strip("/'")
- if GBQualifier_value_text in term_to_uri_dict:
- info_for_yaml_dict['sample']['specimen_source'] = [term_to_uri_dict[GBQualifier_value_text]]
+ if GBQualifier_value_text in field_to_term_to_uri_dict['ncbi_speciesman_source']:
+ info_for_yaml_dict['sample']['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source'][GBQualifier_value_text]]
else:
if GBQualifier_value_text.lower() in ['np/op', 'np/op swab', 'np/np swab', 'nasopharyngeal and oropharyngeal swab', 'nasopharyngeal/oropharyngeal swab', 'combined nasopharyngeal and oropharyngeal swab', 'naso and/or oropharyngeal swab']:
- info_for_yaml_dict['sample']['specimen_source'] = [term_to_uri_dict['nasopharyngeal swab'], term_to_uri_dict['oropharyngeal swab']]
+ info_for_yaml_dict['sample']['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['oropharyngeal swab']]
elif GBQualifier_value_text.lower() in ['nasopharyngeal swab/throat swab', 'nasopharyngeal/throat swab', 'nasopharyngeal swab and throat swab', 'nasal swab and throat swab', 'nasopharyngeal aspirate/throat swab', 'Nasopharyngeal/Throat']:
- info_for_yaml_dict['sample']['specimen_source'] = [term_to_uri_dict['nasopharyngeal swab'], term_to_uri_dict['throat swab']]
+ info_for_yaml_dict['sample']['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
elif GBQualifier_value_text.lower() in ['nasopharyngeal aspirate & throat swab', 'nasopharyngeal aspirate and throat swab']:
- info_for_yaml_dict['sample']['specimen_source'] = [term_to_uri_dict['nasopharyngeal aspirate'], term_to_uri_dict['throat swab']]
+ info_for_yaml_dict['sample']['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal aspirate'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
elif GBQualifier_value_text.lower() in ['nasal swab and throat swab']:
- info_for_yaml_dict['sample']['specimen_source'] = [term_to_uri_dict['nasal swab'], term_to_uri_dict['throat swab']]
+ info_for_yaml_dict['sample']['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
elif GBQualifier_value_text.lower() in ['nasal-swab and oro-pharyngeal swab']:
- info_for_yaml_dict['sample']['specimen_source'] = [term_to_uri_dict['nasal swab'], term_to_uri_dict['oropharyngeal swab']]
+ info_for_yaml_dict['sample']['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['oropharyngeal swab']]
else:
missing_value_list.append('\t'.join([accession_version, 'specimen_source', GBQualifier_value_text]))
elif GBQualifier_name_text == 'collection_date':
@@ -371,8 +349,8 @@ for path_metadata_xxx_xml in [os.path.join(dir_metadata, name_metadata_xxx_xml)
if GBQualifier_name_text == 'country' and ': ' in GBQualifier_value_text:
GBQualifier_value_text = GBQualifier_value_text.replace(': ', ':')
- if GBQualifier_value_text in term_to_uri_dict:
- info_for_yaml_dict['sample']['collection_location'] = term_to_uri_dict[GBQualifier_value_text]
+ if GBQualifier_value_text in field_to_term_to_uri_dict['ncbi_countries']:
+ info_for_yaml_dict['sample']['collection_location'] = field_to_term_to_uri_dict['ncbi_countries'][GBQualifier_value_text]
else:
missing_value_list.append('\t'.join([accession_version, GBQualifier_name_text, GBQualifier_value_text]))
elif GBQualifier_name_text == 'note':
@@ -387,7 +365,7 @@ for path_metadata_xxx_xml in [os.path.join(dir_metadata, name_metadata_xxx_xml)
# Check if mandatory fields are missing
if 'sample_sequencing_technology' not in info_for_yaml_dict['technology']:
- #print(accession_version, ' - technology not found')
+ # print(accession_version, ' - technology not found')
if accession_version not in not_created_accession_dict:
not_created_accession_dict[accession_version] = []
not_created_accession_dict[accession_version].append('sample_sequencing_technology not found')