diff --git a/misc/graylog2.conf b/misc/graylog2.conf index 15c77072799b..665915600f8c 100644 --- a/misc/graylog2.conf +++ b/misc/graylog2.conf @@ -73,7 +73,7 @@ outputbuffer_processors = 5 # High throughput, low latency, higher CPU usage. # - busy_spinning # Avoids syscalls which could introduce latency jitter. Best when threads can be bound to specific CPU cores. -processor_wait_strategy = sleeping +processor_wait_strategy = blocking # Size of internal ring buffers. Raise this if raising outputbuffer_processors does not help anymore. # For optimum performance your LogMessage objects in the ring buffer should fit in your CPU L3 cache. diff --git a/src/main/java/org/graylog2/Configuration.java b/src/main/java/org/graylog2/Configuration.java index 6b18f5b09c6d..39d0bff00f6a 100644 --- a/src/main/java/org/graylog2/Configuration.java +++ b/src/main/java/org/graylog2/Configuration.java @@ -121,7 +121,7 @@ public class Configuration { private int outputBufferProcessorThreadsCorePoolSize = 3; @Parameter(value = "processor_wait_strategy", required = true) - private String processorWaitStrategy = "sleeping"; + private String processorWaitStrategy = "blocking"; @Parameter(value = "ring_size", required = true, validator = PositiveIntegerValidator.class) private int ringSize = 1024; @@ -421,8 +421,8 @@ public WaitStrategy getProcessorWaitStrategy() { } LOG.warn("Invalid setting for [processor_wait_strategy]:" - + " Falling back to default: SleepingWaitStrategy."); - return new SleepingWaitStrategy(); + + " Falling back to default: BlockingWaitStrategy."); + return new BlockingWaitStrategy(); } public int getRingSize() { diff --git a/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java b/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java index 52ed17382915..f33a88b0ee72 100644 --- a/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java +++ b/src/main/java/org/graylog2/inputs/amqp/AMQPConsumer.java @@ -1,5 +1,5 @@ /** - * Copyright 2012 Lennart Koopmann + * Copyright 2012, 2013 Lennart Koopmann * * This file is part of Graylog2. * @@ -223,6 +223,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp channel.basicAck(envelope.getDeliveryTag(), false); handledMessages.mark(); } catch(Exception e) { + // If something breaks here it is extremely likely that it won't work next time. Not-Ack the message and do not requeue. + channel.basicNack(envelope.getDeliveryTag(), false, false); // YOLO LOG.error("Could not handle message from AMQP.", e); } } diff --git a/src/test/java/org/graylog2/inputs/amqp/AMQPConsumerTest.java b/src/test/java/org/graylog2/inputs/amqp/AMQPConsumerTest.java index f778aeb75611..5b5faff5f25c 100644 --- a/src/test/java/org/graylog2/inputs/amqp/AMQPConsumerTest.java +++ b/src/test/java/org/graylog2/inputs/amqp/AMQPConsumerTest.java @@ -46,13 +46,19 @@ public void testAutoAckFalse() throws IOException, IllegalAccessException, NoSuc } @Test - public void testConsumerDoesNotAcknowledgeOnException() throws IOException { + public void testConsumerDoesAcknowledgeOnException() throws IOException { + final long deliveryTag = 3l; + byte[] body = null; // invalid payload so that an Exception is thrown Mockery context = new Mockery(); final Channel channel = context.mock(Channel.class); - + + context.checking(new Expectations() {{ + oneOf (channel).basicAck(deliveryTag, false); + }}); + Consumer consumer = _amqpConsumer.createConsumer(channel); - consumer.handleDelivery("consumerTag", new Envelope(35l, true, "myexchange", "myroutingkey"), null, body); + consumer.handleDelivery("consumerTag", new Envelope(deliveryTag, true, "myexchange", "myroutingkey"), null, body); context.assertIsSatisfied(); }