[置顶] Apache ActiveMQ实战(1)-基本安装配置与消息类型
activemq,mq,activemq spring,消息应答,消息事务2016-07-06
ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。ActiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改。
ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。ActiveMQ实现了JMS标准并提供了很多附加的特性。这些附加的特性包括,JMX管理(java Management Extensions,即java管理扩展),主从管理(master/salve,这是集群模式的一种,主要体现在可靠性方面,当主中介(代理)出现故障,那么从代理会替代主代理的位置,不至于使消息系统瘫痪)、消息组通信(同一组的消息,仅会提交给一个客户进行处理)、有序消息管理(确保消息能够按照发送的次序被接受者接收)。
ActiveMQ 支持JMS规范,ActiveMQ完全实现了JMS1.1规范。
JMS规范提供了同步消息和异步消息投递方式、有且仅有一次投递语义(指消息的接收者对一条消息必须接收到一次,并且仅有一次)、订阅消息持久接收等。如果仅使用JMS规范,表明无论您使用的是哪家厂商的消息代理,都不会影响到您的程序。
ActiveMQ主要涉及到5个方面:
<destinationPolicy>
<policyMap>
<policyEntries>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="50000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="false" optimizedDispatch="true" memoryLimit=“2mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
找到下面这一段
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="90" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
# Set jvm memory configuration (minimal/maximum amount of memory) ACTIVEMQ_OPTS_MEMORY="-Xms2048M -Xmx2048M"
$cd $ACTIVEMQ_HOME $ ./activemq console
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.0.101:61616" /> <property name="useAsyncSend" value="true" /> <property name="alwaysSessionAsync" value="true" /> <property name="useDedicatedTaskRunner" value="true" /> </bean> <!-- 发送消息的目的地(一个队列) --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg value="ymk.queue?consumer.prefetchSize=100" /> </bean> </beans>
。。。。。。
<properties>
<activemq_version>5.13.3</activemq_version>
</properties>
。。。。。。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq_version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq_version}</version>
</dependency>
package webpoc;
public class AMQSender {
public static void sendWithAuto(ApplicationContext context) {
ActiveMQConnectionFactory factory = null;
Connection conn = null;
Destination destination = null;
Session session = null;
MessageProducer producer = null;
try {
destination = (Destination) context.getBean("destination");
factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");
conn = factory.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
Message message = session.createTextMessage("...Hello JMS!");
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
producer.close();
producer = null;
} catch (Exception e) {
}
try {
session.close();
session = null;
}
} catch (Exception e) {
}
try {
conn.stop();
} catch (Exception e) {
}
try {
conn.close();
} catch (Exception e) {
}
}
}
public static void main(String[] args) {
final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq.xml");
sendWithAuto(context);
}
}
package webpoc;
public class TranQConsumer extends Thread implements MessageListener {
private Connection conn = null;
private Destination destination = null;
private Session session = null;
public void run() {
receive();
}
public void receive() {
ConnectionFactory factory = null;
Connection conn = null;
try {
final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq.xml");
factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");
conn = factory.createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = (Destination) context.getBean("destination");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
} catch (Exception e) {
e.printStackTrace();
}
}
public void onMessage(Message message) {
try {
TextMessage tm = (TextMessage) message;
System.out.println("TranQConsumer receive message: " + tm.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
TranQConsumer tranConsumer = new TranQConsumer();
tranConsumer.start();
}
}
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
producer.send(message);
System.out.println("send......" + Thread.currentThread().getId());
session.commit();
catch (Exception e) {
e.printStackTrace();
try {
session.rollback();
} catch (Exception ex) {
}
}
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
try {
TextMessage tm = (TextMessage) message;
System.out.println("TranQConsumer receive message: " + tm.getText());
session.commit();
} catch (Exception e) {
e.printStackTrace();
try {
session.rollback();
} catch (Exception ex) {
}
}
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.0.101:61616" /> <property name="useAsyncSend" value="true" /> <property name="alwaysSessionAsync" value="true" /> <property name="useDedicatedTaskRunner" value="true" /> <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" /> </bean> <!-- 发送消息的目的地(一个队列) --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg value="ymk.queue?consumer.prefetchSize=100" /> </bean> <amq:redeliveryPolicy id="activeMQRedeliveryPolicy" destination="#destination" redeliveryDelay="100"maximumRedeliveries="1" />
<!-- 发送消息的目的地(一个队列) --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg value="ymk.queue?consumer.prefetchSize=100" /> </bean> <bean id="replyDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg value="ymk.reply.queue" /> </bean>
String correlationId = RandomStringUtils.randomNumeric(5); consumer = session.createConsumer(replyDest); message.setJMSReplyTo(replyDest); message.setJMSCorrelationID(correlationId); consumer.setMessageListener(this);
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Client接收Server端消息:" + tm.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
producer.send(message);
package webpoc.mq.dual;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang.RandomStringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Client implements MessageListener {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Client接收Server端消息:" + tm.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
public void start(ApplicationContext context) {
ConnectionFactory factory = null;
Connection conn = null;
Destination destination = null;
Destination replyDest = null;
Session session = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
destination = (Destination) context.getBean("destination");
replyDest = (Destination) context.getBean("replyDestination");
factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory");
conn = factory.createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("...Hello JMS!");
String correlationId = RandomStringUtils.randomNumeric(5);
consumer = session.createConsumer(replyDest);
message.setJMSReplyTo(replyDest);
message.setJMSCorrelationID(correlationId);
consumer.setMessageListener(this);
} catch (Exception e) {
String errorMessage = "JMSException while queueing HTTP JMS Message";
e.printStackTrace();
}
}
public void send(ApplicationContext context) {
ConnectionFactory factory = null;
Connection conn = null;
Destination destination = null;
Destination replyDest = null;
Session session = null;
MessageProducer producer = null;
try {
destination = (Destination) context.getBean("destination");
replyDest = (Destination) context.getBean("replyDestination");
factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory");
conn = factory.createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("...Hello JMS!");
String correlationId = RandomStringUtils.randomNumeric(5);
message.setJMSReplyTo(replyDest);
message.setJMSCorrelationID(correlationId);
producer.send(message);
System.out.println("send 1 message");
} catch (Exception e) {
String errorMessage = "JMSException while queueing HTTP JMS Message";
e.printStackTrace();
}
}
public static void main(String[] args) {
final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq_dual.xml");
//sendWithAuto(context);
Client c = new Client();
c.start(context);
c.send(context);
}
public void onMessage(Message message) {
System.out.println("on message");
try {
TextMessage response = this.session.createTextMessage();
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String messageText = txtMsg.getText();
response.setText("服务器收到消息:" + messageText);
System.out.println(response.getText());
}
response.setJMSCorrelationID(message.getJMSCorrelationID());
producer.send(message.getJMSReplyTo(), response);
} catch (Exception e) {
e.printStackTrace();
}
}
}
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(replyDest); consumer = session.createConsumer(destination); consumer.setMessageListener(this);
package webpoc.mq.dual;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang.RandomStringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Server implements MessageListener {
private ConnectionFactory factory = null;
private Connection conn = null;
private Destination destination = null;
Destination replyDest = null;
private Session session = null;
private MessageProducer producer = null;
private MessageConsumer consumer = null;
@Override
public void onMessage(Message message) {
System.out.println("on message");
try {
// 若有消息传送到服务时,先创建一个文本消息
TextMessage response = this.session.createTextMessage();
// 若从客户端传送到服务端的消息为文本消息
if (message instanceof TextMessage) {
// 先将传送到服务端的消息转化为文本消息
TextMessage txtMsg = (TextMessage) message;
// 取得文本消息的内容
String messageText = txtMsg.getText();
// 将客户端传送过来的文本消息进行处理后,设置到回应消息里面
response.setText("服务器收到消息:" + messageText);
System.out.println(response.getText());
}
// 设置回应消息的关联ID,关联ID来自于客户端传送过来的关联ID
response.setJMSCorrelationID(message.getJMSCorrelationID());
System.out.println("replyto===" + message.getJMSReplyTo());
// 生产者发送回应消息,目的由客户端的JMSReplyTo定义,内容即刚刚定义的回应消息
producer.send(message.getJMSReplyTo(), response);
} catch (Exception e) {
e.printStackTrace();
}
}
public void receive(ApplicationContext context) {
try {
destination = (Destination) context.getBean("destination");
replyDest = (Destination) context.getBean("replyDestination");
factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory");
conn = factory.createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(replyDest);
consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
} catch (Exception e) {
String errorMessage = "JMSException while queueing HTTP JMS Message";
e.printStackTrace();
}
}
public static void main(String[] args) {
final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq_dual.xml");
Server s = new Server();
s.receive(context);
}
}