multithreading - python threading in a loop -


i have project requires bunch of large matrices, stored in ~200 mb files, cross-correlated (i.e. fft * conj(fft)) each other. number of files such can't load them , processing. on other hand, reading in each file need slower i'd like.

what have far like:

result=0 in xrange(n_files):     f1 = file_reader(file_list[i])      ############################################################################     # here want have file_reader go start reading next file i'll need #     ############################################################################      in_place_processing(f1)     j in xrange(i+1,n_files):         f2 = file_reader(file_list[j])          ##################################################################         # here want have file_reader go start reading next file #         ##################################################################          in_place_processing(f2)         result += processing_function(f1,f2) 

so basically, want have 2 threads each read file, give me when ask (or it's done after ask it), , go start reading next file when ask it. object file_reader returns rather large , complicated, i'm not sure if multiprocessing way go here...

i've read threading , queues can't seem figure out part ask thread go read file , can proceed program while does. don't want threads go business in background -- missing detail here, or threading not way go?

below example of using multiprocessing module spawn off child processes call file_reader method , queue results. queue should block when full, can control number of read ahead's you'd perform queue_size constant.

this utilizes standard producer/consumer model of multiprocess communication, child processes act producers, main thread being consumer. join method call in class destructor ensures child process resources cleaned properly. there print statements interspersed demonstration purposes.

additionally, added ability queuedfilereader class offload work worker thread or run in main thread, rather using child process, comparison. done specifying mode parameter @ class initialization mode_threads or mode_synchronous, respectively.

import multiprocessing mp import queue import threading import time  queue_size = 2 #buffer size of queue  ## placeholder functions , variables n_files = 10 file_list = ['file %d' % in range(n_files)]  def file_reader(filename):     time.sleep(.1)     result = (filename,'processed')     return result  def in_place_processing(f):     time.sleep(.2)  def processing_function(f1,f2):     print f1, f2     return id(f1) & id(f2)  mode_synchronous = 0  #file_reader called in main thread synchronously mode_threads = 1      #file_reader executed in worker thread mode_process = 2      #file_reader executed in child_process ################################################## ## class encapsulate multiprocessing objects. class queuedfilereader():     def __init__(self, idlist, mode=mode_process):         self.mode = mode         self.idlist = idlist         if mode == mode_process:             self.queue = mp.queue(queue_size)             self.process = mp.process(target=queuedfilereader.worker,                                       args=(self.queue,idlist))             self.process.start()         elif mode == mode_threads:             self.queue = queue.queue(queue_size)             self.thread = threading.thread(target=queuedfilereader.worker,                                            args=(self.queue,idlist))             self.thread.start()      @staticmethod     def worker(queue, idlist):         in idlist:             queue.put((i, file_reader(file_list[i])))             print id(queue), 'queued', file_list[i]         queue.put('done')      def __iter__(self):         if self.mode == mode_synchronous:             self.index = 0         return self      def next(self):         if self.mode == mode_synchronous:             if self.index == len(self.idlist): raise stopiteration             q = (self.idlist[self.index],                  file_reader(file_list[self.idlist[self.index]]))             self.index += 1         else:             q = self.queue.get()             if q == 'done': raise stopiteration         return q      def __del__(self):         if self.mode == mode_process:             self.process.join()         elif self.mode == mode_threads:             self.thread.join()  #mode = mode_process mode = mode_threads #mode = mode_synchronous result = 0 i, f1 in queuedfilereader(range(n_files),mode):      in_place_processing(f1)      j, f2 in queuedfilereader(range(i+1,n_files),mode):         in_place_processing(f2)         result += processing_function(f1,f2) 

if intermediate values large pass through queue, can execute each iteration of outer loop in own process. handy way using pool class in multiprocessing in example below.

import multiprocessing mp import time  ## placeholder functions , variables n_files = 10 file_list = ['file %d' % in range(n_files)]  def file_reader(filename):     time.sleep(.1)     result = (filename,'processed')     return result  def in_place_processing(f):     time.sleep(.2)  def processing_function(f1,f2):     print f1, f2     return id(f1) & id(f2)  def file_task(file_index):     print file_index     f1 = file_reader(file_list[file_index])     in_place_processing(f1)     task_result = 0     j in range(file_index+1, n_files):         f2 = file_reader(file_list[j])         in_place_processing(f2)         task_result += processing_function(f1,f2)     return task_result    pool = mp.pool(processes=none) #processes default mp.cpu_count() result = 0 file_result in pool.map(file_task, range(n_files)):     result += file_result print 'result', result  #or #result = sum(pool.map(file_task, range(n_files))) 

Comments

Popular posts from this blog

monitor web browser programmatically in Android? -

Shrink a YouTube video to responsive width -

wpf - PdfWriter.GetInstance throws System.NullReferenceException -