Message Queue API
Overview
The Message Queue API has been designed with the intention of 3rd party application integration and high volume processing. One of these 3rd parties is our own Business Process Engine. The Message Queue API has been tested with the Informatica BPM system, but is open to any 3rd party application which can read and write to/from an Apache MQ Server.
In order to be most flexible we tried to minimize any efforts in setting up queues and linking applications together.
Queue Communication Overview
Product 360 produces messages as well as receives messages from Apache MQ. This picture shows the general communication streams between an external application and the Product 360 Application Servers by using Message Queues.
Each queue in this picture is an own queue within the Apache MQ Server instance, so a single server will host all those queues.
The queues allow us to decouple the external applications from the product 360 servers and therefor they form also a buffer for peak load scenarios. Each side of the equation can process the messages in their own speed, typically as fast as possible without crashing. If there are more messages within the queue, the queue will safely have them persisted as long as it's needed. In case the Apache Message queue would reach it's limit it will automatically make sure that the producer will slow down. All this making sure that no module is overloaded at any time.
Of course, this holds only true in case there are no synchronous REST api calls. None, or just a very minimum. Each rest call is a synchronous call which uses a http thread within the application server. Client users also need those http threads and they are a limited resource. So by executing huge amounts of synchronous requests which might execute some performance intensive logic, it can happen that a server becomes unresponsive for users. This is something which should be avoided. Using the Message Queue API instead of the synchronous REST api bypasses this.
Let's follow the basic flow:
A user modifies something, either manually or with a job.
A BPM trigger is configured for this modification. A new trigger event message is created with the configured payload. This event is sent to the queue which is configured in the trigger configuration.
There can be multiple queues configured for the trigger framework - if more than one, the user can actually chose in which queue the event message should be sent! The setting is within the server.properties file, it's called: infa.bpm.trigger.queue.ids. It defaults to "bpm".
Also multiple triggers which send events to multiple queues is possible. One for the BPM server, another one for a 3rd party system - even with different payloads.A DQ trigger is configured for object created or changed. This trigger will write directly a corresponding DQ message to the batch api queue. As the server which changed the item is not necessarily the same server which processes the Message Queue API. Also this way it's guaranteed that all our batching capabilities are used
Informatica BPM (or a 3rd party system) consumes the Trigger Event messages and starts the corresponding workflow process.
Based on the workflows logic, a batch api request is sent to the batch api queue. Currently only Data Quality and Merge requests are supported for the batch api.
The batch api consumer consumes the message from the queue.
The batch api framework groups and accumulates multiple messages for a specific amount of time or number of messages in order to optimize the data quality and merge executions. Please refer to Batch API Queue for details on the algorithm and the possible configurations.
The results of the data quality and merge executions are split into multiple responses so that every requests receives his own response. This renders the batch processing fully transparent to the external application. The responses are sent to the queue which is defined in the requests as "responseQueue". In this picture the "bpm_response" - but it could be a different one!
Informatica BPM or a 3rd party application reads the response and processes it by continuing or starting a new workflow process instance.
The workflow might also execute other "service api" calls by means of request message. For example enter/leave a workflow status, assigning an item into a task etc. Please have a look at Service API Queue for details on how to compose those messages.
The Service API Consumer reads the service api request message and executes it against our standard service api. As this consumer is implemented in a fully generic way he basically is able to execute most of the available service api calls. Not all of them are fully tested and supported. The ones which are are listed on the Service API Queue page
The response from the Service api is sent to the response queue, in our example also "bpm_response". This can also be a custom queue!
The workflow might also call the object api to obtain a full picture of an object, or to modify the object. In terms of single item processing the object api should be used instead of the list api as the object api is backed by an in memory cache.
The object api consumer directly calls the internal object api, bypassing the Service API completely. As the object api is optimized for high throughput the extra marshalling/unmarshalling can be avoided this way. When the object api queue is used, also no further thread pools are involved in the object api - the consumer threads do all the work. In case the object api is executed via rest, an internal thread pool is processing the request. This is to prevent overload scenarios of the application server.
The response from the object api is being sent to the response queue, in our example also "bpm_response". This can also be a custom queue!
Although discouraged and potentially dangerous, the workflow might also still call the service or object api directly. Please be aware that this will block an http thread during the time of the execution. Depending on the request and the number of concurrent workflows this might lead to an overload in the Jetty Server which will render the server unresponsive for web-clients! As there is a limited number of consumer threads for the Service API Consumer the way via message queue can not overload the system.
General Queue Configuration
All queues which are summarized in "Message Queue API" are configured in the server.properties configuration file.
Each queue has an unique id, the queue id. The ID is name after the queue. prefix. So in this example it's "batchapi".
queue.batchapi.type = ${queue.
default
.type}
queue.batchapi.writer.count = ${queue.
default
.writer.count}
queue.batchapi.consumer.count = ${queue.
default
.consumer.count}
queue.batchapi.url = ${queue.
default
.url}
queue.batchapi.username = ${queue.
default
.username}
queue.batchapi.password = ${queue.
default
.password}
queue.batchapi.message.format = XML
queue.batchapi.name = P360_BATCH_API
queue.batchapi.label = BatchAPI
type: The type can not be changed as we currently only support ActiveMQ.
writer.count: The number of threads which are used to write messages into the queue
consumer.count: The number of threads which are used to consume messages from the queue
url: the url to the Apache MQ Server
username: the username which should be used for the apache active mq server
password: the password which should be used for the apache active mq server
message.format: Either XML or JSON. Communication with the Informatica BPM Server requires here XML. If other scenarios allow JSON, use it.
name: the name of the Queue within the Apache Active MQ server. If the queue is not available in Apache Active MQ, it will automatically be created
label: the human readable label of the queue. It's used in the trigger configuration UI to chose the the target queue of the trigger event.
Queue Purposes
The application allows to configure multiple queues to listen to for service api requests and trigger events. By default the "bpm" queue is used for the trigger events, and the "serviceapi" queue is used for service api requests. Users can configure multiple queues by using a comma seperated list of queue IDs
infa.bpm.trigger.queue.ids = bpm
#infa.bpm.consumer.serviceapi.queue.ids = serviceapi
In case there are multiple trigger queues configured, the user can chose into which one the trigger event should be sent when he configures the trigger.
Custom Queues
Customers can easily create own queues by just providing the configuration like described above in the server.properties file. Let's assume we have a downstream syndication system which needs to receive all modifications a user does.
Add the "syndication" queue to the server.properties file:
queue.syndication.type = ${queue.
default
.type}
queue.syndication.writer.count = ${queue.
default
.writer.count}
queue.syndication.consumer.count = ${queue.
default
.consumer.count}
queue.syndication.url = ${queue.
default
.url}
queue.syndication.username = ${queue.
default
.username}
queue.syndication.password = ${queue.
default
.password}
queue.syndication.message.format = JSON
queue.syndication.name = CUSTOM_SYNDICATION
queue.syndication.label = Syndication
We chose JSON as format since it's more efficient than xml and the syndication system can handle it just well.
Add the "syndication" queue to the trigger queues:
infa.bpm.trigger.queue.ids = bpm,syndicationChose the "Syndication" queue in the trigger configuration of the "Business Process management" perspective
Open the Apache Active MQ Console and see the content of the new CUSTOM_SYNDICATION queue
Have a look at the content of the message. It looks something like this
Payload in new Syndication Queue when configured with{
"entityId"
:
"1185@1"
,
"entity"
:
"Article"
,
"data"
: {
"catalog.label"
:
"MASTER"
,
"triggerConfiguration"
:
"ItemChanged"
,
"triggerIdentifier"
:
"hlr.persistence.trigger.entityChanged"
,
"user.label"
:
"Administrator"
,
"userGroup.label"
:
""
},
"entityItemChange"
: {
"_module"
:
"UI"
,
"_entity"
:
"Article"
,
"_identifier"
:
"Item_A"
,
"_user"
: {
"_internalId"
:
"1"
,
"_entityId"
: 2600,
"_externalId"
:
"'Administrator'"
},
"_revision"
: {
"_internalId"
:
"1"
,
"_entityId"
: 5600,
"_externalId"
:
"'root'"
},
"_container"
: {
"_internalId"
:
"1"
,
"_entityId"
: 2900,
"_externalId"
:
"'MASTER'"
},
"_entityItem"
: {
"_internalId"
:
"1185@1"
,
"_entityId"
: 1000,
"_externalId"
:
"'Item_A'@'MASTER'"
},
"_eventTimestamp"
:
"2022-06-09T10:42:25.420Z"
,
"_changeType"
:
"CHANGED"
,
"_changedFields"
: [
"Article.ManufacturerName"
],
"_changedEntities"
: [
"Article"
],
"_changeSummary"
: {
"article"
: {
"_changeType"
:
"CHANGED"
,
"_mainSupplierProxy"
: {
"_entityId"
: 2800,
"_internalId"
:
"3"
,
"_externalId"
:
"'Heiler Product Manager'"
},
"manufacturerName"
: {
"_old"
:
"Mercedes"
,
"_current"
:
"BMW"
}
}
}
}
}
General Headers
These headers apply to all queues, unless stated otherwise on the queue page.
Request Headers
Headers for request to Product 360
Header name |
Type |
Mandatory |
Stored in |
Values |
Value example |
Purpose |
User |
general |
true |
JMS Property |
<String> |
User to be used for authenticating within Product 360 |
|
Password |
general |
true |
JMS Property |
<String> |
Password |
|
MessageFormat |
general |
true |
JMS Property |
JSON, XML |
XML |
Defines the format of the request body. |
ResponseQueueID |
general |
false |
JMS Property |
<String> |
bpm_response |
Queue id from in server.properties (see above) |
JMSCorrelationID |
general |
false |
JMS message header |
Identifies the communication series that the message belongs to. |
||
Origin |
general |
false |
JMS Property |
<String> |
Test system |
Identifies the P360 server environment (DEV/QA/PROD) the message was originates from. This should be taken from a previous trigger message and reflects normally the system.name property in the server.properties. |
SuccessTargetService |
specific |
false |
JMS Property |
<String> |
The name of the target consumer service within the target workflow for success responses. |
|
ErrorTargetService |
specific |
false |
JMS Property |
<String> |
The target consumer service within the target workflow for error responses |
Response Headers
These generic response headers are always added to all responses Product 360 consumers send out.
Header name |
Type |
Stored in |
Values |
Value example |
Purpose |
MessageFormat |
general |
JMS Property |
JSON, XML |
XML |
Defines the content type of the response payload. Can either be XML or JSON. The message format of the response is defined by the response queue's preferred format (as configured in the server.properties file)! It might be that a client provides a request in JSON, but when the response queue prefers xml, the response will be XML! Informatica BPM (ActiveVOS) only supports XML. |
JMSCorrelationID |
general |
JMS Message header |
Identifies the communication series that the message belongs to. |
||
Status |
general |
JMS Property |
Integer (Http like status code) |
200 |
Response status code 200 = OK 400 = BAD REQUEST (Additional ones might have been defined by the actual queue consumer implementation) |
Origin |
general |
JMS Property |
<String> |
Identifies the P360 server environment (DEV/QA/PROD) the message was sent by. Per default this is the value of system.name in the server.properties |
|
P360TargetService |
specific |
JMS Property |
Specifies target endpoint (this is either the SuccessTargetService or the ErrorTargetService from the request header. Depending on the Status of the response) |
Default Queues
BPM Queue
The main queue to which all trigger events are passed and which should trigger workflow instances initially or by correlation id
Queue ID |
Format |
Active MQ Name |
bpm |
XML |
P360_BPM |
Batch API Queue
Some of our Message Queue APIs profit tremendously from our new batching algorithm. The most important ones have been implemented with the Version 10.1 release. Data Quality and Merge.
Queue ID |
Format |
Active MQ Name |
batchapi |
XML |
P360_BATCH_API |
Service API Queue
General queue providing functionality of the .
Queue ID |
Format |
Active MQ Name |
serviceapi |
XML |
P360_SERVICE_API |
ObjectAPI Queue
A dedicated message queue for interacting with the Object APIs CRUD operations. The Object API supports both, JSON and XML payloads. We recommend using JSON where possible.
Queue ID |
Format |
Active MQ Name |
objectapi |
JSON/XML |
P360_OBJECT_API |