# Genbank XML parser
from collections import namedtuple
import dateutil
from dateutil.parser import parse as dateparse
from dateutil.tz import gettz
import os
import re
import sys
import types
import xml.etree.ElementTree as ET
class GBError(Exception):
pass
"""
Example of an output JSON:
{
"id": "placeholder",
"host": {
"host_species": "http://purl.obolibrary.org/obo/NCBITaxon_9606"
},
"sample": {
"sample_id": "MT890462.1",
"source_database_accession": [
"http://identifiers.org/insdc/MT890462.1#sequence"
],
"collection_location": "http://www.wikidata.org/entity/Q649",
"collection_date": "2020-04-17",
"collecting_institution": "N.A.Kovtun Clinical Hospital 1 of Departament of President Affairs"
},
"virus": {
"virus_strain": "SARS-CoV-2/human/RUS/20200417_10/2020",
"virus_species": "http://purl.obolibrary.org/obo/NCBITaxon_2697049"
},
"technology": {
"assembly_method": "http://purl.obolibrary.org/obo/GENEPIO_0001628",
"alignment_protocol": "bowtie2 v. 2.3.4",
"sample_sequencing_technology": [
"http://purl.obolibrary.org/obo/OBI_0000759"
]
},
"submitter": {
"authors": [
"Blagodatskikh,K.A."
],
"submitter_name": [
"R&D"
],
"submitter_address": "Pirogov Russian National Research Medical University, Ostrovityanova 1, Moscow 117997, Russia"
}
}
"""
def get_metadata(id, gbseq):
host = types.SimpleNamespace()
sample = types.SimpleNamespace()
submitter = types.SimpleNamespace()
warnings = []
def warn(msg):
print(f"WARNING: {msg}",file=sys.stderr)
warnings.append(msg)
host.host_species = "http://purl.obolibrary.org/obo/NCBITaxon_9606"
sample.sample_id = id
sample.database = "https://www.ncbi.nlm.nih.gov/genbank/"
sample.source_database_accession = f"http://identifiers.org/insdc/{id}#sequence"
#
# country
# USA: Cruise_Ship_1, California
#
sample.collection_location = "FIXME"
# --- Handling dates ---
# 29-JUL-2020
n = gbseq.find("./GBSeq_create-date")
creation_date = dateparse(n.text).date()
# 30-JUL-2020
n = gbseq.find("./GBSeq_update-date")
update_date = dateparse(n.text).date()
#
# collection_date
# 2020-04-01
#
n = gbseq.find(".//GBQualifier/GBQualifier_name/[.='collection_date']/../GBQualifier_value")
try:
date = dateparse(n.text).date()
sample.collection_date = str(date)
except dateutil.parser._parser.ParserError as e:
warn(str(e))
sample.collection_date = str(creation_date)
except AttributeError:
warn("Missing collection_date - used creation_date instead")
sample.collection_date = str(creation_date)
info = {
'id': 'placeholder',
'update_date': str(update_date),
'host': host,
'sample': sample,
#'virus': virus,
#'technology': technology,
'submitter': submitter,
'warnings': warnings,
}
print(info)
return True,info
def get_sequence(id, gbseq):
seq = None
count = 0
for gbseq_sequence in gbseq.findall('./GBSeq_sequence'):
count += 1
if count > 1:
raise GBError(f"Expected one sequence for {id}")
seq = gbseq_sequence.text.upper()
print(f"SEQ: size={len(seq)}",seq[0:30])
if len(seq) < 20_000:
raise GBError(f"Sequence too short")
return seq
# ---- BELOW IS JUST FOR REFERENCE ----
min_len_to_count = 15000
num_seq_with_len_ge_X_bp = 0
missing_value_list = []
not_created_accession_dict = {}
accession_with_errors_list = []
if None:
tree = ET.parse(path_metadata_xxx_xml)
GBSet = tree.getroot()
for GBSeq in GBSet:
accession_version = GBSeq.find('GBSeq_accession-version').text
try:
info = {
'id': 'placeholder',
'host': {},
'sample': {},
'virus': {},
'technology': {},
'submitter': {}
}
sample['sample_id'] = accession_version
sample['source_database_accession'] = ["http://identifiers.org/insdc/"+accession_version+"#sequence"] #accession is turned into resolvable URL/URI now
# submitter info
GBSeq_references = GBSeq.find('GBSeq_references')
if GBSeq_references is not None:
author_list = ["{}".format(x.text) for x in GBSeq_references.iter('GBAuthor')]
if len(author_list) > 0:
submitter['authors'] = author_list
GBReference = GBSeq_references.find('GBReference')
if GBReference is not None:
GBReference_journal = GBReference.find('GBReference_journal')
if GBReference_journal is not None and GBReference_journal.text != 'Unpublished':
if 'Submitted' in GBReference_journal.text:
submitter['submitter_name'] = ["{}".format(GBReference_journal.text.split(') ')[1].split(',')[0].strip())]
submitter['submitter_address'] = ','.join(GBReference_journal.text.split(') ')[1].split(',')[1:]).strip()
else:
submitter['additional_submitter_information'] = GBReference_journal.text
# This script download and prepare data and metadata for assemblies samples
technology['assembly_method'] = 'http://purl.obolibrary.org/obo/GENEPIO_0001628'
GBSeq_comment = GBSeq.find('GBSeq_comment')
if GBSeq_comment is not None and 'Assembly-Data' in GBSeq_comment.text:
prefix_split_string = '##Genome-Assembly' if GBSeq_comment.text.startswith('##Genome-') else '##Assembly'
GBSeq_comment_text = GBSeq_comment.text.split(
'{}-Data-START## ; '.format(prefix_split_string)
)[1].split(' ; {}-Data-END##'.format(prefix_split_string))[0]
for info_to_check, field_in_yaml in zip(
['Assembly Method', 'Coverage', 'Sequencing Technology'],
['alignment_protocol', 'sequencing_coverage', 'sample_sequencing_technology']
):
if info_to_check in GBSeq_comment_text:
tech_info_to_parse = GBSeq_comment_text.split('{} :: '.format(info_to_check))[1].split(' ;')[0]
if field_in_yaml == 'sequencing_coverage':
# A regular expression would be better!
try:
technology[field_in_yaml] = [
float(tech_info_to_parse.replace('(average)', '').replace("reads/nt", '').
replace('(average for 6 sequences)', '').replace(',', '.').strip(' xX>'))
]
except ValueError:
print(accession_version, "Couldn't make sense of Coverage '%s'" % tech_info_to_parse)
pass
elif field_in_yaml == 'sample_sequencing_technology':
new_seq_tec_list = []
for seq_tec in tech_info_to_parse.split(';'):
seq_tec = seq_tec.strip()
if seq_tec in field_to_term_to_uri_dict['ncbi_sequencing_technology']:
seq_tec = field_to_term_to_uri_dict['ncbi_sequencing_technology'][seq_tec]
new_seq_tec_list.append(seq_tec)
else:
missing_value_list.append('\t'.join([accession_version, 'sample_sequencing_technology', seq_tec]))
if len(new_seq_tec_list) > 0:
technology['sample_sequencing_technology'] = [x for x in new_seq_tec_list]
else:
technology[field_in_yaml] = tech_info_to_parse
for GBFeature in GBSeq.iter('GBFeature'):
if GBFeature.find('GBFeature_key').text != 'source':
continue
for GBQualifier in GBFeature.iter('GBQualifier'):
GBQualifier_value = GBQualifier.find('GBQualifier_value')
if GBQualifier_value is None:
continue
GBQualifier_value_text = GBQualifier_value.text
GBQualifier_name_text = GBQualifier.find('GBQualifier_name').text
if GBQualifier_name_text == 'host':
GBQualifier_value_text = GBQualifier_value_text.split(';')[0] # For case like Homo sapiens;sex:female
if GBQualifier_value_text in field_to_term_to_uri_dict['ncbi_host_species']:
# Cases like 'Felis catus; Domestic Shorthair'
host['host_species'] = field_to_term_to_uri_dict['ncbi_host_species'][GBQualifier_value_text]
else:
GBQualifier_value_text_list = GBQualifier_value_text.split('; ')
if GBQualifier_value_text_list[0] in field_to_term_to_uri_dict['ncbi_host_species']:
host['host_species'] = field_to_term_to_uri_dict['ncbi_host_species'][GBQualifier_value_text_list[0]]
elif GBQualifier_value_text_list[0] and ('MT215193' in accession_version or 'MT270814' in accession_version):
# Information checked manually from NCBI Virus
host['host_species'] = field_to_term_to_uri_dict['ncbi_host_species']['Canis lupus familiaris']
else:
missing_value_list.append('\t'.join([accession_version, 'host_species', GBQualifier_value_text_list[0]]))
# Possible cases:
# - Homo sapiens --> ['Homo sapiens']
# - Homo sapiens; female --> ['Homo sapiens', 'female']
# - Homo sapiens; female 63 --> ['Homo sapiens', 'female 63']
# - Homo sapiens; female; age 40 --> ['Homo sapiens', 'female', 'age 40']
# - Homo sapiens; gender: F; age: 61 --> ['Homo sapiens', 'gender: F', 'age: 61']
# - Homo sapiens; gender: M; age: 68 --> ['Homo sapiens', 'gender: M', 'age: 68']
# - Homo sapiens; hospitalized patient --> ['Homo sapiens', 'hospitalized patient']
# - Homo sapiens; male --> ['Homo sapiens', 'male']
# - Homo sapiens; male; 63 --> ['Homo sapiens', 'male', '63']
# - Homo sapiens; male; age 29 --> ['Homo sapiens', 'male', 'age 29']
# - Homo sapiens; symptomatic --> ['Homo sapiens', 'symptomatic']
if len(GBQualifier_value_text_list) > 1:
host_sex = ''
if 'female' in GBQualifier_value_text_list[1]:
host_sex = 'female'
elif 'male' in GBQualifier_value_text_list[1]:
host_sex = 'male'
elif 'gender' in GBQualifier_value_text_list[1]:
host_sex_one_lecter = GBQualifier_value_text_list[1].split(':')[-1].strip()
if host_sex_one_lecter in ['F', 'M']:
host_sex = 'female' if host_sex_one_lecter == 'F' else 'male'
if host_sex in ['male', 'female']:
host['host_sex'] = "http://purl.obolibrary.org/obo/PATO_0000384" if host_sex == 'male' else "http://purl.obolibrary.org/obo/PATO_0000383"
elif GBQualifier_value_text_list[1] in field_to_term_to_uri_dict['ncbi_host_health_status']:
host['host_health_status'] = field_to_term_to_uri_dict['ncbi_host_health_status'][GBQualifier_value_text_list[1]]
else:
missing_value_list.append('\t'.join([accession_version, 'host_sex or host_health_status', GBQualifier_value_text_list[1]]))
# Host age
host_age = -1
if len(GBQualifier_value_text_list[1].split(' ')) > 1 and is_integer(GBQualifier_value_text_list[1].split(' ')[-1]):
host_age = int(GBQualifier_value_text_list[1].split(' ')[-1])
elif len(GBQualifier_value_text_list) > 2 and is_integer(GBQualifier_value_text_list[2].split(' ')[-1]):
host_age = int(GBQualifier_value_text_list[2].split(' ')[-1])
if host_age >= 0 and host_age < 110:
host['host_age'] = host_age
host['host_age_unit'] = 'http://purl.obolibrary.org/obo/UO_0000036'
elif len(GBQualifier_value_text_list) > 2:
missing_value_list.append('\t'.join([accession_version, 'host_age', GBQualifier_value_text_list[2]]))
elif GBQualifier_name_text == 'collected_by':
if any([x in GBQualifier_value_text.lower() for x in ['institute', 'hospital', 'city', 'center']]):
sample['collecting_institution'] = GBQualifier_value_text
else:
sample['collector_name'] = GBQualifier_value_text
elif GBQualifier_name_text == 'isolation_source':
if GBQualifier_value_text.upper() in field_to_term_to_uri_dict['ncbi_speciesman_source']:
GBQualifier_value_text = GBQualifier_value_text.upper() # For example, in case of 'usa: wa'
# Little cleaning
GBQualifier_value_text = GBQualifier_value_text.strip("/'")
if GBQualifier_value_text in field_to_term_to_uri_dict['ncbi_speciesman_source']:
sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source'][GBQualifier_value_text]]
else:
if GBQualifier_value_text.lower() in ['np/op', 'np-op', 'np/op swab', 'np/np swab', 'nasopharyngeal and oropharyngeal swab', 'nasopharyngeal/oropharyngeal swab', 'combined nasopharyngeal and oropharyngeal swab', 'naso and/or oropharyngeal swab']:
sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['oropharyngeal swab']]
elif GBQualifier_value_text.lower() in ['nasopharyngeal swab/throat swab', 'nasopharyngeal/throat swab', 'nasopharyngeal swab and throat swab', 'nasal swab and throat swab', 'nasopharyngeal aspirate/throat swab', 'Nasopharyngeal/Throat']:
sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
elif GBQualifier_value_text.lower() in ['nasopharyngeal aspirate & throat swab', 'nasopharyngeal aspirate and throat swab']:
sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasopharyngeal aspirate'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
elif GBQualifier_value_text.lower() in ['nasal swab and throat swab']:
sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['throat swab']]
elif GBQualifier_value_text.lower() in ['nasal-swab and oro-pharyngeal swab']:
sample['specimen_source'] = [field_to_term_to_uri_dict['ncbi_speciesman_source']['nasal swab'], field_to_term_to_uri_dict['ncbi_speciesman_source']['oropharyngeal swab']]
else:
missing_value_list.append('\t'.join([accession_version, 'specimen_source', GBQualifier_value_text]))
elif GBQualifier_name_text == 'collection_date':
# TO_DO: which format we will use?
date_to_write = GBQualifier_value_text
if len(GBQualifier_value_text.split('-')) == 1:
if int(GBQualifier_value_text) < 2020:
date_to_write = "{}-12-15".format(GBQualifier_value_text)
else:
date_to_write = "{}-01-15".format(GBQualifier_value_text)
if 'additional_collection_information' in sample:
sample['additional_collection_information'] += "; The 'collection_date' is estimated (the original date was: {})".format(GBQualifier_value_text)
else:
sample['additional_collection_information'] = "The 'collection_date' is estimated (the original date was: {})".format(GBQualifier_value_text)
elif len(GBQualifier_value_text.split('-')) == 2:
date_to_write = parse(GBQualifier_value_text).strftime('%Y-%m') + '-15'
if 'additional_collection_information' in sample:
sample['additional_collection_information'] += "; The 'collection_date' is estimated (the original date was: {})".format(GBQualifier_value_text)
else:
sample['additional_collection_information'] = "The 'collection_date' is estimated (the original date was: {})".format(GBQualifier_value_text)
elif len(GBQualifier_value_text.split('-')) == 3:
GBQualifier_value_text_list = GBQualifier_value_text.split('-')
if GBQualifier_value_text_list[1].isalpha():
date_to_write = parse(GBQualifier_value_text).strftime('%Y-%m-%d')
sample['collection_date'] = date_to_write
elif GBQualifier_name_text in ['lat_lon', 'country']:
if GBQualifier_name_text == 'country' and ': ' in GBQualifier_value_text:
GBQualifier_value_text = GBQualifier_value_text.replace(': ', ':')
if GBQualifier_value_text in field_to_term_to_uri_dict['ncbi_countries']:
sample['collection_location'] = field_to_term_to_uri_dict['ncbi_countries'][GBQualifier_value_text]
else:
missing_value_list.append('\t'.join([accession_version, GBQualifier_name_text, GBQualifier_value_text]))
elif GBQualifier_name_text == 'note':
if 'additional_collection_information' in sample:
sample['additional_collection_information'] += '; ' + GBQualifier_value_text
else:
sample['additional_collection_information'] = GBQualifier_value_text
elif GBQualifier_name_text == 'isolate':
virus['virus_strain'] = GBQualifier_value_text
elif GBQualifier_name_text == 'db_xref':
virus['virus_species'] = "http://purl.obolibrary.org/obo/NCBITaxon_"+GBQualifier_value_text.split('taxon:')[1]
# Check if mandatory fields are missing
if 'sample_sequencing_technology' not in technology:
# print(accession_version, ' - technology not found')
if accession_version not in not_created_accession_dict:
not_created_accession_dict[accession_version] = []
not_created_accession_dict[accession_version].append('sample_sequencing_technology not found')
if 'collection_location' not in sample:
if accession_version not in not_created_accession_dict:
not_created_accession_dict[accession_version] = []
not_created_accession_dict[accession_version].append('collection_location not found')
if 'collection_date' not in sample:
if accession_version not in not_created_accession_dict:
not_created_accession_dict[accession_version] = []
not_created_accession_dict[accession_version].append('collection_date not found')
else:
year, month, day = [int(x) for x in sample['collection_date'].split('-')]
collection_date_in_yaml = datetime(year, month, day)
if collection_date_in_yaml < min_acceptable_collection_date:
if accession_version not in not_created_accession_dict:
not_created_accession_dict[accession_version] = []
not_created_accession_dict[accession_version].append('collection_date too early')
if 'authors' not in submitter:
if accession_version not in not_created_accession_dict:
not_created_accession_dict[accession_version] = []
not_created_accession_dict[accession_version].append('authors not found')
if 'host_species' not in host:
if accession_version not in not_created_accession_dict:
not_created_accession_dict[accession_version] = []
not_created_accession_dict[accession_version].append('host_species not found')
if len(GBSeq_sequence.text) < min_len_to_count:
if accession_version not in not_created_accession_dict:
not_created_accession_dict[accession_version] = []
not_created_accession_dict[accession_version].append('sequence shorter than {} bp'.format(min_len_to_count))
if accession_version not in not_created_accession_dict:
num_seq_with_len_ge_X_bp += 1
# with open(os.path.join(dir_fasta_and_yaml, '{}.fasta'.format(accession_version)), 'w') as fw:
# fw.write('>{}\n{}'.format(accession_version, GBSeq_sequence.text.upper()))
with open(os.path.join(dir_fasta_and_yaml, '{}.yaml'.format(accession_version)), 'w') as fw:
json.dump(info, fw, indent=2)
except:
print("Unexpected error for the ID {}: {}".format(accession_version, sys.exc_info()[0]))
accession_with_errors_list.append(accession_version)
continue
if len(missing_value_list) > 0:
path_missing_terms_tsv = 'missing_terms.genbank.tsv'
print('Written missing terms in {}'.format(path_missing_terms_tsv))
with open(path_missing_terms_tsv, 'w') as fw:
fw.write('\n'.join(missing_value_list))
if len(accession_with_errors_list) > 0:
path_accession_with_errors_tsv = 'accession_with_errors.genbank.tsv'
print('Written the accession with errors in {}'.format(path_accession_with_errors_tsv))
with open(path_accession_with_errors_tsv, 'w') as fw:
fw.write('\n'.join(accession_with_errors_list))
if len(not_created_accession_dict) > 0:
path_not_created_accession_tsv = 'not_created_accession.genbank.tsv'
print('Written not created accession in {}'.format(path_not_created_accession_tsv))
with open(path_not_created_accession_tsv, 'w') as fw:
fw.write('\n'.join(['\t'.join([accession_version, ','.join(missing_info_list)]) for accession_version, missing_info_list in not_created_accession_dict.items()]))
print('Num. new sequences with length >= {} bp: {}'.format(min_len_to_count, num_seq_with_len_ge_X_bp))