aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-01-02 18:09:53 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-01-13 17:37:20 -0300
commit7a10c0ef2a6566229e8571f6d385ca2ff794cc20 (patch)
treececd7ce6eb7a71f967eaa1605615780fa94d346c /kamon-core/src/main/scala
parent54143e4af6182b967736abc60a7fb20c88dd6587 (diff)
downloadKamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.tar.gz
Kamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.tar.bz2
Kamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.zip
integrate trace and metrics into the base project
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala93
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala65
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala49
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala37
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala18
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala47
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala74
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Metrics.scala132
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala (renamed from kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala)27
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala148
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/package.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Segments.scala38
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Trace.scala114
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala51
-rw-r--r--kamon-core/src/main/scala/kamon/trace/UowTracing.scala82
-rw-r--r--kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala (renamed from kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala)15
18 files changed, 784 insertions, 247 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
new file mode 100644
index 00000000..6cede344
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -0,0 +1,93 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package akka.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import akka.actor.{ Cell, Props, ActorSystem, ActorRef }
+import akka.dispatch.{ Envelope, MessageDispatcher }
+import kamon.trace.{ TraceContext, ContextAware, Trace }
+import kamon.metrics.{ HdrActorMetricsRecorder, ActorMetrics }
+import kamon.Kamon
+
+@Aspect("perthis(actorCellCreation(*, *, *, *, *))")
+class BehaviourInvokeTracing {
+ var path: Option[String] = None
+ var actorMetrics: Option[HdrActorMetricsRecorder] = None
+
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
+ def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
+
+ @After("actorCellCreation(system, ref, props, dispatcher, parent)")
+ def afterCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+ val metricsExtension = Kamon(ActorMetrics)(system)
+ val simplePathString = ref.path.elements.mkString("/")
+
+ if (metricsExtension.shouldTrackActor(simplePathString)) {
+ path = Some(ref.path.toString)
+ actorMetrics = Some(metricsExtension.registerActor(simplePathString))
+ }
+ }
+
+ @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+
+ @Around("invokingActorBehaviourAtActorCell(envelope)")
+ def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
+ val timestampBeforeProcessing = System.nanoTime()
+ val contextAndTimestamp = envelope.asInstanceOf[ContextAndTimestampAware]
+
+ Trace.withContext(contextAndTimestamp.traceContext) {
+ pjp.proceed()
+ }
+
+ actorMetrics.map { am ⇒
+ am.recordProcessingTime(System.nanoTime() - timestampBeforeProcessing)
+ am.recordTimeInMailbox(timestampBeforeProcessing - contextAndTimestamp.timestamp)
+ }
+ }
+
+ @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
+ def actorStop(cell: Cell): Unit = {}
+
+ @After("actorStop(cell)")
+ def afterStop(cell: Cell): Unit = {
+ path.map(p ⇒ Kamon(ActorMetrics)(cell.system).unregisterActor(p))
+ }
+}
+
+@Aspect
+class EnvelopeTraceContextMixin {
+
+ @DeclareMixin("akka.dispatch.Envelope")
+ def mixin: ContextAndTimestampAware = new ContextAndTimestampAware {
+ val traceContext: Option[TraceContext] = Trace.context()
+ val timestamp: Long = System.nanoTime()
+ }
+
+ @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
+ def envelopeCreation(ctx: ContextAware): Unit = {}
+
+ @After("envelopeCreation(ctx)")
+ def afterEnvelopeCreation(ctx: ContextAware): Unit = {
+ // Necessary to force the initialization of ContextAware at the moment of creation.
+ ctx.traceContext
+ }
+}
+
+trait ContextAndTimestampAware extends ContextAware {
+ def timestamp: Long
+}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
new file mode 100644
index 00000000..7d26016e
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
@@ -0,0 +1,65 @@
+package akka.instrumentation
+
+import org.aspectj.lang.annotation._
+import kamon.trace.{ Trace, ContextAware }
+import akka.dispatch.sysmsg.EarliestFirstSystemMessageList
+import org.aspectj.lang.ProceedingJoinPoint
+
+@Aspect
+class SystemMessageTraceContextMixin {
+
+ @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+")
+ def mixin: ContextAware = ContextAware.default
+
+ @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)")
+ def envelopeCreation(ctx: ContextAware): Unit = {}
+
+ @After("envelopeCreation(ctx)")
+ def afterEnvelopeCreation(ctx: ContextAware): Unit = {
+ // Necessary to force the initialization of ContextAware at the moment of creation.
+ ctx.traceContext
+ }
+}
+
+@Aspect
+class RepointableActorRefTraceContextMixin {
+
+ @DeclareMixin("akka.actor.RepointableActorRef")
+ def mixin: ContextAware = ContextAware.default
+
+ @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)")
+ def envelopeCreation(ctx: ContextAware): Unit = {}
+
+ @After("envelopeCreation(ctx)")
+ def afterEnvelopeCreation(ctx: ContextAware): Unit = {
+ // Necessary to force the initialization of ContextAware at the moment of creation.
+ ctx.traceContext
+ }
+
+ @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)")
+ def repointableActorRefCreation(repointableActorRef: ContextAware): Unit = {}
+
+ @Around("repointableActorRefCreation(repointableActorRef)")
+ def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: ContextAware): Any = {
+ Trace.withContext(repointableActorRef.traceContext) {
+ pjp.proceed()
+ }
+ }
+
+}
+
+@Aspect
+class ActorSystemMessagePassingTracing {
+
+ @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)")
+ def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {}
+
+ @Around("systemMessageProcessing(messages)")
+ def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = {
+ if (messages.nonEmpty) {
+ val ctx = messages.head.asInstanceOf[ContextAware].traceContext
+ Trace.withContext(ctx)(pjp.proceed())
+
+ } else pjp.proceed()
+ }
+}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala
new file mode 100644
index 00000000..b5b23e61
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala
@@ -0,0 +1,49 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package akka.instrumentation
+
+import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect }
+import akka.event.Logging.Warning
+import scala.compat.Platform.EOL
+import akka.actor.ActorRefProvider
+import akka.pattern.{ AskTimeoutException, PromiseActorRef }
+
+@Aspect
+class AskPatternTracing {
+
+ class StackTraceCaptureException extends Throwable
+
+ @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *)", argNames = "provider")
+ def promiseActorRefApply(provider: ActorRefProvider): Unit = {
+ provider.settings.config.getBoolean("kamon.trace.ask-pattern-tracing")
+ }
+
+ @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor")
+ def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = {
+ val future = promiseActor.result.future
+ val system = promiseActor.provider.guardian.underlying.system
+ implicit val ec = system.dispatcher
+ val stack = new StackTraceCaptureException
+
+ future onFailure {
+ case timeout: AskTimeoutException ⇒
+ val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL)
+
+ system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternTracing],
+ "Timeout triggered for ask pattern registered at: " + stackString))
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index b5c3d552..b72e8fea 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -18,10 +18,8 @@ package kamon
import akka.actor._
object Kamon {
- trait Extension extends akka.actor.Extension {
- def manager: ActorRef
- }
+ trait Extension extends akka.actor.Extension
- def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager
+ def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): T = key(system)
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
new file mode 100644
index 00000000..297017cf
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
@@ -0,0 +1,37 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect }
+import org.aspectj.lang.ProceedingJoinPoint
+import kamon.trace.{ ContextAware, Trace }
+
+@Aspect
+class ActorLoggingTracing {
+
+ @DeclareMixin("akka.event.Logging.LogEvent+")
+ def mixin: ContextAware = ContextAware.default
+
+ @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)")
+ def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = {}
+
+ @Around("withMdcInvocation(logSource, logEvent, logStatement)")
+ def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = {
+ Trace.withContext(logEvent.traceContext) {
+ pjp.proceed()
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
index a3da76f7..90d2b270 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -19,10 +19,8 @@ import org.aspectj.lang.annotation._
import java.util.concurrent._
import org.aspectj.lang.ProceedingJoinPoint
import java.util
-import kamon.metric.{ DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector }
import akka.dispatch.{ MonitorableThreadFactory, ExecutorServiceFactory }
import com.typesafe.config.Config
-import kamon.Kamon
import scala.concurrent.forkjoin.ForkJoinPool
import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
@@ -41,8 +39,8 @@ class ActorSystemInstrumentation {
@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
class ForkJoinPoolInstrumentation {
- var activeThreadsHistogram: Histogram = _
- var poolSizeHistogram: Histogram = _
+ /* var activeThreadsHistogram: Histogram = _
+ var poolSizeHistogram: Histogram = _*/
@Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
@@ -71,8 +69,8 @@ class ForkJoinPoolInstrumentation {
@After("forkJoinScan(fjp)")
def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
- activeThreadsHistogram.update(fjp.getActiveThreadCount)
- poolSizeHistogram.update(fjp.getPoolSize)
+ /*activeThreadsHistogram.update(fjp.getActiveThreadCount)
+ poolSizeHistogram.update(fjp.getPoolSize)*/
}
}
@@ -90,6 +88,7 @@ trait WatchedExecutorService {
def collector: ExecutorServiceCollector
}
+/*
trait ExecutorServiceMonitoring {
def dispatcherMetrics: DispatcherMetricCollector
}
@@ -97,6 +96,7 @@ trait ExecutorServiceMonitoring {
class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring {
@volatile var dispatcherMetrics: DispatcherMetricCollector = _
}
+*/
case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = delegate.createExecutorService
@@ -133,9 +133,9 @@ class NamedExecutorServiceFactoryDelegateInstrumentation {
@Around("factoryMethodCall(namedFactory)")
def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
val delegate = pjp.proceed().asInstanceOf[ExecutorService]
- val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
+ val executorFullName = "" //MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
- ExecutorServiceMetricCollector.register(executorFullName, delegate)
+ //ExecutorServiceMetricCollector.register(executorFullName, delegate)
new NamedExecutorServiceDelegate(executorFullName, delegate)
}
@@ -143,7 +143,7 @@ class NamedExecutorServiceFactoryDelegateInstrumentation {
case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
def shutdown() = {
- ExecutorServiceMetricCollector.deregister(fullName)
+ //ExecutorServiceMetricCollector.deregister(fullName)
delegate.shutdown()
}
def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
new file mode 100644
index 00000000..5600d582
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
@@ -0,0 +1,47 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import kamon.trace.{ ContextAware, TraceContext, Trace }
+
+@Aspect
+class FutureTracing {
+
+ @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
+ def mixin: ContextAware = ContextAware.default
+
+ @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)")
+ def futureRelatedRunnableCreation(runnable: ContextAware): Unit = {}
+
+ @After("futureRelatedRunnableCreation(runnable)")
+ def afterCreation(runnable: ContextAware): Unit = {
+ // Force traceContext initialization.
+ runnable.traceContext
+ }
+
+ @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)")
+ def futureRelatedRunnableExecution(runnable: ContextAware) = {}
+
+ @Around("futureRelatedRunnableExecution(runnable)")
+ def aroundExecution(pjp: ProceedingJoinPoint, runnable: ContextAware): Any = {
+ Trace.withContext(runnable.traceContext) {
+ pjp.proceed()
+ }
+ }
+
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
index da797fa1..44eb8c43 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
@@ -15,17 +15,16 @@
* ========================================================== */
package kamon.instrumentation
-import com.codahale.metrics.{ ExponentiallyDecayingReservoir, Histogram }
import akka.dispatch.{ UnboundedMessageQueueSemantics, Envelope, MessageQueue }
import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect }
import akka.actor.{ ActorSystem, ActorRef }
-import kamon.metric.{ Metrics, MetricDirectory }
import org.aspectj.lang.ProceedingJoinPoint
/**
* For Mailboxes we would like to track the queue size and message latency. Currently the latency
* will be gathered from the ActorCellMetrics.
*/
+/*
@Aspect
class MessageQueueInstrumentation {
@@ -74,4 +73,5 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram:
def hasMessages: Boolean = delegate.hasMessages
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters)
}
+*/
diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
deleted file mode 100644
index 4c4b93e9..00000000
--- a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.metric
-
-import java.util.concurrent.{ ThreadPoolExecutor, ExecutorService }
-import scala.concurrent.forkjoin.ForkJoinPool
-import com.codahale.metrics.{ Metric, MetricFilter }
-
-object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector {
-
- def register(fullName: String, executorService: ExecutorService) = executorService match {
- case fjp: ForkJoinPool ⇒ registerForkJoinPool(fullName, fjp)
- case tpe: ThreadPoolExecutor ⇒ registerThreadPoolExecutor(fullName, tpe)
- case _ ⇒ // If it is a unknown Executor then just do nothing.
- }
-
- def deregister(fullName: String) = {
- Metrics.registry.removeMatching(new MetricFilter {
- def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
- })
- }
-}
-
-trait ForkJoinPoolMetricCollector {
- import GaugeGenerator._
- import BasicExecutorMetricNames._
-
- def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = {
- val forkJoinPoolGauge = newNumericGaugeFor(fjp) _
-
- val allMetrics = Map(
- fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt),
- fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize),
- fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount))
-
- allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) }
- }
-}
-
-trait ThreadPoolExecutorMetricCollector {
- import GaugeGenerator._
- import BasicExecutorMetricNames._
-
- def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = {
- val tpeGauge = newNumericGaugeFor(tpe) _
-
- val allMetrics = Map(
- fullName + queueSize -> tpeGauge(_.getQueue.size()),
- fullName + poolSize -> tpeGauge(_.getPoolSize),
- fullName + activeThreads -> tpeGauge(_.getActiveCount))
-
- allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) }
- }
-}
-
-object BasicExecutorMetricNames {
- val queueSize = "queueSize"
- val poolSize = "threads/poolSize"
- val activeThreads = "threads/activeThreads"
-}
-
diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
deleted file mode 100644
index b904ec56..00000000
--- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.metric
-
-import java.util.concurrent.TimeUnit
-import akka.actor.ActorRef
-import com.codahale.metrics
-import com.codahale.metrics.{ MetricFilter, Metric, ConsoleReporter, MetricRegistry }
-import scala.collection.concurrent.TrieMap
-
-object Metrics {
- val registry: MetricRegistry = new MetricRegistry
-
- val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS)
- //consoleReporter.build().start(45, TimeUnit.SECONDS)
-
- //val newrelicReporter = NewRelicReporter(registry)
- //newrelicReporter.start(5, TimeUnit.SECONDS)
-
- def include(name: String, metric: Metric) = {
- //registry.register(name, metric)
- }
-
- def exclude(name: String) = {
- registry.removeMatching(new MetricFilter {
- def matches(name: String, metric: Metric): Boolean = name.startsWith(name)
- })
- }
-
- def deregister(fullName: String) = {
- registry.removeMatching(new MetricFilter {
- def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
- })
- }
-}
-
-object Watched {
- case object Actor
- case object Dispatcher
-}
-
-object MetricDirectory {
- def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/"
-
- def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox"
-
- def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/")
-
- def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon")
-
- def shouldInstrumentActor(actorPath: String): Boolean = {
- !(actorPath.isEmpty || actorPath.startsWith("system"))
- }
-
-}
-
-case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram)
-
-trait Histogram {
- def update(value: Long): Unit
- def snapshot: HistogramSnapshot
-}
-
-trait HistogramSnapshot {
- def median: Double
- def max: Double
- def min: Double
-}
-
-case class ActorSystemMetrics(actorSystemName: String) {
- val dispatchers = TrieMap.empty[String, DispatcherMetricCollector]
-
- private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())
-
- def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = {
- val stats = createDispatcherCollector
- dispatchers.put(dispatcherName, stats)
- Some(stats)
- }
-
-}
-
-case class CodahaleHistogram() extends Histogram {
- private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir())
-
- def update(value: Long) = histogram.update(value)
- def snapshot: HistogramSnapshot = {
- val snapshot = histogram.getSnapshot
-
- CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin)
- }
-}
-
-case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot
-
-/**
- * Dispatcher Metrics that we care about currently with a histogram-like nature:
- * - Work Queue Size
- * - Total/Active Thread Count
- */
-
-import annotation.tailrec
-import java.util.concurrent.atomic.AtomicReference
-
-object Atomic {
- def apply[T](obj: T) = new Atomic(new AtomicReference(obj))
- implicit def toAtomic[T](ref: AtomicReference[T]): Atomic[T] = new Atomic(ref)
-}
-
-class Atomic[T](val atomic: AtomicReference[T]) {
- @tailrec
- final def update(f: T ⇒ T): T = {
- val oldValue = atomic.get()
- val newValue = f(oldValue)
- if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f)
- }
-
- def get() = atomic.get()
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
index 252e5220..72e473e8 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
@@ -13,26 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
-package kamon.instrumentation
-import org.aspectj.lang.ProceedingJoinPoint
+package kamon.metrics
-trait ProceedingJoinPointPimp {
- import language.implicitConversions
+import akka.actor.{ Props, ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
+import akka.actor
+import kamon.Kamon
- implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp)
-}
-
-object ProceedingJoinPointPimp extends ProceedingJoinPointPimp
+object ActorMetrics extends ExtensionId[ActorMetricsExtension] with ExtensionIdProvider {
+ def lookup(): ExtensionId[_ <: actor.Extension] = ActorMetrics
+ def createExtension(system: ExtendedActorSystem): ActorMetricsExtension = new ActorMetricsExtension(system)
-case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) {
- def proceedWith(newUniqueArg: AnyRef) = {
- val args = pjp.getArgs
- args.update(0, newUniqueArg)
- pjp.proceed(args)
- }
+}
- def proceedWithTarget(args: AnyRef*) = {
- pjp.proceed(args.toArray)
- }
+class ActorMetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension with ActorMetricsOps {
+ lazy val metricsDispatcher = system.actorOf(Props[ActorMetricsDispatcher], "kamon-actor-metrics")
}
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala
new file mode 100644
index 00000000..dc4abde0
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala
@@ -0,0 +1,148 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import org.HdrHistogram.{ AbstractHistogram, AtomicHistogram }
+import kamon.util.GlobPathFilter
+import scala.collection.concurrent.TrieMap
+import scala.collection.JavaConversions.iterableAsScalaIterable
+import akka.actor._
+import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, FlushMetrics }
+import kamon.Kamon
+import scala.concurrent.duration._
+import java.util.concurrent.TimeUnit
+import kamon.metrics.ActorMetricsDispatcher.Subscribe
+
+trait ActorMetricsOps {
+ self: ActorMetricsExtension ⇒
+
+ val config = system.settings.config.getConfig("kamon.metrics.actors")
+ val actorMetrics = TrieMap[String, HdrActorMetricsRecorder]()
+
+ val trackedActors: Vector[GlobPathFilter] = config.getStringList("tracked").map(glob ⇒ new GlobPathFilter(glob)).toVector
+ val excludedActors: Vector[GlobPathFilter] = config.getStringList("excluded").map(glob ⇒ new GlobPathFilter(glob)).toVector
+
+ val actorMetricsFactory: () ⇒ HdrActorMetricsRecorder = {
+ val settings = config.getConfig("hdr-settings")
+ val processingTimeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("processing-time"))
+ val timeInMailboxHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("time-in-mailbox"))
+ val mailboxSizeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("mailbox-size"))
+
+ () ⇒ new HdrActorMetricsRecorder(processingTimeHdrConfig, timeInMailboxHdrConfig, mailboxSizeHdrConfig)
+ }
+
+ import scala.concurrent.duration._
+ system.scheduler.schedule(0.seconds, 10.seconds)(
+ actorMetrics.collect {
+ case (name, recorder: HdrActorMetricsRecorder) ⇒
+ println(s"Actor: $name")
+ recorder.processingTimeHistogram.copy.getHistogramData.outputPercentileDistribution(System.out, 1000000D)
+ })(system.dispatcher)
+
+ def shouldTrackActor(path: String): Boolean =
+ trackedActors.exists(glob ⇒ glob.accept(path)) && !excludedActors.exists(glob ⇒ glob.accept(path))
+
+ def registerActor(path: String): HdrActorMetricsRecorder = actorMetrics.getOrElseUpdate(path, actorMetricsFactory())
+
+ def unregisterActor(path: String): Unit = actorMetrics.remove(path)
+}
+
+class HdrActorMetricsRecorder(processingTimeHdrConfig: HdrConfiguration, timeInMailboxHdrConfig: HdrConfiguration,
+ mailboxSizeHdrConfig: HdrConfiguration) {
+
+ val processingTimeHistogram = new AtomicHistogram(processingTimeHdrConfig.highestTrackableValue, processingTimeHdrConfig.significantValueDigits)
+ val timeInMailboxHistogram = new AtomicHistogram(timeInMailboxHdrConfig.highestTrackableValue, timeInMailboxHdrConfig.significantValueDigits)
+ val mailboxSizeHistogram = new AtomicHistogram(mailboxSizeHdrConfig.highestTrackableValue, mailboxSizeHdrConfig.significantValueDigits)
+
+ def recordTimeInMailbox(waitTime: Long): Unit = timeInMailboxHistogram.recordValue(waitTime)
+
+ def recordProcessingTime(processingTime: Long): Unit = processingTimeHistogram.recordValue(processingTime)
+
+ def snapshot(): HdrActorMetricsSnapshot = {
+ HdrActorMetricsSnapshot(processingTimeHistogram.copy(), timeInMailboxHistogram.copy(), mailboxSizeHistogram.copy())
+ }
+
+ def reset(): Unit = {
+ processingTimeHistogram.reset()
+ timeInMailboxHistogram.reset()
+ mailboxSizeHistogram.reset()
+ }
+}
+
+case class HdrActorMetricsSnapshot(processingTimeHistogram: AbstractHistogram, timeInMailboxHistogram: AbstractHistogram,
+ mailboxSizeHistogram: AbstractHistogram)
+
+class ActorMetricsDispatcher extends Actor {
+ val tickInterval = Duration(context.system.settings.config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS)
+ val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+
+ var subscribedForever: Map[GlobPathFilter, List[ActorRef]] = Map.empty
+ var subscribedForOne: Map[GlobPathFilter, List[ActorRef]] = Map.empty
+ var lastTick = System.currentTimeMillis()
+
+ def receive = {
+ case Subscribe(path, true) ⇒ subscribeForever(path, sender)
+ case Subscribe(path, false) ⇒ subscribeOneOff(path, sender)
+ case FlushMetrics ⇒ flushMetrics()
+ }
+
+ def subscribeForever(path: String, receiver: ActorRef): Unit = subscribedForever = subscribe(receiver, path, subscribedForever)
+
+ def subscribeOneOff(path: String, receiver: ActorRef): Unit = subscribedForOne = subscribe(receiver, path, subscribedForOne)
+
+ def subscribe(receiver: ActorRef, path: String, target: Map[GlobPathFilter, List[ActorRef]]): Map[GlobPathFilter, List[ActorRef]] = {
+ val pathFilter = new GlobPathFilter(path)
+ val oldReceivers = target.get(pathFilter).getOrElse(Nil)
+ target.updated(pathFilter, receiver :: oldReceivers)
+ }
+
+ def flushMetrics(): Unit = {
+ val currentTick = System.currentTimeMillis()
+ val snapshots = Kamon(ActorMetrics)(context.system).actorMetrics.map {
+ case (path, metrics) ⇒
+ val snapshot = metrics.snapshot()
+ metrics.reset()
+
+ (path, snapshot)
+ }.toMap
+
+ dispatchMetricsTo(subscribedForOne, snapshots, currentTick)
+ dispatchMetricsTo(subscribedForever, snapshots, currentTick)
+
+ subscribedForOne = Map.empty
+ lastTick = currentTick
+ }
+
+ def dispatchMetricsTo(subscribers: Map[GlobPathFilter, List[ActorRef]], snapshots: Map[String, HdrActorMetricsSnapshot],
+ currentTick: Long): Unit = {
+
+ for ((subscribedPath, receivers) ← subscribers) {
+ val metrics = snapshots.filterKeys(snapshotPath ⇒ subscribedPath.accept(snapshotPath))
+ val actorMetrics = ActorMetricsSnapshot(lastTick, currentTick, metrics)
+
+ receivers.foreach(ref ⇒ ref ! actorMetrics)
+ }
+ }
+}
+
+object ActorMetricsDispatcher {
+ case class Subscribe(path: String, forever: Boolean = false)
+ case class UnSubscribe(path: String)
+
+ case class ActorMetricsSnapshot(fromMillis: Long, toMillis: Long, metrics: Map[String, HdrActorMetricsSnapshot])
+ case object FlushMetrics
+}
diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala
new file mode 100644
index 00000000..d6359ead
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/package.scala
@@ -0,0 +1,31 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+
+import scala.concurrent.duration._
+import com.typesafe.config.Config
+
+package object metrics {
+ val OneHour = 1.hour.toNanos
+
+ case class HdrConfiguration(highestTrackableValue: Long, significantValueDigits: Int)
+ case object HdrConfiguration {
+ def fromConfig(config: Config): HdrConfiguration = {
+ HdrConfiguration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits"))
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Segments.scala b/kamon-core/src/main/scala/kamon/trace/Segments.scala
new file mode 100644
index 00000000..0bc68ee7
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Segments.scala
@@ -0,0 +1,38 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+
+package kamon.trace
+
+import kamon.trace.Trace.SegmentCompletionHandle
+
+object Segments {
+
+ trait Category
+ case object HttpClientRequest extends Category
+
+ case class Start(category: Category, description: String = "",
+ attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime())
+
+ case class End(attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime())
+
+ case class Segment(start: Start, end: End)
+
+ trait SegmentCompletionHandleAware {
+ var completionHandle: Option[SegmentCompletionHandle]
+ }
+
+ trait ContextAndSegmentCompletionAware extends ContextAware with SegmentCompletionHandleAware
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Trace.scala b/kamon-core/src/main/scala/kamon/trace/Trace.scala
new file mode 100644
index 00000000..31e8185a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Trace.scala
@@ -0,0 +1,114 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace
+
+import kamon.Kamon
+import akka.actor._
+import scala.Some
+import kamon.trace.Trace.Register
+import scala.concurrent.duration._
+import java.util.concurrent.atomic.AtomicLong
+
+object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
+ def lookup(): ExtensionId[_ <: Extension] = Trace
+ def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system)
+
+ /*** Protocol */
+ case object Register
+
+ /** User API */
+ //private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None)
+ private[trace] val traceContext = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue(): Option[TraceContext] = None
+ }
+ private[trace] val tranid = new AtomicLong()
+
+ def context() = traceContext.get
+ private def set(ctx: Option[TraceContext]) = traceContext.set(ctx)
+
+ def clear: Unit = traceContext.remove()
+ def start(name: String)(implicit system: ActorSystem): TraceContext = {
+ val ctx = newTraceContext(name)
+ ctx.start(name)
+ set(Some(ctx))
+
+ ctx
+ }
+
+ def withContext[T](ctx: Option[TraceContext])(thunk: ⇒ T): T = {
+ val oldval = context
+ set(ctx)
+
+ try thunk
+ finally set(oldval)
+ }
+
+ def transformContext(f: TraceContext ⇒ TraceContext): Unit = {
+ context.map(f).foreach(ctx ⇒ set(Some(ctx)))
+ }
+
+ def finish(): Option[TraceContext] = {
+ val ctx = context()
+ ctx.map(_.finish)
+ clear
+ ctx
+ }
+
+ // TODO: FIX
+ def newTraceContext(name: String)(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace).api, tranid.getAndIncrement, name)
+
+ def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = {
+ val start = Segments.Start(category, description, attributes)
+ SegmentCompletionHandle(start)
+ }
+
+ def startSegment(start: Segments.Start): SegmentCompletionHandle = SegmentCompletionHandle(start)
+
+ case class SegmentCompletionHandle(start: Segments.Start) {
+ def complete(): Unit = {
+ val end = Segments.End()
+ println(s"Completing the Segment: $start - $end")
+ }
+ def complete(end: Segments.End): Unit = {
+ println(s"Completing the Segment: $start - $end")
+ }
+ }
+}
+
+class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ val api: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace")
+}
+
+class TraceManager extends Actor with ActorLogging {
+ var listeners: Seq[ActorRef] = Seq.empty
+
+ def receive = {
+ case Register ⇒
+ listeners = sender +: listeners
+ log.info("Registered [{}] as listener for Kamon traces", sender)
+
+ case segment: UowSegment ⇒
+ val tracerName = segment.id.toString
+ context.child(tracerName).getOrElse(newTracer(tracerName)) ! segment
+
+ case trace: UowTrace ⇒
+ listeners foreach (_ ! trace)
+ }
+
+ def newTracer(name: String): ActorRef = {
+ context.actorOf(UowTraceAggregator.props(self, 30 seconds), name)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
new file mode 100644
index 00000000..3e68a816
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -0,0 +1,51 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace
+
+import java.util.UUID
+import akka.actor._
+import java.util.concurrent.atomic.AtomicLong
+import scala.concurrent.duration._
+import kamon.Kamon
+import kamon.trace.UowTracing.{ Finish, Start }
+
+// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary.
+case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) {
+
+ def start(name: String) = {
+ collector ! Start(id, name)
+ }
+
+ def finish: Unit = {
+ collector ! Finish(id)
+ }
+
+}
+
+trait ContextAware {
+ def traceContext: Option[TraceContext]
+}
+
+object ContextAware {
+ def default: ContextAware = new ContextAware {
+ val traceContext: Option[TraceContext] = Trace.context()
+ }
+}
+
+trait TimedContextAware {
+ def timestamp: Long
+ def traceContext: Option[TraceContext]
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
new file mode 100644
index 00000000..20cce830
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
@@ -0,0 +1,82 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package kamon.trace
+
+import akka.actor._
+import scala.concurrent.duration.Duration
+import kamon.trace.UowTracing._
+
+sealed trait UowSegment {
+ def id: Long
+ def timestamp: Long
+}
+
+trait AutoTimestamp extends UowSegment {
+ val timestamp = System.nanoTime
+}
+
+object UowTracing {
+ case class Start(id: Long, name: String) extends AutoTimestamp
+ case class Finish(id: Long) extends AutoTimestamp
+ case class Rename(id: Long, name: String) extends AutoTimestamp
+ case class WebExternalStart(id: Long, host: String) extends AutoTimestamp
+ case class WebExternalFinish(id: Long) extends AutoTimestamp
+ case class WebExternal(id: Long, start: Long, finish: Long, host: String) extends AutoTimestamp
+}
+
+case class UowTrace(name: String, uow: String, start: Long, end: Long, segments: Seq[UowSegment]) {
+ def elapsed: Long = end - start
+}
+
+class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging {
+ context.setReceiveTimeout(aggregationTimeout)
+
+ var name: String = "UNKNOWN"
+ var segments: Seq[UowSegment] = Nil
+
+ var pendingExternal = List[WebExternalStart]()
+
+ var start = 0L
+ var end = 0L
+
+ def receive = {
+ case start: Start ⇒
+ this.start = start.timestamp
+ segments = segments :+ start
+ name = start.name
+ case finish: Finish ⇒
+ end = finish.timestamp
+ segments = segments :+ finish; finishTracing()
+ case wes: WebExternalStart ⇒ pendingExternal = pendingExternal :+ wes
+ case finish @ WebExternalFinish(id) ⇒ pendingExternal.find(_.id == id).map(start ⇒ {
+ segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host)
+ })
+ case Rename(id, newName) ⇒ name = newName
+ case segment: UowSegment ⇒ segments = segments :+ segment
+ case ReceiveTimeout ⇒
+ log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments)
+ context.stop(self)
+ }
+
+ def finishTracing(): Unit = {
+ reporting ! UowTrace(name, "", start, end, segments)
+ context.stop(self)
+ }
+}
+
+object UowTraceAggregator {
+ def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout)
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala
index 9eff2739..add47fdf 100644
--- a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala
+++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackUowConverter.scala
@@ -13,15 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
* ========================================================== */
-package kamon.metric
+package kamon.trace.logging
-import com.codahale.metrics.Gauge
+import ch.qos.logback.classic.pattern.ClassicConverter
+import ch.qos.logback.classic.spi.ILoggingEvent
+import kamon.trace.Trace
-trait GaugeGenerator {
-
- def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T ⇒ V) = new Gauge[V] {
- def getValue: V = generator(target)
- }
+class LogbackUowConverter extends ClassicConverter {
+ def convert(event: ILoggingEvent): String = Trace.context().map(_.uow).getOrElse("undefined")
}
-
-object GaugeGenerator extends GaugeGenerator