aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-04-05 19:48:34 -0300
committerDiego <diegolparra@gmail.com>2014-04-05 19:48:34 -0300
commite94894c5ec60a1102884894c4e73b441c79c70bc (patch)
tree2204aac5ee5a65b88d552cf858c7be5f25d02c3e
parent181b08d970be02b6afc4a2552c49fe3d16ce01f9 (diff)
downloadKamon-e94894c5ec60a1102884894c4e73b441c79c70bc.tar.gz
Kamon-e94894c5ec60a1102884894c4e73b441c79c70bc.tar.bz2
Kamon-e94894c5ec60a1102884894c4e73b441c79c70bc.zip
+ kamon-core: added UnboundedMailboxInstrumentation to avoid O(n) time in .numberOfMessages
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml3
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala58
2 files changed, 61 insertions, 0 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
index 180d905b..27a047b0 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -16,6 +16,9 @@
<!-- Patterns -->
<aspect name="akka.instrumentation.AskPatternTracing"/>
+
+ <!-- UnboundedMailbox -->
+ <aspect name="akka.instrumentation.UnboundedMailboxInstrumentation"/>
</aspects>
<weaver options="-XmessageHandlerClass:kamon.weaver.logging.KamonWeaverMessageHandler">
diff --git a/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala b/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala
new file mode 100644
index 00000000..6a335dfa
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala
@@ -0,0 +1,58 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+package akka.instrumentation
+
+import org.aspectj.lang.annotation._
+import java.util.concurrent.atomic.AtomicInteger
+import org.aspectj.lang.ProceedingJoinPoint
+
+/**
+ * This aspect adds a counter to the message queue to get O(1) time in the .numberOfMessages method implementation
+ */
+@Aspect
+class UnboundedMailboxInstrumentation {
+
+ @DeclareMixin("akka.dispatch.UnboundedMailbox$MessageQueue")
+ def mixinMailboxToMailboxSizeAware: MessageQueueSizeCounting = new MessageQueueSizeCounting {}
+
+ @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.enqueue(..)) && this(queue)")
+ def enqueuePointcut(queue: MessageQueueSizeCounting): Unit = {}
+
+ @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.dequeue()) && this(queue)")
+ def dequeuePointcut(queue: MessageQueueSizeCounting): Unit = {}
+
+ @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.numberOfMessages()) && this(queue)")
+ def numberOfMessagesPointcut(queue: MessageQueueSizeCounting): Unit = {}
+
+ @After("dequeuePointcut(queue)")
+ def afterDequeuePointcut(queue: MessageQueueSizeCounting): Unit = {
+ if (queue.queueSize.get() > 0) queue.queueSize.decrementAndGet()
+ }
+
+ @After("enqueuePointcut(queue)")
+ def afterEnqueue(queue: MessageQueueSizeCounting): Unit = {
+ queue.queueSize.incrementAndGet()
+ }
+
+ @Around("numberOfMessagesPointcut(queue)")
+ def aroundNumberOfMessages(pjp: ProceedingJoinPoint, queue: MessageQueueSizeCounting): Any = {
+ queue.queueSize.get()
+ }
+}
+
+trait MessageQueueSizeCounting {
+ val queueSize = new AtomicInteger
+}