我在我的项目中使用 RabbitMQ.
I am using a RabbitMQ in my project.
我的消费者中有rabbitMQ客户端部分的代码,连接需要一个tls1.1来连接真正的MQ.
I have in my consumer the code of the client part of rabbitMQ and the connection need a tls1.1 to connect with the real MQ.
我想在我的 JUnit 测试中测试此代码并模拟向我的消费者传递的消息.
I want to test this code in my JUnit test and to mock the message delivery to my consumer.
我在 google 中看到了几个使用不同工具的示例,骆驼兔或 activeMQ 是如何使用的,但此工具适用于 amqp 1.0,而 rabbitMQ 仅适用于 amqp 0.9.
I see in google several examples with different tools how camel rabbit or activeMQ but this tools works with amqp 1.0 and rabbitMQ only works in amqp 0.9 .
有人遇到过这个问题吗?
Someone had this problem?
谢谢!
更新
这是测试从队列中接收 json 的代码.
This is the code to testing to receive a json from the queue.
package com.foo.foo.queue;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.security.*;
import java.security.cert.CertificateException;
import javax.net.ssl.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.json.JSONObject;
import com.foo.foo.Constants.Constants;
import com.foo.foo.core.ConfigurationContainer;
import com.foo.foo.policyfinders.PolicyFinder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class BrokerThreadHLConsumer extends Thread {
private static BrokerThreadHLConsumer instance;
private static final Logger log = LogManager.getLogger(BrokerThreadHLConsumer.class);
private Channel channel;
private String queueName;
private PolicyFinder PolicyFinder;
private Connection connection;
private QueueingConsumer consumer;
private boolean loop;
private BrokerThreadHLConsumer() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
char[] keyPassphrase = "clientrabbit".toCharArray();
KeyStore keyStoreCacerts;
ConfigurationContainer configurationContainer = ConfigurationContainer.getInstance();
String exchangeName = configurationContainer.getProperty(Constants.EXCHANGE_NAME);
String rabbitHost = configurationContainer.getProperty(Constants.RABBITMQ_SERVER_HOST_VALUE);
try {
/* Public key cacerts to connect to message queue*/
keyStoreCacerts = KeyStore.getInstance("PKCS12");
URL resourcePublicKey = this.getClass().getClassLoader().getResource("certs/client.keycert.p12");
File filePublicKey = new File(resourcePublicKey.toURI());
keyStoreCacerts.load(new FileInputStream(filePublicKey), keyPassphrase);
KeyManagerFactory keyManager;
keyManager = KeyManagerFactory.getInstance("SunX509");
keyManager.init(keyStoreCacerts, keyPassphrase);
char[] trustPassphrase = "changeit".toCharArray();
KeyStore tks;
tks = KeyStore.getInstance("JCEKS");
URL resourceCacerts = this.getClass().getClassLoader().getResource("certs/cacerts");
File fileCacerts = new File(resourceCacerts.toURI());
tks.load(new FileInputStream(fileCacerts), trustPassphrase);
TrustManagerFactory tmf;
tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(tks);
SSLContext c = SSLContext.getInstance("TLSv1.1");
c.init(keyManager.getKeyManagers(), tmf.getTrustManagers(), null);
factory.setUri(rabbitHost);
factory.useSslProtocol(c);
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "fanout");
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (UnrecoverableKeyException e) {
e.printStackTrace();
} catch (KeyManagementException e1) {
e1.printStackTrace();
} catch (Exception e) {
log.error("Couldn't instantiate a channel with the broker installed in " + rabbitHost);
log.error(e.getStackTrace());
e.printStackTrace();
}
}
public static BrokerThreadHLConsumer getInstance() throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
if (instance == null)
instance = new BrokerThreadHLConsumer();
return instance;
}
public void run() {
if (PolicyFinder != null) {
try {
consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
log.info("Consumer broker started and waiting for messages");
loop = true;
while (loop) {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
JSONObject obj = new JSONObject(message);
log.info("Message received from broker " + obj);
if (StringUtils.isNotEmpty(message) && !PolicyFinder.managePolicySet(obj)) {
log.error("PolicySet error: error upgrading the policySet");
}
} catch (Exception e) {
log.error("Receiving message error");
log.error(e);
}
}
} catch (IOException e) {
log.error("Consumer couldn't start");
log.error(e.getStackTrace());
}
} else {
log.error("Consumer couldn't start cause of PolicyFinder is null");
}
}
public void close() {
loop = false;
try {
consumer.getChannel().basicCancel(consumer.getConsumerTag());
} catch (IOException e) {
e.printStackTrace();
}
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void setLuxPolicyFinder(PolicyFinder PolicyFinder) {
this.PolicyFinder = PolicyFinder;
}
}
据我了解,问题中有两件事要测试:
As I understand it, there are two things trying to be tested in the question:
对于第一个,由于 TLS 本身正在测试,只有连接到配置正确信任库的 RabbitMQ 真实实例才能证明配置正常
For the first one, as TLS itself is being tested, only connecting to a real instance of RabbitMQ with correct truststore configured will prove that configuration is working
然而,对于第二个,对于演示应用程序功能的测试(使用 Cucumber 等工具提高可读性),您可以尝试我正在开发的库:rabbitmq-mock(这就是我挖掘旧帖子的原因)
For the second one however, for tests demonstrating features of the app (with tools like Cucumber for readability), you may try a library i'm working on: rabbitmq-mock (and that's why I'm digging up an old post)
只需将其作为依赖项包含:
Just include it as dependency:
<dependency>
<groupId>com.github.fridujo</groupId>
<artifactId>rabbitmq-mock</artifactId>
<version>1.0.14</version>
<scope>test</scope>
</dependency>
并在您的单元测试中将 new ConnectionFactory()
替换为 new MockConnectionFactory()
.
And replace new ConnectionFactory()
by new MockConnectionFactory()
in your unit test.
项目中提供了示例:https://github.com/fridujo/rabbitmq-mock/blob/master/src/test/java/com/github/fridujo/rabbitmq/mock/IntegrationTest.java
这篇关于我需要在我的单元测试中模拟一个 RabbitMQ的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!