From 8a9e6c4ba6c0ad0db973d002269183b51cb739c8 Mon Sep 17 00:00:00 2001 From: Lennart Koopmann Date: Thu, 14 Mar 2013 21:55:29 +0100 Subject: [PATCH 1/5] Set processor_wait_strategy to blocking by default --- misc/graylog2.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/graylog2.conf b/misc/graylog2.conf index 15c77072799b..665915600f8c 100644 --- a/misc/graylog2.conf +++ b/misc/graylog2.conf @@ -73,7 +73,7 @@ outputbuffer_processors = 5 # High throughput, low latency, higher CPU usage. # - busy_spinning # Avoids syscalls which could introduce latency jitter. Best when threads can be bound to specific CPU cores. -processor_wait_strategy = sleeping +processor_wait_strategy = blocking # Size of internal ring buffers. Raise this if raising outputbuffer_processors does not help anymore. # For optimum performance your LogMessage objects in the ring buffer should fit in your CPU L3 cache. From 821dbf40603bdb201393b18cbce9f926c1acf78a Mon Sep 17 00:00:00 2001 From: Lennart Koopmann Date: Thu, 4 Apr 2013 14:19:42 +0200 Subject: [PATCH 2/5] correctly handle broken AMQP messages --- src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java b/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java index 52ed17382915..cc476af19f89 100644 --- a/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java +++ b/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java @@ -223,6 +223,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp channel.basicAck(envelope.getDeliveryTag(), false); handledMessages.mark(); } catch(Exception e) { + // If something breaks here it is extremely likely that it won't work next time. Ack the message. + channel.basicAck(envelope.getDeliveryTag(), false); LOG.error("Could not handle message from AMQP.", e); } } From 9d3ad3960a9f24102f2d945011817b3bac443b0b Mon Sep 17 00:00:00 2001 From: Lennart Koopmann Date: Thu, 4 Apr 2013 14:27:31 +0200 Subject: [PATCH 3/5] zomg there was a test for that --- .../org/graylog2/inputs/amqp/AMQPConsumerTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/test/java/org/graylog2/inputs/amqp/AMQPConsumerTest.java b/src/test/java/org/graylog2/inputs/amqp/AMQPConsumerTest.java index f778aeb75611..5b5faff5f25c 100644 --- a/src/test/java/org/graylog2/inputs/amqp/AMQPConsumerTest.java +++ b/src/test/java/org/graylog2/inputs/amqp/AMQPConsumerTest.java @@ -46,13 +46,19 @@ public void testAutoAckFalse() throws IOException, IllegalAccessException, NoSuc } @Test - public void testConsumerDoesNotAcknowledgeOnException() throws IOException { + public void testConsumerDoesAcknowledgeOnException() throws IOException { + final long deliveryTag = 3l; + byte[] body = null; // invalid payload so that an Exception is thrown Mockery context = new Mockery(); final Channel channel = context.mock(Channel.class); - + + context.checking(new Expectations() {{ + oneOf (channel).basicAck(deliveryTag, false); + }}); + Consumer consumer = _amqpConsumer.createConsumer(channel); - consumer.handleDelivery("consumerTag", new Envelope(35l, true, "myexchange", "myroutingkey"), null, body); + consumer.handleDelivery("consumerTag", new Envelope(deliveryTag, true, "myexchange", "myroutingkey"), null, body); context.assertIsSatisfied(); } From f0b20db54c1b25c28a5799c79673b5d82152a275 Mon Sep 17 00:00:00 2001 From: Lennart Koopmann Date: Tue, 9 Apr 2013 16:07:28 +0200 Subject: [PATCH 4/5] use blockingwaitstrategy as default in config too --- src/main/java/org/graylog2/Configuration.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/graylog2/Configuration.java b/src/main/java/org/graylog2/Configuration.java index 6b18f5b09c6d..39d0bff00f6a 100644 --- a/src/main/java/org/graylog2/Configuration.java +++ b/src/main/java/org/graylog2/Configuration.java @@ -121,7 +121,7 @@ public class Configuration { private int outputBufferProcessorThreadsCorePoolSize = 3; @Parameter(value = "processor_wait_strategy", required = true) - private String processorWaitStrategy = "sleeping"; + private String processorWaitStrategy = "blocking"; @Parameter(value = "ring_size", required = true, validator = PositiveIntegerValidator.class) private int ringSize = 1024; @@ -421,8 +421,8 @@ public WaitStrategy getProcessorWaitStrategy() { } LOG.warn("Invalid setting for [processor_wait_strategy]:" - + " Falling back to default: SleepingWaitStrategy."); - return new SleepingWaitStrategy(); + + " Falling back to default: BlockingWaitStrategy."); + return new BlockingWaitStrategy(); } public int getRingSize() { From 9b15de57d8d89798a8b2e480313a1e9d43d54ff0 Mon Sep 17 00:00:00 2001 From: Lennart Koopmann Date: Wed, 10 Apr 2013 21:37:54 +0200 Subject: [PATCH 5/5] use nack, not ack for unprocessable amqp messages --- src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java b/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java index cc476af19f89..f33a88b0ee72 100644 --- a/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java +++ b/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java @@ -1,5 +1,5 @@ /** - * Copyright 2012 Lennart Koopmann + * Copyright 2012, 2013 Lennart Koopmann * * This file is part of Graylog2. * @@ -223,8 +223,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp channel.basicAck(envelope.getDeliveryTag(), false); handledMessages.mark(); } catch(Exception e) { - // If something breaks here it is extremely likely that it won't work next time. Ack the message. - channel.basicAck(envelope.getDeliveryTag(), false); + // If something breaks here it is extremely likely that it won't work next time. Not-Ack the message and do not requeue. + channel.basicNack(envelope.getDeliveryTag(), false, false); // YOLO LOG.error("Could not handle message from AMQP.", e); } }