Skip to content

Commit

Permalink
more parallel than evergit add methylFASTQ.py sequencing.py 1! load b…
Browse files Browse the repository at this point in the history
…alancing + fragment fragmenation (wtf?) implemented
  • Loading branch information
cursecatcher committed Sep 28, 2018
1 parent 9223d7d commit 6c892cf
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 54 deletions.
41 changes: 12 additions & 29 deletions src/methylFASTQ.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,34 @@
import random, string
import csv
from Bio import SeqIO
from sequencing import ChromosomeSequencer, Stats
import sequencing as seq
#from sequencing import ChromosomeSequencer, Stats
from timeit import default_timer as timer


class MethylFASTQ(object):
def __init__(self, args):
self.params = self.__parse_args(args)
self.regions = self.__load_regions(args.regions)
self.__stats = Stats()
self.__stats = seq.Stats()



def run(self):
print("Chosen mode: {}".format("targeted" if self.regions else "WGBS"))

start = timer()

self.__run_targeted() if self.regions else self.__run_wgbs()

elapsed = timer() - start

print("Num reads: {}\nNum C: {}\nNum bases: {}\n".format(self.__stats.nreads, self.__stats.ncytosines, self.__stats.nbases))

#rimuovo directory dei file temporanei
# print("Rimuovo directory {}".format(args.temp_dir))
# os.rmdir(args.temp_dir)
print("Elapsed time: {}".format(seq.format_time(elapsed)))

def __parse_args(self, args):
# if args.temp_dir is None:
# temp_dir = os.fspath("{}/.cache/methylfastq/".format(pathlib.Path.home()))
#
# #creo directory principale, se non presente
# if not os.path.exists(temp_dir):
# print("MethylFASTQ temporary files will be written in {}".format(temp_dir))
# os.makedirs(temp_dir)
#
# #creo directory per la run corrente
# while True:
# random_stuff = "".join(random.choices(string.ascii_lowercase, k=2))
# if not os.path.exists(temp_dir + random_stuff):
# temp_dir += random_stuff + "/"
# os.makedirs(temp_dir)
# print("Temporary files of current run will be written in {}".format(temp_dir))
# break
#
# args.temp_dir = temp_dir

def __parse_args(self, args):
#check esistenza directory output
if not os.path.exists(args.output_path):
print("Output directory {} does not exist.".format(args.output_path), end=" ")
Expand All @@ -70,7 +55,7 @@ def __run_wgbs(self):
if self.params.chr is None or fasta_record.id in self.params.chr:
print("Sequencing {}: {} bp".format(fasta_record.id, len(fasta_record)), flush=True)

stats = ChromosomeSequencer(fasta_record).sequencing(self.params)
stats = seq.ChromosomeSequencer(fasta_record).sequencing(self.params)
self.__stats.update(stats)


Expand All @@ -81,7 +66,7 @@ def __run_targeted(self):
curr_chr = self.regions[fasta_record.id]
print("{} --> {}".format(fasta_record.id, curr_chr))

stats = ChromosomeSequencer(fasta_record, target_regions=curr_chr).sequencing(self.params)
stats = seq.ChromosomeSequencer(fasta_record, target_regions=curr_chr).sequencing(self.params)
self.__stats.update(stats)

def __load_regions(self, filename):
Expand Down Expand Up @@ -117,16 +102,14 @@ def __load_regions(self, filename):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="methyl-fastq")

#I/O parameters: input, output and temporary directories
#I/O parameters: input, output
parser.add_argument("-i", "--in", dest="fasta_file", metavar="fasta-file",
action="store", type=str, required=True,
help="Path of the FASTA file containing the genome to be sequenced.")
parser.add_argument("-o", "--out", dest="output_path", metavar="output-file-path",
action="store", type=str, required=True,
help="Path of output files (.fastq & .cpg)")

# parser.add_argument("-t", "--temp", dest="temp_dir", metavar="temporary-directory",
# action="store", type=str, help="Path to store temporary files")
#sequencing mode and library mode
parser.add_argument("--seq", dest="seq_mode",
choices=["single_end", "paired_end"], default="single_end",
Expand Down
87 changes: 62 additions & 25 deletions src/sequencing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from timeit import default_timer as timer
import enum
import sys
import math

import dna

Expand Down Expand Up @@ -136,35 +137,54 @@ def __init__(self, chromosome, target_regions=list()):
print("{} fragments found: {} bp. Elapsed time {}".format(len(self.__fragments), self.__stats.nbases, tot_time), flush=True)


def sequencing(self, params):
queue = mp.Queue() #comunicazione tra processi figli e padre
def load_balancing(self, num_workers):

#ordino frammenti in modo decrescente rispetto alla loro lunghezza
self.__fragments = [{"seq": x[0], "from": x[1], "to": x[2], "params": params} for x in self.__fragments]
self.__fragments.sort(key=lambda el: el["to"] - el["from"], reverse=True)
totsize = sum([e-b for _, b, e in self.__fragments])
average = int(totsize / num_workers)

print([x["to"] - x["from"] for x in self.__fragments])
fragments = list()

single_end = params.seq_mode == "single_end"
# print("Total size: {}\nExpected size per process: {}\n".format(totsize, average))

#inizializzo i processi con i relativi input: parametri vari (frammento e offset) + coda + indice per il join
processes = [mp.Process(target=self.create_reads, name="methylPIPPO-{}".format(index), args=(param, params.seq_mode, (index, queue))) \
for index, param in enumerate(self.__fragments)]
for i, (sequence, begin, end) in enumerate(self.__fragments, 1):
size = end-begin
print("Fragment [{} - {}] of size {} bp".format(begin, end, size))

if size > average: #suddivido il lavoro
num_pieces = math.ceil(size / average)
prev, curr = 0, average #offset di inizio e fine

num_input = len(self.__fragments) #numero di input da processare
num_process = params.num_processes #numero massimo di processi da utilizzare
print(">> {} pieces: ".format(num_pieces), end="")

curr = 0 #indice del prossimo processo da startare
curr_exec = 0 #numero processi attualmente in esecuzione
for n in range(num_pieces):
subseq = (sequence[prev:curr], begin+prev, begin+curr)
fragments.append(subseq)

#starto primi processi
while curr_exec < min(num_input, num_process):
processes[curr].start()
curr += 1
curr_exec += 1
print("[{} - {}] ".format(subseq[1], subseq[2]), end="")

# print("\tsubjob {} from {} to {}".format(n+1, begin+prev, begin+curr))
prev, curr = curr, curr + average

if curr > size:
curr = size
print()
else:
fragments.append((sequence, begin, end))

self.__fragments = sorted(fragments, key=lambda x: x[2]-x[1], reverse=True)

def sequencing(self, params):
self.load_balancing(params.num_processes)

self.__fragments = [{"seq": seq, "from": begin, "to": end, "params": params} \
for seq, begin, end in self.__fragments]

queue = mp.Queue() #comunicazione tra processi figli e padre
num_input = len(self.__fragments) #numero di input da processare
num_process = params.num_processes #numero massimo di processi da utilizzare
single_end = params.seq_mode == "single_end" #variabile di merda, ma vbb

#apro i file di output
output_filename = self.__get_output_filename(params)

if single_end:
Expand All @@ -176,6 +196,20 @@ def sequencing(self, params):
meth_file = open("{}.ch3".format(output_filename), "a")
csv_meth = csv.writer(meth_file, delimiter="\t")

#inizializzo i processi con i relativi input:
#parametri: (frammento e offset) + coda + indice per il join
processes = [mp.Process(target=self.create_reads, name="methylPIPPO-{}".format(index), args=(param, params.seq_mode, (index, queue))) \
for index, param in enumerate(self.__fragments)]

curr = 0 #indice del prossimo processo da startare
curr_exec = 0 #numero processi attualmente in esecuzione

#starto primi processi
while curr_exec < min(num_input, num_process):
processes[curr].start()
curr += 1
curr_exec += 1

#finchè tutti i processi non sono stati eseguiti e non hanno terminato...
while not (curr == num_input and curr_exec == 0):
val = queue.get()
Expand Down Expand Up @@ -211,7 +245,7 @@ def sequencing(self, params):
csv_meth.writerow(record)
else:
self.__stats.increment_cytosines(dsize)
else: #fine while -> chiudo file
else: #fine while -> chiudo i file
meth_file.close()

if single_end:
Expand All @@ -220,9 +254,6 @@ def sequencing(self, params):
fastq_file1.close()
fastq_file2.close()

for p in processes:
p.join()

return self.__stats

def __get_output_filename(self, params):
Expand All @@ -237,13 +268,19 @@ def create_reads(self, data, seq_mode, index_queue):
sequence = data["seq"]
params = data["params"]
index, queue = index_queue
pid = os.getpid()

print("<Process {}>: starting sequencing [{} - {}]".format(pid, offset_begin, offset_end), flush=True)
start = timer()

print("processing fragment from {} to {}".format(data["from"], data["to"]))
fs = FragmentSequencer(self.__chromoId, sequence, offset_begin, offset_end, \
params, queue=queue)

fs.single_end_sequencing() if seq_mode == "single_end" else fs.paired_end_sequencing()

elapsed = format_time(timer() - start)

print("<Process {}>: sequencing [{} - {}] terminated in {}".format(pid, offset_begin, offset_end, elapsed), flush=True)

queue.put(index)


Expand Down

0 comments on commit 6c892cf

Please sign in to comment.