MQ with Media Manager
This article provides some code snippets which show how the implementation of Media Manager integration connects to the JMS server.
Role of PIM components in regard to JMS server
Message producer: Media Manager, which contains several workflows may send corresponding message(only TextMessage) to JMS server
Message consumer: PIM server, which receives corresponding message from JMS server to process the business logic.
Since PIM 7.0.03 the JMS handling Media Manager Integration has been refactored fundamentally (for detailed information please visit the page JMS asset event processing regarding stable server restart handling):
PIM server can receive messages in Asynchronous mode or Synchronous mode from the corresponding queue.
Asynchronous: JMS provider delivers messages to a client (here PIM server) as they arrive; a client does not have to request messages in order to receive them. It is implemented with a MessageListener on the consumer side.
public
class
AsynchronousNotificationQueue
extends
MediaManagerNotificationQueue
implements
MessageListener
{
public
AsynchronousNotificationQueue( Session session, String queueName )
{
super
( session, queueName );
try
{
if
(
this
.consumer !=
null
)
{
this
.consumer.setMessageListener(
this
);
}
}
catch
( Throwable throwable )
{
String message =
"Creating JMS queue '"
+ queueName +
"' failed'"
;
//$NON-NLS-1$ //$NON-NLS-2$
throw
new
RuntimeException( message, throwable );
}
}
@Override
public
void
onMessage( Message msg )
{
try
{
handleMessage( msg );
}
catch
( JMSException e )
{
log.error(
"Internal JMSException while processing notification message"
, e );
//$NON-NLS-1$
}
catch
( CoreException ce )
{
log.error(
"Internal CoreException while processing notification message"
, ce );
//$NON-NLS-1$
}
}
}
Synchronous: PIM server should explicitly call consumer's “receive” or “receiveNoWait” method to get the next message.
public
class
SynchronousNotificationQueue
extends
MediaManagerNotificationQueue
{
/**
* Timeout value as parameter in the method MessageConsumer.receive(timeout) which will be called in
* {@link #processNextMessage(IProgressMonitor, ProblemLog)} to get the next message.
*/
private
static
long
TIMEOUT_RECEIVE_MESSAGE =
3000
;
public
SynchronousNotificationQueue( Session session, String queueName )
{
super
( session, queueName );
}
/**
* Receive the next message with the defined {@link #TIMEOUT_RECEIVE_MESSAGE}. If a message is received, then handle
* it furthermore by the all registered Listeners.
*/
public
OpasGNotificationMessage processNextMessage( IProgressMonitor progressMonitor, ProblemLog problemLog )
throws
JMSException, CoreException
{
Message message =
this
.consumer.receive( TIMEOUT_RECEIVE_MESSAGE );
if
( message !=
null
)
{
handleMessage( message, progressMonitor, problemLog );
return
this
.currentNotification;
}
return
null
;
}
Each Message Queue is processed in owner session to improve the performance. And each session processes the messages with the ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE mode, it means, that the acknowledge method of the message should be always called explicitly to acknowledge message. If this method is not called by any exception, the message will be delievered again after HPM server restarts.
private
void
handleTextMessage( TextMessage textMsg, IProgressMonitor progressMonitor, ProblemLog problemLog )
throws
JMSException, CoreException
{
logMessage( textMsg );
this
.currentNotification = parseTextMessage( textMsg );
String topic =
this
.currentNotification.getTopic();
List< OpasGNotificationListener > listeners =
this
.listenerMap.get( topic );
if
( listeners !=
null
)
{
for
( OpasGNotificationListener listener : listeners )
{
notificationLog.debug(
"Sending notification to listener: "
+ listener.getClass()
//$NON-NLS-1$
.getName() );
//The topic listener which is informed to handle message furthermore
listener.onMessage(
this
.currentNotification, progressMonitor, problemLog );
}
//acknowledge the original JMS message after all listeners finish their takes without exception.
textMsg.acknowledge();
}
else
{
notificationLog.debug(
"No listener registered for topic '"
+ topic +
"'"
);
//$NON-NLS-1$ //$NON-NLS-2$
}
}
In standard PIM there are 3 predefined message queues and the corresponding message topics
JMS message with topic "AssignDocument" and "NewDerivative" : will be send to Message Queue heiler.hmm.backend.event.assignment
JMS message with topic "AssetModified": will be send to Message Queue heiler.hmm.backend.event.assetModified
all other JMS messages will be send to Message Queue heiler.hmm.backend.event