java远程连接调用Rabbitmq的实例代码

发布时间 - 2026-01-11 02:22:34    点击率:

本文介绍了java远程连接调用Rabbitmq,分享给大家,希望此文章对各位有所帮助。

打开IDEA创建一个maven工程(Java就可以了)。

 

pom.xml文件如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.zhenqi</groupId>
 <artifactId>rabbitmq-study</artifactId>
 <version>1.0-SNAPSHOT</version>
 <packaging>jar</packaging>

 <name>rabbitmq-study</name>
 <url>http://maven.apache.org</url>

 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>

 <dependencies>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>

  <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  <dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>4.1.0</version>
   <exclusions>
    <exclusion>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-api</artifactId>
    </exclusion>
   </exclusions>
  </dependency>

  <dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.21</version>
  </dependency>

  <dependency>
   <groupId>commons-lang</groupId>
   <artifactId>commons-lang</artifactId>
   <version>2.6</version>
  </dependency>

 </dependencies>
</project>

为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。

[
  {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
 ]

添加administrator角色

rabbitmqctl set_user_tags openstack administrator

创建抽象队列 EndPoint.java

package com.zhenqi;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * Created by wuming on 2017/7/16.
 */
public abstract class EndPoint {

  protected Channel channel;
  protected Connection connection;
  protected String endPointName;

  public EndPoint(String endpointName) throws Exception {

    this.endPointName = endpointName;

    //创建一个连接工厂 connection factory
    ConnectionFactory factory = new ConnectionFactory();

    //设置rabbitmq-server服务IP地址
    factory.setHost("192.168.146.128");
    factory.setUsername("openstack");
    factory.setPassword("rabbitmq");
    factory.setPort(5672);
    factory.setVirtualHost("/");

    //得到 连接
    connection = factory.newConnection();

    //创建 channel实例
    channel = connection.createChannel();

    channel.queueDeclare(endpointName, false, false, false, null);
  }

  /**
   * 关闭channel和connection。并非必须,因为隐含是自动调用的。
   * @throws IOException
   */
  public void close() throws Exception{
    this.channel.close();
    this.connection.close();
  }
}

生产者Producer.java

生产者类的任务是向队列里写一条消息

package com.zhenqi;

import org.apache.commons.lang.SerializationUtils;

import java.io.Serializable;

/**
 * Created by wuming on 2017/7/16.
 */
public class Producer extends EndPoint {

  public Producer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void sendMessage(Serializable object) throws Exception {
    channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
  }
}

消费者QueueConsumer.java

消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。

package com.zhenqi;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by wuming on 2017/7/16.
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer {

  private Logger LOG=Logger.getLogger(QueueConsumer.class);

  public QueueConsumer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void handleConsumeOk(String s) {

  }

  public void handleCancelOk(String s) {

  }

  public void handleCancel(String s) throws IOException {

  }

  public void handleShutdownSignal(String s, ShutdownSignalException e) {

  }

  public void handleRecoverOk(String s) {
    LOG.info("Consumer "+s +" registered");
  }

  public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
    Map map = (HashMap) SerializationUtils.deserialize(bytes);
    LOG.info("Message Number "+ map.get("message number") + " received.");
  }

  public void run() {
    try{
      channel.basicConsume(endPointName, true,this);
    }catch(IOException e){
      e.printStackTrace();
    }
  }
}

 测试

运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走

package com.zhenqi;

import java.util.HashMap;

/**
 * Created by wuming on 2017/7/16.
 */
public class TestRabbitmq {

  public static void main(String[] args){
    try{
      QueueConsumer consumer = new QueueConsumer("queue");
      Thread consumerThread = new Thread(consumer);
      consumerThread.start();

      Producer producer = new Producer("queue");

      for (int i = 0; i < 100000; i++){
        HashMap message = new HashMap();
        message.put("message number", i);
        producer.sendMessage(message);
        System.out.println("Message Number "+ i +" sent.");
      }
    }catch(Exception e){
      e.printStackTrace();
    }

  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


# java远程调用Rabbitmq  # java远程调用  # Java RabbitMQ的工作队列与消息应答详解  # Java RabbitMQ的持久化和发布确认详解  # Java面试高频问题之RabbitMQ系列全面解析  # Java搭建RabbitMq消息中间件过程详解  # JAVA获取rabbitmq消息总数过程详解  # Java编程rabbitMQ实现消息的收发  # java中RabbitMQ高级应用  # 创建一个  # 的是  # 给大家  # 最主要  # 回调  # 大家多多  # 就可以  # 取走  # 则需  # 远程访问  # 新消息  # exclusions  # client  # api  # slf4j  # exclusion  # amqp  # junit  # dependency  # UTF 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251811 】 【 AI营销90571


相关推荐: 网站制作软件免费下载安装,有哪些免费下载的软件网站?  轻松掌握MySQL函数中的last_insert_id()  微信公众帐号开发教程之图文消息全攻略  高防服务器如何保障网站安全无虞?  laravel怎么实现图片的压缩和裁剪_laravel图片压缩与裁剪方法  Windows家庭版如何开启组策略(gpedit.msc)?(安装方法)  googleplay官方入口在哪里_Google Play官方商店快速入口指南  如何登录建站主机?访问步骤全解析  如何在IIS中新建站点并配置端口与IP地址?  网站制作大概多少钱一个,做一个平台网站大概多少钱?  如何在IIS7中新建站点?详细步骤解析  Python文件操作最佳实践_稳定性说明【指导】  Laravel如何配置中间件Middleware_Laravel自定义中间件拦截请求与权限校验【步骤】  php在windows下怎么调试_phpwindows环境调试操作说明【操作】  Laravel怎么集成Vue.js_Laravel Mix配置Vue开发环境  google浏览器怎么清理缓存_谷歌浏览器清除缓存加速详细步骤  Edge浏览器提示“由你的组织管理”怎么解决_去除浏览器托管提示【修复】  如何在阿里云香港服务器快速搭建网站?  香港服务器网站搭建教程-电商部署、配置优化与安全稳定指南  rsync同步时出现rsync: failed to set times on “xxxx”: Operation not permitted  Laravel项目结构怎么组织_大型Laravel应用的最佳目录结构实践  Laravel请求验证怎么写_Laravel Validator自定义表单验证规则教程  linux top下的 minerd 木马清除方法  Laravel怎么进行浏览器测试_Laravel Dusk自动化浏览器测试入门  javascript和jQuery中的AJAX技术详解【包含AJAX各种跨域技术】  Laravel怎么生成URL_Laravel路由命名与URL生成函数详解  如何在景安服务器上快速搭建个人网站?  详解免费开源的.NET多类型文件解压缩组件SharpZipLib(.NET组件介绍之七)  如何在阿里云完成域名注册与建站?  Laravel如何处理CORS跨域问题_Laravel项目CORS配置与解决方案  Laravel怎么导出Excel文件_Laravel Excel插件使用教程  悟空识字如何进行跟读录音_悟空识字开启麦克风权限与录音  如何快速搭建高效WAP手机网站吸引移动用户?  Laravel Eloquent访问器与修改器是什么_Laravel Accessors & Mutators数据处理技巧  如何快速生成高效建站系统源代码?  佐糖AI抠图怎样调整抠图精度_佐糖AI精度调整与放大细化操作【攻略】  BootStrap整体框架之基础布局组件  Laravel如何实现多对多模型关联?(Eloquent教程)  Swift中swift中的switch 语句  php结合redis实现高并发下的抢购、秒杀功能的实例  Python面向对象测试方法_mock解析【教程】  Laravel如何使用Guzzle调用外部接口_Laravel发起HTTP请求与JSON数据解析【详解】  Laravel如何使用模型观察者?(Observer代码示例)  Laravel怎么返回JSON格式数据_Laravel API资源Response响应格式化【技巧】  Laravel如何实现URL美化Slug功能_Laravel使用eloquent-sluggable生成别名【方法】  Python企业级消息系统教程_KafkaRabbitMQ高并发应用  如何在阿里云虚拟主机上快速搭建个人网站?  百度浏览器网页无法复制文字怎么办 百度浏览器复制修复  如何在云虚拟主机上快速搭建个人网站?  EditPlus中的正则表达式实战(5)