我有一个非常大的(只读)数据数组,希望由多个进程并行处理.
我喜欢 Pool.map
函数,并希望使用它来并行计算该数据上的函数.
我看到可以使用 Value
或 Array
类在进程之间使用共享内存数据.但是当我尝试使用它时,我得到一个 RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance
when using the Pool.map function:
这是我正在尝试做的一个简化示例:
从系统导入标准输入从多处理导入池,数组def count_it(arr,键):计数 = 0对于 arr 中的 c:如果 c == 键:计数 += 1返回计数如果 __name__ == '__main__':testData =abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"# 想使用共享内存共享它toShare = Array('c', testData)# 这行得通打印 count_it(toShare, "a" )池 = 池()# 这里的运行时错误print pool.map(count_it, [(toShare,key) for key in ["a", "b", "s", "d"]])
谁能告诉我我在这里做错了什么?
所以我想做的是在进程池中创建进程后将有关新创建的共享内存分配数组的信息传递给进程.
刚看到赏金就再试一次;)
基本上我认为错误消息的意思是 - 多处理共享内存数组不能作为参数传递(通过酸洗).序列化数据没有意义——关键是数据是共享内存.所以你必须使共享数组成为全局的.我认为将它作为模块的属性更简洁,就像在我的第一个答案中一样,但是在您的示例中将其作为全局变量也可以很好地工作.考虑到您不想在分叉之前设置数据的观点,这是一个修改后的示例.如果您想拥有多个可能的共享数组(这就是您想将 toShare 作为参数传递的原因),您可以类似地创建一个共享数组的全局列表,并将索引传递给 count_it (这将成为 forc in toShare[i]:
).
从系统导入标准输入从多处理导入池、数组、进程def count_it(键):计数 = 0对于toShare中的c:如果 c == 键:计数 += 1返回计数如果 __name__ == '__main__':# 分配共享数组 - 在这种情况下希望 lock=False 因为我们# 没有写入它并希望允许多个进程访问# 同时 - 我认为 lock=True 会有很少或# 没有加速最大长度 = 50toShare = Array('c', maxLength, lock=False)# 叉池 = 池()# fork后可以设置数据testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"如果 len(testData) >最长长度:raise ValueError,共享数组太小而无法容纳数据"toShare[:len(testData)] = testData打印 pool.map(count_it, ["a", "b", "s", "d"])
= testData打印 pool.map(count_it, ["a", "b", "s", "d"])
不知道为什么 map 不会 Pickle 数组,但 Process 和 Pool 会 - 我想它可能已经在 windows 上的子进程初始化时转移了.请注意,数据仍然是在 fork 之后设置的.
I have a very large (read only) array of data that I want to be processed by multiple processes in parallel.
I like the Pool.map
function and would like to use it to calculate functions on that data in parallel.
I saw that one can use the Value
or Array
class to use shared memory data between processes. But when I try to use this I get a RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance
when using the Pool.map function:
Here is a simplified example of what I am trying to do:
from sys import stdin
from multiprocessing import Pool, Array
def count_it( arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
return count
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
# this works
print count_it( toShare, "a" )
pool = Pool()
# RuntimeError here
print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )
Can anyone tell me what I am doing wrong here?
So what I would like to do is pass info about a newly created shared memory allocated array to the processes after they have been created in the process pool.
Trying again as I just saw the bounty ;)
Basically I think the error message means what it said - multiprocessing shared memory Arrays can't be passed as arguments (by pickling). It doesn't make sense to serialise the data - the point is the data is shared memory. So you have to make the shared array global. I think it's neater to put it as the attribute of a module, as in my first answer, but just leaving it as a global variable in your example also works well. Taking on board your point of not wanting to set the data before the fork, here is a modified example. If you wanted to have more than one possible shared array (and that's why you wanted to pass toShare as an argument) you could similarly make a global list of shared arrays, and just pass the index to count_it (which would become for c in toShare[i]:
).
from sys import stdin
from multiprocessing import Pool, Array, Process
def count_it( key ):
count = 0
for c in toShare:
if c == key:
count += 1
return count
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool()
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
[EDIT: The above doesn't work on windows because of not using fork. However, the below does work on Windows, still using Pool, so I think this is the closest to what you want:
from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule
def count_it( key ):
count = 0
for c in mymodule.toShare:
if c == key:
count += 1
return count
def initProcess(share):
mymodule.toShare = share
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool(initializer=initProcess,initargs=(toShare,))
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
Not sure why map won't Pickle the array but Process and Pool will - I think perhaps it has be transferred at the point of the subprocess initialization on windows. Note that the data is still set after the fork though.
这篇关于在 Python 多处理中将 Pool.map 与共享内存数组结合起来的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!