aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2013-10-25 23:02:56 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2013-10-25 23:02:56 -0300
commit491277a8e30353d5e4ed7e381ab2f5aba4e6f420 (patch)
tree6f2c718bb937fcfec9516ba434776d3ff82ce680 /kamon-core/src/main/scala
parent88af5cf513e44efcf84bc7f92e02deb3c7597686 (diff)
parent808846aaa931c2890016d7bb96ad22fd599f4104 (diff)
downloadKamon-491277a8e30353d5e4ed7e381ab2f5aba4e6f420.tar.gz
Kamon-491277a8e30353d5e4ed7e381ab2f5aba4e6f420.tar.bz2
Kamon-491277a8e30353d5e4ed7e381ab2f5aba4e6f420.zip
Merge branch 'simple-instrumentation'
Conflicts: kamon-core/src/main/resources/application.conf
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala28
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContext.scala40
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContextSwap.scala26
-rw-r--r--kamon-core/src/main/scala/kamon/TransactionPublisher.scala15
-rw-r--r--kamon-core/src/main/scala/kamon/executor/eventbus.scala103
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala41
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala104
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala45
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala49
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala115
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricFilter.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala51
-rw-r--r--kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala52
-rw-r--r--kamon-core/src/main/scala/kamon/trace/UowDirectives.scala28
-rw-r--r--kamon-core/src/main/scala/kamon/trace/UowTracing.scala57
-rw-r--r--kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala8
-rw-r--r--kamon-core/src/main/scala/spraytest/ClientTest.scala55
-rw-r--r--kamon-core/src/main/scala/spraytest/FutureTesting.scala81
-rw-r--r--kamon-core/src/main/scala/test/PingPong.scala34
-rw-r--r--kamon-core/src/main/scala/test/SimpleRequestProcessor.scala97
21 files changed, 498 insertions, 568 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 298f43eb..fb1b2393 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -5,6 +5,13 @@ import kamon.metric.{HistogramSnapshot, ActorSystemMetrics}
import scala.concurrent.duration.FiniteDuration
import com.newrelic.api.agent.NewRelic
import scala.collection.concurrent.TrieMap
+import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration}
+import scala.util.DynamicVariable
+
+
+object Instrument {
+ val instrumentation: ActorInstrumentationConfiguration = new SimpleContextPassingInstrumentation
+}
object Kamon {
implicit lazy val actorSystem = ActorSystem("kamon")
@@ -19,27 +26,20 @@ object Kamon {
def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
}
- val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
- val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
+ //val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
+ //val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
}
object Tracer {
- val ctx = new ThreadLocal[Option[TraceContext]] {
- override def initialValue() = None
- }
+ val traceContext = new DynamicVariable[Option[TraceContext]](None)
- def context() = ctx.get()
- def clear = ctx.remove()
- def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
- def start = set(newTraceContext)
- def stop = ctx.get match {
- case Some(context) => context.close
- case None =>
- }
+ def context() = traceContext.value
+ def set(ctx: TraceContext) = traceContext.value = Some(ctx)
+ def start = set(newTraceContext)
def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem)
}
@@ -79,7 +79,7 @@ case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThre
class NewrelicReporterActor extends Actor {
import scala.concurrent.duration._
- Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)
+ //Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)
def receive = {
case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => {
diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala
index 62d7f57e..63cdb488 100644
--- a/kamon-core/src/main/scala/kamon/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/TraceContext.scala
@@ -2,22 +2,26 @@ package kamon
import java.util.UUID
import akka.actor._
-import akka.agent.Agent
-import java.util.concurrent.TimeUnit
-import scala.util.{Failure, Success}
-import akka.util.Timeout
+import java.util.concurrent.atomic.AtomicLong
+import kamon.trace.UowTraceAggregator
+import scala.concurrent.duration._
+import kamon.newrelic.NewRelicReporting
+import kamon.trace.UowTracing.Start
+// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary.
+case class TraceContext(id: Long, tracer: ActorRef, uow: String = "", userContext: Option[Any] = None)
-case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) {
- implicit val timeout = Timeout(30, TimeUnit.SECONDS)
- implicit val as = Kamon.actorSystem.dispatcher
+object TraceContext {
+ val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting])
+ val traceIdCounter = new AtomicLong
- def append(entry: TraceEntry) = entries ! entry
- def close = entries ! "Close" // TODO type this thing!.
-}
+ def apply()(implicit system: ActorSystem) = {
+ val n = traceIdCounter.incrementAndGet()
+ val actor = system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), s"tracer-${n}")
+ actor ! Start()
-object TraceContext {
- def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
+ new TraceContext(n, actor) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
+ }
}
@@ -30,20 +34,10 @@ class TraceAccumulator extends Actor {
trait TraceEntry
-
case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry
-
-
-
case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry])
-
-
-
-
-object Collector {
-
-}
+object Collector
trait TraceEntryStorage {
def store(entry: TraceEntry): Boolean
diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
deleted file mode 100644
index 24661445..00000000
--- a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-package kamon
-
-/**
- * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards.
- */
-trait TraceContextSwap {
-
- def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body)
-
- def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = {
- ctx match {
- case Some(context) => {
- Tracer.set(context)
- val bodyResult = primary
- Tracer.clear
-
- bodyResult
- }
- case None => fallback
- }
-
- }
-
-}
-
-object TraceContextSwap extends TraceContextSwap
diff --git a/kamon-core/src/main/scala/kamon/TransactionPublisher.scala b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala
deleted file mode 100644
index 0626b91d..00000000
--- a/kamon-core/src/main/scala/kamon/TransactionPublisher.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package kamon
-
-import akka.actor.Actor
-import java.util.UUID
-
-class TransactionPublisher extends Actor {
-
- def receive = {
- case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries")
- }
-
-}
-
-
-case class FullTransaction(id: UUID, entries: List[TraceEntry])
diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
deleted file mode 100644
index a1c099d4..00000000
--- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-package kamon.executor
-
-import akka.event.ActorEventBus
-import akka.event.LookupClassification
-import akka.actor._
-import java.util.concurrent.TimeUnit
-
-import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext}
-import akka.util.Timeout
-import scala.util.{Random, Success, Failure}
-import scala.concurrent.Future
-
-trait Message
-
-case class PostMessage(text:String) extends Message
-
-case class MessageEvent(val channel:String, val message:Message)
-
-class AppActorEventBus extends ActorEventBus with LookupClassification{
- type Event = MessageEvent
- type Classifier=String
- protected def mapSize(): Int={
- 10
- }
-
- protected def classify(event: Event): Classifier={
- event.channel
- }
-
- protected def publish(event: Event, subscriber: Subscriber): Unit={
- subscriber ! event
- }
-}
-case class Ping()
-case class Pong()
-
-class PingActor extends Actor with ActorLogging {
-
- val pong = context.actorOf(Props[PongActor], "Pong")
- val random = new Random()
- def receive = {
- case Pong() => {
- //Thread.sleep(random.nextInt(2000))
- //log.info("Message from Ping")
- pong ! Ping()
- }
- }
-}
-
-class PongActor extends Actor with ActorLogging {
- def receive = {
- case Ping() => {
- sender ! Pong()
- }
- }
-}
-
-
-object TryAkka extends App{
- val system = ActorSystem("MySystem")
- val appActorEventBus=new AppActorEventBus
- val NEW_POST_CHANNEL="/posts/new"
- val subscriber = system.actorOf(Props(new Actor {
- def receive = {
- case d: MessageEvent => println(d)
- }
- }))
-
- Tracer.start
- for(i <- 1 to 4) {
- val ping = system.actorOf(Props[PingActor], "Ping" + i)
- ping ! Pong()
- }
-
-
- def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body")
-
- /*
- val newRelicReporter = new NewRelicReporter(registry)
- newRelicReporter.start(1, TimeUnit.SECONDS)
-
-*/
- import akka.pattern.ask
- implicit val timeout = Timeout(10, TimeUnit.SECONDS)
- implicit def execContext = system.dispatcher
-
-
-
- //Tracer.start
-
- Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
- threadPrintln("Before doing it")
- val f = Future { threadPrintln("This is happening inside the future body") }
-
- Tracer.stop
-
-
- //Thread.sleep(3000)
- //system.shutdown()
-
-/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
- appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala
new file mode 100644
index 00000000..4e47c2a4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala
@@ -0,0 +1,41 @@
+package kamon.instrumentation
+
+import akka.actor.{Props, ActorSystem, ActorRef}
+import akka.dispatch.{MessageDispatcher, Envelope}
+import kamon.{Tracer, TraceContext}
+import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage
+
+trait ActorInstrumentationConfiguration {
+ def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any
+ def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation
+}
+
+
+trait ActorReceiveInvokeInstrumentation {
+ def preReceive(envelope: Envelope): (Envelope, Option[TraceContext])
+}
+
+object ActorReceiveInvokeInstrumentation {
+ val noopPreReceive = new ActorReceiveInvokeInstrumentation{
+ def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = (envelope, None)
+ }
+}
+
+class SimpleContextPassingInstrumentation extends ActorInstrumentationConfiguration {
+ def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any = SimpleTraceMessage(message, Tracer.context)
+
+ def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation = {
+ new ActorReceiveInvokeInstrumentation {
+ def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = envelope match {
+ case env @ Envelope(SimpleTraceMessage(msg, ctx), _) => (env.copy(message = msg), ctx)
+ case anyOther => (anyOther, None)
+ }
+ }
+ }
+}
+
+object SimpleContextPassingInstrumentation {
+ case class SimpleTraceMessage(message: Any, context: Option[TraceContext])
+}
+
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala
new file mode 100644
index 00000000..47d1756f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala
@@ -0,0 +1,31 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
+import kamon.{Tracer, TraceContext}
+import org.aspectj.lang.ProceedingJoinPoint
+import org.slf4j.MDC
+
+@Aspect
+class ActorLoggingInstrumentation {
+
+
+ @DeclareMixin("akka.event.Logging.LogEvent+")
+ def traceContextMixin: ContextAware = new ContextAware {
+ def traceContext: Option[TraceContext] = Tracer.context()
+ }
+
+ @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)")
+ def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {}
+
+ @Around("withMdcInvocation(logSource, logEvent, logStatement)")
+ def putTraceContextInMDC(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {
+ logEvent.traceContext match {
+ case Some(ctx) =>
+ MDC.put("uow", ctx.uow)
+ pjp.proceed()
+ MDC.remove("uow")
+
+ case None => pjp.proceed()
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index 7d3e36ca..7b5d5339 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -4,116 +4,46 @@ import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import akka.actor.{Props, ActorSystem, ActorRef}
import kamon.{Tracer, TraceContext}
-import akka.dispatch.{MessageDispatcher, Envelope}
+import akka.dispatch.{Envelope, MessageDispatcher}
import com.codahale.metrics.Timer
-import kamon.metric.{MetricDirectory, Metrics}
import scala.Some
+import kamon.trace.context.TracingAwareContext
case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
-
+case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.traceContext.value, timestamp: Long = System.nanoTime) extends TracingAwareContext
@Aspect
-class ActorRefTellInstrumentation {
- import ProceedingJoinPointPimp._
-
- val t2 = Metrics.registry.timer("some" + "LATENCY")
-
- @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)")
- def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
-
- @Around("sendingMessageToActorRef(actor, message, sender)")
- def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = {
-
- //val actorName = MetricDirectory.nameForActor(actor)
- //val t = Metrics.registry.timer(actorName + "LATENCY")
- //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]")
- pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender)
-
- }
-}
-
-
-@Aspect("perthis(actorCellCreation(..))")
class ActorCellInvokeInstrumentation {
- var processingTimeTimer: Timer = _
- var shouldTrack = false
-
- // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.
-
@Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
- @After("actorCellCreation(system, ref, props, dispatcher, parent)")
- def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
- val actorName = MetricDirectory.nameForActor(ref)
- val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
-
- //println("=====> Created ActorCell for: "+ref.toString())
- /** TODO: Find a better way to filter the things we don't want to measure. */
- //if(system.name != "kamon" && actorName.startsWith("/user")) {
- processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
- shouldTrack = true
- //}
- }
-
-
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
-
@Around("invokingActorBehaviourAtActorCell(envelope)")
def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
- import ProceedingJoinPointPimp._
- //println("ENVELOPE --------------------->"+envelope)
- envelope match {
- case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
- //timer.stop()
-
- val originalEnvelope = envelope.copy(message = msg)
+ //safe cast
+ val msgContext = envelope.asInstanceOf[TracingAwareContext].traceContext
- //println("PROCESSING TIME TIMER: "+processingTimeTimer)
- val pt = processingTimeTimer.time()
- ctx match {
- case Some(c) => {
- Tracer.set(c)
- //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
- pjp.proceedWith(originalEnvelope)
- Tracer.clear
- }
- case None => pjp.proceedWith(originalEnvelope)
- }
- pt.stop()
- }
- case _ => pjp.proceed
+ Tracer.traceContext.withValue(msgContext) {
+ pjp.proceed()
}
}
}
-
@Aspect
-class UnregisteredActorRefInstrumentation {
- @Pointcut("execution(* akka.spray.UnregisteredActorRefBase+.handle(..)) && args(message, sender)")
- def sprayResponderHandle(message: Any, sender: ActorRef) = {}
+class EnvelopeTracingContext {
- @Around("sprayResponderHandle(message, sender)")
- def sprayInvokeAround(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = {
- import ProceedingJoinPointPimp._
- println("Handling unregistered actor ref message: "+message)
- message match {
- case TraceableMessage(ctx, msg, timer) => {
- timer.stop()
+ @DeclareMixin("akka.dispatch.Envelope")
+ def mixin: TracingAwareContext = DefaultTracingAwareEnvelopeContext()
- ctx match {
- case Some(c) => {
- Tracer.set(c)
- pjp.proceedWith(msg.asInstanceOf[AnyRef]) // TODO: define if we should use Any or AnyRef and unify with the rest of the instrumentation.
- Tracer.clear
- }
- case None => pjp.proceedWith(msg.asInstanceOf[AnyRef])
- }
- }
- case _ => pjp.proceed
- }
+ @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
+ def requestRecordInit(ctx: TracingAwareContext): Unit = {}
+
+ @After("requestRecordInit(ctx)")
+ def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = {
+ // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation.
+ ctx.traceContext
}
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
index 30041321..992cfa82 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
@@ -8,10 +8,12 @@ import scala.Some
/**
* Marker interface, just to make sure we don't instrument all the Runnables in the classpath.
*/
-trait TraceContextAwareRunnable extends Runnable {}
+trait TraceContextAwareRunnable {
+ def traceContext: Option[TraceContext]
+}
-@Aspect("perthis(instrumentedRunnableCreation())")
+@Aspect
class RunnableInstrumentation {
/**
@@ -19,43 +21,38 @@ class RunnableInstrumentation {
* while their run method is executed.
*/
@DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
- def onCompleteCallbacksRunnable: TraceContextAwareRunnable = null
+ def onCompleteCallbacksRunnable: TraceContextAwareRunnable = new TraceContextAwareRunnable {
+ val traceContext: Option[TraceContext] = Tracer.traceContext.value
+ }
/**
* Pointcuts
*/
- @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..))")
- def instrumentedRunnableCreation(): Unit = {}
-
- @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable.run())")
- def runnableExecution() = {}
-
-
- /**
- * Aspect members
- */
+ @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..)) && this(runnable)")
+ def instrumentedRunnableCreation(runnable: TraceContextAwareRunnable): Unit = {}
- private val traceContext = Tracer.context
+ @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable+.run()) && this(runnable)")
+ def runnableExecution(runnable: TraceContextAwareRunnable) = {}
- /**
- * Advices
- */
- import kamon.TraceContextSwap.withContext
- @Before("instrumentedRunnableCreation()")
- def beforeCreation = {
- //println((new Throwable).getStackTraceString)
+ @After("instrumentedRunnableCreation(runnable)")
+ def beforeCreation(runnable: TraceContextAwareRunnable): Unit = {
+ // Force traceContext initialization.
+ runnable.traceContext
}
- @Around("runnableExecution()")
- def around(pjp: ProceedingJoinPoint) = {
+ @Around("runnableExecution(runnable)")
+ def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable): Any = {
import pjp._
- withContext(traceContext, proceed())
+ Tracer.traceContext.withValue(runnable.traceContext) {
+ proceed()
+ }
}
}
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
deleted file mode 100644
index 74261403..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-package kamon.instrumentation
-
-import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
-import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect}
-
-class ActorCage(val name: String, val size: Int) {
-
- def doIt: Unit = println("name")
-}
-
-trait CageMonitoring {
- def histogram: Histogram
- def count(value: Int): Unit
-}
-
-class CageMonitoringImp extends CageMonitoring{
- final val histogram = new Histogram(new ExponentiallyDecayingReservoir())
-
- def count(value: Int) = histogram.update(value)
-
-}
-
-
-@Aspect
-class InceptionAspect {
-
- @DeclareMixin("kamon.instrumentation.ActorCage")
- def mixin: CageMonitoring = new CageMonitoringImp
-
-
- @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)")
- def theActorCageDidIt(actorCage: CageMonitoring) = {}
-
- @After("theActorCageDidIt(actorCage)")
- def afterDoingIt(actorCage: CageMonitoring) = {
- actorCage.count(1)
- actorCage.histogram.getSnapshot.dump(System.out)
- }
-
-
-
-}
-
-
-object Runner extends App {
- val cage = new ActorCage("ivan", 10)
-
- cage.doIt
-}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
new file mode 100644
index 00000000..2239f382
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala
@@ -0,0 +1,115 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import kamon.{TraceContext, Tracer}
+import kamon.trace.UowTracing._
+import kamon.trace.context.TracingAwareContext
+import org.aspectj.lang.ProceedingJoinPoint
+import spray.http.HttpRequest
+import kamon.trace.UowTracing.Finish
+import kamon.trace.UowTracing.Rename
+import spray.http.HttpHeaders.Host
+
+//import spray.can.client.HttpHostConnector.RequestContext
+
+trait ContextAware {
+ def traceContext: Option[TraceContext]
+}
+
+trait TimedContextAware {
+ def timestamp: Long
+ def traceContext: Option[TraceContext]
+}
+
+@Aspect
+class SprayOpenRequestContextTracing {
+ @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest")
+ def mixinContextAwareToOpenRequest: ContextAware = new ContextAware {
+ val traceContext: Option[TraceContext] = Tracer.traceContext.value
+ }
+
+ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
+ def mixinContextAwareToRequestContext: TimedContextAware = new TimedContextAware {
+ val timestamp: Long = System.nanoTime()
+ val traceContext: Option[TraceContext] = Tracer.traceContext.value
+ }
+}
+
+@Aspect
+class SprayServerInstrumentation {
+
+ @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)")
+ def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {}
+
+ @After("openRequestInit(openRequest, request)")
+ def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = {
+ Tracer.start
+ openRequest.traceContext
+
+ Tracer.context().map(_.tracer ! Rename(request.uri.path.toString()))
+ }
+
+ @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)")
+ def openRequestCreation(openRequest: ContextAware): Unit = {}
+
+ @After("openRequestCreation(openRequest)")
+ def afterFinishingRequest(openRequest: ContextAware): Unit = {
+ val original = openRequest.traceContext
+ Tracer.context().map(_.tracer ! Finish())
+
+ if(Tracer.context() != original) {
+ println(s"OMG DIFFERENT Original: [${original}] - Came in: [${Tracer.context}]")
+ }
+ }
+
+ @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)")
+ def requestRecordInit(ctx: TimedContextAware, request: HttpRequest): Unit = {}
+
+ @After("requestRecordInit(ctx, request)")
+ def whenCreatedRequestRecord(ctx: TimedContextAware, request: HttpRequest): Unit = {
+ // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation.
+ for{
+ tctx <- ctx.traceContext
+ host <- request.header[Host]
+ } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host)
+ }
+
+
+
+ @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)")
+ def dispatchToCommander(requestContext: TimedContextAware, message: Any): Unit = {}
+
+ @Around("dispatchToCommander(requestContext, message)")
+ def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TimedContextAware, message: Any) = {
+ println("Completing the request with context: " + requestContext.traceContext)
+
+ Tracer.traceContext.withValue(requestContext.traceContext) {
+ requestContext.traceContext.map {
+ tctx => tctx.tracer ! WebExternalFinish(requestContext.timestamp)
+ }
+ pjp.proceed()
+ }
+
+ }
+
+
+ @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)")
+ def copyingRequestContext(old: TimedContextAware): Unit = {}
+
+ @Around("copyingRequestContext(old)")
+ def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TimedContextAware) = {
+ println("Instrumenting the request context copy.")
+ Tracer.traceContext.withValue(old.traceContext) {
+ pjp.proceed()
+ }
+ }
+}
+
+case class DefaultTracingAwareRequestContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext
+
+@Aspect
+class SprayRequestContextTracing {
+
+ @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
+ def mixin: TracingAwareContext = DefaultTracingAwareRequestContext()
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala
deleted file mode 100644
index fb117968..00000000
--- a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala
+++ /dev/null
@@ -1,6 +0,0 @@
-package kamon.metric
-
-object MetricFilter {
- def actorSystem(system: String): Boolean = !system.startsWith("kamon")
- def actor(path: String, system: String): Boolean = true
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala
deleted file mode 100644
index 5b4ceaf4..00000000
--- a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-package kamon.metric
-
-import com.codahale.metrics._
-
-object MetricsUtils {
-
- def markMeter[T](meter:Meter)(f: => T): T = {
- meter.mark()
- f
- }
-//
-// def incrementCounter(key: String) {
-// counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count
-// }
-//
-// def markMeter(key: String) {
-// meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark()
-// }
-//
-// def trace[T](key: String)(f: => T): T = {
-// val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) )
-// timer.time(f)
-// }
-
-// def markAndCountMeter[T](key: String)(f: => T): T = {
-// markMeter(key)
-// f
-// }
-//
-// def traceAndCount[T](key: String)(f: => T): T = {
-// incrementCounter(key)
-// trace(key) {
-// f
-// }
- //}
-
-// implicit def runnable(f: () => Unit): Runnable =
-// new Runnable() { def run() = f() }
-//
-//
-// import java.util.concurrent.Callable
-//
-// implicit def callable[T](f: () => T): Callable[T] =
-// new Callable[T]() { def call() = f() }
-
-// private val actorCounter:Counter = new Counter
-// private val actorTimer:Timer = new Timer
-//
-// metricsRegistry.register(s"counter-for-${actorName}", actorCounter)
-// metricsRegistry.register(s"timer-for-${actorName}", actorTimer)
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
new file mode 100644
index 00000000..106f27e2
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
@@ -0,0 +1,52 @@
+package kamon.newrelic
+
+import akka.actor.Actor
+import kamon.trace.UowTrace
+import com.newrelic.api.agent.{Response, Request, Trace, NewRelic}
+import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart}
+import java.util
+import java.util.Date
+
+
+class NewRelicReporting extends Actor {
+ def receive = {
+ case trace: UowTrace => recordTransaction(trace)
+ }
+
+ def recordTransaction(uowTrace: UowTrace): Unit = {
+ val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9)
+
+ NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat )
+ NewRelic.recordMetric("WebTransaction", time.toFloat)
+ NewRelic.recordMetric("HttpDispatcher", time.toFloat)
+
+ uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace =>
+ val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat
+
+ println("Web External: " + webExternalTrace)
+ NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external)
+ NewRelic.recordMetric(s"External/${webExternalTrace.host}/all", external)
+ NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external)
+ }
+
+
+ val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp)
+
+
+ def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match {
+ case Nil => accum
+ case head :: tail =>
+ if(head.start > lastEnd)
+ measureExternal(accum + (head.finish-head.start), head.finish, tail)
+ else
+ measureExternal(accum + (head.finish-lastEnd), head.finish, tail)
+ }
+
+ val external = measureExternal(0, 0, allExternals) / 1E9
+
+
+ NewRelic.recordMetric(s"External/all", external.toFloat)
+ NewRelic.recordMetric(s"External/allWeb", external.toFloat)
+
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala b/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala
new file mode 100644
index 00000000..392f53b8
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala
@@ -0,0 +1,28 @@
+package kamon.trace
+
+import spray.routing.directives.BasicDirectives
+import spray.routing._
+import kamon.Tracer
+import java.util.concurrent.atomic.AtomicLong
+import scala.util.Try
+import java.net.InetAddress
+
+trait UowDirectives extends BasicDirectives {
+ def uow: Directive0 = mapRequest { request =>
+ val uowHeader = request.headers.find(_.name == "X-UOW")
+
+ val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow)
+ Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow))
+
+ request
+ }
+}
+
+object UowDirectives {
+ val uowCounter = new AtomicLong
+
+ val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
+
+ def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet())
+
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
new file mode 100644
index 00000000..b09478cc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
@@ -0,0 +1,57 @@
+package kamon.trace
+
+import akka.actor._
+import scala.concurrent.duration.Duration
+import kamon.trace.UowTracing._
+
+sealed trait UowSegment {
+ def timestamp: Long
+}
+
+trait AutoTimestamp extends UowSegment {
+ val timestamp = System.nanoTime
+}
+
+object UowTracing {
+ case class Start() extends AutoTimestamp
+ case class Finish() extends AutoTimestamp
+ case class Rename(name: String) extends AutoTimestamp
+ case class WebExternalStart(id: Long, host: String) extends AutoTimestamp
+ case class WebExternalFinish(id: Long) extends AutoTimestamp
+ case class WebExternal(start: Long, finish: Long, host: String) extends AutoTimestamp
+}
+
+case class UowTrace(name: String, segments: Seq[UowSegment])
+
+
+class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging {
+ context.setReceiveTimeout(aggregationTimeout)
+
+ var name: Option[String] = None
+ var segments: Seq[UowSegment] = Nil
+
+ var pendingExternal = List[WebExternalStart]()
+
+ def receive = {
+ case finish: Finish => segments = segments :+ finish; finishTracing()
+ case wes: WebExternalStart => pendingExternal = pendingExternal :+ wes
+ case finish @ WebExternalFinish(id) => pendingExternal.find(_.id == id).map(start => {
+ segments = segments :+ WebExternal(start.timestamp, finish.timestamp, start.host)
+ })
+ case Rename(newName) => name = Some(newName)
+ case segment: UowSegment => segments = segments :+ segment
+ case ReceiveTimeout =>
+ log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments)
+ context.stop(self)
+ }
+
+ def finishTracing(): Unit = {
+ reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments)
+ println("Recorded Segments: " + segments)
+ context.stop(self)
+ }
+}
+
+object UowTraceAggregator {
+ def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala
new file mode 100644
index 00000000..3766dd22
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala
@@ -0,0 +1,8 @@
+package kamon.trace.context
+
+import kamon.TraceContext
+
+trait TracingAwareContext {
+ def traceContext: Option[TraceContext]
+ def timestamp: Long
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/spraytest/ClientTest.scala b/kamon-core/src/main/scala/spraytest/ClientTest.scala
deleted file mode 100644
index 07532d0a..00000000
--- a/kamon-core/src/main/scala/spraytest/ClientTest.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-package spraytest
-
-import akka.actor.ActorSystem
-import spray.client.pipelining._
-import spray.httpx.SprayJsonSupport
-import spray.json._
-import scala.concurrent.Future
-import spray.can.Http
-import akka.io.IO
-
-/**
- * BEGIN JSON Infrastructure
- */
-case class Container(data: List[PointOfInterest])
-case class Geolocation(latitude: Float, longitude: Float)
-case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation)
-
-object GeoJsonProtocol extends DefaultJsonProtocol {
- implicit val geolocationFormat = jsonFormat2(Geolocation)
- implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest)
- implicit val containerFormat = jsonFormat1(Container)
-}
-/** END-OF JSON Infrastructure */
-
-
-
-
-
-
-class ClientTest extends App {
- implicit val actorSystem = ActorSystem("spray-client-test")
- import actorSystem.dispatcher
-
-
- import GeoJsonProtocol._
- import SprayJsonSupport._
-
-
- val actor = IO(Http)
-
- val pipeline = sendReceive ~> unmarshal[Container]
-
- val response = pipeline {
- Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco")
- } onSuccess {
- case a => {
- println(a)
- }
- }
-}
-
-
-
-
-
diff --git a/kamon-core/src/main/scala/spraytest/FutureTesting.scala b/kamon-core/src/main/scala/spraytest/FutureTesting.scala
deleted file mode 100644
index b864d6d6..00000000
--- a/kamon-core/src/main/scala/spraytest/FutureTesting.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-package spraytest
-/*
-import akka.actor.ActorSystem
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Try, Success}
-import kamon.actor.TransactionContext
-
-object FutureTesting extends App {
-
- val actorSystem = ActorSystem("future-testing")
- implicit val ec = actorSystem.dispatcher
- implicit val tctx = TransactionContext(11, Nil)
-
- threadPrintln("In the initial Thread")
-
-
- val f = TraceableFuture {
- threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}")
- }
-
- f.onComplete({
- case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}")
- })
-
- f.onComplete({
- case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}")
- })
-
-
-
-
-
-
-
-
- def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]")
-
-}
-
-
-
-
-trait TransactionContextWrapper {
- def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = {
- TransactionContext.current.set(tranContext.fork)
- println(s"SetContext to: ${tranContext}")
- val result = f
-
- TransactionContext.current.remove()
- result
- }
-
-}
-
-class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper {
- def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = {
- future.onComplete(wrap(func, transactionContext))
- }
-}
-
-object TraceableFuture {
-
- implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future
-
- def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = {
- val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil))
-
- new TraceableFuture(Future { wrappedBody })
- }
-
-
-
-
- def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = {
- TransactionContext.current.set(transactionContext)
- val result = body
- TransactionContext.current.remove()
- result
- }
-}*/
-
diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala
deleted file mode 100644
index f9d6869c..00000000
--- a/kamon-core/src/main/scala/test/PingPong.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-package test
-
-import akka.actor.{Props, Actor, ActorSystem}
-
-object PingPong extends App {
-
- val as = ActorSystem("ping-pong")
-
- val pinger = as.actorOf(Props[Pinger])
- val ponger = as.actorOf(Props[Ponger])
-
- pinger.tell(Pong, ponger)
-
-
- Thread.sleep(30000)
- as.shutdown()
-
-
-}
-
-case object Ping
-case object Pong
-
-class Pinger extends Actor {
- def receive = {
- case Pong => sender ! Ping
- }
-}
-
-class Ponger extends Actor {
- def receive = {
- case Ping => sender ! Pong
- }
-}
diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala
new file mode 100644
index 00000000..b1727d2b
--- /dev/null
+++ b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala
@@ -0,0 +1,97 @@
+package test
+
+import akka.actor._
+import kamon.Tracer
+import spray.routing.SimpleRoutingApp
+import akka.util.Timeout
+import spray.httpx.RequestBuilding
+import scala.concurrent.{Await, Future}
+import kamon.trace.UowDirectives
+
+object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives {
+ import scala.concurrent.duration._
+ import spray.client.pipelining._
+ import akka.pattern.ask
+
+ implicit val system = ActorSystem("test")
+ import system.dispatcher
+
+ implicit val timeout = Timeout(30 seconds)
+
+ val pipeline = sendReceive
+ val replier = system.actorOf(Props[Replier])
+
+ startServer(interface = "localhost", port = 9090) {
+ get {
+ path("test"){
+ uow {
+ complete {
+ val futures = pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: Nil
+
+ Future.sequence(futures).map(l => "Ok")
+ }
+ }
+ } ~
+ path("reply" / Segment) { reqID =>
+ uow {
+ complete {
+ if (Tracer.context().isEmpty)
+ println("ROUTE NO CONTEXT")
+
+ (replier ? reqID).mapTo[String]
+ }
+ }
+ } ~
+ path("ok") {
+ complete("ok")
+ } ~
+ path("future") {
+ dynamic {
+ complete(Future { "OK" })
+ }
+ } ~
+ path("error") {
+ complete {
+ throw new NullPointerException
+ "okk"
+ }
+ }
+ }
+ }
+
+}
+
+object Verifier extends App {
+
+ def go: Unit = {
+ import scala.concurrent.duration._
+ import spray.client.pipelining._
+
+ implicit val system = ActorSystem("test")
+ import system.dispatcher
+
+ implicit val timeout = Timeout(30 seconds)
+
+ val pipeline = sendReceive
+
+ val futures = Future.sequence(for(i <- 1 to 500) yield {
+ pipeline(Get("http://127.0.0.1:9090/reply/"+i)).map(r => r.entity.asString == i.toString)
+ })
+ println("Everything is: "+ Await.result(futures, 10 seconds).forall(a => a == true))
+ }
+
+
+
+
+}
+
+class Replier extends Actor with ActorLogging {
+ def receive = {
+ case anything =>
+ if(Tracer.context.isEmpty)
+ log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT")
+
+ log.info("Processing at the Replier")
+ sender ! anything
+ }
+}