aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-03-27 22:47:44 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-03-27 22:47:44 -0300
commit1f6a3f3bb9c59da198df302193ddf5c29c4e42d6 (patch)
tree88252a9db6a4ec8f6b7c5f7ac7f2d9c979df32c7
parentfb604dca60458f553acf1aa0929693adcf91225d (diff)
downloadKamon-1f6a3f3bb9c59da198df302193ddf5c29c4e42d6.tar.gz
Kamon-1f6a3f3bb9c59da198df302193ddf5c29c4e42d6.tar.bz2
Kamon-1f6a3f3bb9c59da198df302193ddf5c29c4e42d6.zip
remove legacy unused aspects for mailbox and dispatcher monitoring
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala162
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala77
2 files changed, 0 insertions, 239 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
deleted file mode 100644
index 90d2b270..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.instrumentation
-
-import org.aspectj.lang.annotation._
-import java.util.concurrent._
-import org.aspectj.lang.ProceedingJoinPoint
-import java.util
-import akka.dispatch.{ MonitorableThreadFactory, ExecutorServiceFactory }
-import com.typesafe.config.Config
-import scala.concurrent.forkjoin.ForkJoinPool
-import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
-
-@Aspect
-class ActorSystemInstrumentation {
-
- @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)")
- def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {}
-
- @After("actorSystemInstantiation(name, applicationConfig, classLoader)")
- def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = {
-
- //Kamon.Metric.registerActorSystem(name)
- }
-}
-
-@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
-class ForkJoinPoolInstrumentation {
- /* var activeThreadsHistogram: Histogram = _
- var poolSizeHistogram: Histogram = _*/
-
- @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
- def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
-
- @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)")
- def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = {
- /*val (actorSystemName, dispatcherName) = threadFactory match {
- case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames)
- case _ => ("Unknown", "Unknown")
- }
-
- val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName)
- for(m <- metrics) {
- activeThreadsHistogram = m.activeThreadCount
- poolSizeHistogram = m.poolSize
- println(s"Registered $dispatcherName for actor system $actorSystemName")
- }*/
- }
-
- def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = {
- knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName ⇒ (asName, threadFactoryName.substring(asName.length + 1))).getOrElse(("Unkown", "Unkown"))
- }
-
- @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)")
- def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {}
-
- @After("forkJoinScan(fjp)")
- def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
- /*activeThreadsHistogram.update(fjp.getActiveThreadCount)
- poolSizeHistogram.update(fjp.getPoolSize)*/
- }
-
-}
-
-/**
- * ExecutorService monitoring base:
- */
-trait ExecutorServiceCollector {
- def updateActiveThreadCount(diff: Int): Unit
- def updateTotalThreadCount(diff: Int): Unit
- def updateQueueSize(diff: Int): Unit
-}
-
-trait WatchedExecutorService {
- def collector: ExecutorServiceCollector
-}
-
-/*
-trait ExecutorServiceMonitoring {
- def dispatcherMetrics: DispatcherMetricCollector
-}
-
-class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring {
- @volatile var dispatcherMetrics: DispatcherMetricCollector = _
-}
-*/
-
-case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
- def createExecutorService: ExecutorService = delegate.createExecutorService
-}
-
-@Aspect
-class ExecutorServiceFactoryProviderInstrumentation {
-
- @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()")
- def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = {
- true
- }
-
- @Around("factoryMethodCall(dispatcherName, threadFactory)")
- def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
- val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast
-
- val actorSystemName = threadFactory match {
- case m: MonitorableThreadFactory ⇒ m.name
- case _ ⇒ "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
- }
-
- new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate)
- }
-
-}
-
-@Aspect
-class NamedExecutorServiceFactoryDelegateInstrumentation {
-
- @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)")
- def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {}
-
- @Around("factoryMethodCall(namedFactory)")
- def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
- val delegate = pjp.proceed().asInstanceOf[ExecutorService]
- val executorFullName = "" //MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
-
- //ExecutorServiceMetricCollector.register(executorFullName, delegate)
-
- new NamedExecutorServiceDelegate(executorFullName, delegate)
- }
-}
-
-case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
- def shutdown() = {
- //ExecutorServiceMetricCollector.deregister(fullName)
- delegate.shutdown()
- }
- def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
- def isShutdown: Boolean = delegate.isShutdown
- def isTerminated: Boolean = delegate.isTerminated
- def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit)
- def submit[T](task: Callable[T]): Future[T] = delegate.submit(task)
- def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result)
- def submit(task: Runnable): Future[_] = delegate.submit(task)
- def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks)
- def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit)
- def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
- def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
- def execute(command: Runnable) = delegate.execute(command)
-}
-
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
deleted file mode 100644
index 44eb8c43..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.instrumentation
-
-import akka.dispatch.{ UnboundedMessageQueueSemantics, Envelope, MessageQueue }
-import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect }
-import akka.actor.{ ActorSystem, ActorRef }
-import org.aspectj.lang.ProceedingJoinPoint
-
-/**
- * For Mailboxes we would like to track the queue size and message latency. Currently the latency
- * will be gathered from the ActorCellMetrics.
- */
-/*
-
-@Aspect
-class MessageQueueInstrumentation {
-
- @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)")
- def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {}
-
- @Around("messageQueueCreation(owner, system)")
- def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
- val delegate = pjp.proceed.asInstanceOf[MessageQueue]
-
- // We are not interested in monitoring mailboxes if we don't know where they belong to.
- val monitoredMailbox = for (own ← owner; sys ← system) yield {
- val systemName = sys.name
- val ownerName = MetricDirectory.nameForActor(own)
- val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName)
-
- val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir())
- Metrics.include(mailBoxName, queueSizeHistogram)
-
- new MonitoredMessageQueue(delegate, queueSizeHistogram)
- }
-
- monitoredMailbox match {
- case None ⇒ delegate
- case Some(mmb) ⇒ mmb
- }
- }
-}
-
-class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics {
-
- def enqueue(receiver: ActorRef, handle: Envelope) = {
- delegate.enqueue(receiver, handle)
- //queueSizeHistogram.update(numberOfMessages)
- }
-
- def dequeue(): Envelope = {
- val envelope = delegate.dequeue()
- //queueSizeHistogram.update(numberOfMessages)
-
- envelope
- }
-
- def numberOfMessages: Int = delegate.numberOfMessages
- def hasMessages: Boolean = delegate.hasMessages
- def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters)
-}
-*/
-