java - RabbitMQ messages delivered after x-message-ttl has expired -
i try create queue x-message-ttl , x-max-length prevent queue growing big when consumer slow. have not been able make work. when run test below, consumer continues receive messages long time after publisher has finished publish messages if ttl set 10ms , max number of messages on queue set 2. can see doing wrong?
i using rabbitmq amqp-client 4.2.0 running rabbitmq 3.5.4.
@test public void testrabbitmq() throws exception { publish(); consume(); system.out.println(thread.currentthread().tostring() + " - sleep"); thread.sleep(60000); } public void consume() throws ioexception, timeoutexception { connectionfactory factory = new connectionfactory(); factory.setusername("name"); factory.setpassword("pwd"); factory.setautomaticrecoveryenabled(true); factory.setconnectiontimeout(1000); channel channel = factory.newconnection("localhost:5672").createchannel(); map<string, object> args = new hashmap<>(); args.put("x-message-ttl", 10); args.put("x-max-length", 2); amqp.queue.declareok declareok = channel.queuedeclare("test-queue", false, false, true, args); string queuename = declareok.getqueue(); channel.queuebind(queuename, "test-exchange", "*.*"); channel.basicconsume(queuename, true, new defaultconsumer(channel) { @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { system.out.println(thread.currentthread().tostring() + " - " + new string(body)); try { thread.sleep(300); } catch (interruptedexception ignore) { } } }); } public void publish() throws ioexception, timeoutexception { connectionfactory factory = new connectionfactory(); factory.setusername("name"); factory.setpassword("pwd"); factory.setautomaticrecoveryenabled(true); factory.setconnectiontimeout(1000); channel channel = factory.newconnection("localhost:5672").createchannel(); channel.exchangedeclare("test-exchange", "topic"); executors.newsinglethreadexecutor().execute(() -> { try { (int = 0; < 4000; i++) { testmessage message = new testmessage("test", i); channel.basicpublish("test-exchange", message.getroutingkey(), null, new gson().tojson(message).getbytes()); } system.out.println(thread.currentthread().tostring() + " - done"); } catch (exception e) { e.printstacktrace(); } }); }
Comments
Post a Comment