Message Queue API

With version 10.0 we introduced a new way to integrate Product 360 with external applications. The Message Queue API.
The Message Queue API provides drastic advantages in terms of stability and resiliency and offers new ways of optimizing performance sensitive tasks.Currently this API only supports the Apache Active MQ implementation. Please refer to our installation section and the official installation manuals from Apache Active MQ for details.

Overview

The Message Queue API has been designed with the intention of 3rd party application integration and high volume processing. One of these 3rd parties is our own Business Process Engine. The Message Queue API has been tested with the Informatica BPM system, but is open to any 3rd party application which can read and write to/from an Apache MQ Server.

In order to be most flexible we tried to minimize any efforts in setting up queues and linking applications together.

Queue Communication Overview

Product 360 produces messages as well as receives messages from Apache MQ. This picture shows the general communication streams between an external application and the Product 360 Application Servers by using Message Queues.

Each queue in this picture is an own queue within the Apache MQ Server instance, so a single server will host all those queues.

images/download/attachments/487621524/MQApi_-_Overview.png

The queues allow us to decouple the external applications from the product 360 servers and therefor they form also a buffer for peak load scenarios. Each side of the equation can process the messages in their own speed, typically as fast as possible without crashing. If there are more messages within the queue, the queue will safely have them persisted as long as it's needed. In case the Apache Message queue would reach it's limit it will automatically make sure that the producer will slow down. All this making sure that no module is overloaded at any time.

Of course, this holds only true in case there are no synchronous REST api calls. None, or just a very minimum. Each rest call is a synchronous call which uses a http thread within the application server. Client users also need those http threads and they are a limited resource. So by executing huge amounts of synchronous requests which might execute some performance intensive logic, it can happen that a server becomes unresponsive for users. This is something which should be avoided. Using the Message Queue API instead of the synchronous REST api bypasses this.

Let's follow the basic flow:

  1. A user modifies something, either manually or with a job.

    1. A BPM trigger is configured for this modification. A new trigger event message is created with the configured payload. This event is sent to the queue which is configured in the trigger configuration.
      There can be multiple queues configured for the trigger framework - if more than one, the user can actually chose in which queue the event message should be sent! The setting is within the server.properties file, it's called: infa.bpm.trigger.queue.ids. It defaults to "bpm".
      Also multiple triggers which send events to multiple queues is possible. One for the BPM server, another one for a 3rd party system - even with different payloads.

    2. A DQ trigger is configured for object created or changed. This trigger will write directly a corresponding DQ message to the batch api queue. As the server which changed the item is not necessarily the same server which processes the Message Queue API. Also this way it's guaranteed that all our batching capabilities are used

  2. Informatica BPM (or a 3rd party system) consumes the Trigger Event messages and starts the corresponding workflow process.

  3. Based on the workflows logic, a batch api request is sent to the batch api queue. Currently only Data Quality and Merge requests are supported for the batch api.

  4. The batch api consumer consumes the message from the queue.

  5. The batch api framework groups and accumulates multiple messages for a specific amount of time or number of messages in order to optimize the data quality and merge executions. Please refer to Batch API Queue for details on the algorithm and the possible configurations.

  6. The results of the data quality and merge executions are split into multiple responses so that every requests receives his own response. This renders the batch processing fully transparent to the external application. The responses are sent to the queue which is defined in the requests as "responseQueue". In this picture the "bpm_response" - but it could be a different one!

  7. Informatica BPM or a 3rd party application reads the response and processes it by continuing or starting a new workflow process instance.

  8. The workflow might also execute other "service api" calls by means of request message. For example enter/leave a workflow status, assigning an item into a task etc. Please have a look at Service API Queue for details on how to compose those messages.

  9. The Service API Consumer reads the service api request message and executes it against our standard service api. As this consumer is implemented in a fully generic way he basically is able to execute most of the available service api calls. Not all of them are fully tested and supported. The ones which are are listed on the Service API Queue page

  10. The response from the Service api is sent to the response queue, in our example also "bpm_response". This can also be a custom queue!

  11. The workflow might also call the object api to obtain a full picture of an object, or to modify the object. In terms of single item processing the object api should be used instead of the list api as the object api is backed by an in memory cache.

  12. The object api consumer directly calls the internal object api, bypassing the Service API completely. As the object api is optimized for high throughput the extra marshalling/unmarshalling can be avoided this way. When the object api queue is used, also no further thread pools are involved in the object api - the consumer threads do all the work. In case the object api is executed via rest, an internal thread pool is processing the request. This is to prevent overload scenarios of the application server.

  13. The response from the object api is being sent to the response queue, in our example also "bpm_response". This can also be a custom queue!

  14. Although discouraged and potentially dangerous, the workflow might also still call the service or object api directly. Please be aware that this will block an http thread during the time of the execution. Depending on the request and the number of concurrent workflows this might lead to an overload in the Jetty Server which will render the server unresponsive for web-clients! As there is a limited number of consumer threads for the Service API Consumer the way via message queue can not overload the system.

General Queue Configuration

All queues which are summarized in "Message Queue API" are configured in the server.properties configuration file.

Each queue has an unique id, the queue id. The ID is name after the queue. prefix. So in this example it's "batchapi".

queue.batchapi.type = ${queue.default.type}
queue.batchapi.writer.count = ${queue.default.writer.count}
queue.batchapi.consumer.count = ${queue.default.consumer.count}
queue.batchapi.url = ${queue.default.url}
queue.batchapi.username = ${queue.default.username}
queue.batchapi.password = ${queue.default.password}
queue.batchapi.message.format = XML
queue.batchapi.name = P360_BATCH_API
queue.batchapi.label = BatchAPI
  • type: The type can not be changed as we currently only support ActiveMQ.

  • writer.count: The number of threads which are used to write messages into the queue

  • consumer.count: The number of threads which are used to consume messages from the queue

  • url: the url to the Apache MQ Server

  • username: the username which should be used for the apache active mq server

  • password: the password which should be used for the apache active mq server

  • message.format: Either XML or JSON. Communication with the Informatica BPM Server requires here XML. If other scenarios allow JSON, use it.

  • name: the name of the Queue within the Apache Active MQ server. If the queue is not available in Apache Active MQ, it will automatically be created

  • label: the human readable label of the queue. It's used in the trigger configuration UI to chose the the target queue of the trigger event.

Queue Purposes

The application allows to configure multiple queues to listen to for service api requests and trigger events. By default the "bpm" queue is used for the trigger events, and the "serviceapi" queue is used for service api requests. Users can configure multiple queues by using a comma seperated list of queue IDs

infa.bpm.trigger.queue.ids = bpm
#infa.bpm.consumer.serviceapi.queue.ids = serviceapi
In case there are multiple trigger queues configured, the user can chose into which one the trigger event should be sent when he configures the trigger.

Custom Queues

Customers can easily create own queues by just providing the configuration like described above in the server.properties file. Let's assume we have a downstream syndication system which needs to receive all modifications a user does.

  1. Add the "syndication" queue to the server.properties file:

    queue.syndication.type = ${queue.default.type}
    queue.syndication.writer.count = ${queue.default.writer.count}
    queue.syndication.consumer.count = ${queue.default.consumer.count}
    queue.syndication.url = ${queue.default.url}
    queue.syndication.username = ${queue.default.username}
    queue.syndication.password = ${queue.default.password}
    queue.syndication.message.format = JSON
    queue.syndication.name = CUSTOM_SYNDICATION
    queue.syndication.label = Syndication

    We chose JSON as format since it's more efficient than xml and the syndication system can handle it just well.

  2. Add the "syndication" queue to the trigger queues:
    infa.bpm.trigger.queue.ids = bpm,syndication

  3. Chose the "Syndication" queue in the trigger configuration of the "Business Process management" perspective

    images/download/attachments/487621524/image2022-6-9_10-41-48.png
  4. Open the Apache Active MQ Console and see the content of the new CUSTOM_SYNDICATION queue

    images/download/attachments/487621524/image2022-6-9_10-39-25.png
  5. Have a look at the content of the message. It looks something like this

    Payload in new Syndication Queue when configured with
    {
    "entityId": "1185@1",
    "entity": "Article",
    "data": {
    "catalog.label": "MASTER",
    "triggerConfiguration": "ItemChanged",
    "triggerIdentifier": "hlr.persistence.trigger.entityChanged",
    "user.label": "Administrator",
    "userGroup.label": ""
    },
    "entityItemChange": {
    "_module": "UI",
    "_entity": "Article",
    "_identifier": "Item_A",
    "_user": {
    "_internalId": "1",
    "_entityId": 2600,
    "_externalId": "'Administrator'"
    },
    "_revision": {
    "_internalId": "1",
    "_entityId": 5600,
    "_externalId": "'root'"
    },
    "_container": {
    "_internalId": "1",
    "_entityId": 2900,
    "_externalId": "'MASTER'"
    },
    "_entityItem": {
    "_internalId": "1185@1",
    "_entityId": 1000,
    "_externalId": "'Item_A'@'MASTER'"
    },
    "_eventTimestamp": "2022-06-09T10:42:25.420Z",
    "_changeType": "CHANGED",
    "_changedFields": [
    "Article.ManufacturerName"
    ],
    "_changedEntities": [
    "Article"
    ],
    "_changeSummary": {
    "article": {
    "_changeType": "CHANGED",
    "_mainSupplierProxy": {
    "_entityId": 2800,
    "_internalId": "3",
    "_externalId": "'Heiler Product Manager'"
    },
    "manufacturerName": {
    "_old": "Mercedes",
    "_current": "BMW"
    }
    }
    }
    }
    }

General Headers

These headers apply to all queues, unless stated otherwise on the queue page.

Request Headers

Headers for request to Product 360

Header name

Type

Mandatory

Stored in

Values

Value example

Purpose

User

general

true

JMS Property

<String>

User to be used for authenticating within Product 360
The user needs to have the needed permissions for the request and of course permission for the Message Queue communication.

Password

general

true

JMS Property

<String>

Password
The password of the user. We recommend to use a secure communication to and from Apache Active MQ.
The password is in plain text.

MessageFormat

general

true

JMS Property

JSON, XML

XML

Defines the format of the request body.
Can either be XML or JSON.

ResponseQueueID

general

false

JMS Property

<String>

bpm_response

Queue id from in server.properties (see above)

JMSCorrelationID

general

false

JMS message header

Identifies the communication series that the message belongs to.
The provided correlation id will be sent back in the response so the client link responses to requests.

Origin

general

false

JMS Property

<String>

Test system

Identifies the P360 server environment (DEV/QA/PROD) the message was originates from. This should be taken from a previous trigger message and reflects normally the system.name property in the server.properties.

SuccessTargetService

specific

false

JMS Property

<String>

The name of the target consumer service within the target workflow for success responses.

ErrorTargetService

specific

false

JMS Property

<String>

The target consumer service within the target workflow for error responses

Response Headers

These generic response headers are always added to all responses Product 360 consumers send out.

Header name

Type

Stored in

Values

Value example

Purpose

MessageFormat

general

JMS Property

JSON, XML

XML

Defines the content type of the response payload. Can either be XML or JSON.

The message format of the response is defined by the response queue's preferred format (as configured in the server.properties file)! It might be that a client provides a request in JSON, but when the response queue prefers xml, the response will be XML!

Informatica BPM (ActiveVOS) only supports XML.

JMSCorrelationID

general

JMS Message header

Identifies the communication series that the message belongs to.

Status

general

JMS Property

Integer (Http like status code)

200

Response status code

200 = OK
201 = CREATED

400 = BAD REQUEST
401 = UNAUTHORIZED
500 = INTERNAL SERVER ERROR

(Additional ones might have been defined by the actual queue consumer implementation)

Origin

general

JMS Property

<String>

Identifies the P360 server environment (DEV/QA/PROD) the message was sent by. Per default this is the value of system.name in the server.properties

P360TargetService

specific

JMS Property

Specifies target endpoint (this is either the SuccessTargetService or the ErrorTargetService from the request header. Depending on the Status of the response)

Default Queues

BPM Queue

The main queue to which all trigger events are passed and which should trigger workflow instances initially or by correlation id

Queue ID

Format

Active MQ Name

bpm

XML

P360_BPM

Batch API Queue

Some of our Message Queue APIs profit tremendously from our new batching algorithm. The most important ones have been implemented with the Version 10.1 release. Data Quality and Merge.

Queue ID

Format

Active MQ Name

batchapi

XML

P360_BATCH_API

Service API Queue

General queue providing functionality of the .

Queue ID

Format

Active MQ Name

serviceapi

XML

P360_SERVICE_API

ObjectAPI Queue

A dedicated message queue for interacting with the Object APIs CRUD operations. The Object API supports both, JSON and XML payloads. We recommend using JSON where possible.

Queue ID

Format

Active MQ Name

objectapi

JSON/XML

P360_OBJECT_API