我在 python 中使用 pika 框架编写了非常基本的生产者-消费者代码.问题是 - 消费者端在队列中的消息上运行太慢.我进行了一些测试,发现我可以通过多处理将工作流程加快 27 倍.问题是 - 我不知道向我的代码添加多处理功能的正确方法是什么.
I have very basic producer-consumer code written with pika framework in python. The problem is - consumer side runs too slow on messages in queue. I ran some tests and found out that i can speed up the workflow up to 27 times with multiprocessing. The problem is - I don't know what is the right way to add multiprocessing functionality to my code.
import pika
import json
from datetime import datetime
from functions import download_xmls
def callback(ch, method, properties, body):
print('Got something')
body = json.loads(body)
type = body[-1]['Type']
print('Object type in work currently ' + type)
cnums = [x['cadnum'] for x in body[:-1]]
print('Got {} cnums to work with'.format(len(cnums)))
date_start = datetime.now()
download_xmls(type,cnums)
date_end = datetime.now()
ch.basic_ack(delivery_tag=method.delivery_tag)
print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))
def consume(queue_name = 'bot-test'):
parameters = pika.URLParameters('server@address')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='bot-test')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
如何从这里开始添加多处理功能?
How do I start with adding multiprocessing functionality from here?
Pika 有广泛的 示例代码,我建议您查看.请注意,此代码仅供 示例 使用.在处理线程的情况下,您将不得不使用更智能的方式来管理您的线程.
Pika has extensive example code that I recommend you check out. Note that this code is for example use only. In the case of doing work on threads, you will have to use a more intelligent way to manage your threads.
目标是不阻塞运行 Pika IO 循环的线程,并从您的工作线程正确回调到 IO 循环.这就是 add_callback_threadsafe 存在并在该代码中使用的原因.
The goal is to not block the thread that runs Pika's IO loop, and to call back into the IO loop correctly from your worker threads. That's why add_callback_threadsafe exists and is used in that code.
注意: RabbitMQ 团队监控 rabbitmq-users 邮件列表,并且有时只回答 StackOverflow 上的问题.
NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.
这篇关于如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!
如何在 Python 中将货币字符串转换为浮点数?How do I convert a currency string to a floating point number in Python?(如何在 Python 中将货币字符串转换为浮点数?)
在 Pandas 中解析多索引 Excel 文件Parsing a Multi-Index Excel File in Pandas(在 Pandas 中解析多索引 Excel 文件)
pandas 时间序列 between_datetime 函数?pandas timeseries between_datetime function?( pandas 时间序列 between_datetime 函数?)
pandas 重新采样到每月的特定工作日pandas resample to specific weekday in month( pandas 重新采样到每月的特定工作日)
Python - 如何标准化时间序列数据Python - how to normalize time-series data(Python - 如何标准化时间序列数据)
statsmodels 使用 ARMA 模型进行预测statsmodels forecasting using ARMA model(statsmodels 使用 ARMA 模型进行预测)