• <tfoot id='U4OVX'></tfoot>

          <bdo id='U4OVX'></bdo><ul id='U4OVX'></ul>
      1. <i id='U4OVX'><tr id='U4OVX'><dt id='U4OVX'><q id='U4OVX'><span id='U4OVX'><b id='U4OVX'><form id='U4OVX'><ins id='U4OVX'></ins><ul id='U4OVX'></ul><sub id='U4OVX'></sub></form><legend id='U4OVX'></legend><bdo id='U4OVX'><pre id='U4OVX'><center id='U4OVX'></center></pre></bdo></b><th id='U4OVX'></th></span></q></dt></tr></i><div id='U4OVX'><tfoot id='U4OVX'></tfoot><dl id='U4OVX'><fieldset id='U4OVX'></fieldset></dl></div>

        <legend id='U4OVX'><style id='U4OVX'><dir id='U4OVX'><q id='U4OVX'></q></dir></style></legend>
      2. <small id='U4OVX'></small><noframes id='U4OVX'>

        跨多处理 python 共享 pandas 数据框字典

        时间:2023-05-26
          <bdo id='Paroz'></bdo><ul id='Paroz'></ul>

          • <tfoot id='Paroz'></tfoot>
            <legend id='Paroz'><style id='Paroz'><dir id='Paroz'><q id='Paroz'></q></dir></style></legend>
              <tbody id='Paroz'></tbody>

                  <i id='Paroz'><tr id='Paroz'><dt id='Paroz'><q id='Paroz'><span id='Paroz'><b id='Paroz'><form id='Paroz'><ins id='Paroz'></ins><ul id='Paroz'></ul><sub id='Paroz'></sub></form><legend id='Paroz'></legend><bdo id='Paroz'><pre id='Paroz'><center id='Paroz'></center></pre></bdo></b><th id='Paroz'></th></span></q></dt></tr></i><div id='Paroz'><tfoot id='Paroz'></tfoot><dl id='Paroz'><fieldset id='Paroz'></fieldset></dl></div>

                  <small id='Paroz'></small><noframes id='Paroz'>

                  本文介绍了跨多处理 python 共享 pandas 数据框字典的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

                  问题描述

                  限时送ChatGPT账号..

                  我有一本 python pandas 数据框字典.这本词典的总大小约为 2GB.但是,当我在 16 个多进程中共享它时(在子进程中我只读取 dict 的数据而不修改它),它需要 32GB 内存.所以我想问一下我是否可以在不复制的情况下跨多处理共享这本字典.我试图将其转换为 manager.dict().但似乎时间太长了.实现这一目标的最标准方法是什么?谢谢.

                  I have a dictionary of python pandas dataframes. The total size of this dictionary is about 2GB. However, when I share it across 16 multiprocessing (in the subprocesses I only read the data of the dict without modifying it), it takes 32GB ram. So I would like to ask if it is possible for me to share this dictionary across multiprocessing without copying it. I tried to convert it to manager.dict(). But it seems it takes too long. What would be the most standard way to achieve this? Thank you.

                  推荐答案

                  我发现的最佳解决方案(它仅适用于某些类型的问题)是使用使用 Python 的 BaseManager 和 SyncManager 类的客户端/服务器设置.为此,您首先设置一个服务器,为数据提供代理类.

                  The best solution I've found (and it only works for some types of problems) is to use a client/server setup using Python's BaseManager and SyncManager classes. To do this you first setup a Server that serve's up a proxy class for the data.

                  DataServer.py

                  #!/usr/bin/python
                  from    multiprocessing.managers import SyncManager
                  import  numpy
                  
                  # Global for storing the data to be served
                  gData = {}
                  
                  # Proxy class to be shared with different processes
                  # Don't put big data in here since that will force it to be piped to the
                  # other process when instantiated there, instead just return a portion of
                  # the global data when requested.
                  class DataProxy(object):
                      def __init__(self):
                          pass
                  
                      def getData(self, key, default=None):
                          global gData
                          return gData.get(key, None)
                  
                  if __name__ == '__main__':
                      port  = 5000
                  
                      print 'Simulate loading some data'
                      for i in xrange(1000):
                          gData[i] = numpy.random.rand(1000)
                  
                      # Start the server on address(host,port)
                      print 'Serving data. Press <ctrl>-c to stop.'
                      class myManager(SyncManager): pass
                      myManager.register('DataProxy', DataProxy)
                      mgr = myManager(address=('', port), authkey='DataProxy01')
                      server = mgr.get_server()
                      server.serve_forever()
                  

                  运行一次以上并让它继续运行.下面是您用来访问数据的客户端类.

                  Run the above once and leave it running. Below is the client class you use to access the data.

                  DataClient.py

                  from   multiprocessing.managers import BaseManager
                  import psutil   #3rd party module for process info (not strictly required)
                  
                  # Grab the shared proxy class.  All methods in that class will be availble here
                  class DataClient(object):
                      def __init__(self, port):
                          assert self._checkForProcess('DataServer.py'), 'Must have DataServer running'
                          class myManager(BaseManager): pass
                          myManager.register('DataProxy')
                          self.mgr = myManager(address=('localhost', port), authkey='DataProxy01')
                          self.mgr.connect()
                          self.proxy = self.mgr.DataProxy()
                  
                      # Verify the server is running (not required)
                      @staticmethod
                      def _checkForProcess(name):
                          for proc in psutil.process_iter():
                              if proc.name() == name:
                                  return True
                          return False
                  

                  下面是多处理的测试代码.

                  Below is the test code to try this with multiprocessing.

                  TestMP.py

                  #!/usr/bin/python
                  import time
                  import multiprocessing as mp
                  import numpy
                  from   DataClient import *    
                  
                  # Confusing, but the "proxy" will be global to each subprocess, 
                  # it's not shared across all processes.
                  gProxy = None
                  gMode  = None
                  gDummy = None
                  def init(port, mode):
                      global gProxy, gMode, gDummy
                      gProxy  = DataClient(port).proxy
                      gMode  = mode
                      gDummy = numpy.random.rand(1000)  # Same as the dummy in the server
                      #print 'Init proxy ', id(gProxy), 'in ', mp.current_process()
                  
                  def worker(key):
                      global gProxy, gMode, gDummy
                      if 0 == gMode:   # get from proxy
                          array = gProxy.getData(key)
                      elif 1 == gMode: # bypass retrieve to test difference
                          array = gDummy
                      else: assert 0, 'unknown mode: %s' % gMode
                      for i in range(1000):
                          x = sum(array)
                      return x    
                  
                  if __name__ == '__main__':
                      port   = 5000
                      maxkey = 1000
                      numpts = 100
                  
                      for mode in [1, 0]:
                          for nprocs in [16, 1]:
                              if 0==mode: print 'Using client/server and %d processes' % nprocs
                              if 1==mode: print 'Using local data and %d processes' % nprocs                
                              keys = [numpy.random.randint(0,maxkey) for k in xrange(numpts)]
                              pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))
                              start = time.time()
                              ret_data = pool.map(worker, keys, chunksize=1)
                              print '   took %4.3f seconds' % (time.time()-start)
                              pool.close()
                  

                  当我在我的机器上运行它时,我得到...

                  When I run this on my machine I get...

                  Using local data and 16 processes
                     took 0.695 seconds
                  Using local data and 1 processes
                     took 5.849 seconds
                  Using client/server and 16 processes
                     took 0.811 seconds
                  Using client/server and 1 processes
                     took 5.956 seconds
                  

                  这是否适合您的多处理系统取决于获取数据的频率.每次传输都会产生少量开销.如果您减少 x=sum(array) 循环中的迭代次数,您可以看到这一点.在某些时候,您将花费更多时间获取数据而不是处理数据.

                  Whether this works for you in your multiprocessing system depends on how often have to grab the data. There's a small overhead associated with each transfer. You can see this if you turn down the number of iterations in the x=sum(array) loop. At some point you'll spend more time getting data than working on it.

                  除了多处理之外,我也喜欢这种模式,因为我只需在服务器程序中加载一次我的大数组数据,它会一直加载到我终止服务器为止.这意味着我可以针对数据运行一堆单独的脚本,并且它们可以快速执行;无需等待数据加载.

                  Besides multiprocessing, I also like this pattern because I only have to load my big array data once in the server program and it stays loaded until I kill the server. That means I can run a bunch of separate scripts against the data and they execute quickly; no waiting for data to load.

                  虽然这里的方法有点类似于使用数据库,但它的优势在于可以处理任何类型的 python 对象,而不仅仅是字符串和整数等简单的 DB 表.我发现使用 DB 是一种对于那些简单的类型来说要快一些,但对我来说,它往往更多地以编程方式工作,而且我的数据并不总是很容易移植到数据库中.

                  While the approach here is somewhat similar to using a database, it has the advantage of working on any type of python object, not just simple DB tables of strings and ints, etc. I've found that using a DB is a bit faster for those simple types but for me, it tends to be more work programatically and my data doesn't always port over easily to a database.

                  这篇关于跨多处理 python 共享 pandas 数据框字典的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!

                  上一篇:有效地将函数并行应用于分组的 pandas DataFrame 下一篇:来自 concurrent.futures 的 ProcessPoolExecutor 比 multipr

                  相关文章

                  最新文章

                    • <bdo id='gPcZb'></bdo><ul id='gPcZb'></ul>
                    <legend id='gPcZb'><style id='gPcZb'><dir id='gPcZb'><q id='gPcZb'></q></dir></style></legend>
                    <tfoot id='gPcZb'></tfoot>
                    <i id='gPcZb'><tr id='gPcZb'><dt id='gPcZb'><q id='gPcZb'><span id='gPcZb'><b id='gPcZb'><form id='gPcZb'><ins id='gPcZb'></ins><ul id='gPcZb'></ul><sub id='gPcZb'></sub></form><legend id='gPcZb'></legend><bdo id='gPcZb'><pre id='gPcZb'><center id='gPcZb'></center></pre></bdo></b><th id='gPcZb'></th></span></q></dt></tr></i><div id='gPcZb'><tfoot id='gPcZb'></tfoot><dl id='gPcZb'><fieldset id='gPcZb'></fieldset></dl></div>

                    <small id='gPcZb'></small><noframes id='gPcZb'>