Skip to content

Commit

Permalink
even more parallel than before! pool of worker + consumer thread
Browse files Browse the repository at this point in the history
  • Loading branch information
cursecatcher committed Sep 30, 2018
1 parent 6c892cf commit 5359f7e
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/methylFASTQ.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import csv
from Bio import SeqIO
import sequencing as seq
#from sequencing import ChromosomeSequencer, Stats

from timeit import default_timer as timer


Expand Down
108 changes: 55 additions & 53 deletions src/sequencing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import random, csv
import os, pickle
import multiprocessing as mp
import threading
from timeit import default_timer as timer
import enum
import sys
Expand Down Expand Up @@ -103,14 +104,12 @@ def __init__(self, chromosome, target_regions=list()):
chromosome_sequence = str(chromosome.seq).lower()

self.__chromoId = chromosome.id
# self.__genome_size = len(chromosome.seq)
self.__fragments = [(chromosome_sequence[begin:end], begin, end) for (begin, end) in target_regions]
self.__stats = Stats()

if len(target_regions) == 0:
#WGBS option
print("Parsing {} sequence... ".format(chromosome.id), end="", flush=True)
# chromosome = str(chromosome.seq).lower()
chr_size = len(chromosome.seq)

#get salient fragments
Expand Down Expand Up @@ -138,7 +137,6 @@ def __init__(self, chromosome, target_regions=list()):


def load_balancing(self, num_workers):

totsize = sum([e-b for _, b, e in self.__fragments])
average = int(totsize / num_workers)

Expand Down Expand Up @@ -173,79 +171,55 @@ def load_balancing(self, num_workers):

self.__fragments = sorted(fragments, key=lambda x: x[2]-x[1], reverse=True)

def sequencing(self, params):
self.load_balancing(params.num_processes)

self.__fragments = [{"seq": seq, "from": begin, "to": end, "params": params} \
for seq, begin, end in self.__fragments]
def consumer(self, num_jobs, params, queue):
""" Thread consumer che legge dalla coda e scrive i dati sull'apposito file"""

queue = mp.Queue() #comunicazione tra processi figli e padre
num_input = len(self.__fragments) #numero di input da processare
num_process = params.num_processes #numero massimo di processi da utilizzare
single_end = params.seq_mode == "single_end" #variabile di merda, ma vbb

#apro i file di output
output_filename = self.__get_output_filename(params)
filename = self.__get_output_filename(params)
single_end = params.seq_mode == "single_end"

## apro file di output
if single_end:
fastq_file = open("{}.fastq".format(output_filename), "a")
fastq_file = open("{}.fastq".format(filename), "a")
else:
fastq_file1 = open("{}_R1.fastq".format(output_filename), "a")
fastq_file2 = open("{}_R2.fastq".format(output_filename), "a")
fastq_file1 = open("{}_R1.fastq".format(filename), "a")
fastq_file2 = open("{}_R2.fastq".format(filename), "a")

meth_file = open("{}.ch3".format(output_filename), "a")
meth_file = open("{}.ch3".format(filename), "a")
csv_meth = csv.writer(meth_file, delimiter="\t")

#inizializzo i processi con i relativi input:
#parametri: (frammento e offset) + coda + indice per il join
processes = [mp.Process(target=self.create_reads, name="methylPIPPO-{}".format(index), args=(param, params.seq_mode, (index, queue))) \
for index, param in enumerate(self.__fragments)]

curr = 0 #indice del prossimo processo da startare
curr_exec = 0 #numero processi attualmente in esecuzione

#starto primi processi
while curr_exec < min(num_input, num_process):
processes[curr].start()
curr += 1
curr_exec += 1

#finchè tutti i processi non sono stati eseguiti e non hanno terminato...
while not (curr == num_input and curr_exec == 0):
#
while num_jobs > 0:
val = queue.get()
tval = type(val)

if tval is int: #segnale di terminazione di un processo
processes[val].join()
if tval is int: #segnale di terminazione di un job
num_jobs -= 1

if curr < num_input:
processes[curr].start()
curr += 1
else:
curr_exec -= 1
elif tval is tuple:
tag, data = val
datatype, data = val
dsize = len(data)

if tag == "fastq_se":
if datatype == "fastq_se":
for record in data:
SeqIO.write(record, fastq_file, "fastq")
else:
self.__stats.increment_reads(dsize)

elif tag == "fastq_pe":
elif datatype == "fastq_pe":
for read1, read2 in data:
SeqIO.write(read1, fastq_file1, "fastq")
SeqIO.write(read2, fastq_file2, "fastq")
else:
self.__stats.increment_reads(dsize)

elif tag == "ch3":
elif datatype == "ch3":
for record in data:
csv_meth.writerow(record)
else:
self.__stats.increment_cytosines(dsize)
else: #fine while -> chiudo i file
else:
#chiudo file
meth_file.close()

if single_end:
Expand All @@ -254,34 +228,62 @@ def sequencing(self, params):
fastq_file1.close()
fastq_file2.close()

def sequencing(self, params):
self.load_balancing(params.num_processes)

queue = mp.Manager().Queue()

###### TERRRIBBBBILEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE!!!!!
input_data = [{
"seq": seq,
"offset": (begin, end),
"params": params,
"queue": queue
}
for seq, begin, end in self.__fragments
]

num_jobs = len(self.__fragments)

consumer_thread = threading.Thread(target=self.consumer, args=(num_jobs, params, queue))
consumer_thread.start()

with mp.Pool(params.num_processes) as pool:
pool.map(self.create_reads, input_data)

consumer_thread.join()

return self.__stats



def __get_output_filename(self, params):
fasta = "".join(params.fasta_file.split("/")[-1].split(".")[:-1])
se_pe = "se" if params.seq_mode == "single_end" else "pe"
dir_nondir = "dir" if params.lib_mode == "directional" else "undir"
return "{}/{}_{}_f{}r{}_{}".format(params.output_path, fasta, se_pe, params.fragment_size, params.read_length, dir_nondir)


def create_reads(self, data, seq_mode, index_queue):
offset_begin, offset_end = data["from"], data["to"]
sequence = data["seq"]
params = data["params"]
index, queue = index_queue
def create_reads(self, input_process):#, seq_mode, queue):
sequence = input_process["seq"]
offset_begin, offset_end = input_process["offset"]
queue = input_process["queue"]
params = input_process["params"]

pid = os.getpid()

print("<Process {}>: starting sequencing [{} - {}]".format(pid, offset_begin, offset_end), flush=True)
start = timer()

fs = FragmentSequencer(self.__chromoId, sequence, offset_begin, offset_end, \
params, queue=queue)
fs.single_end_sequencing() if seq_mode == "single_end" else fs.paired_end_sequencing()
fs.single_end_sequencing() if params.seq_mode == "single_end" else fs.paired_end_sequencing()

elapsed = format_time(timer() - start)

print("<Process {}>: sequencing [{} - {}] terminated in {}".format(pid, offset_begin, offset_end, elapsed), flush=True)

queue.put(index)
queue.put(0)



Expand Down

0 comments on commit 5359f7e

Please sign in to comment.