aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-04-06 02:29:08 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-04-06 02:29:08 -0300
commita4ef80ed7a40db9eacdfa7d237db625c17909c20 (patch)
tree8de64bcb19a3cb2120eb43c77a78e37c3d72ff94
parented4676be2ae7d1c7cce30931d2bbd8289406acef (diff)
parente94894c5ec60a1102884894c4e73b441c79c70bc (diff)
downloadKamon-a4ef80ed7a40db9eacdfa7d237db625c17909c20.tar.gz
Kamon-a4ef80ed7a40db9eacdfa7d237db625c17909c20.tar.bz2
Kamon-a4ef80ed7a40db9eacdfa7d237db625c17909c20.zip
Merge branch 'master' of github.com:kamon-io/Kamon
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml3
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala58
2 files changed, 61 insertions, 0 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
index 180d905b..27a047b0 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -16,6 +16,9 @@
<!-- Patterns -->
<aspect name="akka.instrumentation.AskPatternTracing"/>
+
+ <!-- UnboundedMailbox -->
+ <aspect name="akka.instrumentation.UnboundedMailboxInstrumentation"/>
</aspects>
<weaver options="-XmessageHandlerClass:kamon.weaver.logging.KamonWeaverMessageHandler">
diff --git a/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala b/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala
new file mode 100644
index 00000000..6a335dfa
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala
@@ -0,0 +1,58 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+package akka.instrumentation
+
+import org.aspectj.lang.annotation._
+import java.util.concurrent.atomic.AtomicInteger
+import org.aspectj.lang.ProceedingJoinPoint
+
+/**
+ * This aspect adds a counter to the message queue to get O(1) time in the .numberOfMessages method implementation
+ */
+@Aspect
+class UnboundedMailboxInstrumentation {
+
+ @DeclareMixin("akka.dispatch.UnboundedMailbox$MessageQueue")
+ def mixinMailboxToMailboxSizeAware: MessageQueueSizeCounting = new MessageQueueSizeCounting {}
+
+ @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.enqueue(..)) && this(queue)")
+ def enqueuePointcut(queue: MessageQueueSizeCounting): Unit = {}
+
+ @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.dequeue()) && this(queue)")
+ def dequeuePointcut(queue: MessageQueueSizeCounting): Unit = {}
+
+ @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.numberOfMessages()) && this(queue)")
+ def numberOfMessagesPointcut(queue: MessageQueueSizeCounting): Unit = {}
+
+ @After("dequeuePointcut(queue)")
+ def afterDequeuePointcut(queue: MessageQueueSizeCounting): Unit = {
+ if (queue.queueSize.get() > 0) queue.queueSize.decrementAndGet()
+ }
+
+ @After("enqueuePointcut(queue)")
+ def afterEnqueue(queue: MessageQueueSizeCounting): Unit = {
+ queue.queueSize.incrementAndGet()
+ }
+
+ @Around("numberOfMessagesPointcut(queue)")
+ def aroundNumberOfMessages(pjp: ProceedingJoinPoint, queue: MessageQueueSizeCounting): Any = {
+ queue.queueSize.get()
+ }
+}
+
+trait MessageQueueSizeCounting {
+ val queueSize = new AtomicInteger
+}