Spring JMS Session Acknowledge Mode

Spradzimy dziś możliwości jakie mamy używając Spring JMS jesli chodzi o ponowne dostarczenie wiadomości w przypadku gdy coś poszło nie tak. Tak zwane redelivery jest supportowane przez ustawienie SessionAcknowledgeMode. 

Do wyboru mamy cztery tryby powiadomień:

  • Session.AUTO_ACKNOWLEDGE – DEFAULT
    • Potwierdzenie wysłane zaraz po otrzymaniu wiadomości i przed procesowniem jej przez warstwę aplikacji.
  • Session.DUPS_OK_ACKNOWLEDGE
    • Potwierdzenie obsługiwane jest przez JMS automatycznie. Może być wysłane zbiorczo dopiero po kilku otrzymanych wiadomościach. Możliwe jest otrzymanie duplikatów.
    • Jest to bardzo wydajna metoda potwierdzania wiadomości jednak aplikacja musi być przygotowana na radzenie sobie z duplikatami – np. poprzez rozpoznawanie ich i odrzucanie.
  • Session.CLIENT_ACKNOWLEDGE
    • Odbiorca może zdecydować kiedy potwierdza otrzymanie wiadomości. 
    • Np.: Jeśli zakładamy, że dobrze sparsowana wiadomość jest wystarczająca do bezproblemowego dalszego procesowania to warto wysłać acknowlege tuż po etapie parsowania. 
  • Session.SESSION_TRANSACTED
    • Tryb ten oznacza ze sesja jest tranzakcyjna – jest to wartość informacyjna. Aktywacja tego trybu i tak wymaga ustawienia flagi setSessionTransacted na true. W tym trybie wywołanie commit na obiekcie session jest równoznaczne z wysłaniem potwierdzenia.

Ustawienia czy procesowanie ma być tranzakcyjne:

	factory.setSessionTransacted(true);

Ustawienie tej zmiennej na true powoduje użycie lokalnej tranzakcji oraz ignorowanie powyższych trybów sessionAcknowledgeMode!

Ostatnia opcja to ustawienie managera tranzakcji:

	factory.setTransactionManager(jmsTransactionManager);

ma to najwyższy priorytet i powoduje zignorowanie poprzednich ustawień:
sessionAcknowledgeMode oraz sessionTransacted.

Kiedy używać tranzakcyjności?

Tranzakcje JMS warto stosować jeśli zamierzamy zatwierdzać grupę otrzymanych wiadomości a nie pojedyńcze sztuki. Tranzakcyjność użyta przy każdej pojedyńczej wiadomości wprowadzi nam niepotrzebny narzut związany z zatwierdzaniem (commit) i cofaniem (rollback) tranzakcji.

Przygotowanie środowiska:

Konfiugracja JMS:

@Configuration
@EnableJms
public class JmsConfig implements JmsListenerConfigurer {

    @Value("${hostname}")
    private String hostname;
    @Value("${port}")
    private int port;
    @Value("${queueManager}")
    private String queueManager;
    @Value("${channel}")
    private String channel;

    @Value("${destination}")
    private String destination;

    @Autowired
    private JmsMessageListener jmsMessageListener;
    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId(destination);
        endpoint.setDestination(destination);
        endpoint.setMessageListener(jmsMessageListener);
        DefaultJmsListenerContainerFactory factory =
                applicationContext.getBean("jmsFactory", DefaultJmsListenerContainerFactory.class);
        registrar.registerEndpoint(endpoint, factory);
    }

    @Bean
    public ConnectionFactory connectionFactory() throws JMSException {
        MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
        factory.setHostName(hostname);
        factory.setPort(port);
        factory.setQueueManager(queueManager);
        factory.setChannel(channel);
        factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
        return factory;
    }

    @Bean(name = "jmsFactory")
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                          JmsTransactionManager jmsTransactionManager) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
        factory.setReceiveTimeout(60000L);
        return factory;
    }

  	// Used for Producer only:
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        return new JmsTemplate(connectionFactory);
    }

    @Bean
    public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }
}

Konfiugracja Listener-a:

@Slf4j
@Component
public class JmsMessageListener implements MessageListener {

    @Autowired
    private JmsTransactionManager jmsTransactionManager;
    
    @Override
    public void onMessage(Message message) {
        try {
            String textMessage = ((TextMessage) message).getText();
            log.info("Received message: " + textMessage);
            JmsMessageListener.counter++;
        } catch (JMSException e) {
            log.error("Failed: ", e);
        }
    }
    
    // for verification
    static int counter = 0;
}

Konfiguracja nadawcy:

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = SpringjmsApplication.class)
@ContextConfiguration(classes = JmsConfig.class)
class JmsProducerTest {

    @Autowired
    private JmsTemplate jmsTemplate;
    @Value("${destination}")
    private String destination;

    @Test
    public void sendMessageTest() throws InterruptedException {
        jmsTemplate.send(destination, messageCreator -> messageCreator.createTextMessage(createMessage()));

        TimeUnit.SECONDS.sleep(10);

        Assert.assertEquals(1, JmsMessageListener.counter);
    }

    private String createMessage() {
        String sentTime = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.SHORT).withZone(ZoneOffset.UTC)
                .format(Instant.now());
        return "Message content. Sent at: " + sentTime;
    }
}

Poprawne wykonanie

Po odpaleniu powyższego kodu, wiadomość zostanie wysłana jeden raz i test zakończy się sukcesem. 

1. AUTO_ACKNOWLEDGE

        factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);

1.1 Rzucenie wyjątku w onMessage – AUTO ACK

W przypadku auto acknowledge mamy ograniczone możliwości jeśli chodzi o obsługę powiadomień JMS. Spróbojmy rzucić exception w Message Listenerze:

    public void onMessage(Message message) {
        try {
            String textMessage = ((TextMessage) message).getText();
            log.info("Received message: " + textMessage);
            JmsMessageListener.counter++;
            throw new RuntimeException("Message has not been processed correctly");
        } catch (JMSException e) {
            log.error("Failed: ", e);
        }
    }
2019-01-04 12:12:24.985  INFO 1724 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 11:12 AM
java.lang.RuntimeException: Message has not been processed correctly

Potwierdzenie zostało wysłana automatycznie. Exception nie spowodował wysłanie wiadomoście ponownie.

2. CLIENT_ACKNOWLEDGE

Zmieńmy teraz rodzaj potwierdzenia na CLIENT_ACKNOWLEDGE:

        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

2.1 Rzucenie wyjątku w onMessage – CLIENT ACK

Tym razem test sfailował gdyż ilość odebranych message bedzie dużo wieksza. Rzucamy exception w onMessage więc widomość jest uznawana za niedostarczoną i następuje redelivery:

2019-01-04 12:21:27.376  INFO 17696 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 11:21 AM
java.lang.RuntimeException: Message has not been processed correctly
2019-01-04 12:21:28.820  INFO 17696 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 11:21 AM
java.lang.RuntimeException: Message has not been processed correctly
2019-01-04 12:21:29.174  INFO 17696 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 11:21 AM
java.lang.RuntimeException: Message has not been processed correctly
...
  java.lang.AssertionError: expected:<1> but was:<332>
  Expected :1
  Actual   :332

2.2 Potwierdzenie odebrania przed rzuceniem wyjątku:

Spróbujmy to naprawić dodając wywołanie message.acknowledge() przed rzuceniem wyjątku:

    @Override
    public void onMessage(Message message) {
        try {
            String textMessage = ((TextMessage) message).getText();
            log.info("Received message: " + textMessage);
            JmsMessageListener.counter++;
            message.acknowledge();
            throw new RuntimeException("Message has not been processed correctly");
        } catch (JMSException e) {
            log.error("Failed: ", e);
        }
    }

Tym razem widomość otrzymamy tylko raz. Wywołanie potwierdzenia odebrania wiadomości spowodowało, że nawet jeśli jest potem rzucony wyjątek to i tak nie ma to wpływu na redelivery tej widomości ponownie.

3. DUPS_OK_ACKNOWLEDGE

W przypadku zmiany na DUPS_OK_ACKNOWLEDGE zgadzamy sie na zarządzanie powiadomieniami przez JMS i nie mamy wpływu na to kiedy zostanie potwierdzenie wysłane. Z tą opcją uruchamiając test kilka razy pod rząd otrzymamy duplikaty. Dopiero po pewnym czasie otrzymanie wiadomości zostanie potwierdzone – leniwe potwierdzenie.

4. SESSION_TRANSACTED

Sama zmiana na SESSION TRANSACTED nie jest wystarczająca. Przy próbie odebrania wiadomości dostaniemy WARNING oraz wiadomości nie będą odbierane:

2019-01-04 13:07:46.069  WARN 7896 --- [nerContainer-18] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'TEST' - trying to recover. Cause: JMSCC0097: Bad acknowlegement mode '0'. Connection.createSession() was asked to create a non-transacted session (transacted was false) but the acknowledgement mode was not one of AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE or CLIENT_ACKNOWLEDGE.

Niezbędne tutaj jest włączenie tranzakcyjności.

5. Lokalna tranzakcyjność

Włączenie lokalnej tranzakcyjności:

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        factory.setSessionTransacted(true);

5.1 Rzucenie wyjątku w onMessage – TRANSACTED=true

Przy rzuceniu wyjątku tutaj będziemy miec taka samą sytuacje jak w punkcie 2.1. Wiadomość nie zostanie potwierdzona i JMS będzie nam próbował ją wysłać w kółko aż do skutku.

5.2 Potwierdzenie odebrania przed rzuceniem wyjątku:

Niestety tym razem (w przeciwieństwie do punktu 2.2) potwierdzenie przez message.acknowledge() nam  nie pomoże:

2019-01-04 13:39:00.974  INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 12:38 PM
java.lang.RuntimeException: Message has not been processed correctly
2019-01-04 13:39:01.329  INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 12:38 PM
java.lang.RuntimeException: Message has not been processed correctly
2019-01-04 13:39:01.689  INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 12:38 PM
java.lang.RuntimeException: Message has not been processed correctly

6. Użycie managera tranzakcji a cache level.

Przy użyciu JmsTransactionManager możemy ustawić odpowiedni CacheLevel. Mamy następujące opcje do wyboru:

  • CACHE_AUTO
  • CACHE_NONE
  • CACHE_CONNECTION
  • CACHE_SESSION
  • CACHE_CONSUMER

Ustawienie cache level:
Dopóki nie używamy managera tranzakcji cache level ustawiamy na CACHE_CONSUMER – jest to default dla lokalnych zasobów JMS.

Jeśli natomiast używamy zewnętrznego managera tranzakcji default to CACHE_NONE. Możemy jednak bez obaw zmienić tą wartość na conajmniej CACHE_CONNECTION.

    @Bean(name = "jmsFactory")
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                          JmsTransactionManager jmsTransactionManager) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setTransactionManager(jmsTransactionManager);
        factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION);
        factory.setReceiveTimeout(60000L);
        return factory;
    }

6.1 Rzucenie wyjątku w onMessage – JmsTransactionManager

Cache Level: CACHE_NONE, CACHE_CONNECTION

Podobniej jak wczesniej oraz w punkcie 2.1. Wiadomość nie zostanie potwierdzona i JMS będzie nam próbował ją wysłać w kółko aż do skutku.

2019-01-04 13:39:00.974  INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 12:38 PM
java.lang.RuntimeException: Message has not been processed correctly
2019-01-04 13:39:01.329  INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 12:38 PM
java.lang.RuntimeException: Message has not been processed correctly
2019-01-04 13:39:01.689  INFO 16640 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 12:38 PM
java.lang.RuntimeException: Message has not been processed correctly

6.2 Potwierdzenie odebrania przed rzuceniem wyjątku:

Niestety potwierdzenie przez message.acknowledge() i zacommitowanie transakcji przed rzuceniem wyjątku też nam nic nie pomoże. Message będzie na kolejce.

6.3 Rollback na jmsTransactionManager

Dodatkowo w tym przypadku możemy zawołać rollback na JmsTransactionManager. Dzięki temu wycofamy tranzakcje i wiadomość bedzie dostarczona ponownie:

public class JmsMessageListener implements MessageListener {

    @Autowired
    private JmsTransactionManager jmsTransactionManager;
    
    @Override
    public void onMessage(Message message) {
        try {
            TransactionStatus transactionStatus = jmsTransactionManager.getTransaction(new DefaultTransactionDefinition());
            
            String textMessage = ((TextMessage) message).getText();
            log.info("Received message: " + textMessage);
            JmsMessageListener.counter++;
          
            log.info("Is tx completed: " + transactionStatus.isCompleted());
            jmsTransactionManager.rollback(transactionStatus);
            log.info("Is tx completed: " + transactionStatus.isCompleted());
          
        } catch (JMSException e) {
            log.error("Failed: ", e);
        }
    }
2019-01-04 14:43:09.425  INFO 15100 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 1:41 PM
2019-01-04 14:43:09.425  INFO 15100 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Is tx completed: false
2019-01-04 14:43:09.425  INFO 15100 --- [enerContainer-1] pl.devrev.springjms.JmsMessageListener   : Is tx completed: true
2019-01-04 14:43:15.280  WARN 15100 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'TEST' - trying to recover. Cause: Transaction rolled back because it has been marked as rollback-only
2019-01-04 14:43:20.319  INFO 15100 --- [enerContainer-2] pl.devrev.springjms.JmsMessageListener   : Received message: Message content. Sent at: 1/4/19 1:41 PM
2019-01-04 14:43:20.319  INFO 15100 --- [enerContainer-2] pl.devrev.springjms.JmsMessageListener   : Is tx completed: false
2019-01-04 14:43:20.319  INFO 15100 --- [enerContainer-2] pl.devrev.springjms.JmsMessageListener   : Is tx completed: true
...

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *