RabbitMQ:快速生产者和慢消费者

时间:2023-04-17
本文介绍了RabbitMQ:快速生产者和慢消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我有一个应用程序,它使用 RabbitMQ 作为消息队列在两个组件之间发送/接收消息:发送者和接收者.发件人以非常快的方式发送消息.接收方接收到消息,然后执行一些非常耗时的任务(主要是为非常大的数据量写入数据库).由于接收者需要很长时间才能完成任务然后检索队列中的下一条消息,因此发送者将继续快速填满队列.所以我的问题是:这会导致消息队列溢出吗?

消息消费者如下所示:

public void onMessage() throws IOException, InterruptedException {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");QueueingConsumer 消费者 = 新 QueueingConsumer(channel);channel.basicConsume(queueName, true, consumer);而(真){QueueingConsumer.Delivery 交付 = consumer.nextDelivery();字符串消息 = new String(delivery.getBody());System.out.println(" [x] 收到'" + 消息 + "'");JSONObject json = new JSONObject(message);字符串 caseID = json.getString("caseID");//跟随需要很长时间dao.saveToDB(caseID);}}

消费者收到的每条消息都包含一个 caseID.对于每个caseID,都会将大量的数据保存到数据库中,这需要很长时间.目前只为 RabbitMQ 设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅 caseID.那么如何加快消费者的吞吐量,让消费者赶上生产者,避免队列中的消息溢出呢?是否应该在消费者部分使用多线程来加快消费速度?或者我应该使用多个消费者同时消费传入的消息?或者有什么异步方式让消费者异步消费消息而不等待它完成?欢迎任何建议.

解决方案

这会导致消息队列溢出吗?"

是的.RabbitMQ 会进入流控"状态,以防止随着队列长度的增加而过度消耗内存.它还将开始将消息持久化到磁盘,而不是将它们保存在内存中.

<块引用>

"那么如何才能加快消费者的吞吐量,让消费者可以赶上生产者,避免消息溢出排队"

你有两个选择:

  1. 添加更多消费者.请记住,如果您选择此选项,您的数据库现在将由多个并发进程操作.确保数据库能够承受额外的压力.
  2. 提高消费渠道的QOS值.这将从队列中提取更多消息并将它们缓冲在消费者上.这将增加整体处理时间;如果缓冲了 5 条消息,则第 5 条消息将花费消息 1...5 的处理时间来完成.

<块引用>

我应该在消费者部分使用多线程来加速消费率?"

除非您有精心设计的解决方案,否则不会.向应用程序添加并行性将在消费者端增加大量开销.您最终可能会耗尽 ThreadPool 或限制内存使用.

在处理 AMQP 时,您确实需要考虑每个流程的业务需求,以便设计出最优的解决方案.您收到的消息对时间有多敏感?它们是否需要尽快持久化到数据库中,或者这些数据是否立即可用对您的用户来说很重要?

如果数据不需要立即持久化,您可以修改您的应用程序,以便消费者只需从队列中删除消息并将它们保存到缓存集合中,例如在 Redis 中.引入第二个进程,然后依次读取和处理缓存的消息.这将确保您的队列长度不会增长到足以导致流控制,同时防止您的数据库被写入请求轰炸,这些写入请求通常比读取请求更昂贵.您的消费者现在只需从队列中删除消息,稍后由另一个进程处理.

I have an application that uses RabbitMQ as the message queue to send/receive message between two components: sender and receiver. The sender sends message in a very fast way. The receiver receives the message and then does some very time-consuming task (mainly database writing for very large data size). Since the receiver takes a very long time to finish the task and then retrieve the next message in the queue, the sender will keep filling up the queue quickly. So my question is: Will this cause the message queue to overflow?

The message consumer looks like the following:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}

Each message received by the consumer contains a caseID. For each caseID, it will save large amount of data to the database, which takes very long time. Currently only one consumer is set up for the RabbitMQ since producer/consumer use the same queue for the publish/subscribe of caseID. So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue? Should I use multithreading in the consumer part to speed up the consumption rate? Or should I use multiple consumers to consume the incoming message simutaneously? Or is there any asynchronous way to let the consumer consume the message asynchronously without waiting it to finish? Any suggestions are welcome.

解决方案

"Will this cause the message queue to overflow?"

Yes. RabbitMQ will enter a state of "flow control" to prevent excessive memory consumption as the queue length increases. It will also start persisting messages to disk, rather than hold them in memory.

"So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue"

You have 2 options:

  1. Add more consumers. Bear in mind that your DB will now be manipulated by multiple concurrent processes if you choose this option. Ensure that the DB can withstand the extra pressure.
  2. Increase the QOS value of the consuming channel. This will pull more messages from the queue and buffer them on the consumer. This will increase the overall processing time; if 5 messages are buffered, the 5th message will take the processing time of messages 1...5 to complete.

"Should I use multithreading in the consumer part to speed up the consumption rate?"

Not unless you have a well-designed solution. Adding parallelism to an application is going to add a lot of overhead on the consumer-side. You may end up exhausting the ThreadPool or throttling memory-usage.

When dealing with AMQP, you really need to consider the business requirement for each process in order to design the optimal solution. How time-sensitive are your incoming messages? Do they need to be persisted to DB ASAP, or does it matter to your users whether or not that data is available immediately?

If the data does not need to be persisted immediately, you could modify your application so that the consumer(s) simply remove messages from the queue and save them to a cached collection, in Redis, for example. Introduce a second process which then reads and processes the cached messages sequentially. This will ensure that your queue-length does not grow sufficiently to result in flow-control, while preventing your DB from being bombarded with write requests, which are typically more expensive than read requests. Your consumer(s) now simply remove messages from the queue, to be dealt with by another process later.

这篇关于RabbitMQ:快速生产者和慢消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!

上一篇:嵌入式 AMQP Java 代理 下一篇:Spring异步MessageListener用例发生业务异常时如何让

相关文章

最新文章