博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ入门实例Demo
阅读量:6427 次
发布时间:2019-06-23

本文共 9587 字,大约阅读时间需要 31 分钟。

  前面我们已经搭建和配置好了ActiveMQ,下面来看一个Demo,体验一下MQ。

JMS 消息模型

  JMS消息服务应用程序结构支持两种模型:点对点模型,发布者/订阅者模型。  

  (1)点对点模型(Queue)

    一个生产者向一个特定的队列发布消息,一个消费者从这个队列中依次读取消息。

    模型特点:只有一个消费者获得消息。

  (2)发布者/订阅者模型(Topic)

    0个或多个订阅者可以接受特定主题的消息。

    模型特点:多个消费者可获得消息。

    Topic和Queue的最大区别在于Topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而Queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

JMS消息格式

  • MapMessage -- key-value键值对
  • TextMessage -- 字符串对象
  • ObjcetMessage -- 一个序列化的Java对象
  • ByteMessage -- 一个未解释字节的数据流
  • StreamMessage -- Java原始值的数据流

点对点模型Demo

public class Constants {    public static final String MQ_NAME = "parry";        public static final String MQ_PASSWORD = "parry123";        public static final String MQ_BROKETURL = "tcp://192.168.56.129:61616";}
import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import com.parry.demo.constant.Constants;/** *  * 

* MSProduct 点对点模型-消息生产者 *

*/public class MSProduct { public static void main(String[] args) { // 连接工厂 ConnectionFactory factory; // 连接实例 Connection connection = null; // 收发的线程实例 Session session; // 消息发送目标地址 Destination destination; // 消息创建者 MessageProducer messageProducer; try { factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL); // 获取连接实例 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息) session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建队列(返回一个消息目的地) destination = session.createQueue("parryQuene"); // 创建消息生产者 messageProducer = session.createProducer(destination); // 创建TextMessage消息实体 TextMessage message = session.createTextMessage("我是parry,这是我的第一个消息!"); messageProducer.send(message); session.commit(); } catch (JMSException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }}

import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;import com.parry.demo.constant.Constants;/** * 

* MQConsumer 点对点--消息消费者 *

*/public class MQConsumer { public static void main(String[] args) { // 连接工厂 ConnectionFactory connectionFactory; // 连接实例 Connection connection = null; // 收发的线程实例 Session session; // 消息发送目标地址 Destination destination; try { // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL); // 获取连接实例 connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建接收或发送的线程实例(消费者就不需要开启事务了) session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); // 创建队列(返回一个消息目的地) destination = session.createQueue("parryQuene"); // 创建消息消费者 MessageConsumer consumer = session.createConsumer(destination); //注册消息监听 consumer.setMessageListener(new MQListerner()); } catch (JMSException e) { e.printStackTrace(); } }}

import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * 

* MQListerner 生产者监听器 *

*/public class MQListerner implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println(((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } }}

 发布者/订阅者模型Demo

  (1)发布者

import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import com.parry.demo.constant.Constants;/** * 

* MQProducer 订阅消息的发送者 *

*/public class MQProducer { public static void main(String[] args) { // 连接工厂 ConnectionFactory factory; // 连接实例 Connection connection = null; // 收发的线程实例 Session session; // 消息发送目标地址 Destination destination; try { // 实例化连接工厂 factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL); // 获取连接实例 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息) session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建队列(返回一个消息目的地) destination = session.createTopic("parryTopic"); // 创建消息发布者 MessageProducer producer = session.createProducer(destination); // 创建TextMessage消息 TextMessage message = session.createTextMessage("你好,这是我发布的第一条消息!"); // 发布消息 producer.send(message); } catch (JMSException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }}

  (2)订阅者01

import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;import com.parry.demo.constant.Constants;/** * 

* MQCousumer01 订阅-发布模式 订阅者01 *

*/public class MQCousumer01 { public static void main(String[] args) { // 连接工厂 ConnectionFactory factory; // 连接实例 Connection connection = null; // 收发的线程实例 Session session; // 消息发送目标地址 Destination destination; try { // 实例化连接工厂 factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL); // 获取连接实例 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建接收或发送的线程实例 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建队列(返回一个消息目的地) destination = session.createTopic("parryTopic"); // 创建消息订阅者 MessageConsumer consumer = session.createConsumer(destination); // 消息发布者添加监听器 consumer.setMessageListener(new Listerner01()); } catch (JMSException e) { e.printStackTrace(); } }}

import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * 

* Listerner01 订阅者01的监听器 *

*/public class Listerner01 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("订阅者01接收到消息:" + ((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } }}

  (3)订阅者02

import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;import com.parry.demo.constant.Constants;/** * 

* MQCousumer02 订阅-发布模式 订阅者02 *

*/public class MQCousumer02 { public static void main(String[] args) { // 连接工厂 ConnectionFactory factory; // 连接实例 Connection connection = null; // 收发的线程实例 Session session; // 消息发送目标地址 Destination destination; try { // 实例化连接工厂 factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL); // 获取连接实例 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建接收或发送的线程实例 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建队列(返回一个消息目的地) destination = session.createTopic("parryTopic"); // 创建消息订阅者 MessageConsumer consumer = session.createConsumer(destination); // 消息发布者添加监听器 consumer.setMessageListener(new Listerner02()); } catch (JMSException e) { e.printStackTrace(); } }}

import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * 

* Listerner02 订阅者02的监听器 *

*/public class Listerner02 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("订阅者02接收到消息:"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } }}

 

转载地址:http://tqyga.baihongyu.com/

你可能感兴趣的文章
RSD和wlwmanifest是什么
查看>>
Linkedin工程师是如何优化他们的Java代码的(转)
查看>>
winfrom 如何保存datagridview中的某一行数据
查看>>
面向领域驱动的应用开发框架Apworks 2.0发布
查看>>
开发自己的Web服务处理程序(以支持Ajax框架异步调用Web服务方法)
查看>>
ref和out
查看>>
黑客教父详解账号泄露全过程:1亿用户已泄露
查看>>
程序员必须软件
查看>>
Canvas里的globalCompositeOperation
查看>>
解决Unable to locate theme engine in module_path: "pixmap"
查看>>
贝叶斯文本分类c#版
查看>>
Centos安装KDE或GNOME
查看>>
Eclipse & IDEA 中常用的快捷键
查看>>
javascript ---IPhone滑动解锁
查看>>
table固定行和表头
查看>>
<每天读一点职场心理学>读书笔记
查看>>
Android权限大全代码
查看>>
android 判断SIM卡是哪个运营商
查看>>
删除N天前的M(天)个目录 、删除N天前最后修改的文件 ForFiles, dos command 批处理命令cmd/bat...
查看>>
十进制数1~n中1出现的次数
查看>>