• <tfoot id='xYIJF'></tfoot>

        <legend id='xYIJF'><style id='xYIJF'><dir id='xYIJF'><q id='xYIJF'></q></dir></style></legend>

      1. <small id='xYIJF'></small><noframes id='xYIJF'>

      2. <i id='xYIJF'><tr id='xYIJF'><dt id='xYIJF'><q id='xYIJF'><span id='xYIJF'><b id='xYIJF'><form id='xYIJF'><ins id='xYIJF'></ins><ul id='xYIJF'></ul><sub id='xYIJF'></sub></form><legend id='xYIJF'></legend><bdo id='xYIJF'><pre id='xYIJF'><center id='xYIJF'></center></pre></bdo></b><th id='xYIJF'></th></span></q></dt></tr></i><div id='xYIJF'><tfoot id='xYIJF'></tfoot><dl id='xYIJF'><fieldset id='xYIJF'></fieldset></dl></div>
          <bdo id='xYIJF'></bdo><ul id='xYIJF'></ul>

        多处理 - 生产者/消费者设计

        时间:2023-05-25
        • <i id='GrKTK'><tr id='GrKTK'><dt id='GrKTK'><q id='GrKTK'><span id='GrKTK'><b id='GrKTK'><form id='GrKTK'><ins id='GrKTK'></ins><ul id='GrKTK'></ul><sub id='GrKTK'></sub></form><legend id='GrKTK'></legend><bdo id='GrKTK'><pre id='GrKTK'><center id='GrKTK'></center></pre></bdo></b><th id='GrKTK'></th></span></q></dt></tr></i><div id='GrKTK'><tfoot id='GrKTK'></tfoot><dl id='GrKTK'><fieldset id='GrKTK'></fieldset></dl></div>
            <tbody id='GrKTK'></tbody>

            <legend id='GrKTK'><style id='GrKTK'><dir id='GrKTK'><q id='GrKTK'></q></dir></style></legend>

              <small id='GrKTK'></small><noframes id='GrKTK'>

                • <bdo id='GrKTK'></bdo><ul id='GrKTK'></ul>

                  <tfoot id='GrKTK'></tfoot>
                  本文介绍了多处理 - 生产者/消费者设计的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

                  问题描述

                  限时送ChatGPT账号..

                  我正在使用多处理模块来拆分一个非常大的任务.它在大多数情况下都有效,但我的设计肯定遗漏了一些明显的东西,因为这样我很难有效地判断何时处理了所有数据.

                  I'm using the multiprocessing module to split up a very large task. It works for the most part, but I must be missing something obvious with my design, because this way it's very hard for me to effectively tell when all of the data has been processed.

                  我有两个单独的任务在运行;一个喂另一个.我想这是一个生产者/消费者问题.我在所有进程之间使用共享队列,生产者填满队列,消费者从队列中读取并进行处理.问题是数据量是有限的,所以在某些时候每个人都需要知道所有数据都已处理,以便系统可以正常关闭.

                  I have two separate tasks that run; one that feeds the other. I guess this is a producer/consumer problem. I use a shared Queue between all processes, where the producers fill up the queue, and the consumers read from the queue and do the processing. The problem is that there is a finite amount of data, so at some point everyone needs to know that all of the data has been processed so the system can shut down gracefully.

                  使用 map_async() 函数似乎很有意义,但由于生产者正在填满队列,我不知道前面的所有项目,所以我必须进入一个 while 循环并使用apply_async() 并尝试检测何时一切都完成了某种超时......丑陋.

                  It would seem to make sense to use the map_async() function, but since the producers are filling up the queue, I don't know all of the items up front, so I have to go into a while loop and use apply_async() and try to detect when everything is done with some sort of timeout...ugly.

                  我觉得我错过了一些明显的东西.如何设计得更好?

                  I feel like I'm missing something obvious. How can this be better designed?

                  制片人

                  class ProducerProcess(multiprocessing.Process):
                      def __init__(self, item, consumer_queue):
                          self.item = item
                          self.consumer_queue = consumer_queue
                          multiprocessing.Process.__init__(self)
                  
                      def run(self):
                          for record in get_records_for_item(self.item): # this takes time
                              self.consumer_queue.put(record)
                  
                  def start_producer_processes(producer_queue, consumer_queue, max_running):
                      running = []
                  
                      while not producer_queue.empty():
                          running = [r for r in running if r.is_alive()]
                          if len(running) < max_running:
                              producer_item = producer_queue.get()
                              p = ProducerProcess(producer_item, consumer_queue)
                              p.start()
                              running.append(p)
                          time.sleep(1)
                  

                  消费者

                  def process_consumer_chunk(queue, chunksize=10000):
                      for i in xrange(0, chunksize):
                          try:
                              # don't wait too long for an item
                              # if new records don't arrive in 10 seconds, process what you have
                              # and let the next process pick up more items.
                  
                              record = queue.get(True, 10)
                          except Queue.Empty:                
                              break
                  
                          do_stuff_with_record(record)
                  

                  主要

                  if __name__ == "__main__":
                      manager = multiprocessing.Manager()
                      consumer_queue = manager.Queue(1024*1024)
                      producer_queue = manager.Queue()
                  
                      producer_items = xrange(0,10)
                  
                      for item in producer_items:
                          producer_queue.put(item)
                  
                      p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
                      p.start()
                  
                      consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
                  

                  这就是它变得俗气的地方.我不能使用地图,因为要消费的列表同时被填满.所以我必须进入一个while循环并尝试检测超时.当生产者仍在尝试填充时,consumer_queue 可能会变为空,因此我不能只检测到空队列并退出.

                  Here is where it gets cheesy. I can't use map, because the list to consume is being filled up at the same time. So I have to go into a while loop and try to detect a timeout. The consumer_queue can become empty while the producers are still trying to fill it up, so I can't just detect an empty queue an quit on that.

                      timed_out = False
                      timeout= 1800
                      while 1:
                          try:
                              result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
                              if timed_out:
                                  timed_out = False
                  
                          except Queue.Empty:
                              if timed_out:
                                  break
                  
                              timed_out = True
                              time.sleep(timeout)
                          time.sleep(1)
                  
                      consumer_queue.join()
                      consumer_pool.close()
                      consumer_pool.join()
                  

                  我认为也许我可以在主线程中获取()记录并将它们传递给消费者而不是传递队列,但我认为我最终会遇到同样的问题.我仍然需要运行一个 while 循环并使用 apply_async() 提前感谢您的任何建议!

                  I thought that maybe I could get() the records in the main thread and pass those into the consumer instead of passing the queue in, but I think I end up with the same problem that way. I still have to run a while loop and use apply_async() Thank you in advance for any advice!

                  推荐答案

                  您可以使用 manager.Event 来表示工作的结束.此事件可以在您的所有进程之间共享,然后当您从主进程发出信号时,其他工作人员可以正常关闭.

                  You could use a manager.Event to signal the end of the work. This event can be shared between all of your processes and then when you signal it from your main process the other workers can then gracefully shutdown.

                  while not event.is_set():
                   ...rest of code...
                  

                  因此,您的消费者将等待事件设置并在设置后处理清理.

                  So, your consumers would wait for the event to be set and handle the cleanup once it is set.

                  要确定何时设置此标志,您可以在生产者线程上执行 join,当这些都完成后,您可以在消费者线程上加入.

                  To determine when to set this flag you can do a join on the producer threads and when those are all complete you can then join on the consumer threads.

                  这篇关于多处理 - 生产者/消费者设计的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!

                  上一篇:如何在 Python 中使用多处理并行求和循环 下一篇:多处理 - 共享数组

                  相关文章

                  最新文章

                  <legend id='WT1ey'><style id='WT1ey'><dir id='WT1ey'><q id='WT1ey'></q></dir></style></legend>

                • <small id='WT1ey'></small><noframes id='WT1ey'>

                    <tfoot id='WT1ey'></tfoot>

                      • <bdo id='WT1ey'></bdo><ul id='WT1ey'></ul>

                      <i id='WT1ey'><tr id='WT1ey'><dt id='WT1ey'><q id='WT1ey'><span id='WT1ey'><b id='WT1ey'><form id='WT1ey'><ins id='WT1ey'></ins><ul id='WT1ey'></ul><sub id='WT1ey'></sub></form><legend id='WT1ey'></legend><bdo id='WT1ey'><pre id='WT1ey'><center id='WT1ey'></center></pre></bdo></b><th id='WT1ey'></th></span></q></dt></tr></i><div id='WT1ey'><tfoot id='WT1ey'></tfoot><dl id='WT1ey'><fieldset id='WT1ey'></fieldset></dl></div>