Skip to content

KumuluzEE AMQP provides support for Advanced Message Queueing Protocol, such as RabbitMQ and ActiveMQ.

License

Notifications You must be signed in to change notification settings

kumuluz/kumuluzee-amqp

Repository files navigation

KumuluzEE AMQP

Build Status

KumuluzEE AMQP project for development of messaging applications.

KumuluzEE AMQP enables you to easily send and recieve messages over the AMQP protocol. Currently, it includes the RabbitMQ-based implementation. The RabbitMQ documentation can be found here.

Usage

You can enable KumuluzEE AMQP RabbitMQ support by adding the following dependency to pom.xml:

<dependency>
    <groupId>com.kumuluz.ee.amqp</groupId>
    <artifactId>kumuluzee-amqp-rabbitmq</artifactId>
    <version>${kumuluzee-amqp-rabbitmq.version}</version>
</dependency>

Installing RabbitMQ

In order to use RabbitMQ, you first need to install the RabbitMQ broker. You can find installation guide here.

Configuration

To configure RabbitMQ, create configuration in resources/config.yaml. Here you can put your RabbitMQ hosts and their configurations.

kumuluzee:
  amqp:
    rabbitmq:
      hosts: List
        name: String (required)
        url: String (required) - null
        password: String - null
        username: String - null
        port: Integer - null
        automaticRecoveryEnabled: Boolean - null
        channelRpcTimeout: Integer - null
        channelShouldCheckRpcResponseType: Boolean - null
        connectionTimeout: Integer - null
        enableHostnameVerification: Boolean - null
        handshakeTimeout: Integer - null
        networkRecoveryInterval: Integer - null
        requestedChannelMax: Integer - null
        requestedFrameMax: Integer - null
        requestedHeartbeat: Integer - null
        shutdownTimeout: Integer - null
        topologyRecoveryEnabled: Boolean - null
        uri: String - null
        virtualHost: String - null
        automaticRecoveryEnabled: Boolean - null
        clientProperties: Map<String, Object> - null
        exchanges: List
          name: String (required)
          type: String [fanout, direct, topic, headers] - fanout
          durable: Boolean - false
          autoDelete: Boolean - false
          arguments: Map<String, Object> - null
        queues:
          name: String (required)
          exclusive: Boolean - false
          durable: Boolean - false
          autoDelete: Boolean - false
          arguments: Map<String, Object> - null
      properties: List
        name: String (required)
        contentType: String - null
        contentEncoding: String - null
        headers: Map<String, Object> - null
        deliveryMode: Integer - null
        priority: Integer - null
        corelationId: String - null
        replyTo: String - null
        expiration: String - null
        messageId: String - null
        timestamp: Boolean - null
        type: String - null
        userId: String - null
        appId: String - null
        clusterId: String - null

Connection

You can create a connection to a server with parameters that are not available in the config.yaml.

Annotate a class (all methods which return a map will be considered) or a method (only a method which returns a map will be considered) with @AMQPConnection. In this method create a new connection to the broker using ConnectionFactory provided by RabbitMQ.

The method has to return a Map<String, Connection> object, where String is the name of the connection. You can then configure exchanges and queues in the config.yaml with the name you selected. All other parameters in the config.yaml are ignored.

@AMQPConnection
public Map<String, Connection> localhostConnection() {
    Map<String, Connection> localhost = new HashMap<>();

    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("localhost");

    Connection connection = null;

    try {
        connection = connectionFactory.newConnection();
    } catch (IOException | TimeoutException  e) {
        log.severe("Connection could not be created");
    }

    localhost.put("MQtest", connection);
    return localhost;
}
kumuluzee:
  amqp:
    rabbitmq:
      hosts:
        - name: MQtest
          port: 9000 #ignored
          automaticRecoveryEnabled: true #ignored
          exchanges:
           - name: directExchange
             type: direct	

The connection to the server is managed by the framework and does not need to be closed. In case you want to manage connections yourself use RabbitConnection class where you can create and close connections.

Channel

You can obtain RabbitMQ Channel by using @AMQPChannel. Then you can use this channel to send and receive messages (Read more).

@AMQPChannel(host: String - "")
@Inject
@AMQPChannel("hostName")
private Channel channel;

Consuming messages

In order to listen to queues you can annotate your method with @AMQPConsumer. The first parameter of the annotated method is the object of type that is expected to be received, the second parameter is optional and must be of type MessageInfo which will allow you to obtain details about the received messages. If the message body cannot be deserialized to the expected type the method will not be invoked and an error will be logged.

@AMQPConsumer(
    host: String - "", 
    exchange: String - "", 
    key: String[] - {""}, 
    prefetch: int - 100, 
    autoAck: boolean - true
)
@AMQPConsumer(...)
public void listenToDirectExchange(
    message: any, 
    info: MessageInfo - optional
) {
    ...
}

Send messages

You can send messages by annotating a method with @AMQPProducer annotation. The method must return an object which will be then sent to the consumer.

@AMQPProducer(
    host: String - "", 
    exchange: String - "", 
    key: String[] - {""}, 
    properties: String - ""
)
@AMQPProducer(host="MQtest", exchange = "directExchange", key = "object")
    public ExampleObject sendObject() {
        ExampleObject exampleObject = new ExampleObject();
        exampleObject.setContent("I'm just an object.");
        return exampleObject;
    }

You can also send a Message object, where you can define host, exchange, keys, body and properties (which wouldn't be possible to define in the config.yml). Keep in mind that Message parameters will override annotation parameters.

@AMQPProducer
public Message sendFullMessage() {
    Message message = new Message();
    ExampleObject exampleObject = new ExampleObject();
    exampleObject.setContent("I'm an object in a special message");

    if (Math.random() < 0.5) {
        message.host("MQtest")
                .key(new String[]{"object"})
                .exchange("directExchange")
                .basicProperties(MessageProperties.BASIC);
    } else {
        message.host("MQtest2")
                .key(new String[]{"testQueue"})
                .basicProperties("testProperty");
    }

    return message.body(exampleObject);
}

To send a message to a specific queue, you just have to remove the exchange from the annotation and use key as a queue name

@AMQPProducer(host="MQtest2", key = "testQueue")
public Message sendToQueue(){
    Message message = new Message();
    ExampleObject exampleObject = new ExampleObject();
    exampleObject.setContent("I'm an object in a message");
    return message.body(exampleObject).basicProperties(MessageProperties.BASIC);
}

Sample

You can start by using the sample code.

Changelog

Recent changes can be viewed on Github on the Releases Page

Contribute

See the contributing docs

When submitting an issue, please follow the guidelines.

When submitting a bugfix, write a test that exposes the bug and fails before applying your fix. Submit the test alongside the fix.

When submitting a new feature, add tests that cover the feature.

License

MIT