消息中间件RabbitMq

消息中间件RabbitMq

1.hello world

Snipaste_2019-08-17_07-41-15 这是RabbitMq最简单的模式,P代表生产者,中间的代表消息队列,C代表消费者。

生产者:



public class Producer01 {

    private static final String QUEUE_NAME = "TEST_HELLO";

    public static void main(String[] args) throws IOException {
        //创建连接工厂 通过工厂创建和mq的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接工厂的参数
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("amos");
        connectionFactory.setPassword("1");
        //后台管理页面的端口是15672 这里服务端的接口是5672
        connectionFactory.setPort(5672);
        //设置虚拟机 一个mq服务可以设置多个虚拟机 每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/amos");
        //创建连接
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            //创建会话通道 生产者和mq的所有通信都在channel通道中完成
            channel = connection.createChannel();
            //声明队列
            //String queue,boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
            /*
               参数明细:
               queue: 队列名称
               durable: 是否持久化,如果持久化,mq重启后队列还会存在
               exclusive 是否独占连接,队列只允许在该连接中存在,如果connection连接关闭则该连接删除 如果该参数设置为true可用于创建临时队列
               autoDelete: 自动删除,队列不再使用时 是否自动删除该队列,如果将此参数和exclusive设置为true可用于临时队列的创建(队列不使用会自动删除)
               arguments: 可以设置一个队列的扩展参数,比如:设置存活时间
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //发送消息
            //String exchange, String routingKey, BasicProperties props, byte[] body
            /*参数明细
              exchange 交换机,如果不指定将使用默认的交换机(设置为"")
              routingKey 路由key 交换机根据路由key来将消息发送给指定的队列,如果使用默认的交换机,该属性应该设置为队列的名称
              pros: 消息的属性(不常用)
              body: 消息的内容
             */
            String message = "hello amos!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("send to mq " + message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}






消费者:
public class Consumer01 {

    private static final String QUEUE_NAME = "TEST_HELLO";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂 通过工厂创建和mq的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接工厂的参数
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("amos");
        connectionFactory.setPassword("1");
        //后台管理页面的端口是15672 这里服务端的接口是5672
        connectionFactory.setPort(5672);
        //设置虚拟机 一个mq服务可以设置多个虚拟机 每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/amos");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建会话通道 消费者和生产者所有通信都需要在通道中完成
        Channel channel = connection.createChannel();
        //声明队列  消费者中声明的队列名称要和生产者的队列一致 不然会接受不到消息
        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 当收到消息后此方法被调用
             * @param consumerTag 消费者标识 用来标识消费者 在监听队列时设置channel.basicConsume
             * @param envelope 信封
             * @param properties 消息属性
             * @param body 消息体
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String exchange = envelope.getExchange();
                //消息id ,mq在channel使用该id标识消息,可以用于确定消息已经接收
                long deliveryTag = envelope.getDeliveryTag();
                String content = new String(body, "UTF-8");
                System.out.println("接收到消息: " + content);
            }
        };

         /*
               参数明细:
               queue: 队列名称
               durable: 是否持久化,如果持久化,mq重启后队列还会存在
               exclusive 是否独占连接,队列只允许在该连接中存在,如果connection连接关闭则该连接删除 如果该参数设置为true可用于创建临时队列
               autoDelete: 自动删除,队列不再使用时 是否自动删除该队列,如果将此参数和exclusive设置为true可用于临时队列的创建(队列不使用会自动删除)
               arguments: 可以设置一个队列的扩展参数,比如:设置存活时间
             */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);


        //监听队列
        /*
            String queue, boolean autoAck, Consumer callback
            queue:队列名称
            autoAck: 自动回复 当消费者接收到消息后要告诉mq消息已经接收到了,如果将该值设置为true则自动回复mq,如果设置为false则要通过编程实现回复
            callback: 消费方法,当消费者接收到消息 要执行的方法
         */
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        //消费者需要一直保持连接 时刻监听生产者的消息
    }
}

总结: 1.发送端操作流程: 1).创建连接 2).创建通道 3).声明队列 4).发送消息 2.接收端操作流程: 1).创建连接 2).创建通道 3).声明队列 4).监听队列 5).接受消息 为什么生产者和消费者都需要声明队列? 如果消费者先启动 这时候生产者还没有创建队列,此时消费者就会监听一个不存在的队列。所以生产者和消费者最好都声明队列。



2.工作模式

RabbitMQ有以下几种工作模式

  • Work queues
  • publish/subscribe 发布订阅模式
  • Routing 路由模式
  • Topics 通配符模式
  • Header
  • RPC


2.1 工作队列模式(work queues)

Snipaste_2019-08-17_07-50-39

该模式和入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中中的消息。 应用场景:对于任务过重或者任务较多情况使用工作队列可以提高任务处理的速度。 测试: 1.使用入门程序(hello world),启动多个消费者 2.生产者发送多个消息 结果: 1.一条消息只会被一个消费者接收 2.rabbit采用轮询的方式将消息平均发送给消费者 3.消费者在处理完某条消息后才会收到下一条消息。

2.2 发布订阅模式(publish/subscribe)

  • 每个消费者监听自己的队列。
  • 生产者将消息发送给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
  • 应用场景:用户通知,当用户充值成功系统会通过邮件和短信的方式通知用户。

生产者: 定义了两个队列QUEUE_INFORM_EMAIL和QUEUE_INFORM_SMS这两个队列都绑定到了交换机EXCHANGE_FANOUT_INFORM 上

public class Producer02_Publish {

    private final static String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private final static String QUEUE_INFORM_SMS = "queue_inform_sms";
    private final static String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) throws IOException {
        //创建连接工厂 通过工厂创建和mq的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接工厂的参数
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("amos");
        connectionFactory.setPassword("1");
        //后台管理页面的端口是15672 这里服务端的接口是5672
        connectionFactory.setPort(5672);
        //设置虚拟机 一个mq服务可以设置多个虚拟机 每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/amos");
        //创建连接
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            //创建会话通道 生产者和mq的所有通信都在channel通道中完成
            channel = connection.createChannel();


            //声明队列
            //String queue,boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
            /*
               参数明细:
               queue: 队列名称
               durable: 是否持久化,如果持久化,mq重启后队列还会存在
               exclusive 是否独占连接,队列只允许在该连接中存在,如果connection连接关闭则该连接删除 如果该参数设置为true可用于创建临时队列
               autoDelete: 自动删除,队列不再使用时 是否自动删除该队列,如果将此参数和exclusive设置为true可用于临时队列的创建(队列不使用会自动删除)
               arguments: 可以设置一个队列的扩展参数,比如:设置存活时间
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            /*  声明交换机
                String exchange, String type
                参数详情:交换机名称
                交换机类型:fanout: 发布订阅模式 对应publish/subscribe
                           topic: 对应topics模式 通配符模式
                           direct: 对应routing工作模式
                           headers: 对应headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
            //将交换机和队列绑定 routingKey作用是交换机会根据key的值将消息发送到指定队列中去。在发布订阅模式中设置为空串
            channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM,"");
            channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM,"");


            //发送消息
            //String exchange, String routingKey, BasicProperties props, byte[] body
            /*参数明细
              exchange 交换机,如果不指定将使用默认的交换机(设置为"")
              routingKey 路由key 交换机根据路由key来将消息发送给指定的队列,如果使用默认的交换机,该属性应该设置为队列的名称
              pros: 消息的属性(不常用)
              body: 消息的内容
             */
            for (int i = 0; i < 5; i ++){
                String message = "inform to user!";
                channel.basicPublish(EXCHANGE_FANOUT_INFORM,"", null, message.getBytes());
                System.out.println("send to mq " + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}


2.3 路由模式(routing)

Snipaste_2019-08-17_08-22-29 路由模式:

  • 每个消费者监听自己的队列,并且设置routingKey。
  • 生产者将消息发送给交换机,由交换机根据routingKey来转发消息到指定的队列。


生产者: 声明exchange_routing_infrom交换机 声明两个队列并且绑定到此交换机,绑定时需要指定routingKey 发送消息时需要指定routingKey
public class Producer03_Routing {

    private final static String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private final static String QUEUE_INFORM_SMS = "queue_inform_sms";
    private final static String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
    private final static String ROUTING_KEY_EMAIL = "routing_key_email";
    private final static String ROUTING_KEY_SMS = "routing_key_sms";

    public static void main(String[] args) throws IOException {
        //创建连接工厂 通过工厂创建和mq的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接工厂的参数
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("amos");
        connectionFactory.setPassword("1");
        //后台管理页面的端口是15672 这里服务端的接口是5672
        connectionFactory.setPort(5672);
        //设置虚拟机 一个mq服务可以设置多个虚拟机 每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/amos");
        //创建连接
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            //创建会话通道 生产者和mq的所有通信都在channel通道中完成
            channel = connection.createChannel();


            //声明队列
            //String queue,boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
            /*
               参数明细:
               queue: 队列名称
               durable: 是否持久化,如果持久化,mq重启后队列还会存在
               exclusive 是否独占连接,队列只允许在该连接中存在,如果connection连接关闭则该连接删除 如果该参数设置为true可用于创建临时队列
               autoDelete: 自动删除,队列不再使用时 是否自动删除该队列,如果将此参数和exclusive设置为true可用于临时队列的创建(队列不使用会自动删除)
               arguments: 可以设置一个队列的扩展参数,比如:设置存活时间
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            /*  声明交换机
                String exchange, String type
                参数详情:交换机名称
                交换机类型:fanout: 发布订阅模式 对应publish/subscribe
                           topic: 对应topics模式 通配符模式
                           direct: 对应routing工作模式
                           headers: 对应headers工作模式
             */

            //❤ 修改
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
            //将交换机和队列绑定 routingKey作用是交换机会根据key的值将消息发送到指定队列中去。并设置routing key
            channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, ROUTING_KEY_EMAIL);
            channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, ROUTING_KEY_SMS);
            //❤ 修改

            //发送消息
            //String exchange, String routingKey, BasicProperties props, byte[] body
            /*参数明细
              exchange 交换机,如果不指定将使用默认的交换机(设置为"")
              routingKey 路由key 交换机根据路由key来将消息发送给指定的队列,如果使用默认的交换机,该属性应该设置为队列的名称
              pros: 消息的属性(不常用)
              body: 消息的内容
             */
            for (int i = 0; i < 5; i ++){
                String message = "inform to routing key sms!";
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_KEY_SMS, null, message.getBytes());
                System.out.println("send to mq " + message);
            }
            for (int i = 0; i < 5; i ++){
                String message = "inform to routing key EMAIL!";
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_KEY_EMAIL, null, message.getBytes());
                System.out.println("send to mq " + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}


发送邮件的消费者

public class Consumer03_RoutingEmail {

    private final static String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private final static String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
    private final static String ROUTING_KEY_EMAIL = "routing_key_email";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂 通过工厂创建和mq的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接工厂的参数
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("amos");
        connectionFactory.setPassword("1");
        //后台管理页面的端口是15672 这里服务端的接口是5672
        connectionFactory.setPort(5672);
        //设置虚拟机 一个mq服务可以设置多个虚拟机 每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/amos");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建会话通道 消费者和生产者所有通信都需要在通道中完成
        Channel channel = connection.createChannel();
        //声明队列  消费者中声明的队列名称要和生产者的队列一致 不然会接受不到消息
        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 当收到消息后此方法被调用
             * @param consumerTag 消费者标识 用来标识消费者 在监听队列时设置channel.basicConsume
             * @param envelope 信封
             * @param properties 消息属性
             * @param body 消息体
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String exchange = envelope.getExchange();
                //消息id ,mq在channel使用该id标识消息,可以用于确定消息已经接收
                long deliveryTag = envelope.getDeliveryTag();
                String content = new String(body, "UTF-8");
                System.out.println("接收到消息: " + content);
            }
        };
         /*
               参数明细:
               queue: 队列名称
               durable: 是否持久化,如果持久化,mq重启后队列还会存在
               exclusive 是否独占连接,队列只允许在该连接中存在,如果connection连接关闭则该连接删除 如果该参数设置为true可用于创建临时队列
               autoDelete: 自动删除,队列不再使用时 是否自动删除该队列,如果将此参数和exclusive设置为true可用于临时队列的创建(队列不使用会自动删除)
               arguments: 可以设置一个队列的扩展参数,比如:设置存活时间
             */



        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        //绑定交换机
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, ROUTING_KEY_EMAIL);



        //监听队列
        /*
            String queue, boolean autoAck, Consumer callback
            queue:队列名称
            autoAck: 自动回复 当消费者接收到消息后要告诉mq消息已经接收到了,如果将该值设置为true则自动回复mq,如果设置为false则要通过编程实现回复
            callback: 消费方法,当消费者接收到消息 要执行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
        //消费者需要一直保持连接 时刻监听生产者的消息
    }
}


发送短信的消费者

public class Consumer03_RoutingSms {

    private final static String QUEUE_INFORM_SMS = "queue_inform_sms";
    private final static String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
    private final static String ROUTING_KEY_SMS = "routing_key_sms";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂 通过工厂创建和mq的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接工厂的参数
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("amos");
        connectionFactory.setPassword("1");
        //后台管理页面的端口是15672 这里服务端的接口是5672
        connectionFactory.setPort(5672);
        //设置虚拟机 一个mq服务可以设置多个虚拟机 每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/amos");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建会话通道 消费者和生产者所有通信都需要在通道中完成
        Channel channel = connection.createChannel();
        //声明队列  消费者中声明的队列名称要和生产者的队列一致 不然会接受不到消息
        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 当收到消息后此方法被调用
             * @param consumerTag 消费者标识 用来标识消费者 在监听队列时设置channel.basicConsume
             * @param envelope 信封
             * @param properties 消息属性
             * @param body 消息体
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String exchange = envelope.getExchange();
                //消息id ,mq在channel使用该id标识消息,可以用于确定消息已经接收
                long deliveryTag = envelope.getDeliveryTag();
                String content = new String(body, "UTF-8");
                System.out.println("接收到消息: " + content);
            }
        };
         /*
               参数明细:
               queue: 队列名称
               durable: 是否持久化,如果持久化,mq重启后队列还会存在
               exclusive 是否独占连接,队列只允许在该连接中存在,如果connection连接关闭则该连接删除 如果该参数设置为true可用于创建临时队列
               autoDelete: 自动删除,队列不再使用时 是否自动删除该队列,如果将此参数和exclusive设置为true可用于临时队列的创建(队列不使用会自动删除)
               arguments: 可以设置一个队列的扩展参数,比如:设置存活时间
             */



        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        //绑定交换机
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, ROUTING_KEY_SMS);



        //监听队列
        /*
            String queue, boolean autoAck, Consumer callback
            queue:队列名称
            autoAck: 自动回复 当消费者接收到消息后要告诉mq消息已经接收到了,如果将该值设置为true则自动回复mq,如果设置为false则要通过编程实现回复
            callback: 消费方法,当消费者接收到消息 要执行的方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS, true, defaultConsumer);
        //消费者需要一直保持连接 时刻监听生产者的消息
    }
}


2.4 Topics(通配符)模式

Snipaste_2019-08-17_08-28-06

  • 每个消费者监听自己的队列,并且设置带通配符的routingKey
  • 生产者将消息发送给broker,由交换机根据routingKey来转发消息到指定的队列。

生产者:

public class Producer04_Topic {

    private final static String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private final static String QUEUE_INFORM_SMS = "queue_inform_sms";
    private final static String EXCHANGE_TOPIC_INFORM = "exchange_topic_inform";
    //inform.email前者接受 inform.sms后者接受 inform.email.sms两者均可接收
    private final static String ROUTING_KEY_EMAIL = "inform.#.email.#";
    private final static String ROUTING_KEY_SMS = "inform.#.sms.#";

    public static void main(String[] args) throws IOException {
        //创建连接工厂 通过工厂创建和mq的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接工厂的参数
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("amos");
        connectionFactory.setPassword("1");
        //后台管理页面的端口是15672 这里服务端的接口是5672
        connectionFactory.setPort(5672);
        //设置虚拟机 一个mq服务可以设置多个虚拟机 每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/amos");
        //创建连接
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            //创建会话通道 生产者和mq的所有通信都在channel通道中完成
            channel = connection.createChannel();


            //声明队列
            //String queue,boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
            /*
               参数明细:
               queue: 队列名称
               durable: 是否持久化,如果持久化,mq重启后队列还会存在
               exclusive 是否独占连接,队列只允许在该连接中存在,如果connection连接关闭则该连接删除 如果该参数设置为true可用于创建临时队列
               autoDelete: 自动删除,队列不再使用时 是否自动删除该队列,如果将此参数和exclusive设置为true可用于临时队列的创建(队列不使用会自动删除)
               arguments: 可以设置一个队列的扩展参数,比如:设置存活时间
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            /*  声明交换机
                String exchange, String type
                参数详情:交换机名称
                交换机类型:fanout: 发布订阅模式 对应publish/subscribe
                           topic: 对应topics模式 通配符模式
                           direct: 对应routing工作模式
                           headers: 对应headers工作模式
             */

            //❤ 修改
            channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC);
            //将交换机和队列绑定 routingKey作用是交换机会根据key的值将消息发送到指定队列中去。并设置routing key
            channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPIC_INFORM, ROUTING_KEY_EMAIL);
            channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPIC_INFORM, ROUTING_KEY_SMS);
            //❤ 修改

            //发送消息
            //String exchange, String routingKey, BasicProperties props, byte[] body
            /*参数明细
              exchange 交换机,如果不指定将使用默认的交换机(设置为"")
              routingKey 路由key 交换机根据路由key来将消息发送给指定的队列,如果使用默认的交换机,该属性应该设置为队列的名称
              pros: 消息的属性(不常用)
              body: 消息的内容
             */
           /* //只有sms接收
            for (int i = 0; i < 5; i ++){
                String message = "inform to routing key sms!";
                channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.sms", null, message.getBytes());
                System.out.println("send to mq " + message);
            }*/

            //只有email接收
            for (int i = 0; i < 5; i ++){
                String message = "inform to routing key EMAIL!";
                channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.email", null, message.getBytes());
                System.out.println("send to mq " + message);
            }

           //email和sms均可接收
            for (int i = 0; i < 5; i ++){
                String message = "inform to routing key EMAIL & sms";
                channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.sms.email", null, message.getBytes());
                System.out.println("send to mq " + message);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}


消费者:


public class Consumer04_TopicEmail {

    private final static String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private final static String EXCHANGE_TOPIC_INFORM = "exchange_topic_inform";
    private final static String ROUTING_KEY_EMAIL = "inform.#.email.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂 通过工厂创建和mq的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接工厂的参数
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("amos");
        connectionFactory.setPassword("1");
        //后台管理页面的端口是15672 这里服务端的接口是5672
        connectionFactory.setPort(5672);
        //设置虚拟机 一个mq服务可以设置多个虚拟机 每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/amos");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建会话通道 消费者和生产者所有通信都需要在通道中完成
        Channel channel = connection.createChannel();
        //声明队列  消费者中声明的队列名称要和生产者的队列一致 不然会接受不到消息
        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 当收到消息后此方法被调用
             * @param consumerTag 消费者标识 用来标识消费者 在监听队列时设置channel.basicConsume
             * @param envelope 信封
             * @param properties 消息属性
             * @param body 消息体
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String exchange = envelope.getExchange();
                //消息id ,mq在channel使用该id标识消息,可以用于确定消息已经接收
                long deliveryTag = envelope.getDeliveryTag();
                String content = new String(body, "UTF-8");
                System.out.println("接收到消息: " + content);
            }
        };
         /*
               参数明细:
               queue: 队列名称
               durable: 是否持久化,如果持久化,mq重启后队列还会存在
               exclusive 是否独占连接,队列只允许在该连接中存在,如果connection连接关闭则该连接删除 如果该参数设置为true可用于创建临时队列
               autoDelete: 自动删除,队列不再使用时 是否自动删除该队列,如果将此参数和exclusive设置为true可用于临时队列的创建(队列不使用会自动删除)
               arguments: 可以设置一个队列的扩展参数,比如:设置存活时间
             */



        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC);
        //绑定交换机
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPIC_INFORM, ROUTING_KEY_EMAIL);



        //监听队列
        /*
            String queue, boolean autoAck, Consumer callback
            queue:队列名称
            autoAck: 自动回复 当消费者接收到消息后要告诉mq消息已经接收到了,如果将该值设置为true则自动回复mq,如果设置为false则要通过编程实现回复
            callback: 消费方法,当消费者接收到消息 要执行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
        //消费者需要一直保持连接 时刻监听生产者的消息
    }
}


public class Consumer04_TopicSms {

    private final static String QUEUE_INFORM_SMS = "queue_inform_sms";
    private final static String EXCHANGE_TOPIC_INFORM = "exchange_topic_inform";
    //inform.email前者接受 inform.sms后者接受 inform.email.sms两者均可接收
    private final static String ROUTING_KEY_SMS = "inform.#.sms.#";


    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂 通过工厂创建和mq的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接工厂的参数
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("amos");
        connectionFactory.setPassword("1");
        //后台管理页面的端口是15672 这里服务端的接口是5672
        connectionFactory.setPort(5672);
        //设置虚拟机 一个mq服务可以设置多个虚拟机 每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/amos");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建会话通道 消费者和生产者所有通信都需要在通道中完成
        Channel channel = connection.createChannel();
        //声明队列  消费者中声明的队列名称要和生产者的队列一致 不然会接受不到消息
        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 当收到消息后此方法被调用
             * @param consumerTag 消费者标识 用来标识消费者 在监听队列时设置channel.basicConsume
             * @param envelope 信封
             * @param properties 消息属性
             * @param body 消息体
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String exchange = envelope.getExchange();
                //消息id ,mq在channel使用该id标识消息,可以用于确定消息已经接收
                long deliveryTag = envelope.getDeliveryTag();
                String content = new String(body, "UTF-8");
                System.out.println("接收到消息: " + content);
            }
        };
         /*
               参数明细:
               queue: 队列名称
               durable: 是否持久化,如果持久化,mq重启后队列还会存在
               exclusive 是否独占连接,队列只允许在该连接中存在,如果connection连接关闭则该连接删除 如果该参数设置为true可用于创建临时队列
               autoDelete: 自动删除,队列不再使用时 是否自动删除该队列,如果将此参数和exclusive设置为true可用于临时队列的创建(队列不使用会自动删除)
               arguments: 可以设置一个队列的扩展参数,比如:设置存活时间
             */



        channel.queueDeclare(EXCHANGE_TOPIC_INFORM, true, false, false, null);
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC);
        //绑定交换机
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPIC_INFORM, ROUTING_KEY_SMS);



        //监听队列
        /*
            String queue, boolean autoAck, Consumer callback
            queue:队列名称
            autoAck: 自动回复 当消费者接收到消息后要告诉mq消息已经接收到了,如果将该值设置为true则自动回复mq,如果设置为false则要通过编程实现回复
            callback: 消费方法,当消费者接收到消息 要执行的方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS, true, defaultConsumer);
        //消费者需要一直保持连接 时刻监听生产者的消息
    }
}


3.Springboot整合rabbitMq

1).编写rabbitmq的配置类 用于声明队列和交换机

@Configuration
public class RabbitmqConfig {

    public final static String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public final static String QUEUE_INFORM_SMS = "queue_inform_sms";
    public final static String EXCHANGE_TOPIC_INFORM = "exchange_topic_inform";
    //inform.email前者接受 inform.sms后者接受 inform.email.sms两者均可接收
    public final static String ROUTING_KEY_EMAIL = "inform.#.email.#";
    public final static String ROUTING_KEY_SMS = "inform.#.sms.#";

    //声明交换机 注入Bean并且指定名称
    @Bean(EXCHANGE_TOPIC_INFORM)
    public Exchange buildExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_INFORM).durable(true).build();
    }

    //声明email队列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue buildEmailQueue(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }

    //声明sms队列
    @Bean(QUEUE_INFORM_SMS)
    public Queue buildSmsQueue(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    //将交换机和email队列和routing key绑定 @Qualifier注入上面的类
    @Bean
    public Binding bindEmail(@Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange, @Qualifier(QUEUE_INFORM_EMAIL) Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_EMAIL).noargs();
    }

    //将交换机和email队列和routing key绑定 @Qualifier注入上面的类
    @Bean
    public Binding bindSms(@Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange, @Qualifier(QUEUE_INFORM_SMS) Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_SMS).noargs();
    }
}


2)定义生产者

public class Producer05_Springboot {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testProducer(){
        //exchange:交换机名称 routingKey 路由路径 object:消息体
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPIC_INFORM, "inform.email", "springboot test email");
    }
}


3)定义消费者:

@Component
public class ReceiveHandler {
    //监听的队列
    @RabbitListener(queues = RabbitmqConfig.QUEUE_INFORM_EMAIL)
    //字符类型msg就可以直接接收生产者发送的消息
    public void receiveMessage(String msg, Message message, Channel channel){
        System.out.println("EMAIL 收到消息了" + msg);
    }

}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×