博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ的介绍及使用实例.
阅读量:7010 次
发布时间:2019-06-28

本文共 16554 字,大约阅读时间需要 55 分钟。

今天就来说下 这个项目中使用ActiveMQ的情况, MQ: message queue, 顾名思义就是消息队列的意思. 

一: 使用场景: 

消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有这深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。在不使用消息队列的情况下,用户的请求数据直接写入数据库,在高并发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧。在使用队列后,用户的请求发给队列后立即返回(当然不能直接给用户提示订单提交成功,京东上提示:您“您提交了订单,请等待系统确认”),再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列的服务处理速度远快于数据库,因此用户的响应延迟可得到有效改善。
那么在babasport这个项目中, 我们可以在上架的时候使用消息队列的模式:
我们之前在点击一款商品上架的时候, 我们需要分成2步, 第一: 更新商品表中该商品的上架状态. 第二: 将该商品信息保存到Solr服务器中.  那么如果我们使用了消息队列后, 第二步就可以使用发送message来异步完成.

消息队列可以接收消息和 发送消息

消息队列类型:

队列:一对一聊天  私聊  QQ

主题(订阅模式):一对多聊天  群聊  QQ

名词解释: 

 

 二, 代码原型

ActiveMQ需要部署到Linux系统下, 这里就不再做概述.
这里也是tar包, 导入到linux下直接解压启动即可, 前面已经有过很多博文讲Linux下一些常用软件的安装步骤.
上架代码原型:
项目构件图:
未使用ActiveMQ前ProductServiceImpl.cs:

1 //上架 2     public void isShow(Long[] ids){ 3         Product product = new Product(); 4         product.setIsShow(true); 5         for (final Long id : ids) { 6             //上下架状态 7             product.setId(id); 8             productDao.updateByPrimaryKeySelective(product); 9             10             //这个地方的代码应该在babasport-solr中写, 现在使用ActiveMQ进行迁移.11             //TODO 保存商品信息到Solr服务器12             SolrInputDocument doc = new SolrInputDocument();13             //ID14             doc.setField("id", id);15             //名称16             Product p = productDao.selectByPrimaryKey(id);17             doc.setField("name_ik", p.getName());18             //图片URL19             doc.setField("url", p.getImgUrls()[0]);20             //品牌 ID21             doc.setField("brandId", p.getBrandId());22             //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 123             SkuQuery skuQuery = new SkuQuery();24             skuQuery.createCriteria().andProductIdEqualTo(id);25             skuQuery.setOrderByClause("price asc");26             skuQuery.setPageNo(1);27             skuQuery.setPageSize(1);28             List
skus = skuDao.selectByExample(skuQuery);29 doc.setField("price", skus.get(0).getPrice());30 //...时间等 剩下的省略31 32 try {33 solrServer.add(doc);34 solrServer.commit();35 } catch (Exception e) {36 // TODO Auto-generated catch block37 e.printStackTrace();38 }39 40 41 42 43 //TODO 静态化44 }45 }

上面的代码 除了更改本来就该更改的商品状态信息外, 还去见商品信息保存到了Solr服务器中了. 这里我们使用ActiveMQ进行改造: 

使用ActiveMQ后的ProductServiceImpl.cs:

1 //上架 2     public void isShow(Long[] ids){ 3         Product product = new Product(); 4         product.setIsShow(true); 5         for (final Long id : ids) { 6             //上下架状态 7             product.setId(id); 8             productDao.updateByPrimaryKeySelective(product); 9             10             //发送商品ID到ActiveMQ即可.11             jmsTemplate.send(new MessageCreator() {12                 13                 @Override14                 public Message createMessage(Session session) throws JMSException {15                     16                     return session.createTextMessage(String.valueOf(id));17                 }18             });19             20             //TODO 静态化21         }22     }

接着就是配置消息发送方(JMS生产者) mq.xml:

1 
22 23 24
25
26
27
30
31
32
33
34
35 36
37
38
39
40
41
42
43 44
45
46
47
48
49 50
51
52
53
54
55
56
57

配置说明: 这里是首先构建一个MQ的连接工厂, 只要ActiveMQ启动后 就可以这样构建连接了. 配置登录的用户名和和密码.

接着就是配置连接池, 把连接工厂交给连接池去管理. 这些都是Apache厂商提供的. 
接着就是再将连接池交由Spring管理. 
最后我们再来配置一个jmsTemplate模板来操作ActiveMQ, 这个类似于jdbcTemplate模板. 而且我们这个里面注入了一个默认的管道, 也就是productId, 因为我们现在是 传递消息一一去对应, 关于怎么对应  就是依赖于这个管道.
接下来我们就看下消息的接收方(JMS消费者)的一些东西:
消费者的目录结构:(Solr)
Solr项目中的ActiveMQ配置文件mq.xml:

1 
22 23 24
25
26
27
30
31
32
33
34
35 36
37
38
39
40
41
42
43 44
45
46
47
48
49 50
51
52
53
54
55
56
57 58
59
60 61
62
63
64
65
66
67
68
69
70

我们来说下 和上面配置不同的地方, 我们在这里配置了一个监听器, 因为接收到 JMS 生产者发过来的消息后我们需要有个监听器去监听且 将监听到的消息拿过来处理.

接下来看看监听器的处理方法做了些什么事情: 
CustomMessageListener.java:

1 /* 2  * 接收MQ中的消息 3  */ 4 public class CustomMessageListener implements MessageListener{ 5     @Autowired 6     private SearchService searchService; 7      8     @Override 9     public void onMessage(Message message) {10         //先将接收到的消息强转为ActiveMQ类型的消息11         //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage12         ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;13         try {14             String id = amtm.getText();15             System.out.println("接收到的ID:"+id);16             searchService.insertProductToSolr(Long.parseLong(id));17         } catch (JMSException e) {18             // TODO Auto-generated catch block19             e.printStackTrace();20         }21     }

因为我们接收到的是string类型的文本, 所以这里我们直接将接收到的消息转换为ActiveMQText类型, 然后通过getText去得到传递过来的id, 然后我们就可以通过这个productId去做相应的操作了. 

接下来就看保存商品信息到Solr服务器的逻辑:
SearchServiceImpl.java:

1 //保存商品信息到Solr服务器中, 通过ActiveMQ 2     public void insertProductToSolr(Long productId){ 3         //TODO 保存商品信息到Solr服务器 4         SolrInputDocument doc = new SolrInputDocument(); 5         //ID 6         doc.setField("id", productId); 7         //名称 8         Product p = productDao.selectByPrimaryKey(productId); 9         doc.setField("name_ik", p.getName());10         //图片URL11         doc.setField("url", p.getImgUrls()[0]);12         //品牌 ID13         doc.setField("brandId", p.getBrandId());14         //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 115         SkuQuery skuQuery = new SkuQuery();16         skuQuery.createCriteria().andProductIdEqualTo(productId);17         skuQuery.setOrderByClause("price asc");18         skuQuery.setPageNo(1);19         skuQuery.setPageSize(1);20         List
skus = skuDao.selectByExample(skuQuery);21 doc.setField("price", skus.get(0).getPrice());22 //...时间等 剩下的省略23 24 try {25 solrServer.add(doc);26 solrServer.commit();27 } catch (Exception e) {28 // TODO Auto-generated catch block29 e.printStackTrace();30 }31 }

这样就比较明朗了, ActiveMQ 队列就是这样来实现的. 

====================接下来还会有 ActiveMQ 订阅者模式的示例, 这里只是生产者发送消息给单个消费者, 下次还会更新生产者发送消息给多个消费者.

 2016/09/04 20:32 更新

上面已经说了 消息的队列模式, 及点对点发送消息, 那么接下来就来说下 消息的一对多模式, 也就是 发布/订阅模式.
项目原型: 当商品上架后(babasport-product), 发送消息id给solr(babasport-solr)来将商品信息保存到solr服务器和cms(babasport-cms)来对商品详情页面做页面静态化.
===================
babasport-product:
结构图:
babasport-product下的项目结构图:
ProductServiceImpl.java中的上架:

1 @Autowired 2     private JmsTemplate jmsTemplate; 3      4     //上架 5     public void isShow(Long[] ids){ 6         Product product = new Product(); 7         product.setIsShow(true); 8         for (final Long id : ids) { 9             //上下架状态10             product.setId(id);11             productDao.updateByPrimaryKeySelective(product);12             13             //发送商品ID到ActiveMQ即可.14             jmsTemplate.send(new MessageCreator() {15                 16                 @Override17                 public Message createMessage(Session session) throws JMSException {18                     19                     return session.createTextMessage(String.valueOf(id));20                 }21             });22         }23     }
View Code

mq.xml:

1 
22 23 24
25
26
27
30
31
32
33
34
35 36
37
38
39
40
41
42
43 44
45
46
47
48
49 50
51
52
53
54
55
56
57
58
59
View Code

这里面的最大的变化就是将消息发布模式改为了: publish subject.

============================================
babasport-solr:
mq.xml配置文件:

1 
22 23 24
25
26
27
30
31
32
33
34
35 36
37
38
39
40
41
42
43 44
45
46
47
48
49 50
51
52
53
54
55
56
57 58
59
60 61
62
63
64
65
66
67
68
69
70
71
72
View Code

SearchServiceImpl.java: 保存商品信息到Solr服务器中, 通过ActiveMQ

1 //保存商品信息到Solr服务器中, 通过ActiveMQ 2     public void insertProductToSolr(Long productId){ 3         //TODO 保存商品信息到Solr服务器 4         SolrInputDocument doc = new SolrInputDocument(); 5         //ID 6         doc.setField("id", productId); 7         //名称 8         Product p = productDao.selectByPrimaryKey(productId); 9         doc.setField("name_ik", p.getName());10         //图片URL11         doc.setField("url", p.getImgUrls()[0]);12         //品牌 ID13         doc.setField("brandId", p.getBrandId());14         //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 115         SkuQuery skuQuery = new SkuQuery();16         skuQuery.createCriteria().andProductIdEqualTo(productId);17         skuQuery.setOrderByClause("price asc");18         skuQuery.setPageNo(1);19         skuQuery.setPageSize(1);20         List
skus = skuDao.selectByExample(skuQuery);21 doc.setField("price", skus.get(0).getPrice());22 //...时间等 剩下的省略23 24 try {25 solrServer.add(doc);26 solrServer.commit();27 } catch (Exception e) {28 // TODO Auto-generated catch block29 e.printStackTrace();30 }31 }
View Code

CustomMessageListener.java: 监听ActiveMQ中传递过来的消息, 且对传递过来的消息进行处理:

1 public class CustomMessageListener implements MessageListener{ 2     @Autowired 3     private SearchService searchService; 4      5     @Override 6     public void onMessage(Message message) { 7         //先将接收到的消息强转为ActiveMQ类型的消息 8         //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage 9         ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;10         try {11             String id = amtm.getText();12             System.out.println("接收到的ID:"+id);13             searchService.insertProductToSolr(Long.parseLong(id));14         } catch (JMSException e) {15             // TODO Auto-generated catch block16             e.printStackTrace();17         }18     }19 }
View Code

===============================
babasport-cms:
mq.xml:

1 
2
3
4
7
8
9
10
11
12 13
14
15
16
17
18
19
20 21
22
23
24
25
26 27
28
29
30
31
32
33
34 35
36
37 38
39
40
41
42
43
44
45
46
47
48
View Code

CustomMessageListener.java: 监听ActiveMQ中传递过来的消息, 且对传递过来的消息进行处理:

1 public class CustomMessageListener implements MessageListener{ 2     @Autowired 3     private StaticPageService staticPageService; 4     @Autowired 5     private CMSService cmsService; 6      7     @Override 8     public void onMessage(Message message) { 9         //先将接收到的消息强转为ActiveMQ类型的消息10         //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage11         ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;12         try {13             String id = amtm.getText();14             System.out.println("CMS接收到的ID:"+id);15             Map
root = new HashMap
();16 17 Product product = cmsService.selectProductById(Long.parseLong(id));18 List
skus = cmsService.selectSkuListByProductIdWithStock(Long.parseLong(id));19 //去掉重复的颜色20 Set
colors = new HashSet
();21 for (Sku sku : skus) {22 colors.add(sku.getColor());23 }24 root.put("colors", colors);25 root.put("product", product);26 root.put("skus", skus);27 28 staticPageService.index(root, id);29 } catch (JMSException e) {30 // TODO Auto-generated catch block31 e.printStackTrace();32 }33 }34 }
View Code

StaticPageServiceImpl.java: 静态化页面的核心类:

1 public class StaticPageServiceImpl implements StaticPageService, ServletContextAware{ 2     //SpringMvc 管理 conf 3     private Configuration conf; 4     public void setFreeMarkerConfig(FreeMarkerConfig freeMarkerConfig) { 5         this.conf = freeMarkerConfig.getConfiguration(); 6     } 7  8     //静态化页面的方法 9     public void index(Map
root, String id){10 //输出目录: 通过getPath方法获取的是绝对路径11 String path = getPath("/html/product/" + id +".html");12 File f = new File(path);13 File parentFile = f.getParentFile();14 if(!parentFile.exists()){15 parentFile.mkdirs();16 }17 18 //spring中已经设置了模板路径:
19 Writer out = null;20 21 try {22 //读23 Template template = conf.getTemplate("product.html");24 25 //设置输出的位置26 //写27 out = new OutputStreamWriter(new FileOutputStream(f), "UTF-8");28 template.process(root, out);29 } catch (Exception e) {30 // TODO Auto-generated catch block31 e.printStackTrace();32 }finally {33 if (out != null)34 {35 try {36 out.close();37 } catch (IOException e) {38 // TODO Auto-generated catch block39 e.printStackTrace();40 }41 }42 43 }44 45 }46 47 //获取webapp下的html文件夹所在的位置48 //将相对路径转换为绝对路径49 public String getPath(String path){50 return servletContext.getRealPath(path);51 }52 53 private ServletContext servletContext;54 @Override55 public void setServletContext(ServletContext servletContext) {56 this.servletContext = servletContext;57 }58 }
View Code

Spring管理 freemarkerConfig配置类:

1 
2
3
4
5
6
7
8
9
10
11
View Code

更多关于freemarker的讲解请关注我以后的博客...

关于ActiveMQ的内容就更新到这么多.

转载地址:http://jqttl.baihongyu.com/

你可能感兴趣的文章
tensorflow 在加载大型的embedding模型参数时,会遇到cannot be larger than 2GB
查看>>
Flutter的教程:ListView
查看>>
xxl-job安装教程
查看>>
SpringBoot(十八)@value、@Import、@ImportResource、@PropertySource
查看>>
[dubbo] Dubbo API 笔记——配置参考
查看>>
The last access date is not changed even after reading the file on Windows 7
查看>>
SQL Server 字符串处理函数
查看>>
恢复系统管理员密码的五大奇招
查看>>
英语形容“漂亮女孩”知多少
查看>>
GridView 获取当前行的索引值
查看>>
PHPCMS V9二次开发:内容模块PC标签调用详解
查看>>
Virtual-Key Codes
查看>>
Azure China (3) 使用Visual Studio 2013证书发布Cloud Service至Azure China
查看>>
xmapp 404设置
查看>>
SQL Server中, DateTime (日期)型操作的 SQL语法
查看>>
iisweb服务器完美解决方案
查看>>
请教SQL Server登录失败的问题
查看>>
对象和流
查看>>
简单的兼容的login页面
查看>>
jquery之超简单的div显示和隐藏特效demo
查看>>