From 1f6a3f3bb9c59da198df302193ddf5c29c4e42d6 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 27 Mar 2014 22:47:44 -0300 Subject: remove legacy unused aspects for mailbox and dispatcher monitoring --- .../instrumentation/ExecutorServiceMetrics.scala | 162 --------------------- .../instrumentation/MessageQueueMetrics.scala | 77 ---------- 2 files changed, 239 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala deleted file mode 100644 index 90d2b270..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ /dev/null @@ -1,162 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import java.util.concurrent._ -import org.aspectj.lang.ProceedingJoinPoint -import java.util -import akka.dispatch.{ MonitorableThreadFactory, ExecutorServiceFactory } -import com.typesafe.config.Config -import scala.concurrent.forkjoin.ForkJoinPool -import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool - -@Aspect -class ActorSystemInstrumentation { - - @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)") - def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {} - - @After("actorSystemInstantiation(name, applicationConfig, classLoader)") - def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = { - - //Kamon.Metric.registerActorSystem(name) - } -} - -@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") -class ForkJoinPoolInstrumentation { - /* var activeThreadsHistogram: Histogram = _ - var poolSizeHistogram: Histogram = _*/ - - @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") - def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} - - @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)") - def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = { - /*val (actorSystemName, dispatcherName) = threadFactory match { - case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames) - case _ => ("Unknown", "Unknown") - } - - val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName) - for(m <- metrics) { - activeThreadsHistogram = m.activeThreadCount - poolSizeHistogram = m.poolSize - println(s"Registered $dispatcherName for actor system $actorSystemName") - }*/ - } - - def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = { - knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName ⇒ (asName, threadFactoryName.substring(asName.length + 1))).getOrElse(("Unkown", "Unkown")) - } - - @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)") - def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {} - - @After("forkJoinScan(fjp)") - def updateMetrics(fjp: AkkaForkJoinPool): Unit = { - /*activeThreadsHistogram.update(fjp.getActiveThreadCount) - poolSizeHistogram.update(fjp.getPoolSize)*/ - } - -} - -/** - * ExecutorService monitoring base: - */ -trait ExecutorServiceCollector { - def updateActiveThreadCount(diff: Int): Unit - def updateTotalThreadCount(diff: Int): Unit - def updateQueueSize(diff: Int): Unit -} - -trait WatchedExecutorService { - def collector: ExecutorServiceCollector -} - -/* -trait ExecutorServiceMonitoring { - def dispatcherMetrics: DispatcherMetricCollector -} - -class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { - @volatile var dispatcherMetrics: DispatcherMetricCollector = _ -} -*/ - -case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { - def createExecutorService: ExecutorService = delegate.createExecutorService -} - -@Aspect -class ExecutorServiceFactoryProviderInstrumentation { - - @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()") - def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = { - true - } - - @Around("factoryMethodCall(dispatcherName, threadFactory)") - def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { - val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast - - val actorSystemName = threadFactory match { - case m: MonitorableThreadFactory ⇒ m.name - case _ ⇒ "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. - } - - new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate) - } - -} - -@Aspect -class NamedExecutorServiceFactoryDelegateInstrumentation { - - @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)") - def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {} - - @Around("factoryMethodCall(namedFactory)") - def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { - val delegate = pjp.proceed().asInstanceOf[ExecutorService] - val executorFullName = "" //MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) - - //ExecutorServiceMetricCollector.register(executorFullName, delegate) - - new NamedExecutorServiceDelegate(executorFullName, delegate) - } -} - -case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { - def shutdown() = { - //ExecutorServiceMetricCollector.deregister(fullName) - delegate.shutdown() - } - def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() - def isShutdown: Boolean = delegate.isShutdown - def isTerminated: Boolean = delegate.isTerminated - def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit) - def submit[T](task: Callable[T]): Future[T] = delegate.submit(task) - def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result) - def submit(task: Runnable): Future[_] = delegate.submit(task) - def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks) - def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit) - def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) - def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) - def execute(command: Runnable) = delegate.execute(command) -} - diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala deleted file mode 100644 index 44eb8c43..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation - -import akka.dispatch.{ UnboundedMessageQueueSemantics, Envelope, MessageQueue } -import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } -import akka.actor.{ ActorSystem, ActorRef } -import org.aspectj.lang.ProceedingJoinPoint - -/** - * For Mailboxes we would like to track the queue size and message latency. Currently the latency - * will be gathered from the ActorCellMetrics. - */ -/* - -@Aspect -class MessageQueueInstrumentation { - - @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") - def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} - - @Around("messageQueueCreation(owner, system)") - def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { - val delegate = pjp.proceed.asInstanceOf[MessageQueue] - - // We are not interested in monitoring mailboxes if we don't know where they belong to. - val monitoredMailbox = for (own ← owner; sys ← system) yield { - val systemName = sys.name - val ownerName = MetricDirectory.nameForActor(own) - val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) - - val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir()) - Metrics.include(mailBoxName, queueSizeHistogram) - - new MonitoredMessageQueue(delegate, queueSizeHistogram) - } - - monitoredMailbox match { - case None ⇒ delegate - case Some(mmb) ⇒ mmb - } - } -} - -class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics { - - def enqueue(receiver: ActorRef, handle: Envelope) = { - delegate.enqueue(receiver, handle) - //queueSizeHistogram.update(numberOfMessages) - } - - def dequeue(): Envelope = { - val envelope = delegate.dequeue() - //queueSizeHistogram.update(numberOfMessages) - - envelope - } - - def numberOfMessages: Int = delegate.numberOfMessages - def hasMessages: Boolean = delegate.hasMessages - def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) -} -*/ - -- cgit v1.2.3