今天就来说下 这个项目中使用ActiveMQ的情况, MQ: message queue, 顾名思义就是消息队列的意思.
一: 使用场景:消息队列可以接收消息和 发送消息
消息队列类型:
队列:一对一聊天 私聊 QQ
主题(订阅模式):一对多聊天 群聊 QQ
名词解释:
二, 代码原型ActiveMQ需要部署到Linux系统下, 这里就不再做概述.这里也是tar包, 导入到linux下直接解压启动即可, 前面已经有过很多博文讲Linux下一些常用软件的安装步骤.上架代码原型:项目构件图:未使用ActiveMQ前ProductServiceImpl.cs:
1 //上架 2 public void isShow(Long[] ids){ 3 Product product = new Product(); 4 product.setIsShow(true); 5 for (final Long id : ids) { 6 //上下架状态 7 product.setId(id); 8 productDao.updateByPrimaryKeySelective(product); 9 10 //这个地方的代码应该在babasport-solr中写, 现在使用ActiveMQ进行迁移.11 //TODO 保存商品信息到Solr服务器12 SolrInputDocument doc = new SolrInputDocument();13 //ID14 doc.setField("id", id);15 //名称16 Product p = productDao.selectByPrimaryKey(id);17 doc.setField("name_ik", p.getName());18 //图片URL19 doc.setField("url", p.getImgUrls()[0]);20 //品牌 ID21 doc.setField("brandId", p.getBrandId());22 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 123 SkuQuery skuQuery = new SkuQuery();24 skuQuery.createCriteria().andProductIdEqualTo(id);25 skuQuery.setOrderByClause("price asc");26 skuQuery.setPageNo(1);27 skuQuery.setPageSize(1);28 Listskus = skuDao.selectByExample(skuQuery);29 doc.setField("price", skus.get(0).getPrice());30 //...时间等 剩下的省略31 32 try {33 solrServer.add(doc);34 solrServer.commit();35 } catch (Exception e) {36 // TODO Auto-generated catch block37 e.printStackTrace();38 }39 40 41 42 43 //TODO 静态化44 }45 }
上面的代码 除了更改本来就该更改的商品状态信息外, 还去见商品信息保存到了Solr服务器中了. 这里我们使用ActiveMQ进行改造:
使用ActiveMQ后的ProductServiceImpl.cs:1 //上架 2 public void isShow(Long[] ids){ 3 Product product = new Product(); 4 product.setIsShow(true); 5 for (final Long id : ids) { 6 //上下架状态 7 product.setId(id); 8 productDao.updateByPrimaryKeySelective(product); 9 10 //发送商品ID到ActiveMQ即可.11 jmsTemplate.send(new MessageCreator() {12 13 @Override14 public Message createMessage(Session session) throws JMSException {15 16 return session.createTextMessage(String.valueOf(id));17 }18 });19 20 //TODO 静态化21 }22 }
接着就是配置消息发送方(JMS生产者) mq.xml:
122 23 24 25 26 27 30 35 36 3731 32 33 34 38 39 43 44 4540 41 42 46 47 49 50 5148 52 53 5754 55 56
配置说明: 这里是首先构建一个MQ的连接工厂, 只要ActiveMQ启动后 就可以这样构建连接了. 配置登录的用户名和和密码.
接着就是配置连接池, 把连接工厂交给连接池去管理. 这些都是Apache厂商提供的. 接着就是再将连接池交由Spring管理. 最后我们再来配置一个jmsTemplate模板来操作ActiveMQ, 这个类似于jdbcTemplate模板. 而且我们这个里面注入了一个默认的管道, 也就是productId, 因为我们现在是 传递消息一一去对应, 关于怎么对应 就是依赖于这个管道.接下来我们就看下消息的接收方(JMS消费者)的一些东西:消费者的目录结构:(Solr)Solr项目中的ActiveMQ配置文件mq.xml:122 23 24 25 26 27 30 35 36 3731 32 33 34 38 39 43 44 4540 41 42 46 47 49 50 5148 52 53 57 58 59 60 61 6254 55 56 63 64 7065 66 67 68 69
我们来说下 和上面配置不同的地方, 我们在这里配置了一个监听器, 因为接收到 JMS 生产者发过来的消息后我们需要有个监听器去监听且 将监听到的消息拿过来处理.
接下来看看监听器的处理方法做了些什么事情: CustomMessageListener.java:1 /* 2 * 接收MQ中的消息 3 */ 4 public class CustomMessageListener implements MessageListener{ 5 @Autowired 6 private SearchService searchService; 7 8 @Override 9 public void onMessage(Message message) {10 //先将接收到的消息强转为ActiveMQ类型的消息11 //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage12 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;13 try {14 String id = amtm.getText();15 System.out.println("接收到的ID:"+id);16 searchService.insertProductToSolr(Long.parseLong(id));17 } catch (JMSException e) {18 // TODO Auto-generated catch block19 e.printStackTrace();20 }21 }
因为我们接收到的是string类型的文本, 所以这里我们直接将接收到的消息转换为ActiveMQText类型, 然后通过getText去得到传递过来的id, 然后我们就可以通过这个productId去做相应的操作了.
接下来就看保存商品信息到Solr服务器的逻辑:SearchServiceImpl.java:1 //保存商品信息到Solr服务器中, 通过ActiveMQ 2 public void insertProductToSolr(Long productId){ 3 //TODO 保存商品信息到Solr服务器 4 SolrInputDocument doc = new SolrInputDocument(); 5 //ID 6 doc.setField("id", productId); 7 //名称 8 Product p = productDao.selectByPrimaryKey(productId); 9 doc.setField("name_ik", p.getName());10 //图片URL11 doc.setField("url", p.getImgUrls()[0]);12 //品牌 ID13 doc.setField("brandId", p.getBrandId());14 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 115 SkuQuery skuQuery = new SkuQuery();16 skuQuery.createCriteria().andProductIdEqualTo(productId);17 skuQuery.setOrderByClause("price asc");18 skuQuery.setPageNo(1);19 skuQuery.setPageSize(1);20 Listskus = skuDao.selectByExample(skuQuery);21 doc.setField("price", skus.get(0).getPrice());22 //...时间等 剩下的省略23 24 try {25 solrServer.add(doc);26 solrServer.commit();27 } catch (Exception e) {28 // TODO Auto-generated catch block29 e.printStackTrace();30 }31 }
这样就比较明朗了, ActiveMQ 队列就是这样来实现的.
====================接下来还会有 ActiveMQ 订阅者模式的示例, 这里只是生产者发送消息给单个消费者, 下次还会更新生产者发送消息给多个消费者.2016/09/04 20:32 更新上面已经说了 消息的队列模式, 及点对点发送消息, 那么接下来就来说下 消息的一对多模式, 也就是 发布/订阅模式.项目原型: 当商品上架后(babasport-product), 发送消息id给solr(babasport-solr)来将商品信息保存到solr服务器和cms(babasport-cms)来对商品详情页面做页面静态化.===================babasport-product:结构图:babasport-product下的项目结构图:ProductServiceImpl.java中的上架:
1 @Autowired 2 private JmsTemplate jmsTemplate; 3 4 //上架 5 public void isShow(Long[] ids){ 6 Product product = new Product(); 7 product.setIsShow(true); 8 for (final Long id : ids) { 9 //上下架状态10 product.setId(id);11 productDao.updateByPrimaryKeySelective(product);12 13 //发送商品ID到ActiveMQ即可.14 jmsTemplate.send(new MessageCreator() {15 16 @Override17 public Message createMessage(Session session) throws JMSException {18 19 return session.createTextMessage(String.valueOf(id));20 }21 });22 }23 }
mq.xml:
122 23 24 25 26 27 30 35 36 3731 32 33 34 38 39 43 44 4540 41 42 46 47 49 50 5148 52 53 5954 55 56 57 58
这里面的最大的变化就是将消息发布模式改为了: publish subject.
============================================babasport-solr:mq.xml配置文件:122 23 24 25 26 27 30 35 36 3731 32 33 34 38 39 43 44 4540 41 42 46 47 49 50 5148 52 53 57 58 59 60 61 6254 55 56 63 64 7265 66 67 68 69 70 71
SearchServiceImpl.java: 保存商品信息到Solr服务器中, 通过ActiveMQ
1 //保存商品信息到Solr服务器中, 通过ActiveMQ 2 public void insertProductToSolr(Long productId){ 3 //TODO 保存商品信息到Solr服务器 4 SolrInputDocument doc = new SolrInputDocument(); 5 //ID 6 doc.setField("id", productId); 7 //名称 8 Product p = productDao.selectByPrimaryKey(productId); 9 doc.setField("name_ik", p.getName());10 //图片URL11 doc.setField("url", p.getImgUrls()[0]);12 //品牌 ID13 doc.setField("brandId", p.getBrandId());14 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 115 SkuQuery skuQuery = new SkuQuery();16 skuQuery.createCriteria().andProductIdEqualTo(productId);17 skuQuery.setOrderByClause("price asc");18 skuQuery.setPageNo(1);19 skuQuery.setPageSize(1);20 Listskus = skuDao.selectByExample(skuQuery);21 doc.setField("price", skus.get(0).getPrice());22 //...时间等 剩下的省略23 24 try {25 solrServer.add(doc);26 solrServer.commit();27 } catch (Exception e) {28 // TODO Auto-generated catch block29 e.printStackTrace();30 }31 }
CustomMessageListener.java: 监听ActiveMQ中传递过来的消息, 且对传递过来的消息进行处理:
1 public class CustomMessageListener implements MessageListener{ 2 @Autowired 3 private SearchService searchService; 4 5 @Override 6 public void onMessage(Message message) { 7 //先将接收到的消息强转为ActiveMQ类型的消息 8 //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage 9 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;10 try {11 String id = amtm.getText();12 System.out.println("接收到的ID:"+id);13 searchService.insertProductToSolr(Long.parseLong(id));14 } catch (JMSException e) {15 // TODO Auto-generated catch block16 e.printStackTrace();17 }18 }19 }
1 2 34 7 12 13 148 9 10 11 15 16 20 21 2217 18 19 23 24 26 27 2825 29 30 34 35 36 37 38 3931 32 33 40 41 42 43 44 45 46 47 48
CustomMessageListener.java: 监听ActiveMQ中传递过来的消息, 且对传递过来的消息进行处理:
1 public class CustomMessageListener implements MessageListener{ 2 @Autowired 3 private StaticPageService staticPageService; 4 @Autowired 5 private CMSService cmsService; 6 7 @Override 8 public void onMessage(Message message) { 9 //先将接收到的消息强转为ActiveMQ类型的消息10 //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage11 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;12 try {13 String id = amtm.getText();14 System.out.println("CMS接收到的ID:"+id);15 Maproot = new HashMap ();16 17 Product product = cmsService.selectProductById(Long.parseLong(id));18 List skus = cmsService.selectSkuListByProductIdWithStock(Long.parseLong(id));19 //去掉重复的颜色20 Set colors = new HashSet ();21 for (Sku sku : skus) {22 colors.add(sku.getColor());23 }24 root.put("colors", colors);25 root.put("product", product);26 root.put("skus", skus);27 28 staticPageService.index(root, id);29 } catch (JMSException e) {30 // TODO Auto-generated catch block31 e.printStackTrace();32 }33 }34 }
StaticPageServiceImpl.java: 静态化页面的核心类:
1 public class StaticPageServiceImpl implements StaticPageService, ServletContextAware{ 2 //SpringMvc 管理 conf 3 private Configuration conf; 4 public void setFreeMarkerConfig(FreeMarkerConfig freeMarkerConfig) { 5 this.conf = freeMarkerConfig.getConfiguration(); 6 } 7 8 //静态化页面的方法 9 public void index(Maproot, String id){10 //输出目录: 通过getPath方法获取的是绝对路径11 String path = getPath("/html/product/" + id +".html");12 File f = new File(path);13 File parentFile = f.getParentFile();14 if(!parentFile.exists()){15 parentFile.mkdirs();16 }17 18 //spring中已经设置了模板路径: 19 Writer out = null;20 21 try {22 //读23 Template template = conf.getTemplate("product.html");24 25 //设置输出的位置26 //写27 out = new OutputStreamWriter(new FileOutputStream(f), "UTF-8");28 template.process(root, out);29 } catch (Exception e) {30 // TODO Auto-generated catch block31 e.printStackTrace();32 }finally {33 if (out != null)34 {35 try {36 out.close();37 } catch (IOException e) {38 // TODO Auto-generated catch block39 e.printStackTrace();40 }41 }42 43 }44 45 }46 47 //获取webapp下的html文件夹所在的位置48 //将相对路径转换为绝对路径49 public String getPath(String path){50 return servletContext.getRealPath(path);51 }52 53 private ServletContext servletContext;54 @Override55 public void setServletContext(ServletContext servletContext) {56 this.servletContext = servletContext;57 }58 }
Spring管理 freemarkerConfig配置类:
1 23 4 115 6 107 8 9
更多关于freemarker的讲解请关注我以后的博客...
关于ActiveMQ的内容就更新到这么多.