From 923b88e8adef2f66b43e551fa4a0a1bbae5af7ff Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 7 Aug 2013 19:06:33 -0300 Subject: upgrading to akka 2.2 --- kamon-core/src/main/scala/kamon/Kamon.scala | 132 +++++++++++ kamon-core/src/main/scala/kamon/TraceContext.scala | 67 ++++++ .../src/main/scala/kamon/TraceContextSwap.scala | 26 +++ .../main/scala/kamon/TransactionPublisher.scala | 15 ++ .../src/main/scala/kamon/executor/eventbus.scala | 103 +++++++++ .../ActorRefTellInstrumentation.scala | 89 ++++++++ .../scala/kamon/instrumentation/AspectJPimps.scala | 23 ++ .../instrumentation/ExecutorServiceMetrics.scala | 245 +++++++++++++++++++++ .../instrumentation/MessageQueueMetrics.scala | 73 ++++++ .../instrumentation/RunnableInstrumentation.scala | 61 +++++ .../instrumentation/SampleInstrumentation.scala | 49 +++++ .../metric/ExecutorServiceMetricCollector.scala | 67 ++++++ .../main/scala/kamon/metric/GaugeGenerator.scala | 12 + .../src/main/scala/kamon/metric/MetricFilter.scala | 6 + .../src/main/scala/kamon/metric/Metrics.scala | 146 ++++++++++++ .../src/main/scala/kamon/metric/MetricsUtils.scala | 51 +++++ .../main/scala/kamon/metric/NewRelicReporter.scala | 51 +++++ .../src/main/scala/spraytest/ClientTest.scala | 55 +++++ .../src/main/scala/spraytest/FutureTesting.scala | 81 +++++++ kamon-core/src/main/scala/test/PingPong.scala | 34 +++ 20 files changed, 1386 insertions(+) create mode 100644 kamon-core/src/main/scala/kamon/Kamon.scala create mode 100644 kamon-core/src/main/scala/kamon/TraceContext.scala create mode 100644 kamon-core/src/main/scala/kamon/TraceContextSwap.scala create mode 100644 kamon-core/src/main/scala/kamon/TransactionPublisher.scala create mode 100644 kamon-core/src/main/scala/kamon/executor/eventbus.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MetricFilter.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Metrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala create mode 100644 kamon-core/src/main/scala/spraytest/ClientTest.scala create mode 100644 kamon-core/src/main/scala/spraytest/FutureTesting.scala create mode 100644 kamon-core/src/main/scala/test/PingPong.scala (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala new file mode 100644 index 00000000..c3080909 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -0,0 +1,132 @@ +package kamon + +import akka.actor.{Actor, Props, ActorSystem} +import scala.collection.JavaConverters._ +import java.util.concurrent.ConcurrentHashMap +import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics} +import scala.concurrent.duration.{FiniteDuration, Duration} +import com.newrelic.api.agent.NewRelic + +object Kamon { + + val ctx = new ThreadLocal[Option[TraceContext]] { + override def initialValue() = None + } + + implicit lazy val actorSystem = ActorSystem("kamon") + + + def context() = ctx.get() + def clear = ctx.remove() + def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) + + def start = set(newTraceContext) + def stop = ctx.get match { + case Some(context) => context.close + case None => + } + + def newTraceContext(): TraceContext = TraceContext() + + + val publisher = actorSystem.actorOf(Props[TransactionPublisher]) + + def publish(tx: FullTransaction) = publisher ! tx + + + + object Metric { + val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala + + def actorSystemNames: List[String] = actorSystems.keys.toList + def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name)) + + def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) + } + + + + val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") + val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") + +} + + + + + + + + + +object Tracer { + val ctx = new ThreadLocal[Option[TraceContext]] { + override def initialValue() = None + } + + def context() = ctx.get() + def clear = ctx.remove() + def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) + + def start = ??? //set(newTraceContext) + def stop = ctx.get match { + case Some(context) => context.close + case None => + } + + //def newTraceContext(): TraceContext = TraceContext() +} + + +class MetricManager extends Actor { + implicit val ec = context.system.dispatcher + + def receive = { + case RegisterForAllDispatchers(frequency) => { + val subscriber = sender + context.system.scheduler.schedule(frequency, frequency) { + Kamon.Metric.actorSystems.foreach { + case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach { + case (dispatcherName, dispatcherMetrics) => { + val activeThreads = dispatcherMetrics.activeThreadCount.snapshot + val poolSize = dispatcherMetrics.poolSize.snapshot + val queueSize = dispatcherMetrics.queueSize.snapshot + + subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize) + + } + } + } + } + } + } +} + +case class RegisterForAllDispatchers(frequency: FiniteDuration) +case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot) + + + + + + +class NewrelicReporterActor extends Actor { + import scala.concurrent.duration._ + + Kamon.metricManager ! RegisterForAllDispatchers(5 seconds) + + def receive = { + case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => { + /*println("PUBLISHED DISPATCHER STATS") + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat) + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat)) + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)*/ + + + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat) + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat)) + + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat) + } + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala new file mode 100644 index 00000000..6b32550f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -0,0 +1,67 @@ +package kamon + +import java.util.UUID +import akka.actor.{ActorSystem, ActorPath} +import akka.agent.Agent +import java.util.concurrent.TimeUnit +import scala.util.{Failure, Success} +import akka.util.Timeout + + +case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) { + implicit val timeout = Timeout(30, TimeUnit.SECONDS) + implicit val as = Kamon.actorSystem.dispatcher + + def append(entry: TraceEntry) = entries send (entry :: _) + def close = entries.future.onComplete({ + case Success(list) => Kamon.publish(FullTransaction(id, list)) + case Failure(t) => println("WTF!") + }) +} + +object TraceContext { + implicit val as2 = Kamon.actorSystem.dispatcher + def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) +} + + + +trait TraceEntry + +case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry + + + +case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry]) + + + + + +object Collector { + +} + +trait TraceEntryStorage { + def store(entry: TraceEntry): Boolean +} + +class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) { + def store(entry: TraceEntry) = storage.store(entry) +} + +object ThreadLocalTraceEntryStorage extends TraceEntryStorage { + + private val storage = new ThreadLocal[List[TraceEntry]] { + override def initialValue(): List[TraceEntry] = Nil + } + + def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get) + + def store(entry: TraceEntry): Boolean = { + update(entry :: _) + true + } +} + + diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala new file mode 100644 index 00000000..68ee808b --- /dev/null +++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala @@ -0,0 +1,26 @@ +package kamon + +/** + * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards. + */ +trait TraceContextSwap { + + def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body) + + def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = { + ctx match { + case Some(context) => { + Kamon.set(context) + val bodyResult = primary + Kamon.clear + + bodyResult + } + case None => fallback + } + + } + +} + +object TraceContextSwap extends TraceContextSwap diff --git a/kamon-core/src/main/scala/kamon/TransactionPublisher.scala b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala new file mode 100644 index 00000000..0626b91d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala @@ -0,0 +1,15 @@ +package kamon + +import akka.actor.Actor +import java.util.UUID + +class TransactionPublisher extends Actor { + + def receive = { + case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries") + } + +} + + +case class FullTransaction(id: UUID, entries: List[TraceEntry]) diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala new file mode 100644 index 00000000..599f2a7a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala @@ -0,0 +1,103 @@ +package kamon.executor + +import akka.event.ActorEventBus +import akka.event.LookupClassification +import akka.actor._ +import java.util.concurrent.TimeUnit + +import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} +import akka.util.Timeout +import scala.util.{Random, Success, Failure} +import scala.concurrent.Future + +trait Message + +case class PostMessage(text:String) extends Message + +case class MessageEvent(val channel:String, val message:Message) + +class AppActorEventBus extends ActorEventBus with LookupClassification{ + type Event = MessageEvent + type Classifier=String + protected def mapSize(): Int={ + 10 + } + + protected def classify(event: Event): Classifier={ + event.channel + } + + protected def publish(event: Event, subscriber: Subscriber): Unit={ + subscriber ! event + } +} +case class Ping() +case class Pong() + +class PingActor extends Actor with ActorLogging { + + val pong = context.actorOf(Props[PongActor]) + val random = new Random() + def receive = { + case Pong() => { + //Thread.sleep(random.nextInt(2000)) + //log.info("Message from Ping") + pong ! Ping() + } + } +} + +class PongActor extends Actor with ActorLogging { + def receive = { + case Ping() => { + sender ! Pong() + } + } +} + + +object TryAkka extends App{ + val system = ActorSystem("MySystem") + val appActorEventBus=new AppActorEventBus + val NEW_POST_CHANNEL="/posts/new" + val subscriber = system.actorOf(Props(new Actor { + def receive = { + case d: MessageEvent => println(d) + } + })) + + Kamon.start + for(i <- 1 to 4) { + val ping = system.actorOf(Props[PingActor]) + ping ! Pong() + } + + + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") + + /* + val newRelicReporter = new NewRelicReporter(registry) + newRelicReporter.start(1, TimeUnit.SECONDS) + +*/ + import akka.pattern.ask + implicit val timeout = Timeout(10, TimeUnit.SECONDS) + implicit def execContext = system.dispatcher + + + + Kamon.start + + Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) + threadPrintln("Before doing it") + val f = Future { threadPrintln("This is happening inside the future body") } + + Kamon.stop + + + //Thread.sleep(3000) + //system.shutdown() + +/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) + appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala new file mode 100644 index 00000000..82915ce9 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -0,0 +1,89 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{Props, ActorSystem, ActorRef} +import kamon.{Kamon, TraceContext} +import akka.dispatch.{MessageDispatcher, Envelope} +import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} +import kamon.metric.{MetricDirectory, Metrics} +import com.codahale.metrics +import kamon.instrumentation.TraceableMessage +import scala.Some + +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) + + +@Aspect +class ActorRefTellInstrumentation { + import ProceedingJoinPointPimp._ + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)") + def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} + + @Around("sendingMessageToActorRef(actor, message, sender)") + def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { + + val actorName = MetricDirectory.nameForActor(actor) + val t = Metrics.registry.timer(actorName + "LATENCY") + //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") + pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender) + } +} + + +@Aspect("perthis(actorCellCreation(..))") +class ActorCellInvokeInstrumentation { + + var processingTimeTimer: Timer = _ + var shouldTrack = false + + // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(system, ref, props, dispatcher, parent)") + def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + val actorName = MetricDirectory.nameForActor(ref) + val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + + /** TODO: Find a better way to filter the things we don't want to measure. */ + //if(system.name != "kamon" && actorName.startsWith("/user")) { + processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") + shouldTrack = true + //} + } + + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + import ProceedingJoinPointPimp._ + println("ENVELOPE --------------------->"+envelope) + envelope match { + case Envelope(TraceableMessage(ctx, msg, timer), sender) => { + timer.stop() + + val originalEnvelope = envelope.copy(message = msg) + + //println("PROCESSING TIME TIMER: "+processingTimeTimer) + val pt = processingTimeTimer.time() + ctx match { + case Some(c) => { + Kamon.set(c) + println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + pjp.proceedWith(originalEnvelope) + Kamon.clear + } + case None => pjp.proceedWith(originalEnvelope) + } + pt.stop() + } + case _ => pjp.proceed + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala new file mode 100644 index 00000000..84c20c52 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala @@ -0,0 +1,23 @@ +package kamon.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint + +trait ProceedingJoinPointPimp { + import language.implicitConversions + + implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) +} + +object ProceedingJoinPointPimp extends ProceedingJoinPointPimp + +case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { + def proceedWith(newUniqueArg: AnyRef) = { + val args = pjp.getArgs + args.update(0, newUniqueArg) + pjp.proceed(args) + } + + def proceedWithTarget(args: AnyRef*) = { + pjp.proceed(args.toArray) + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala new file mode 100644 index 00000000..b4f8a475 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -0,0 +1,245 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import java.util.concurrent._ +import org.aspectj.lang.ProceedingJoinPoint +import java.util +import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector} +import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} +import com.typesafe.config.Config +import kamon.Kamon +import scala.concurrent.forkjoin.ForkJoinPool +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool + + +@Aspect +class ActorSystemInstrumentation { + + @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)") + def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {} + + @After("actorSystemInstantiation(name, applicationConfig, classLoader)") + def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = { + + Kamon.Metric.registerActorSystem(name) + } +} + +@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") +class ForkJoinPoolInstrumentation { + var activeThreadsHistogram: Histogram = _ + var poolSizeHistogram: Histogram = _ + + @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") + def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} + + @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)") + def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = { + val (actorSystemName, dispatcherName) = threadFactory match { + case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames) + case _ => ("Unknown", "Unknown") + } + + val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName) + for(m <- metrics) { + activeThreadsHistogram = m.activeThreadCount + poolSizeHistogram = m.poolSize + println(s"Registered $dispatcherName for actor system $actorSystemName") + } + } + + def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = { + knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown")) + } + + + + + @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)") + def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {} + + @After("forkJoinScan(fjp)") + def updateMetrics(fjp: AkkaForkJoinPool): Unit = { + activeThreadsHistogram.update(fjp.getActiveThreadCount) + poolSizeHistogram.update(fjp.getPoolSize) + } + + + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +/** + * ExecutorService monitoring base: + */ +trait ExecutorServiceCollector { + def updateActiveThreadCount(diff: Int): Unit + def updateTotalThreadCount(diff: Int): Unit + def updateQueueSize(diff: Int): Unit +} + +trait WatchedExecutorService { + def collector: ExecutorServiceCollector +} + + + + + + + + + + + + + + +trait ExecutorServiceMonitoring { + def dispatcherMetrics: DispatcherMetricCollector +} + +class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { + @volatile var dispatcherMetrics: DispatcherMetricCollector = _ +} + + + + + + + + + + + + + + + + +case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { + def createExecutorService: ExecutorService = delegate.createExecutorService +} + +@Aspect +class ExecutorServiceFactoryProviderInstrumentation { + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()") + def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = { + true + } + + @Around("factoryMethodCall(dispatcherName, threadFactory)") + def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast + + val actorSystemName = threadFactory match { + case m: MonitorableThreadFactory => m.name + case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. + } + + new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate) + } + +} + + +@Aspect +class NamedExecutorServiceFactoryDelegateInstrumentation { + + @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)") + def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {} + + @Around("factoryMethodCall(namedFactory)") + def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { + val delegate = pjp.proceed().asInstanceOf[ExecutorService] + val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + + ExecutorServiceMetricCollector.register(executorFullName, delegate) + + new NamedExecutorServiceDelegate(executorFullName, delegate) + } +} + +case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { + def shutdown() = { + ExecutorServiceMetricCollector.deregister(fullName) + delegate.shutdown() + } + def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() + def isShutdown: Boolean = delegate.isShutdown + def isTerminated: Boolean = delegate.isTerminated + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit) + def submit[T](task: Callable[T]): Future[T] = delegate.submit(task) + def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result) + def submit(task: Runnable): Future[_] = delegate.submit(task) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) + def execute(command: Runnable) = delegate.execute(command) +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala new file mode 100644 index 00000000..c21502ac --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -0,0 +1,73 @@ +package kamon.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue} +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import akka.actor.{ActorSystem, ActorRef} +import kamon.metric.{Metrics, MetricDirectory} +import org.aspectj.lang.ProceedingJoinPoint + + +/** + * For Mailboxes we would like to track the queue size and message latency. Currently the latency + * will be gathered from the ActorCellMetrics. + */ + + +@Aspect +class MessageQueueInstrumentation { + + @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") + def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} + + @Around("messageQueueCreation(owner, system)") + def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { + val delegate = pjp.proceed.asInstanceOf[MessageQueue] + + // We are not interested in monitoring mailboxes if we don't know where they belong to. + val monitoredMailbox = for(own <- owner; sys <- system) yield { + val systemName = sys.name + val ownerName = MetricDirectory.nameForActor(own) + val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) + + val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir()) + Metrics.include(mailBoxName, queueSizeHistogram) + + new MonitoredMessageQueue(delegate, queueSizeHistogram) + } + + monitoredMailbox match { + case None => delegate + case Some(mmb) => mmb + } + } +} + + +class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{ + + def enqueue(receiver: ActorRef, handle: Envelope) = { + delegate.enqueue(receiver, handle) + queueSizeHistogram.update(numberOfMessages) + } + + def dequeue(): Envelope = { + val envelope = delegate.dequeue() + queueSizeHistogram.update(numberOfMessages) + + envelope + } + + def numberOfMessages: Int = delegate.numberOfMessages + def hasMessages: Boolean = delegate.hasMessages + def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) +} + + + + + + + + + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala new file mode 100644 index 00000000..e75a638f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -0,0 +1,61 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import kamon.{Kamon, TraceContext} +import org.aspectj.lang.ProceedingJoinPoint +import scala.Some + +/** + * Marker interface, just to make sure we don't instrument all the Runnables in the classpath. + */ +trait TraceContextAwareRunnable extends Runnable {} + + +@Aspect("perthis(instrumentedRunnableCreation())") +class RunnableInstrumentation { + + /** + * These are the Runnables that need to be instrumented and make the TraceContext available + * while their run method is executed. + */ + @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") + def onCompleteCallbacksRunnable: TraceContextAwareRunnable = null + + + /** + * Pointcuts + */ + + @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..))") + def instrumentedRunnableCreation(): Unit = {} + + @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable.run())") + def runnableExecution() = {} + + + /** + * Aspect members + */ + + private val traceContext = Kamon.context + + + /** + * Advices + */ + import kamon.TraceContextSwap.withContext + + @Before("instrumentedRunnableCreation()") + def beforeCreation = { + //println((new Throwable).getStackTraceString) + } + + + @Around("runnableExecution()") + def around(pjp: ProceedingJoinPoint) = { + import pjp._ + + withContext(traceContext, proceed()) + } + +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala new file mode 100644 index 00000000..74261403 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala @@ -0,0 +1,49 @@ +package kamon.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} + +class ActorCage(val name: String, val size: Int) { + + def doIt: Unit = println("name") +} + +trait CageMonitoring { + def histogram: Histogram + def count(value: Int): Unit +} + +class CageMonitoringImp extends CageMonitoring{ + final val histogram = new Histogram(new ExponentiallyDecayingReservoir()) + + def count(value: Int) = histogram.update(value) + +} + + +@Aspect +class InceptionAspect { + + @DeclareMixin("kamon.instrumentation.ActorCage") + def mixin: CageMonitoring = new CageMonitoringImp + + + @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)") + def theActorCageDidIt(actorCage: CageMonitoring) = {} + + @After("theActorCageDidIt(actorCage)") + def afterDoingIt(actorCage: CageMonitoring) = { + actorCage.count(1) + actorCage.histogram.getSnapshot.dump(System.out) + } + + + +} + + +object Runner extends App { + val cage = new ActorCage("ivan", 10) + + cage.doIt +} diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala new file mode 100644 index 00000000..54a13f39 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala @@ -0,0 +1,67 @@ +package kamon.metric + +import java.util.concurrent.{ThreadPoolExecutor, ExecutorService} +import scala.concurrent.forkjoin.ForkJoinPool +import com.codahale.metrics.{Metric, MetricFilter} + +object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { + + def register(fullName: String, executorService: ExecutorService) = executorService match { + case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp) + case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe) + case _ => // If it is a unknown Executor then just do nothing. + } + + def deregister(fullName: String) = { + Metrics.registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } +} + + +trait ForkJoinPoolMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + + def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { + val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ + + val allMetrics = Map( + fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), + fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), + fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount) + ) + + allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } + } +} + +trait ThreadPoolExecutorMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = { + val tpeGauge = newNumericGaugeFor(tpe) _ + + val allMetrics = Map( + fullName + queueSize -> tpeGauge(_.getQueue.size()), + fullName + poolSize -> tpeGauge(_.getPoolSize), + fullName + activeThreads -> tpeGauge(_.getActiveCount) + ) + + allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } + } +} + + +object BasicExecutorMetricNames { + val queueSize = "queueSize" + val poolSize = "threads/poolSize" + val activeThreads = "threads/activeThreads" +} + + + + diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala new file mode 100644 index 00000000..30635432 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala @@ -0,0 +1,12 @@ +package kamon.metric + +import com.codahale.metrics.Gauge + +trait GaugeGenerator { + + def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] { + def getValue: V = generator(target) + } +} + +object GaugeGenerator extends GaugeGenerator diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala new file mode 100644 index 00000000..fb117968 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala @@ -0,0 +1,6 @@ +package kamon.metric + +object MetricFilter { + def actorSystem(system: String): Boolean = !system.startsWith("kamon") + def actor(path: String, system: String): Boolean = true +} diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala new file mode 100644 index 00000000..cdc0a334 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala @@ -0,0 +1,146 @@ +package kamon.metric + +import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit} +import akka.actor.ActorRef +import com.codahale.metrics +import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry} + + +object Metrics { + val registry: MetricRegistry = new MetricRegistry + + val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS) + //consoleReporter.build().start(45, TimeUnit.SECONDS) + + //val newrelicReporter = NewRelicReporter(registry) + //newrelicReporter.start(5, TimeUnit.SECONDS) + + def include(name: String, metric: Metric) = { + //registry.register(name, metric) + } + + def exclude(name: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(name) + }) + } + + + + def deregister(fullName: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } +} + +object Watched { + case object Actor + case object Dispatcher +} + +object MetricDirectory { + def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" + + def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox" + + def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/") + + def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon") + + + def shouldInstrumentActor(actorPath: String): Boolean = { + !(actorPath.isEmpty || actorPath.startsWith("system")) + } + + +} + + + + + + + + + + + + +case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram) + + + + +trait Histogram { + def update(value: Long): Unit + def snapshot: HistogramSnapshot +} + +trait HistogramSnapshot { + def median: Double + def max: Double + def min: Double +} + + +case class ActorSystemMetrics(actorSystemName: String) { + import scala.collection.JavaConverters._ + val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala + + private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) + + def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = { + val stats = createDispatcherCollector + dispatchers.put(dispatcherName, stats) + Some(stats) + } + +} + + +case class CodahaleHistogram() extends Histogram { + private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir()) + + def update(value: Long) = histogram.update(value) + def snapshot: HistogramSnapshot = { + val snapshot = histogram.getSnapshot + + CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin) + } +} + +case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot + + + + + + + +/** + * Dispatcher Metrics that we care about currently with a histogram-like nature: + * - Work Queue Size + * - Total/Active Thread Count + */ + + + +import annotation.tailrec +import java.util.concurrent.atomic.AtomicReference + +object Atomic { + def apply[T]( obj : T) = new Atomic(new AtomicReference(obj)) + implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref) +} + +class Atomic[T](val atomic : AtomicReference[T]) { + @tailrec + final def update(f: T => T) : T = { + val oldValue = atomic.get() + val newValue = f(oldValue) + if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f) + } + + def get() = atomic.get() +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala new file mode 100644 index 00000000..5b4ceaf4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala @@ -0,0 +1,51 @@ +package kamon.metric + +import com.codahale.metrics._ + +object MetricsUtils { + + def markMeter[T](meter:Meter)(f: => T): T = { + meter.mark() + f + } +// +// def incrementCounter(key: String) { +// counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count +// } +// +// def markMeter(key: String) { +// meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark() +// } +// +// def trace[T](key: String)(f: => T): T = { +// val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) ) +// timer.time(f) +// } + +// def markAndCountMeter[T](key: String)(f: => T): T = { +// markMeter(key) +// f +// } +// +// def traceAndCount[T](key: String)(f: => T): T = { +// incrementCounter(key) +// trace(key) { +// f +// } + //} + +// implicit def runnable(f: () => Unit): Runnable = +// new Runnable() { def run() = f() } +// +// +// import java.util.concurrent.Callable +// +// implicit def callable[T](f: () => T): Callable[T] = +// new Callable[T]() { def call() = f() } + +// private val actorCounter:Counter = new Counter +// private val actorTimer:Timer = new Timer +// +// metricsRegistry.register(s"counter-for-${actorName}", actorCounter) +// metricsRegistry.register(s"timer-for-${actorName}", actorTimer) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala new file mode 100644 index 00000000..70f3e54a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala @@ -0,0 +1,51 @@ +package kamon.metric + +import com.codahale.metrics +import metrics._ +import java.util.concurrent.TimeUnit +import java.util +import com.newrelic.api.agent.NewRelic +import scala.collection.JavaConverters._ + + +class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) { + + + + private[NewRelicReporter] def processMeter(name: String, meter: Meter) { + NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat) + } + + private[NewRelicReporter] def processCounter(name:String, counter:Counter) { + println(s"Logging to NewRelic: ${counter.getCount}") + + } + + +/* def processGauge(name: String, gauge: Gauge[_]) = { + println(s"the value is: "+gauge.getValue) + NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float]) + }*/ + + + def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { + //Process Meters + meters.asScala.map{case(name, meter) => processMeter(name, meter)} + + //Process Meters + counters.asScala.map{case(name, counter) => processCounter(name, counter)} + + // Gauges + gauges.asScala.foreach{ case (name, gauge) => { + val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue() + val fullMetricName = "Custom" + name + NewRelic.recordMetric(fullMetricName, measure) + }} + } + + +} + +object NewRelicReporter { + def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/spraytest/ClientTest.scala b/kamon-core/src/main/scala/spraytest/ClientTest.scala new file mode 100644 index 00000000..07532d0a --- /dev/null +++ b/kamon-core/src/main/scala/spraytest/ClientTest.scala @@ -0,0 +1,55 @@ +package spraytest + +import akka.actor.ActorSystem +import spray.client.pipelining._ +import spray.httpx.SprayJsonSupport +import spray.json._ +import scala.concurrent.Future +import spray.can.Http +import akka.io.IO + +/** + * BEGIN JSON Infrastructure + */ +case class Container(data: List[PointOfInterest]) +case class Geolocation(latitude: Float, longitude: Float) +case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation) + +object GeoJsonProtocol extends DefaultJsonProtocol { + implicit val geolocationFormat = jsonFormat2(Geolocation) + implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest) + implicit val containerFormat = jsonFormat1(Container) +} +/** END-OF JSON Infrastructure */ + + + + + + +class ClientTest extends App { + implicit val actorSystem = ActorSystem("spray-client-test") + import actorSystem.dispatcher + + + import GeoJsonProtocol._ + import SprayJsonSupport._ + + + val actor = IO(Http) + + val pipeline = sendReceive ~> unmarshal[Container] + + val response = pipeline { + Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco") + } onSuccess { + case a => { + println(a) + } + } +} + + + + + diff --git a/kamon-core/src/main/scala/spraytest/FutureTesting.scala b/kamon-core/src/main/scala/spraytest/FutureTesting.scala new file mode 100644 index 00000000..b864d6d6 --- /dev/null +++ b/kamon-core/src/main/scala/spraytest/FutureTesting.scala @@ -0,0 +1,81 @@ +package spraytest +/* +import akka.actor.ActorSystem +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Try, Success} +import kamon.actor.TransactionContext + +object FutureTesting extends App { + + val actorSystem = ActorSystem("future-testing") + implicit val ec = actorSystem.dispatcher + implicit val tctx = TransactionContext(11, Nil) + + threadPrintln("In the initial Thread") + + + val f = TraceableFuture { + threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}") + } + + f.onComplete({ + case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}") + }) + + f.onComplete({ + case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}") + }) + + + + + + + + + def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]") + +} + + + + +trait TransactionContextWrapper { + def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = { + TransactionContext.current.set(tranContext.fork) + println(s"SetContext to: ${tranContext}") + val result = f + + TransactionContext.current.remove() + result + } + +} + +class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper { + def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = { + future.onComplete(wrap(func, transactionContext)) + } +} + +object TraceableFuture { + + implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future + + def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = { + val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil)) + + new TraceableFuture(Future { wrappedBody }) + } + + + + + def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = { + TransactionContext.current.set(transactionContext) + val result = body + TransactionContext.current.remove() + result + } +}*/ + diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala new file mode 100644 index 00000000..f9d6869c --- /dev/null +++ b/kamon-core/src/main/scala/test/PingPong.scala @@ -0,0 +1,34 @@ +package test + +import akka.actor.{Props, Actor, ActorSystem} + +object PingPong extends App { + + val as = ActorSystem("ping-pong") + + val pinger = as.actorOf(Props[Pinger]) + val ponger = as.actorOf(Props[Ponger]) + + pinger.tell(Pong, ponger) + + + Thread.sleep(30000) + as.shutdown() + + +} + +case object Ping +case object Pong + +class Pinger extends Actor { + def receive = { + case Pong => sender ! Ping + } +} + +class Ponger extends Actor { + def receive = { + case Ping => sender ! Pong + } +} -- cgit v1.2.3