Batch API Queue

Some of our Message Queue APIs profit tremendously from our new batching algorithm. The most important ones have been implemented with the Version 10.1 release. Data Quality and Merge.

Queue ID

Format

Active MQ Name

batchapi

XML

P360_BATCH_API

Architecture Design

To achieve generic batching, we have done some changes in our Message Queue framework.

  • We have introduced a new batch requests queue (P360_BATCH_API) for all batching requests.

  • DQ requests queue(P360_DATA_QUALITY) is removed as it is no longer needed. All DQ requests from Trigger Framework or from BPM will be written to P360_BATCH_API queue.

  • Batch framework will have its own consumer now for consuming all messages from batch queue(irrespective if it is DQ, Merge or something else in future).

  • DQ MQ Consumer is removed as it is not needed.

  • Merge requests which need batching should be written on batch requests queue.

  • Merge requests on batch queue will be processed synchronously(immediate merge).

Batching Concepts

There is no way around the fact that, taken a single message into consideration, the batching will introduce a small delay in processing of the message. But if many requests are taken into consideration, the overall throughput is way faster!

Incoming requests on batch queue are clubbed together(based on internal key) to form a batch. To decide if a batch is ready for execution, we use these two parameters.
Each service using batch framework can have it's own customized value for those parameters. These can be found in the plugin_customization.ini file.

  • Threshold
    Specifies the size in terms of number of items in a batch after which it is ready for execution. A single request with number of items greater than this threshold value will become a batch with single request. Similarly, a lot of small requests with collective total of items less than threshold will be clubbed into a single batch.
    Default: dataquality.message.batch.threshold=500

  • Timeout
    Specifies the delay( in millisecond) or time from it's creation after which a batch qualifies for processing(even if it has less number of items than threshold value).
    Default: dataquality.message.batch.timeout=3000

To change these parameters at runtime, MXBeans are used which allow these values to be updated from JConsole and does not require server restart.

images/download/attachments/304234888/image2020-6-15_18-9-34.png

Threshold and Timeout values should not be too small or too big. If values are too small, batch will have very less number of items or will be executed without waiting for several requests to batch. Similarly, if values are too big, it will delay execution of messages as it will try to club too many items in a batch or wait for long time to reach timeout.

Batching can be disabled by setting these properties to 1.

Notes for 10.0 upgrade

New batching framework is backward compatible and old functionality will continue to work in the same way as it was working before.

  • Data Quality requests from trigger framework or BPM will be batched as before. Only change will be that these requests will be written on batch queue instead of Data Quality queue (which does not exist after this upgrade).

  • Merge requests via Service Api queue or direct REST Api calls from BPM will not be batched and will be processed asynchronously (through Job Framework) as before.

  • BPM workflows for merge does not require any change apart from writing the request to new batch queue.

  • Workflows for Data Quality call can have extra P360Url parameter (with Data Quality rest endpoint as value) but it is optional. All requests without this parameter will be considered as Data Quality by default.

Merge requests written to Service Api queue or direct REST calls will still be processed as single request(as earlier). Only merge requests via batching queue will be batched.

Request Headers

Please see Message Queue API for the list of generic request headers. This table will only add additional specific ones for this queue!

Header name

Type

Stored in

Values

Value example

Purpose

P360Url

optional

JMS Property

rest/V1.0/manage/merge

or

rest/V1.0/manage/dataQuality

For merge:

rest/V1.0/manage/merge

For data quality (default):

rest/V1.0/manage/dataQuality

The rest endpoint for service to be invoked (DQ or Merge as of now). It is optional and any message which does not have this parameter will be considered DQ by default.

Response Headers

Please see Message Queue API for the list of generic response headers. No additional headers are provided for this queue.

Protocol in Responses

Protocol entries present in the result are only relevant for all the items in the batch, in which items present in the message were executed together. So protocol entries are inconsistent when batching is used. Hence, a single message containing 2 entries might contain protocol relevant for more than 2 entries in it's result, but it will contain the result for those items which have been part of the request!

Data Quality

Request Body

DQ request on queue is initiated by either Trigger framework or by a BPM workflow.

DQ Request Message Body
<dataQualityProfile>
<entityIdentifier>Article</entityIdentifier>
<reportQuery>
<identifier>byItems</identifier>
<parameters>
<entry>
<key>items</key>
<value>1@1</value>
</entry>
</parameters>
</reportQuery>
<rules>IsEmptyManufacturerRule</rules>
</dataQualityProfile>

Response Body

DQ Response Message Body
{
"ruleIds": {
"CheckGtinMED": 12
},
"numberOfSuccessfulItems": 5,
"numberOfFailedItems": 0,
"items": [
{
"entityItem": {
"id": "1@1"
},
"status": "SUCCESSFUL",
"failedRuleIds": [],
"successfulRuleIds": [
12
]
}
],
"protocol": {
"infoCounter": 3,
"warningCounter": 0,
"errorCounter": 0,
"entries": [
{
"severity": "INFO",
"category": "SUMMARY",
"message": "1 Regel wird auf 5 Objekte des Typs 'Artikel' angewendet",
"logDate": "2016-01-04",
"logTime": "14:52:00"
},
{
"severity": "INFO",
"category": "SUMMARY",
"message": "Ausgeführte Regeln: CheckGtinMED",
"logDate": "2016-01-04",
"logTime": "14:52:00"
},
{
"severity": "INFO",
"category": "SUMMARY",
"message": "Verarbeitung der Regeln beendet.",
"logDate": "2016-01-04",
"logTime": "14:52:00"
}
]
}
}

Preferences

Default value for Data Quality preferences:

Preference

Default Value

dataquality.message.batch.threshold

500

dataquality.message.batch.timeout

3000

Merge

Request Body

Field

Required

Default

Description

defaultAction

no

SUPPLEMENT

Default merge mode for all fields in case the corresponding supplier catalog has no persisted merge profile.
"SUPPLEMENT", "OVERWRITE", "CLEAN_COPY", "NO_MERGE"
This parameter doubles as the default action for the entityQualification parameter object.

entityQualification

no


Optional parameter to fine grain the merge actions down to the field level. Provides also the ability to filter based on qualifications (like: merge only the short description in English). See also special chapter below. More details can be found here : REST Merge API

entityReportQuery

yes

An entity report query which allows to use the available reports 'byItems' only.

Merge Request Body
<mergeProfile>
<defaultAction>SUPPLEMENT</defaultAction>
<entityQualification>null</entityQualification>
<entityReportQuery>
<entityIdentifier>Article</entityIdentifier>
<identifier>byItems</identifier>
<parameters>
<entry>
<key>items</key>
<value>1@10</value>
</entry>
</parameters>
</entityReportQuery>
</mergeProfile>

Response Body

If the batch contains less messages then threshold size, the supplier to master items value pair is returned in the response.

Merge Response Body With Supplier Master Map
<mergeResultWithMappings>
<entity>Article</entity>
<protocol>
</protocol>
<total>
<supplierMasterMapping supplierId="147@1024" masterId="47764@1"/>
</total>
<new>
<supplierMasterMapping supplierId="147@1024" masterId="47764@1"/>
</new>
<newAndUpdated>
<supplierMasterMapping supplierId="147@1024" masterId="47764@1"/>
</newAndUpdated>
</mergeResultWithMappings>

If the batch contains more messages then threshold size, the report result is returned in the response instead of supplier to master items map.

Merge Response Body With Report Result
<mergeResultWithReport>
<entity>Article</entity>
<protocol>
</protocol>
<total>
<reportResult>
<id>10204</id>
<dataSource>PCM_MASTER</dataSource>
<type>1</type>
<purpose>1</purpose>
<resultTableName>ReportStoreTempA4</resultTableName>
<count>1</count>
</reportResult>
</total>
<new>
<reportResult>
<id>10204</id>
<dataSource>PCM_MASTER</dataSource>
<type>1</type>
<purpose>1</purpose>
<resultTableName>ReportStoreTempA4</resultTableName>
<count>1</count>
</reportResult>
</new>
<newAndUpdated>
<reportResult>
<id>10204</id>
<dataSource>PCM_MASTER</dataSource>
<type>1</type>
<purpose>1</purpose>
<resultTableName>ReportStoreTempA4</resultTableName>
<count>1</count>
</reportResult>
</newAndUpdated>
</mergeResultWithReport>

Preferences

Apart from threshold and timeout preferences defined for message queue requests batching, two more preferences are defined for merge.

  • merge.callMergeFinishTrigger : Merge requests through message queue do not call merge finish trigger by default. This preference parameter can be set to true explicitly if merge finish trigger needs to be called for merge requests through message queue.

  • merge.message.response.threshold : This preference parameter decides if merge response will have a supplier master mapping or report result. If number of items in a batch are more then this threshold, then merge result will have report result. Otherwise it will have supplier master map.

Default values for merge preferences :

Preference

Default Value

merge.message.batch.threshold

500

merge.message.batch.timeout

3000

merge.message.response.threshold

10000

merge.callMergeFinishTrigger

false