java - ActiveMQ, why RedeliveryCounter dont increase when shutdown consumer before Ack -
my consumer running in application server. activemq config:
optimizeacknowledge=true optimizeacknowledgetimeout=5000 destination=queue.test.msg?consumer.prefetchsize=10
after sent 10 messages, "web console(http://localhost:8161/admin)" see 3 messages dont ack(used optimizeacknowledge), @ time when restarted consumer server, received 3 message again , redeliverycounter=0. dont determine redelivery messages or sent producer.
this question find in transaction. run test class: org\apache\activemq\redeliverypolicytest.java, methoed:testrepeatedredeliveryreceivenocommit. debug in "connection.close();" , stop it. message's redeliverycounter "0".
question: when determine message redelivery or not, must use store save message's id after consumer processed , check if exist when consumer received message?
the code below:
public void testrepeatedredeliveryreceivenocommit() throws exception { connection.start(); session dlqsession = connection.createsession(true, session.session_transacted); activemqqueue destination = new activemqqueue("test"); messageproducer producer = dlqsession.createproducer(destination); // send messages producer.send(dlqsession.createtextmessage("1st")); dlqsession.commit(); messageconsumer dlqconsumer = dlqsession.createconsumer(new activemqqueue("activemq.dlq")); final int maxredeliveries = 4; (int i=0;i<=maxredeliveries +1;i++) { connection = (activemqconnection)factory.createconnection(username, password); connections.add(connection); // receive message jms api redeliverypolicy policy = connection.getredeliverypolicy(); policy.setinitialredeliverydelay(0); policy.setuseexponentialbackoff(false); policy.setmaximumredeliveries(maxredeliveries); connection.start(); session session = connection.createsession(true, session.auto_acknowledge); messageconsumer consumer = session.createconsumer(destination); activemqtextmessage m = ((activemqtextmessage)consumer.receive(4000)); if (i<=maxredeliveries) { m.setredeliverycounter(1); system.out.println("text:"+ m.gettext()); system.out.println("redeliverycounter:" + m.getredeliverycounter()); } else { system.out.println("null on exceeding redelivery count:" + m); } connection.close(); connections.remove(connection); } // should able message off dlq now. textmessage m = (textmessage)dlqconsumer.receive(1000); system.out.println("got message dlq:" + m); system.out.println("text:"+ m.gettext()); string cause = m.getstringproperty(activemqmessage.dlq_delivery_failure_cause_property); system.out.println("cause exception has policy ref:" + cause.contains("redeliverypolicy")); dlqsession.commit(); }
Comments
Post a Comment