Message Queue Framework

Since Product 360 10.0 there is a generic implementation for interacting with message queues.

Currently only the ActiveMQ Message Queue is supported.

Message queue definition in the server.properties file:

Configuring an existing queue or adding a new queue for Product 360 is done by extending the section "Message Queue Settings"

An explanation of each property can be found here: Server configuration

Example: Add a new queue with the identifier custom_queue in the server.properties

server.properties
queue.custom_queue.type = ActiveMQ
queue.custom_queue.writer.count = <thread_count_for_writing>
queue.custom_queue.consumer.count = <thread_count_for_consuming>
queue.custom_queue.url = <queue_URL>
queue.custom_queue.username = <username_for_the_queue>
queue.custom_queue.password = <password_for_the_username>
queue.custom_queue.message.format = <message_format_for_writing>
queue.custom_queue.name = P360_CUSTOM_QUEUE
queue.custom_queue.label = Custom queue for new stuff

Reading from a message queue:

To read from our new custom queue we will need the MessageQueueService from the package com.heiler.ppm.messagequeue.server.v2

Depending on the object you want to read from the queue you can use the appropriate getMessageQueue method

  • Case 1: There is only one type of objects to read from the queue

only read the same object type
MessageQueueService messageQueueService = MessageQueueComponent.getMessageQueueService();
MessageQueue< CustomObject> queue = messageQueueService.getMessageQueue( "custom_queue",CustomObject.class );
MessageQueueReader< CustomObject> customReader = queue.getReader();
  • Case 2: There can be different types of objects to read from the queue

read different objects
MessageQueueService messageQueueService = MessageQueueComponent.getMessageQueueService();
MessageQueue< Object> queue = messageQueueService.getMessageQueue( "custom_queue", new MultiCustomObjectConverter() );
MessageQueueReader< Object> customReader = queue.getReader();

In all cases it is necessary to specify a function which describes what happens to the message when it gets consumed from the message queue. This function has to be the parameter for the method call onMessage

onMessage function
customReader.onMessage( this::processMessage );
 
[...]
 
private void processMessage( QueueMessage< CustomObject> message )
{
try
{
//custom code
}
finally
{
message.acknowledge();
}
}

Each session processes the message with the ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE mode, it means, that the acknowledge method of the message should be always called explicitly to acknowledge message. If this method is not called, the message will be delivered again after the Product 360 server restarts.

Only messages which fulfill the following criteria can be accessed via the function of the onMessage method. If one of the criteria is not fulfilled, an error will be logged in the Product 360 server and the message will be acknowledged automatically.

  • In the JMS message there must be a JMS property with the key MessageFormat

  • In the JMS message there must be a JMS property with the key User

  • In the JMS message there must be a JMS property with the key Password or a JMS property with the key Signature

  • The user which is specified in the JMS property has the action right to use queue communications

  • The JMS message body can be successfully converted to the specified object

Authentication methods

Only messages which can successfully validate a user against the Product 360 Server are processed.

  1. username / password
    The given JMS properties for User and Password are used to authenticate against the Product 360 Server. Additionally this user has to have the action right for queue communication.

  2. username / signature

    Alternatively to a password, a signature can be filled as JMS message property which will then be checked by the Product 360 Server automatically and the user will be authenticated.

    The signature approach can only be used when messages are written by the Product 360 server itself and also consumed by a Product 360 server.

    To use this signature for other purposes it is recommended to use the SignatureService class.

Writing to a message queue:

For writing to a message queue it is also first needed to specify to which queue the messages will be written.

Get a MessageQueue object from the MessageQueueService of the package com.heiler.ppm.messagequeue.server.v2.

From the MessageQueue object you can get the MessageQueueWriter object.

only write the same object type
MessageQueueService messageQueueService = MessageQueueComponent.getMessageQueueService();
MessageQueue< CustomObject> queue = this.messageQueueService.getMessageQueue( "custom_queue",CustomObject.class );
MessageQueueWriter< CustomObject> customWriter = queue.getWriter();

The MessageQueueWriter can write any implementation of QueueMessage to the queue. By default we provide the JMSMessage implementation of the QueueMessage interface.

Example to write a sample Message of type CustomObject

write JMS message
CustomObject payload = new CustomObject();
QueueMessage< CustomObject> message = new JMSMessage( payload );
customWriter.write( message );

It is also possible to write custom JMS properties to the message by using the addHeader method.

add custom header to JMS message
QueueMessage< CustomObject> message = new JMSMessage( payload );
message.addHeader( "customMessageProperty", "value" );
customWriter.write( message );

For setting the JMS Properties Username, Password, JMSCorrelationID and priority there are predefined methods available.

write predefined properties to message
QueueMessage< CustomObject> message = new JMSMessage( payload );
message.setCorrelationId(5);
message.setUser("TinaTen");
message.setPassword("Infa123");
customWriter.write( message );

When writing messages from the P360 Server to the queue and also reading from the same queue with a P360 Server the authentication via signing can be used like described in section "Authentication methods".

To use this feature only the username has to be set and additionally the method setSignMessage has to be set to true.

use signing
QueueMessage< CustomObject> message = new JMSMessage( payload );
message.setCorrelationId(5);
message.setUser("TinaTen");
message.setSignMessage(true);
customWriter.write( message );

Answering to a specific message queue:

When reading from a queue the generic approach will not send a response automatically. If it should be possible to send a response to a specific queue, the message which got read needs to have a JMS header property with the key ResponseQueueID. The value has to be the identifier of the message queue where the response should be sent to. This identifier can be found in the appropriate queue defined in the server.properties.

Example:

Send a message which has a ResponseQueueID set. Afterwards we read the message from the queue with the identifier custom_queue and want to send a response to the queue with the identifier response_queue.

server.properties setup

server.properties
queue.custom_queue.type = ${queue.default.type}
queue.custom_queue.writer.count = ${queue.default.writer.count}
queue.custom_queue.consumer.count = ${queue.default.consumer.count}
queue.custom_queue.url = ${queue.default.url}
queue.custom_queue.username = ${queue.default.username}
queue.custom_queue.password = ${queue.default.password}
queue.custom_queue.message.format = JSON
queue.custom_queue.name = P360_CUSTOM_QUEUE
queue.custom_queue.label = Custom queue for new stuff
 
queue.response_queue.type = ${queue.default.type}
queue.response_queue.writer.count = ${queue.default.writer.count}
queue.response_queue.consumer.count = ${queue.default.consumer.count}
queue.response_queue.url = ${queue.default.url}
queue.response_queue.username = ${queue.default.username}
queue.response_queue.password = ${queue.default.password}
queue.response_queue.message.format = JSON
queue.response_queue.name = P360_RESPONSE_QUEUE
queue.response_queue.label = Queue to respond to
 
 
 
 
 
 
queue.dq.type = ${queue.default.type}
queue.dq.writer.count = ${queue.default.writer.count}
queue.dq.consumer.count = ${queue.default.consumer.count}
queue.dq.url = ${queue.default.url}
queue.dq.username = ${queue.default.username}
queue.dq.password = ${queue.default.password}
queue.dq.message.format = XML
queue.dq.name = P360_DATA_QUALITY
queue.dq.label = Data Quality

Writing the message with the JMS header ResponseQueueID with the value response_queue. This value indicates the identifier of the queue where we want to send the response to.

set ResponseQueueID as JMS property
QueueMessage< CustomObject> message = new JMSMessage( payload );
message.setCorrelationId(5);
message.setUser("TinaTen");
message.setSignMessage(true);
message.setResponseQueueID("response_queue");
customWriter.write( message );

Processing the message we just sent and sending a response message to a queue

Objects for Response

Note that all objects which will be send as a response object need the annotation @XmlRootElement

The method sendResponseMessage will first check if the initial message has a JMS header ResponseQueueID if it does not have this property, nothing will be send.If it has this property it will resolve the identifier to the correct queue. The following JMS properties will be automatically filtered out when creating the response message:

  • User

  • Password

  • MessageFormat

  • ResponseQueueID

  • SuccessTargetService

  • ErrorTargetService

If the initial message contained the following JMS properties ResponseUser, ResponsePassword they will then be transferred to the JMS properties User and Password of the new response message.

Finally the response object will be converted and send to the queue.

Example: Send an error message with status code 500 to the response queue

send error response
customReader.onMessage( this::processMessage );
 
MessageQueueService messageQueueService = MessageQueueComponent.getMessageQueueService()
[...]
 
private void processMessage( QueueMessage< CustomObject> message )
{
try
{
//custom code
}
catch(SomeException e)
{
ErrorObject errorObject = new ErrorObject( "Exception occurred" );
Response< String > response = new Response<>( QueueMessageStatus.INTERNAL_SERVER_ERROR, errorObject );
messageQueueService.sendResponseMessage(message ,response);
}
finally
{
message.acknowledge();
}
}

Example: Send a custom object to the response queue

send response
customReader.onMessage( this::processMessage );
 
MessageQueueService messageQueueService = MessageQueueComponent.getMessageQueueService()
[...]
 
private void processMessage( QueueMessage< CustomObject> message )
{
try
{
//custom code
MyObject myObject = new MyObject( [...] );
Response< String > response = new Response<>( QueueMessageStatus.OK, myObject );
messageQueueService.sendResponseMessage(message ,response);
}
finally
{
message.acknowledge();
}
}