MQ with Media Manager

This article provides some code snippets which show how the implementation of Media Manager integration connects to the JMS server.

Role of PIM components in regard to JMS server

Message producer: Media Manager, which contains several workflows may send corresponding message(only TextMessage) to JMS server

Message consumer: PIM server, which receives corresponding message from JMS server to process the business logic.

Since PIM 7.0.03 the JMS handling Media Manager Integration has been refactored fundamentally (for detailed information please visit the page JMS asset event processing regarding stable server restart handling):

  1. PIM server can receive messages in Asynchronous mode or Synchronous mode from the corresponding queue.

    1. Asynchronous: JMS provider delivers messages to a client (here PIM server) as they arrive; a client does not have to request messages in order to receive them. It is implemented with a MessageListener on the consumer side.

      public class AsynchronousNotificationQueue extends MediaManagerNotificationQueue implements MessageListener
      {
      public AsynchronousNotificationQueue( Session session, String queueName )
      {
      super( session, queueName );
      try
      {
      if ( this.consumer != null )
      {
      this.consumer.setMessageListener( this );
      }
      }
      catch ( Throwable throwable )
      {
      String message = "Creating JMS queue '" + queueName + "' failed'"; //$NON-NLS-1$ //$NON-NLS-2$
      throw new RuntimeException( message, throwable );
      }
      }
      @Override
      public void onMessage( Message msg )
      {
      try
      {
      handleMessage( msg );
      }
      catch ( JMSException e )
      {
      log.error( "Internal JMSException while processing notification message", e ); //$NON-NLS-1$
      }
      catch ( CoreException ce )
      {
      log.error( "Internal CoreException while processing notification message", ce ); //$NON-NLS-1$
      }
      }
      }
    2. Synchronous: PIM server should explicitly call consumer's “receive” or “receiveNoWait” method to get the next message.

      public class SynchronousNotificationQueue extends MediaManagerNotificationQueue
      {
      /**
      * Timeout value as parameter in the method MessageConsumer.receive(timeout) which will be called in
      * {@link #processNextMessage(IProgressMonitor, ProblemLog)} to get the next message.
      */
      private static long TIMEOUT_RECEIVE_MESSAGE = 3000;
      public SynchronousNotificationQueue( Session session, String queueName )
      {
      super( session, queueName );
      }
      /**
      * Receive the next message with the defined {@link #TIMEOUT_RECEIVE_MESSAGE}. If a message is received, then handle
      * it furthermore by the all registered Listeners.
      */
      public OpasGNotificationMessage processNextMessage( IProgressMonitor progressMonitor, ProblemLog problemLog )
      throws JMSException, CoreException
      {
      Message message = this.consumer.receive( TIMEOUT_RECEIVE_MESSAGE );
      if ( message != null )
      {
      handleMessage( message, progressMonitor, problemLog );
      return this.currentNotification;
      }
      return null;
      }
  2. Each Message Queue is processed in owner session to improve the performance. And each session processes the messages with the ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE mode, it means, that the acknowledge method of the message should be always called explicitly to acknowledge message. If this method is not called by any exception, the message will be delievered again after HPM server restarts.

    private void handleTextMessage( TextMessage textMsg, IProgressMonitor progressMonitor, ProblemLog problemLog )
    throws JMSException, CoreException
    {
    logMessage( textMsg );
    this.currentNotification = parseTextMessage( textMsg );
    String topic = this.currentNotification.getTopic();
    List< OpasGNotificationListener > listeners = this.listenerMap.get( topic );
    if ( listeners != null )
    {
    for ( OpasGNotificationListener listener : listeners )
    {
    notificationLog.debug( "Sending notification to listener: " + listener.getClass() //$NON-NLS-1$
    .getName() );
    //The topic listener which is informed to handle message furthermore
    listener.onMessage( this.currentNotification, progressMonitor, problemLog );
    }
    //acknowledge the original JMS message after all listeners finish their takes without exception.
    textMsg.acknowledge();
    }
    else
    {
    notificationLog.debug( "No listener registered for topic '" + topic + "'" ); //$NON-NLS-1$ //$NON-NLS-2$
    }
    }
  3. In standard PIM there are 3 predefined message queues and the corresponding message topics

    1. JMS message with topic "AssignDocument" and "NewDerivative" : will be send to Message Queue heiler.hmm.backend.event.assignment

    2. JMS message with topic "AssetModified": will be send to Message Queue heiler.hmm.backend.event.assetModified

    3. all other JMS messages will be send to Message Queue heiler.hmm.backend.event