-
Notifications
You must be signed in to change notification settings - Fork 0
/
TEST_MODULE.py
105 lines (83 loc) · 3.99 KB
/
TEST_MODULE.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TEST_MODULE
Generates a random reference panel as well as a random observations table
for a single LD block.
Daniel Ariad (daniel@ariad.org)
Sep 30, 2021
"""
import collections, random, pickle, os
random.seed(a=2021,version=2)
leg_tuple = collections.namedtuple('leg_tuple', ('chr_id', 'pos', 'ref', 'alt')) #Encodes the rows of the legend table
sam_tuple = collections.namedtuple('sam_tuple', ('sample_id', 'group1', 'group2', 'sex')) #Encodes the rows of the samples table
obs_tuple = collections.namedtuple('obs_tuple', ('pos', 'read_id', 'base')) #Encodes the rows of the observations table
def generate_legned(SNPs=24,depth=0.1):
chr_id = 'chrTEST'
POS = sorted(random.sample(range(int(SNPs/depth)), SNPs))
REF, ALT = zip(*(random.sample('ATCG',k=2) for i in range(SNPs)))
legend = tuple(leg_tuple(chr_id,pos,ref,alt) for pos,ref,alt in zip(POS,REF,ALT))
return legend
def generate_haplotype(groups=1,SNPs=24,number_of_haplotypes=10):
max_int = 1 << number_of_haplotypes
hap_tab_per_group = {'group'+str(i): tuple(random.randrange(1,max_int+1) for i in range(SNPs)) for i in range(groups)}
number_of_haplotypes_per_group = {'group'+str(i): number_of_haplotypes for i in range(groups)}
return hap_tab_per_group, number_of_haplotypes_per_group
def generate_obs(legend,alleles_per_read=4):
reads_generator = (i for i in range(len(legend)//alleles_per_read) for j in range(alleles_per_read))
obs_tab = tuple(obs_tuple(pos,'read'+str(read_id),random.choice(alleles))
for read_id,(chr_id,pos,*alleles) in zip(reads_generator,legend))
return obs_tab
def generate_sam(groups=1,number_of_haplotypes=10):
sam_tab = {}
for i in range(groups):
sample_ids = ('sample'+str(i) for j in range(number_of_haplotypes//2))
groups2 = ['group'+str(i)]*(number_of_haplotypes//2)
sex = random.choices(range(2),k=number_of_haplotypes//2)
sam_tab['group'+str(i)] = tuple(sam_tuple(i,'N/A',j,k) for i,j,k in zip(sample_ids,groups2,sex))
return sam_tab
def print_haplotypes(hap_tab,number_of_haplotypes):
return tuple(tuple(bin(h)[2:].zfill(number_of_haplotypes)) for h in hap_tab)
leg_tab = generate_legned(SNPs=1000)
hap_tab, number_of_haplotypes = generate_haplotype(groups=2,SNPs=1000,number_of_haplotypes=500) ### number of haplotypes must be even, because diploids are simulated.
obs_tab = generate_obs(leg_tab,alleles_per_read=1)
sam_tab = generate_sam(groups=2,number_of_haplotypes=500)
if not os.path.exists('test'): os.makedirs('test')
info = {'redo-BAQ': False,
'handle-multiple-observations' : 'all',
'min-bq': 30,
'min-mq' : 30,
'max-depth' : 0,
'chr_id': 'chrTEST',
'depth': 0.1}
with open('test/test.obs.p','wb') as f:
pickle.dump(obs_tab,f)
pickle.dump(info,f)
with open('test/test.leg.p','wb') as f:
pickle.dump(leg_tab,f)
with open('test/test.hap.p','wb') as f:
pickle.dump(hap_tab,f)
pickle.dump(number_of_haplotypes,f)
with open('test/test.sam.p','wb') as f:
pickle.dump(sam_tab,f)
"""
operator, itertools
def get_joint_freq(hap_tab,number_of_haplotypes,invert,*include):
nt = lambda p: 1-p
new_hap_tab = [ ([*map(int,i)] if not j else [*map(nt,map(int,i))])
for i,j in zip(print_haplotypes(hap_tab,number_of_haplotypes),invert)]
#print(z)
ig = operator.itemgetter(*include)
t = [all(i) for i in zip(*ig(new_hap_tab))]
return sum(t)/len(t)
def print_freq(leg_tab,obs_tab,hap_tab,number_of_haplotypes):
inv = [base==ref for (pos, read_id, base),(chr_id, pos, ref, alt) in zip(obs_tab,leg_tab)]
for i in range(1,2**4):
X = []
for j in bin(i)[2:]:
X.extend([j=='1']*4)
#print(bin(i)[2:])
#print(''.join(map(str,map(int,X))).zfill(16))
print(bin(i)[2:], get_joint_freq(hap_tab,number_of_haplotypes,inv,*[k for k,l in enumerate(X[::-1]) if l]))
print_freq(leg_tab,obs_tab,hap_tab,number_of_haplotypes)
"""