我知道 multiprocessing.Manager()
以及它如何用于创建共享对象,特别是可以在工作人员之间共享的队列.有这个问题,这个问题,这个问题甚至我自己的一个问题.
I am aware of multiprocessing.Manager()
and how it can be used to create shared objects, in particular queues which can be shared between workers. There is this question, this question, this question and even one of my own questions.
但是,我需要定义大量队列,每个队列都链接一对特定的进程.假设每对进程及其链接队列由变量key
标识.
However, I need to define a great many queues, each of which is linking a specific pair of processes. Say that each pair of processes and its linking queue is identified by the variable key
.
当我需要放入和获取数据时,我想使用字典来访问我的队列.我无法完成这项工作.我已经尝试了很多东西.将 multiprocessing
导入为 mp
:
I want to use a dictionary to access my queues when I need to put and get data. I cannot make this work. I've tried a number of things. With multiprocessing
imported as mp
:
在由多处理模块导入的配置文件中定义一个类似 for key in all_keys: DICT[key] = mp.Queue
的字典(称为 multi.py
) 不会返回错误,但队列 DICT[key]
不会在进程之间共享,每个进程似乎都有自己的队列副本,因此不会发生通信.
Defining a dict like for key in all_keys: DICT[key] = mp.Queue
in a config file which is imported by the multiprocessing module (call it multi.py
) does not return errors, but the queue DICT[key]
is not shared between the processes, each one seems to have their own copy of the queue and thus no communication happens.
如果我尝试在定义进程并启动它们的主多处理函数的开头定义 DICT
,例如
If I try to define the DICT
at the beginning of the main multiprocessing function that defines the processes and starts them, like
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
我得到了错误
RuntimeError: Queue objects should only be shared between processes through
inheritance
改成
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
只会让一切变得更糟.在 multi.py
的头部而不是在 main 函数内部尝试类似的定义会返回类似的错误.
only makes everything worse. Trying similar definitions at the head of multi.py
rather than inside the main function returns similar errors.
必须有一种方法可以在进程之间共享多个队列,而无需在代码中明确命名每个队列.有什么想法吗?
There must be a way to share many queues between processes without explicitly naming each one in the code. Any ideas?
编辑
这是程序的基本架构:
1- 加载第一个模块,它定义了一些变量,导入 multi
,启动 multi.main()
,并加载另一个模块,该模块开始级联的模块加载和代码执行.同时……
1- load the first module, which defines some variables, imports multi
, launches multi.main()
, and loads another module which starts a cascade of module loads and code execution. Meanwhile...
2- multi.main
看起来像这样:
2- multi.main
looks like this:
def main():
manager = mp.Manager()
pool = mp.Pool()
DICT2 = manager.dict()
for key in all_keys:
DICT2[key] = manager.Queue()
proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,)
除了使用 pool
和 manager
之外,我还使用以下命令启动进程:
Rather than use pool
and manager
, I was also launching processes with the following:
mp.Process(target=targ1, args=(DICT[key],))
3 - targ1
函数获取来自主进程的输入数据(按 key
排序).它旨在将结果传递给 DICT[key]
以便 targ2
可以完成它的工作.这是不工作的部分.有任意数量的 targ1
s、targ2
s 等,因此有任意数量的队列.
3 - The function targ1
takes input data that is coming in (sorted by key
) from the main process. It is meant to pass the result to DICT[key]
so targ2
can do its work. This is the part that is not working. There are an arbitrary number of targ1
s, targ2
s, etc. and therefore an arbitrary number of queues.
4 - 其中一些进程的结果将被发送到一组不同的数组/pandas 数据帧,这些数组/pandas 数据帧也由 key
索引,我希望可以从任意进程访问这些数据帧,甚至是在不同的模块中启动的.我还没有写这部分,这可能是一个不同的问题.(我在这里提到它是因为上面 3 的答案也可能很好地解决了 4.)
4 - The results of some of these processes will be sent to a bunch of different arrays / pandas dataframes which are also indexed by key
, and which I would like to be accessible from arbitrary processes, even ones launched in a different module. I have yet to write this part and it might be a different question. (I mention it here because the answer to 3 above might also solve 4 nicely.)
听起来您的问题是在您尝试通过将 multiprocessing.Queue()
作为参数传递来共享它时开始的.您可以通过创建 托管队列 来解决此问题:
It sounds like your issues started when you tried to share a multiprocessing.Queue()
by passing it as an argument. You can get around this by creating a managed queue instead:
import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()
当您使用管理器创建它时,您正在存储和传递一个 代理 到队列,而不是队列本身,因此即使您传递给工作进程的对象是复制后,它仍将指向相同的底层数据结构:您的队列.它(在概念上)与 C/C++ 中的指针非常相似.如果您以这种方式创建队列,您将能够在启动工作进程时传递它们.
When you use a manager to create it, you are storing and passing around a proxy to the queue, rather than the queue itself, so even when the object you pass to your worker processes is a copied, it will still point at the same underlying data structure: your queue. It's very similar (in concept) to pointers in C/C++. If you create your queues this way, you will be able to pass them when you launch a worker process.
由于您现在可以传递队列,因此您不再需要管理您的字典.在 main 中保留一个普通字典,它将存储所有映射,并且只为您的工作进程提供他们需要的队列,因此他们不需要访问任何映射.
Since you can pass queues around now, you no longer need your dictionary to be managed. Keep a normal dictionary in main that will store all the mappings, and only give your worker processes the queues they need, so they won't need access to any mappings.
我在这里写了一个例子.看起来你正在你的工人之间传递对象,所以这就是这里所做的.假设我们有两个处理阶段,数据开始和结束都在 main
的控制中.看看我们如何创建像管道一样连接工作人员的队列,但是通过只给他们他们需要的队列,他们就不需要知道任何映射:
I've written an example of this here. It looks like you are passing objects between your workers, so that's what's done here. Imagine we have two stages of processing, and the data both starts and ends in the control of main
. Look at how we can create the queues that connect the workers like a pipeline, but by giving them only they queues they need, there's no need for them to know about any mappings:
import multiprocessing as mp
def stage1(q_in, q_out):
q_out.put(q_in.get()+"Stage 1 did some work.
")
return
def stage2(q_in, q_out):
q_out.put(q_in.get()+"Stage 2 did some work.
")
return
def main():
pool = mp.Pool()
manager = mp.Manager()
# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()
# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))
# Send a message into the pipeline
q_main_to_s1.put("Main started the job.
")
# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")
pool.close()
pool.join()
return
if __name__ == "__main__":
main()
代码产生这个输出:
Main 开始了这项工作.
第 1 阶段做了一些工作.
第 2 阶段做了一些工作.
主要完成了工作.
Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.
我没有包含将队列或 AsyncResults
对象存储在字典中的示例,因为我仍然不太了解您的程序应该如何工作.但是现在您可以自由地传递队列,您可以根据需要构建字典来存储队列/进程映射.
I didn't include an example of storing the queues or AsyncResults
objects in dictionaries, because I still don't quite understand how your program is supposed to work. But now that you can pass your queues freely, you can build your dictionary to store the queue/process mappings as needed.
事实上,如果你真的在多个worker之间建立了一个管道,你甚至不需要在main
中保留对inter-worker"队列的引用.创建队列,将它们传递给您的工作人员,然后只保留对 main
将使用的队列的引用.如果您确实有任意数量"的队列,我肯定会建议您尽快让旧队列被垃圾回收.
In fact, if you really do build a pipeline between multiple workers, you don't even need to keep a reference to the "inter-worker" queues in main
. Create the queues, pass them to your workers, then only retain references to queues that main
will use. I would definitely recommend trying to let old queues be garbage collected as quickly as possible if you really do have "an arbitrary number" of queues.
这篇关于在 Python 中的进程之间共享许多队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!