python - Using a shared queue that workers can add tasks to -
i'm pretty new python (i write code in java). have python script that's crawler. calls phantomjs, loads page, returns source, , list of urls found in page.
i've been trying use python 3's multiprocessing
module this, can't figure out how use shared queue workers can add to. keep getting unpredictable results.
my previous approach used global list of urls, out of extracted chunk , sent workers using map_async
. @ end, gather returned urls , append them global list. problem each "chunk" takes long slowest worker. i'm trying modify whenever worker done, can pick next url. however, don't think i'm doing correctly. here's have far:
def worker(url, urls): print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url) returned_urls = phantomjs(url) print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " urls") returned_url in returned_urls: urls.put(returned_url, block=true) print("there " + str(urls.qsize()) + " urls in total.\n") if __name__ == '__main__': manager = multiprocessing.manager() urls = manager.queue() urls.append(<some-url>) pool = pool() while true: url = urls.get(block=true) pool.apply_async(worker, (url, urls)) pool.close() pool.join()
if there better way this, please let me know. i'm crawling known site, , eventual terminating condition when there no urls process. right looks keep running ever. i'm not sure if use queue.empty()
because it's not reliable.
here do:
def worker(url, urls): print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url) returned_urls = phantomjs(url) print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " urls") returned_url in returned_urls: urls.put(returned_url, block=true) # signal finished processing url urls.put('no-url') print("there " + str(urls.qsize()) + " urls in total.\n") if __name__ == '__main__': manager = multiprocessing.manager() pool = pool() urls = manager.queue() # start first url before entering loop counter = 1 pool.apply_async(worker, (<some-url>, urls)) while counter > 0: url = urls.get(block=true) if url == 'no-url': # url has finished processing counter -= 1 else: # new url needs processed counter += 1 pool.apply_async(worker, (url, urls)) pool.close() pool.join()
whenever url popped off queue, increment counter. think of "currently processing url" counter. when 'no-url' popped off queue, "currently processing url" has finished, decrement counter. long counter greater 0, there urls haven't finished processing , returned 'no-url' yet.
edit
as said in comment (put here else reads it), when using multiprocessing.pool
, instead of thinking of individual processes, it's best think of single construct executes function each time gets data (concurrently when possible). useful data-driven problems don't track or care individual worker processes data being processed.
Comments
Post a Comment