パンが主食

学んだことのアウトプット+ポエム

RabbitMQに入門した

インストール

こちらに沿って進める。  

https://www.rabbitmq.com/install-homebrew.html

HomeBrewのアップデート

$ brew update

RabbitMQのインストール

$ brew install rabbitmq

RabbitMQがインストールされていることを確認

$ ls /usr/local/sbin
cuttlefish      rabbitmq-diagnostics    rabbitmq-plugins    rabbitmqadmin
rabbitmq-defaults   rabbitmq-env        rabbitmq-server     rabbitmqctl

インストール先のディレクトリをPATHへ追加

$ export PATH=$PATH:/usr/local/sbin

HelloWorld

以下を沿って進めていく。内容等は私の解釈のため妥当性は保証できません。

https://www.rabbitmq.com/tutorials/tutorial-one-java.html

Introduction

RabbitMQは郵便局と考えることができる。もっとも転送するのはデータであって紙でない。 以下に用語を定義する。

  • プロデューサー・・・メッセージを送るプログラム
  • キュー    ・・・RabbitMQ中にある郵便箱
  • コンシューマー・・・メッセージを受け取るプログラム

クライアントライブラリの準備

以下の3つのjarをダウンロードし、作業ディレクトリにコピーする。

AMQPクライアント  

http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.5.1/amqp-client-5.5.1.jar

※以下の2つはAMQOクライアントの依存パッケージ

SLF4J

http://central.maven.org/maven2/org/slf4j/slf4j-api/1.7.25/slf4j-api-1.7.25.jar

SLF4J-Sample

http://central.maven.org/maven2/org/slf4j/slf4j-simple/1.7.25/slf4j-simple-1.7.25.jar

キューにメッセージを送る

$ touch Send.java

  

Send.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      String message = "Hello World!";
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

      System.out.println(" [x] Sent '" + message + "'");
    }
  }
}

キューからメッセージを受け取る

$ touch Recv.java

   Recv.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
      String message = new String(delivery.getBody(), "UTF-8");
      System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
    });
  }
}

コンシューマーがメッセージの到着を非同期的にlistenしている間、プロセスを有効にしておく必要があるため、try-with-resourceを使用していない。

RabbitMQサーバの起動

$ rabbitmq-server

5672ポートでlistenされているはず。

$ lsof -i:5672
COMMAND    PID  USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
beam.smp 60151  USER   98u  IPv4 0xdf0b0e86a7f35edf      0t0  TCP localhost:amqp (LISTEN)

コンパイル

$ javac -cp amqp-client-5.5.1.jar Send.java Recv.java

コンシューマーの起動

$ java -cp .:amqp-client-5.5.1.jar:slf4j-api-1.7.25.jar:slf4j-simple-1.7.25.jar Recv
 [*] Waiting for messages. To exit press CTRL+C

プロデューサーの起動

$ java -cp .:amqp-client-5.5.1.jar:slf4j-api-1.7.25.jar:slf4j-simple-1.7.25.jar Send
 [x] Sent 'Hello World!'

  

コンシューマー

$ java -cp .:amqp-client-5.5.1.jar:slf4j-api-1.7.25.jar:slf4j-simple-1.7.25.jar Recv
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

おまけ

キューのリストを確認

$ sudo rabbitmqctl list_queues
Password:
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
hello   0