Skip to content

Commit

Permalink
Merge pull request #8 from CamiloPosso/main
Browse files Browse the repository at this point in the history
Updated logic in FASTA aggregation, leading to smaller FASTA files. Release 1.0.
  • Loading branch information
CamiloPosso authored May 26, 2023
2 parents 8fc509c + 6b9ac76 commit 28e1219
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 60 deletions.
68 changes: 51 additions & 17 deletions Kaiko_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@


# @profile
def run_diamond_tally(diamond_output, ntops, ncbi_taxa_folder, mode, fout, pident, n_protein_cutoff):
nprot = '{:.5e}'.format(n_protein_cutoff)
def run_diamond_tally(diamond_output, n_strain_select, ncbi_taxa_folder, mode, fout, pident, n_protein_cutoff):
detailed_fout = fout.parent / (re.sub("_kaiko_prediction_.*.csv$", "", fout.name) + '_detailed.csv')
taxa_stats_path = Path(PureWindowsPath('Kaiko_volume/Kaiko_stationary_files/uniref100_stats.txt'))
taxa_stats_path = Path(PureWindowsPath('Kaiko_volume/Kaiko_stationary_files/uniref100_member_stats_with_lineage.txt'))
taxa_stats = pd.read_csv(taxa_stats_path, sep = '\t')

if mode=="member":
Expand Down Expand Up @@ -95,10 +94,12 @@ def run_diamond_tally(diamond_output, ntops, ncbi_taxa_folder, mode, fout, piden
else:
print("Loading |scan|protein|taxa| table" + detailed_fout.name + "\n")
detailed_output = pd.read_csv(detailed_fout)
detailed_output = detailed_output[['scan', 'uid', 'protein', 'member_taxa', 'common_taxa']]
detailed_output = detailed_output.merge(taxa_stats, left_on = taxa_col, right_on = 'taxid', how = 'left')

detailed_output = detailed_output[detailed_output['genus'] == detailed_output['genus']]
detailed_output = detailed_output[['scan', 'uid', 'protein', 'member_taxa', 'common_taxa']]
detailed_output = detailed_output.merge(taxa_stats, left_on = taxa_col, right_on = 'taxid', how = 'left')

detailed_output = detailed_output[detailed_output['n_protein'] < n_protein_cutoff]
# detailed_output = detailed_output[detailed_output['n_protein'] < n_protein_cutoff]
unique_pepseq_taxa = detailed_output.drop_duplicates(subset=['scan',taxa_col])
pepcount_taxid = unique_pepseq_taxa[taxa_col].value_counts()

Expand All @@ -112,24 +113,53 @@ def besthit(row):
for besthit in unique_pepseq_taxa.groupby(by='scan').apply(besthit):
besthits += besthit
besthits = pd.Series(besthits).value_counts()

pepcount_taxid = pepcount_taxid.to_frame('hits')
pepcount_taxid['taxid'] = pepcount_taxid.index
pepcount_taxid = pepcount_taxid.merge(taxa_stats[['taxid', 'tax_name', 'rank', 'species', 'n_protein']], left_on = 'taxid', right_on = 'taxid', how = 'left')

if ntops > 0:
_ntops = ntops
if n_strain_select > 0 & n_strain_select <= 5:
_n_strain_select = 5
elif n_strain_select > 5:
_n_strain_select = n_strain_select
else:
_ntops = besthits.shape[0]
_n_strain_select = -1

top_taxa = besthits.nlargest(_ntops, keep='all').to_frame(name='hits')
# top_taxa = besthits.nlargest(n_strain_select, keep='all').to_frame(name='hits')
top_taxa = besthits.to_frame(name='hits')
top_taxa['taxid'] = top_taxa.index

############################################################
# save top-rank taxa info
############################################################
print("Saving top-rank taxa info...")
# df = pd.concat([top_taxa, taxa_stats], join='inner', axis=1)
df = top_taxa.merge(taxa_stats[['taxid', 'tax_name', 'rank', 'n_protein']], left_on = 'taxid', right_on = 'taxid')
df = df[['taxid', 'tax_name', 'rank', 'hits', 'n_protein']]
df.index.name = 'taxid'
if fout: df.sort_values(by="hits", ascending=False).to_csv(fout, index = False)
df = top_taxa.merge(taxa_stats[['taxid', 'tax_name', 'rank', 'species', 'n_protein']], left_on = 'taxid', right_on = 'taxid', how = 'left')
df = df[['taxid', 'tax_name', 'species', 'rank', 'hits', 'n_protein']]
df['running_coverage'] = df['hits'].cumsum()/df['hits'].sum()
append = pd.DataFrame({'taxid' : [],
'tax_name' : [],
'species' : [],
'rank' : [],
'hits' : [],
'n_protein' : [],
'running_coverage' : [],
'notes' : []})
df['notes'] = 'Primary taxa identified by Kaiko. \'hits\' denotes tally2 hits'
pepcount_taxid = pepcount_taxid[pepcount_taxid['n_protein'] < n_protein_cutoff]
for index in range(len(df)):
if df.iloc[index].n_protein > n_protein_cutoff:
if n_strain_select == -1:
subcount = pepcount_taxid[pepcount_taxid['species'] == df['tax_name'][index]].copy()
else:
subcount = pepcount_taxid[pepcount_taxid['species'] == df['tax_name'][index]].copy().iloc[0:_n_strain_select]
subcount['running_coverage'] = df['running_coverage'][index]
subcount['notes'] = 'Strain of primary taxa ' + df['tax_name'][index] + ' below proteome size cutoff. \'hits\' denotes tally1 hits'
append = pd.concat([append, subcount])

df = pd.concat([df, append])
df = df.sort_values(by = ['running_coverage', 'notes'])
if fout: df.to_csv(fout, index = False)

def read_dmd(diamond_output):
dmd_colnames = ['scans','uniref_seq','pident','evalue','mismatch']
Expand Down Expand Up @@ -169,7 +199,10 @@ def collect_taxid(filtered_dmd):
# 9606 # Homo sapiens
# 412755 # marine sediment metagenome
# 408172 # marine metagenome
drop_taxids = set(['1','2','9606','412755','408172'])
# 9823 # Sus scrofa (pig)
drop_taxids = set(['1','2','9606','412755','408172', '9823'])
print("Dropping the following taxids\n")
print(drop_taxids)

_filtered_dmd = filtered_dmd[~((filtered_dmd.taxid=='')|(filtered_dmd.taxid.isin(drop_taxids)))].copy()
_filtered_dmd.taxid = _filtered_dmd.taxid.astype(int)
Expand Down Expand Up @@ -203,11 +236,12 @@ def get_taxa_members(member_tbl_file, unique_unirefs):

# prefix = "S1_NM0001_NMDC_MixedCommunities_R3_mgf"
# diamond_search_out = Path("Kaiko_volume/Kaiko_intermediate/" + prefix + "_diamond_search_output.dmd")
# kaiko_tally = Path("Kaiko_volume/Kaiko_intermediate/" + prefix + "_kaiko_prediction_top_taxa.csv")
# nprot = '{:.5e}'.format(int(69000))
# kaiko_tally = Path("Kaiko_volume/Kaiko_intermediate/" + prefix + "_kaiko_prediction" + f'_top_taxa_nprot_{nprot}_top_{5}_strains.csv')
# ncbi_taxa_folder = Path(PureWindowsPath("Kaiko_volume/Kaiko_stationary_files/ncbi_taxa").as_posix())

# run_diamond_tally(diamond_search_out,
# -1,
# 1,
# ncbi_taxa_folder,
# "member",
# kaiko_tally,
Expand Down
91 changes: 63 additions & 28 deletions Kaiko_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,45 @@


# @profile
def aggregate_fasta(ref_fasta, diamon_tally, fout, ntops, taxa_key, kingdom_list = []):
def aggregate_fasta(ref_fasta, diamon_tally, fout, coverage_target, top_strains, ncbi_taxa_folder, taxa_key, kingdom_list = []):
"""
python -u ExtractProteinSequenceByTaxID.py --ref_fasta uniref100.fasta.gz --taxafile Weintraub_kaiko_25p_UniRef100_toptaxa.csv --fout Weintraub_kaiko_25p_UniRef100_Bacteria_top100.fasta --ntops 100 -l Bacteria --key TaxID
"""
df = pd.read_csv(diamon_tally)
df, _ = rank_to_lineage(df)
# df, _ = rank_to_lineage(df)

rank_conditions = ~df['rank'].isin(EXCLUDED_RANKS)

if len(kingdom_list) > 0:
conditions = df.superkingdom.isin(kingdom_list)
conditions |= df.kingdom.isin(kingdom_list)

tdf = df[rank_conditions & conditions].nlargest(ntops, 'hits', keep="all")
# tdf = df[rank_conditions & conditions].nlargest(coverage_target, 'hits', keep="all")
tdf = df[rank_conditions & conditions]
else:
tdf = df[rank_conditions].nlargest(ntops, 'hits', keep="all")
# tdf = df[rank_conditions].nlargest(coverage_target, 'hits', keep="all")
tdf = df[rank_conditions]

coverage_target = min(tdf[tdf['running_coverage'] > coverage_target]['running_coverage'])
tdf = tdf[tdf['running_coverage'] <= coverage_target]

print(tdf.head(20))

taxids = set(tdf.taxid.tolist())
coverage_steps = [*set(tdf['running_coverage'].values)]
taxids = []

for fasta_addition in coverage_steps:
fasta_addition = tdf[tdf['running_coverage'] == fasta_addition]
primary_species = ['Primary' in str for str in fasta_addition['notes']]
secondary_strains = ['Strain' in str for str in fasta_addition['notes']]
if any(secondary_strains):
fasta_addition = fasta_addition[secondary_strains].nlargest(top_strains, 'hits')
else:
fasta_addition = fasta_addition[primary_species].nlargest(top_strains, 'hits')
taxids = taxids + [int(fasta_addition['taxid'].iloc[0])]

fout_proteome = fout.parent / (fout.stem + '_proteome.txt')
print("NCBI-TaxIDs:", taxids)
proteome = get_taxa_proteome(ncbi_taxa_folder / 'uniref100_member_taxa_tbl.csv', taxids, fout_proteome)

key_for_taxid = '{}='.format(taxa_key)
print("Key for parsing Tax IDs:", key_for_taxid)
Expand All @@ -35,6 +53,8 @@ def aggregate_fasta(ref_fasta, diamon_tally, fout, ntops, taxa_key, kingdom_list

num_seqs = 0
start_time = time.time()
proteome = pd.read_csv(fout_proteome, sep = "\t", header = None)
proteome = set(proteome[0].values)

with gzip.open(ref_fasta, 'rb') as file:
try:
Expand All @@ -45,10 +65,11 @@ def aggregate_fasta(ref_fasta, diamon_tally, fout, ntops, taxa_key, kingdom_list
if bline[0] == 62: # to find `>`
num_seqs += 1
line = bline.decode("utf-8")
uid = line.split('>')[1].split(' ')[0]
taxid = line.split(key_for_taxid)[1].split(' ')[0]
if taxid in ["", "N/A"]:
if uid in ["", "N/A"]:
is_selected = False
elif int(taxid) in taxids:
elif uid in proteome:
is_selected = True
ofile.write(line)
else:
Expand All @@ -57,13 +78,43 @@ def aggregate_fasta(ref_fasta, diamon_tally, fout, ntops, taxa_key, kingdom_list
if is_selected:
line = bline.decode("utf-8")
ofile.write(line)
if (num_seqs % 10000000) == 0:
if (num_seqs % 1000000) == 0:
print("{}M sequences has been parsed. {:.1f}min".format(num_seqs//1e6, (time.time()-start_time)/60))
except Exception as e:
print(line, e)

ofile.close()


def get_taxa_proteome(member_tbl_file, unique_taxids, fout_proteome):
chunksize = 1000000
unique_taxids = set([str(x) for x in unique_taxids])

print('Collecting protein names in ' + fout_proteome.name + '\n')
nchunks = 1
output = fout_proteome.open('w')
for chunk in pd.read_csv(member_tbl_file, chunksize=chunksize):
proteome_index = [any([x in set(member_taxa.split(':')) for x in unique_taxids]) for member_taxa in chunk['members']]
proteome = chunk[proteome_index]['uid'].values.tolist()
for protein in proteome:
output.write(f'{protein}\n')

nchunks = nchunks + 1
return proteome


def rank_to_lineage(df):
coverage = {}
for c in ['species','genus','family','order','class','phylum','kingdom','superkingdom']:
idx = df['rank']==c
df.loc[idx, c] = df.loc[idx, 'tax_name']
for c in ['species','genus','family','order','class','phylum','superkingdom']:
coverage[c] = 100*df[c].dropna().shape[0]/df.shape[0]
print("{}: {:.2f}%".format(c, coverage[c]))
return df, coverage

EXCLUDED_RANKS = ['family','order','class','phylum','kingdom','superkingdom']

# def write_proteins(database_file, taxa, fout):
# index_s_path = Path('Kaiko_volume/Kaiko_stationary_files/uniref100_index_s.txt')
# index_path = Path('Kaiko_volume/Kaiko_stationary_files/uniref100_index.txt')
Expand All @@ -90,7 +141,6 @@ def aggregate_fasta(ref_fasta, diamon_tally, fout, ntops, taxa_key, kingdom_list
# reading_protein = False
# else:
# line = line.decode("utf-8")


# def write_single_protein(database_file, pos, output_fasta):
# database_file.seek(pos)
Expand All @@ -107,35 +157,20 @@ def aggregate_fasta(ref_fasta, diamon_tally, fout, ntops, taxa_key, kingdom_list
# else:
# line = line.decode("utf-8")

def rank_to_lineage(df):
coverage = {}
for c in ['species','genus','family','order','class','phylum','kingdom','superkingdom']:
idx = df['rank']==c
df.loc[idx, c] = df.loc[idx, 'tax_name']
for c in ['species','genus','family','order','class','phylum','superkingdom']:
coverage[c] = 100*df[c].dropna().shape[0]/df.shape[0]
print("{}: {:.2f}%".format(c, coverage[c]))
return df, coverage

EXCLUDED_RANKS = ['family','order','class','phylum','kingdom','superkingdom']


# from pathlib import Path, PureWindowsPath

# prefix = "S1_NM0001_NMDC_MixedCommunities_R3_mgf"
# diamond_search_out = Path("Kaiko_volume/Kaiko_intermediate/" + prefix + "_diamond_search_output.dmd")
# kaiko_tally = Path("Kaiko_volume/Kaiko_intermediate/" + prefix + "_kaiko_prediction_top_taxa.csv")
# ncbi_taxa_folder = Path(PureWindowsPath("Kaiko_volume/Kaiko_stationary_files/ncbi_taxa").as_posix())
# nprot = '{:.5e}'.format(int(300000))
# kaiko_tally = Path("Kaiko_volume/Kaiko_intermediate/" + prefix + "_kaiko_prediction" + f'_top_taxa_nprot_{nprot}.csv')
# nprot = '{:.5e}'.format(int(150000))
# kaiko_tally = Path("Kaiko_volume/Kaiko_intermediate/" + prefix + "_kaiko_prediction" + f'_top_taxa_nprot_{nprot}_top_{5}_strains.csv')
# ref_fasta = Path(PureWindowsPath('Kaiko_volume/Kaiko_stationary_files/uniref100.fasta.gz').as_posix())
# kaiko_final_output = Path("Kaiko_volume/Kaiko_output/" + prefix + "_kaiko_output.fasta")



# aggregate_fasta(ref_fasta,
# kaiko_tally,
# kaiko_final_output,
# 5,
# 0.66,
# 'TaxID',
# [])
2 changes: 1 addition & 1 deletion Kaiko_parse_uniref.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def gather_taxa_stats_5(stats_output, ncbi_taxa_folder):
taxa_stats = pd.read_csv(stats_output, sep = '\t')
print(len(taxa_stats))
taxa_stats = taxa_stats.merge(taxa_table, left_on="taxid", right_on="tax_id", how="left")
taxa_stats = taxa_stats[['taxid', 'tax_name', 'rank', 'n_protein', 'n_AA']]
taxa_stats = taxa_stats[['taxid', 'tax_name', 'rank', 'n_protein', 'n_AA', 'species', 'genus', 'family', 'order', 'class', 'phylum', 'kingdom', 'superkingdom']]
taxa_stats.to_csv(stats_output, sep = '\t', index = False)


Expand Down
34 changes: 23 additions & 11 deletions Kaiko_pipeline_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import yaml
import os
import re
import argparse

from pathlib import Path, PureWindowsPath
from s3path import S3Path
Expand All @@ -11,18 +12,25 @@
from Kaiko_3 import run_diamond_tally
from Kaiko_2 import combine_denovo_output

## Parsing
parser = argparse.ArgumentParser()

parser.add_argument("-config", "--config", help="YAML file with parameters for pipeline, and input files path. Any found values in config replace the default values.")
args = parser.parse_args()


## Parsing
kaiko_defaults_path = Path('kaiko_defaults.yaml')
config = yaml.safe_load(kaiko_defaults_path.open())
user_config_path = Path('Kaiko_volume/config.yaml')

if user_config_path.exists():
config_user = yaml.safe_load(user_config_path.open())
assert args.config is not None, "Please provide a config file with the spectrum path (mgf files). Use flag --config to pass the path. See the tutorial for more information."
user_config_path = Path(args.config)
assert user_config_path.exists(), "File " + str(user_config_path.absolute()) + " does not exist."
config_user = yaml.safe_load(user_config_path.open())

for section in config_user.keys():
for param in config_user[section].keys():
config[section][param] = config_user[section][param]
## Overriding defaults if values found in user config.
for section in config_user.keys():
for param in config_user[section].keys():
config[section][param] = config_user[section][param]


## handling any backslashes with this. All final paths used here have forward slashes,
Expand Down Expand Up @@ -95,11 +103,12 @@
os.chdir(working_dir)

nprot = '{:.5e}'.format(int(config['diamond tally']['n_protein_cutoff']))
kaiko_tally = Path("Kaiko_volume/Kaiko_intermediate/" + prefix + "_kaiko_prediction" + f'_top_taxa_nprot_{nprot}.csv')
top_strains = str(config['taxa to fasta']['top_strains'])
kaiko_tally = Path("Kaiko_volume/Kaiko_intermediate/" + prefix + "_kaiko_prediction" + f'_top_taxa_nprot_{nprot}_top_{top_strains}_strains.csv')

# Step 4. Tallying the diamond results
run_diamond_tally(diamond_search_out,
int(config['diamond tally']['ntops']),
int(config['taxa to fasta']['top_strains']),
ncbi_taxa_folder,
config['diamond tally']['mode'],
kaiko_tally,
Expand All @@ -115,12 +124,15 @@
else:
kingdom_list = []

kaiko_final_output = Path("Kaiko_volume/Kaiko_output/" + prefix + "_kaiko_output.fasta")
coverage_target = str(config['taxa to fasta']['coverage_target'])
kaiko_final_output = Path("Kaiko_volume/Kaiko_output/" + prefix + f'_kaiko_fasta_coverage_{coverage_target}_nprot_{nprot}_top{top_strains}_strains.fasta')

aggregate_fasta(ref_fasta,
kaiko_tally,
kaiko_final_output,
int(config['taxa to fasta']['ntops']),
config['taxa to fasta']['coverage_target'],
int(config['taxa to fasta']['top_strains']),
ncbi_taxa_folder,
config['taxa to fasta']['taxa_key'],
kingdom_list)

Expand Down
6 changes: 3 additions & 3 deletions Parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
}


config['diamond tally'] = {'ntops' : -1,
'diamond_folder' : 'Kaiko_volume/Kaiko_stationary_files/diamond',
config['diamond tally'] = {'diamond_folder' : 'Kaiko_volume/Kaiko_stationary_files/diamond',
'ncbi_taxa_folder' : "Kaiko_volume/Kaiko_stationary_files/ncbi_taxa",
'mode' : 'member',
# 'fout' : 'Kaiko_volume/Kaiko_intermediate/kaiko_prediction_top_taxa.csv',
Expand All @@ -33,7 +32,8 @@
config['taxa to fasta'] = {'ref_fasta' : "Kaiko_volume/Kaiko_stationary_files/uniref100.fasta.gz",
# 'diamond_tally' : "Kaiko_volume/Kaiko_intermediate/kaiko_prediction_top_taxa.csv",
# 'fout' : "Kaiko_volume/Kaiko_output/kaiko_output.fasta",
'ntops' : 5,
'coverage_target' : 0.66,
'top_strains' : 1,
'taxa_key' : "TaxID",
'kingdom_list' : ""}

Expand Down

0 comments on commit 28e1219

Please sign in to comment.