aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
commit923b88e8adef2f66b43e551fa4a0a1bbae5af7ff (patch)
treed555199f0c63b690ec51805b496ee2d54eb014da /kamon-core
parent1e6665e30d96772eab92aca4d23e176adcd88dc5 (diff)
downloadKamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.gz
Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.bz2
Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.zip
upgrading to akka 2.2
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml34
-rw-r--r--kamon-core/src/main/resources/application.conf47
-rw-r--r--kamon-core/src/main/resources/newrelic.yml242
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala132
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContext.scala67
-rw-r--r--kamon-core/src/main/scala/kamon/TraceContextSwap.scala26
-rw-r--r--kamon-core/src/main/scala/kamon/TransactionPublisher.scala15
-rw-r--r--kamon-core/src/main/scala/kamon/executor/eventbus.scala103
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala89
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala23
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala245
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala73
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala61
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala49
-rw-r--r--kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala67
-rw-r--r--kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricFilter.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Metrics.scala146
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala51
-rw-r--r--kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala51
-rw-r--r--kamon-core/src/main/scala/spraytest/ClientTest.scala55
-rw-r--r--kamon-core/src/main/scala/spraytest/FutureTesting.scala81
-rw-r--r--kamon-core/src/main/scala/test/PingPong.scala34
-rw-r--r--kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala45
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala22
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala34
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala53
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala82
28 files changed, 1945 insertions, 0 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
new file mode 100644
index 00000000..e6d61fa1
--- /dev/null
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,34 @@
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+
+<aspectj>
+ <weaver options="-verbose -showWeaveInfo">
+ <!--<dump within="*" beforeandafter="true"/>-->
+ </weaver>
+
+ <aspects>
+
+ <aspect name="kamon.instrumentation.ActorRefTellInstrumentation"/>
+ <aspect name="kamon.instrumentation.ActorCellInvokeInstrumentation"/>
+ <aspect name="kamon.instrumentation.RunnableInstrumentation" />
+ <aspect name="kamon.instrumentation.MessageQueueInstrumentation" />
+
+ <aspect name="kamon.instrumentation.InceptionAspect"/>
+
+ <!-- ExecutorService Instrumentation for Akka. -->
+<!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/>
+ <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>-->
+ <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/>
+ <aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/>
+
+
+
+ <include within="*"/>
+ <exclude within="javax..*"/>
+ <exclude within="org.aspectj..*"/>
+ <exclude within="scala..*"/>
+ <exclude within="scalaz..*"/>
+ <exclude within="scalad..*"/>
+ <exclude within="play..*"/>
+ </aspects>
+
+</aspectj>
diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf
new file mode 100644
index 00000000..370acae9
--- /dev/null
+++ b/kamon-core/src/main/resources/application.conf
@@ -0,0 +1,47 @@
+akka {
+ actor {
+ default-dispatcher {
+ fork-join-executor {
+ # Min number of threads to cap factor-based parallelism number to
+ parallelism-min = 2
+
+ # The parallelism factor is used to determine thread pool size using the
+ # following formula: ceil(available processors * factor). Resulting size
+ # is then bounded by the parallelism-min and parallelism-max values.
+ parallelism-factor = 3.0
+
+ # Max number of threads to cap factor-based parallelism number to
+ parallelism-max = 8
+ }
+
+ throughput = 100
+ }
+ }
+}
+
+# Dispatcher is the name of the event-based dispatcher
+#type = Dispatcher
+
+# What kind of ExecutionService to use
+#executor = "kamon.executor.InstrumentedExecutorServiceConfigurator"
+
+# Min number of threads to cap factor-based parallelism number to
+#parallelism-min = 2
+
+# Parallelism (threads) ... ceil(available processors * factor)
+#parallelism-factor = 2.0
+
+# Max number of threads to cap factor-based parallelism number to
+#parallelism-max = 10
+
+# Throughput defines the maximum number of messages to be
+# processed per actor before the thread jumps to the next actor.
+# Set to 1 for as fair as possible.
+#throughput = 100
+
+
+
+
+
+
+
diff --git a/kamon-core/src/main/resources/newrelic.yml b/kamon-core/src/main/resources/newrelic.yml
new file mode 100644
index 00000000..1b1ad53b
--- /dev/null
+++ b/kamon-core/src/main/resources/newrelic.yml
@@ -0,0 +1,242 @@
+#
+# This file configures the New Relic Agent. New Relic monitors
+# Java applications with deep visibility and low overhead. For more
+# information, visit www.newrelic.com.
+#
+# This configuration file is custom generated for Ivan Topolnak - ivantopo@gmail.com
+#
+# This section is for settings common to all environments.
+# Do not add anything above this next line.
+common: &default_settings
+ #
+ # ============================== LICENSE KEY ===============================
+
+ # You must specify the license key associated with your New Relic
+ # account. This key binds your Agent's data to your account in the
+ # New Relic service.
+ license_key: '2e24765acb032cb9e7207013b5ba3e2ab7d2d75c'
+
+ # Agent Enabled
+ # Use this setting to force the agent to run or not run.
+ # Default is true.
+ # agent_enabled: true
+
+ # Set to true to enable support for auto app naming.
+ # The name of each web app is detected automatically
+ # and the agent reports data separately for each one.
+ # This provides a finer-grained performance breakdown for
+ # web apps in New Relic.
+ # Default is false.
+ enable_auto_app_naming: false
+
+ # Set to true to enable component-based transaction naming.
+ # Set to false to use the URI of a web request as the name of the transaction.
+ # Default is true.
+ enable_auto_transaction_naming: true
+
+ # Set the name of your application as you'd like it show up in New Relic.
+ # if enable_auto_app_naming is false, the agent reports all data to this application.
+ # Otherwise, the agent reports only background tasks (transactions for non-web applications) to this application.
+ # To report data to more than one application, separate the application names with ";".
+ # For example, to report data to"My Application" and "My Application 2" use this:
+ # app_name: My Application;My Application 2
+ # This setting is required.
+ app_name: My Application
+
+ # The agent uses its own log file to keep its logging
+ # separate from that of your application. Specify the log level here.
+ # This setting is dynamic, so changes do not require restarting your application.
+ # The levels in increasing order of verboseness are: off, severe, warning, info, fine, finer, finest
+ # Default is info.
+ log_level: finest
+ enable_custom_tracing: true
+
+ # Log all data to and from New Relic in plain text.
+ # This setting is dynamic, so changes do not require restarting your application.
+ # Default is false.
+ #audit_mode: true
+
+ # The number of log files to use.
+ # Default is 1.
+ #log_file_count: 1
+
+ # The maximum number of bytes to write to any one log file.
+ # Default is 0 (no limit).
+ #log_limit_in_kbytes: 0
+
+ # The name of the log file.
+ # Default is newrelic_agent.log.
+ #log_file_name: newrelic_agent.log
+
+ # The log file directory.
+ # Default is the logs directory in the newrelic.jar parent directory.
+ log_file_path: /home/ivantopo/Desktop/tmp
+
+ # The agent communicates with New Relic via https by
+ # default. If you want to communicate with newrelic via http,
+ # then turn off SSL by setting this value to false.
+ # This work is done asynchronously to the threads that process your
+ # application code, so response times will not be directly affected
+ # by this change.
+ # Default is true.
+ ssl: true
+
+ # Proxy settings for connecting to the New Relic server.
+ #
+ # If a proxy is used, the host setting is required. Other settings
+ # are optional. Default port is 8080. The username and password
+ # settings will be used to authenticate to Basic Auth challenges
+ # from a proxy server.
+ #
+ # proxy_host: hostname
+ # proxy_port: 8080
+ # proxy_user: username
+ # proxy_password: password
+
+ # Tells transaction tracer and error collector (when enabled)
+ # whether or not to capture HTTP params. When true, frameworks can
+ # exclude HTTP parameters from being captured.
+ # Default is false.
+ capture_params: false
+
+ # Tells transaction tracer and error collector to not to collect
+ # specific http request parameters.
+ # ignored_params: credit_card, ssn, password
+
+ # Transaction tracer captures deep information about slow
+ # transactions and sends this to the New Relic service once a
+ # minute. Included in the transaction is the exact call sequence of
+ # the transactions including any SQL statements issued.
+ transaction_tracer:
+
+ # Transaction tracer is enabled by default. Set this to false to
+ # turn it off. This feature is only available at the higher product levels.
+ # Default is true.
+ enabled: true
+
+ # Threshold in seconds for when to collect a transaction
+ # trace. When the response time of a controller action exceeds
+ # this threshold, a transaction trace will be recorded and sent to
+ # New Relic. Valid values are any float value, or (default) "apdex_f",
+ # which will use the threshold for the "Frustrated" Apdex level
+ # (greater than four times the apdex_t value).
+ # Default is apdex_f.
+ transaction_threshold: apdex_f
+
+ # When transaction tracer is on, SQL statements can optionally be
+ # recorded. The recorder has three modes, "off" which sends no
+ # SQL, "raw" which sends the SQL statement in its original form,
+ # and "obfuscated", which strips out numeric and string literals.
+ # Default is obfuscated.
+ record_sql: obfuscated
+
+ # Obfuscate only occurrences of specific SQL fields names.
+ # This setting only applies if "record_sql" is set to "raw".
+ #obfuscated_sql_fields: credit_card, ssn, password
+
+ # Set this to true to log SQL statements instead of recording them.
+ # SQL is logged using the record_sql mode.
+ # Default is false.
+ log_sql: false
+
+ # Threshold in seconds for when to collect stack trace for a SQL
+ # call. In other words, when SQL statements exceed this threshold,
+ # then capture and send to New Relic the current stack trace. This is
+ # helpful for pinpointing where long SQL calls originate from.
+ # Default is 0.5 seconds.
+ stack_trace_threshold: 0.5
+
+ # Determines whether the agent will capture query plans for slow
+ # SQL queries. Only supported for MySQL and PostgreSQL.
+ # Default is true.
+ explain_enabled: true
+
+ # Threshold for query execution time below which query plans will not
+ # not be captured. Relevant only when `explain_enabled` is true.
+ # Default is 0.5 seconds.
+ explain_threshold: 0.5
+
+ # Use this setting to control the variety of transaction traces.
+ # The higher the setting, the greater the variety.
+ # Set this to 0 to always report the slowest transaction trace.
+ # Default is 20.
+ top_n: 20
+
+
+ # Error collector captures information about uncaught exceptions and
+ # sends them to New Relic for viewing
+ error_collector:
+
+ # Error collector is enabled by default. Set this to false to turn
+ # it off. This feature is only available at the higher product levels.
+ # Default is true.
+ enabled: true
+
+ # To stop specific exceptions from reporting to New Relic, set this property
+ # to a comma separated list of full class names.
+ #
+ # ignore_errors:
+
+ # To stop specific http status codes from being reporting to New Relic as errors,
+ # set this property to a comma separated list of status codes to ignore.
+ # When this property is commented out it defaults to ignoring 404s.
+ #
+ # ignore_status_codes: 404
+
+ # Cross Application Tracing adds request and response headers to
+ # external calls using the Apache HttpClient libraries to provided better
+ # performance data when calling applications monitored by other New Relic Agents.
+ #
+ cross_application_tracer:
+ # Set to true to enable cross application tracing.
+ # Default is true.
+ enabled: true
+
+ # Thread profiler measures wall clock time, CPU time, and method call counts
+ # in your application's threads as they run.
+ thread_profiler:
+
+ # Set to false to disable the thread profiler.
+ # Default is true.
+ enabled: true
+
+ #============================== Browser Monitoring ===============================
+ # New Relic Real User Monitoring gives you insight into the performance real users are
+ # experiencing with your website. This is accomplished by measuring the time it takes for
+ # your users' browsers to download and render your web pages by injecting a small amount
+ # of JavaScript code into the header and footer of each page.
+ browser_monitoring:
+ # By default the agent automatically inserts API calls in compiled JSPs to
+ # inject the monitoring JavaScript into web pages.
+ # Set this attribute to false to turn off this behavior.
+ auto_instrument: true
+ # Set this attribute to false to prevent injection of the monitoring JavaScript.
+ # Default is true.
+ enabled: true
+
+# Application Environments
+# ------------------------------------------
+# Environment specific settings are in this section.
+# You can use the environment to override the default settings.
+# For example, to change the app_name setting.
+# Use -Dnewrelic.environment=<environment> on the Java command line
+# to set the environment.
+# The default environment is production.
+
+# NOTE if your application has other named environments, you should
+# provide configuration settings for these environments here.
+
+development:
+ <<: *default_settings
+ app_name: KAMON[Development]
+
+test:
+ <<: *default_settings
+ app_name: My Application (Test)
+
+production:
+ <<: *default_settings
+
+staging:
+ <<: *default_settings
+ app_name: My Application (Staging) \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
new file mode 100644
index 00000000..c3080909
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -0,0 +1,132 @@
+package kamon
+
+import akka.actor.{Actor, Props, ActorSystem}
+import scala.collection.JavaConverters._
+import java.util.concurrent.ConcurrentHashMap
+import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics}
+import scala.concurrent.duration.{FiniteDuration, Duration}
+import com.newrelic.api.agent.NewRelic
+
+object Kamon {
+
+ val ctx = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue() = None
+ }
+
+ implicit lazy val actorSystem = ActorSystem("kamon")
+
+
+ def context() = ctx.get()
+ def clear = ctx.remove()
+ def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
+
+ def start = set(newTraceContext)
+ def stop = ctx.get match {
+ case Some(context) => context.close
+ case None =>
+ }
+
+ def newTraceContext(): TraceContext = TraceContext()
+
+
+ val publisher = actorSystem.actorOf(Props[TransactionPublisher])
+
+ def publish(tx: FullTransaction) = publisher ! tx
+
+
+
+ object Metric {
+ val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala
+
+ def actorSystemNames: List[String] = actorSystems.keys.toList
+ def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name))
+
+ def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
+ }
+
+
+
+ val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
+ val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
+
+}
+
+
+
+
+
+
+
+
+
+object Tracer {
+ val ctx = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue() = None
+ }
+
+ def context() = ctx.get()
+ def clear = ctx.remove()
+ def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
+
+ def start = ??? //set(newTraceContext)
+ def stop = ctx.get match {
+ case Some(context) => context.close
+ case None =>
+ }
+
+ //def newTraceContext(): TraceContext = TraceContext()
+}
+
+
+class MetricManager extends Actor {
+ implicit val ec = context.system.dispatcher
+
+ def receive = {
+ case RegisterForAllDispatchers(frequency) => {
+ val subscriber = sender
+ context.system.scheduler.schedule(frequency, frequency) {
+ Kamon.Metric.actorSystems.foreach {
+ case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach {
+ case (dispatcherName, dispatcherMetrics) => {
+ val activeThreads = dispatcherMetrics.activeThreadCount.snapshot
+ val poolSize = dispatcherMetrics.poolSize.snapshot
+ val queueSize = dispatcherMetrics.queueSize.snapshot
+
+ subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize)
+
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+case class RegisterForAllDispatchers(frequency: FiniteDuration)
+case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot)
+
+
+
+
+
+
+class NewrelicReporterActor extends Actor {
+ import scala.concurrent.duration._
+
+ Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)
+
+ def receive = {
+ case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => {
+ /*println("PUBLISHED DISPATCHER STATS")
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat)
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat))
+ println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)*/
+
+
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat)
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat))
+
+ NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat)
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala
new file mode 100644
index 00000000..6b32550f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/TraceContext.scala
@@ -0,0 +1,67 @@
+package kamon
+
+import java.util.UUID
+import akka.actor.{ActorSystem, ActorPath}
+import akka.agent.Agent
+import java.util.concurrent.TimeUnit
+import scala.util.{Failure, Success}
+import akka.util.Timeout
+
+
+case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) {
+ implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+ implicit val as = Kamon.actorSystem.dispatcher
+
+ def append(entry: TraceEntry) = entries send (entry :: _)
+ def close = entries.future.onComplete({
+ case Success(list) => Kamon.publish(FullTransaction(id, list))
+ case Failure(t) => println("WTF!")
+ })
+}
+
+object TraceContext {
+ implicit val as2 = Kamon.actorSystem.dispatcher
+ def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil))
+}
+
+
+
+trait TraceEntry
+
+case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry
+
+
+
+case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry])
+
+
+
+
+
+object Collector {
+
+}
+
+trait TraceEntryStorage {
+ def store(entry: TraceEntry): Boolean
+}
+
+class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) {
+ def store(entry: TraceEntry) = storage.store(entry)
+}
+
+object ThreadLocalTraceEntryStorage extends TraceEntryStorage {
+
+ private val storage = new ThreadLocal[List[TraceEntry]] {
+ override def initialValue(): List[TraceEntry] = Nil
+ }
+
+ def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get)
+
+ def store(entry: TraceEntry): Boolean = {
+ update(entry :: _)
+ true
+ }
+}
+
+
diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
new file mode 100644
index 00000000..68ee808b
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala
@@ -0,0 +1,26 @@
+package kamon
+
+/**
+ * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards.
+ */
+trait TraceContextSwap {
+
+ def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body)
+
+ def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = {
+ ctx match {
+ case Some(context) => {
+ Kamon.set(context)
+ val bodyResult = primary
+ Kamon.clear
+
+ bodyResult
+ }
+ case None => fallback
+ }
+
+ }
+
+}
+
+object TraceContextSwap extends TraceContextSwap
diff --git a/kamon-core/src/main/scala/kamon/TransactionPublisher.scala b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala
new file mode 100644
index 00000000..0626b91d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala
@@ -0,0 +1,15 @@
+package kamon
+
+import akka.actor.Actor
+import java.util.UUID
+
+class TransactionPublisher extends Actor {
+
+ def receive = {
+ case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries")
+ }
+
+}
+
+
+case class FullTransaction(id: UUID, entries: List[TraceEntry])
diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
new file mode 100644
index 00000000..599f2a7a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
@@ -0,0 +1,103 @@
+package kamon.executor
+
+import akka.event.ActorEventBus
+import akka.event.LookupClassification
+import akka.actor._
+import java.util.concurrent.TimeUnit
+
+import kamon.{CodeBlockExecutionTime, Kamon, TraceContext}
+import akka.util.Timeout
+import scala.util.{Random, Success, Failure}
+import scala.concurrent.Future
+
+trait Message
+
+case class PostMessage(text:String) extends Message
+
+case class MessageEvent(val channel:String, val message:Message)
+
+class AppActorEventBus extends ActorEventBus with LookupClassification{
+ type Event = MessageEvent
+ type Classifier=String
+ protected def mapSize(): Int={
+ 10
+ }
+
+ protected def classify(event: Event): Classifier={
+ event.channel
+ }
+
+ protected def publish(event: Event, subscriber: Subscriber): Unit={
+ subscriber ! event
+ }
+}
+case class Ping()
+case class Pong()
+
+class PingActor extends Actor with ActorLogging {
+
+ val pong = context.actorOf(Props[PongActor])
+ val random = new Random()
+ def receive = {
+ case Pong() => {
+ //Thread.sleep(random.nextInt(2000))
+ //log.info("Message from Ping")
+ pong ! Ping()
+ }
+ }
+}
+
+class PongActor extends Actor with ActorLogging {
+ def receive = {
+ case Ping() => {
+ sender ! Pong()
+ }
+ }
+}
+
+
+object TryAkka extends App{
+ val system = ActorSystem("MySystem")
+ val appActorEventBus=new AppActorEventBus
+ val NEW_POST_CHANNEL="/posts/new"
+ val subscriber = system.actorOf(Props(new Actor {
+ def receive = {
+ case d: MessageEvent => println(d)
+ }
+ }))
+
+ Kamon.start
+ for(i <- 1 to 4) {
+ val ping = system.actorOf(Props[PingActor])
+ ping ! Pong()
+ }
+
+
+ def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")
+
+ /*
+ val newRelicReporter = new NewRelicReporter(registry)
+ newRelicReporter.start(1, TimeUnit.SECONDS)
+
+*/
+ import akka.pattern.ask
+ implicit val timeout = Timeout(10, TimeUnit.SECONDS)
+ implicit def execContext = system.dispatcher
+
+
+
+ Kamon.start
+
+ Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
+ threadPrintln("Before doing it")
+ val f = Future { threadPrintln("This is happening inside the future body") }
+
+ Kamon.stop
+
+
+ //Thread.sleep(3000)
+ //system.shutdown()
+
+/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
+ appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
new file mode 100644
index 00000000..82915ce9
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -0,0 +1,89 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import akka.actor.{Props, ActorSystem, ActorRef}
+import kamon.{Kamon, TraceContext}
+import akka.dispatch.{MessageDispatcher, Envelope}
+import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram}
+import kamon.metric.{MetricDirectory, Metrics}
+import com.codahale.metrics
+import kamon.instrumentation.TraceableMessage
+import scala.Some
+
+case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
+
+
+@Aspect
+class ActorRefTellInstrumentation {
+ import ProceedingJoinPointPimp._
+
+ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)")
+ def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
+
+ @Around("sendingMessageToActorRef(actor, message, sender)")
+ def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = {
+
+ val actorName = MetricDirectory.nameForActor(actor)
+ val t = Metrics.registry.timer(actorName + "LATENCY")
+ //println(s"About to proceed with: $actor $message $sender ${Kamon.context}")
+ pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender)
+ }
+}
+
+
+@Aspect("perthis(actorCellCreation(..))")
+class ActorCellInvokeInstrumentation {
+
+ var processingTimeTimer: Timer = _
+ var shouldTrack = false
+
+ // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.
+
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
+ def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
+
+ @After("actorCellCreation(system, ref, props, dispatcher, parent)")
+ def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+ val actorName = MetricDirectory.nameForActor(ref)
+ val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
+
+ /** TODO: Find a better way to filter the things we don't want to measure. */
+ //if(system.name != "kamon" && actorName.startsWith("/user")) {
+ processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
+ shouldTrack = true
+ //}
+ }
+
+
+ @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+
+
+ @Around("invokingActorBehaviourAtActorCell(envelope)")
+ def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
+ import ProceedingJoinPointPimp._
+ println("ENVELOPE --------------------->"+envelope)
+ envelope match {
+ case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
+ timer.stop()
+
+ val originalEnvelope = envelope.copy(message = msg)
+
+ //println("PROCESSING TIME TIMER: "+processingTimeTimer)
+ val pt = processingTimeTimer.time()
+ ctx match {
+ case Some(c) => {
+ Kamon.set(c)
+ println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
+ pjp.proceedWith(originalEnvelope)
+ Kamon.clear
+ }
+ case None => pjp.proceedWith(originalEnvelope)
+ }
+ pt.stop()
+ }
+ case _ => pjp.proceed
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala
new file mode 100644
index 00000000..84c20c52
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala
@@ -0,0 +1,23 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.ProceedingJoinPoint
+
+trait ProceedingJoinPointPimp {
+ import language.implicitConversions
+
+ implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp)
+}
+
+object ProceedingJoinPointPimp extends ProceedingJoinPointPimp
+
+case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) {
+ def proceedWith(newUniqueArg: AnyRef) = {
+ val args = pjp.getArgs
+ args.update(0, newUniqueArg)
+ pjp.proceed(args)
+ }
+
+ def proceedWithTarget(args: AnyRef*) = {
+ pjp.proceed(args.toArray)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
new file mode 100644
index 00000000..b4f8a475
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -0,0 +1,245 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import java.util.concurrent._
+import org.aspectj.lang.ProceedingJoinPoint
+import java.util
+import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector}
+import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory}
+import com.typesafe.config.Config
+import kamon.Kamon
+import scala.concurrent.forkjoin.ForkJoinPool
+import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
+
+
+@Aspect
+class ActorSystemInstrumentation {
+
+ @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)")
+ def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {}
+
+ @After("actorSystemInstantiation(name, applicationConfig, classLoader)")
+ def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = {
+
+ Kamon.Metric.registerActorSystem(name)
+ }
+}
+
+@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
+class ForkJoinPoolInstrumentation {
+ var activeThreadsHistogram: Histogram = _
+ var poolSizeHistogram: Histogram = _
+
+ @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
+ def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
+
+ @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)")
+ def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = {
+ val (actorSystemName, dispatcherName) = threadFactory match {
+ case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames)
+ case _ => ("Unknown", "Unknown")
+ }
+
+ val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName)
+ for(m <- metrics) {
+ activeThreadsHistogram = m.activeThreadCount
+ poolSizeHistogram = m.poolSize
+ println(s"Registered $dispatcherName for actor system $actorSystemName")
+ }
+ }
+
+ def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = {
+ knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown"))
+ }
+
+
+
+
+ @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)")
+ def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {}
+
+ @After("forkJoinScan(fjp)")
+ def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
+ activeThreadsHistogram.update(fjp.getActiveThreadCount)
+ poolSizeHistogram.update(fjp.getPoolSize)
+ }
+
+
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+/**
+ * ExecutorService monitoring base:
+ */
+trait ExecutorServiceCollector {
+ def updateActiveThreadCount(diff: Int): Unit
+ def updateTotalThreadCount(diff: Int): Unit
+ def updateQueueSize(diff: Int): Unit
+}
+
+trait WatchedExecutorService {
+ def collector: ExecutorServiceCollector
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+trait ExecutorServiceMonitoring {
+ def dispatcherMetrics: DispatcherMetricCollector
+}
+
+class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring {
+ @volatile var dispatcherMetrics: DispatcherMetricCollector = _
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
+ def createExecutorService: ExecutorService = delegate.createExecutorService
+}
+
+@Aspect
+class ExecutorServiceFactoryProviderInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()")
+ def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = {
+ true
+ }
+
+ @Around("factoryMethodCall(dispatcherName, threadFactory)")
+ def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
+ val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast
+
+ val actorSystemName = threadFactory match {
+ case m: MonitorableThreadFactory => m.name
+ case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
+ }
+
+ new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate)
+ }
+
+}
+
+
+@Aspect
+class NamedExecutorServiceFactoryDelegateInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)")
+ def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {}
+
+ @Around("factoryMethodCall(namedFactory)")
+ def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
+ val delegate = pjp.proceed().asInstanceOf[ExecutorService]
+ val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
+
+ ExecutorServiceMetricCollector.register(executorFullName, delegate)
+
+ new NamedExecutorServiceDelegate(executorFullName, delegate)
+ }
+}
+
+case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
+ def shutdown() = {
+ ExecutorServiceMetricCollector.deregister(fullName)
+ delegate.shutdown()
+ }
+ def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
+ def isShutdown: Boolean = delegate.isShutdown
+ def isTerminated: Boolean = delegate.isTerminated
+ def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit)
+ def submit[T](task: Callable[T]): Future[T] = delegate.submit(task)
+ def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result)
+ def submit(task: Runnable): Future[_] = delegate.submit(task)
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks)
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit)
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
+ def execute(command: Runnable) = delegate.execute(command)
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
new file mode 100644
index 00000000..c21502ac
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
@@ -0,0 +1,73 @@
+package kamon.instrumentation
+
+import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
+import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue}
+import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
+import akka.actor.{ActorSystem, ActorRef}
+import kamon.metric.{Metrics, MetricDirectory}
+import org.aspectj.lang.ProceedingJoinPoint
+
+
+/**
+ * For Mailboxes we would like to track the queue size and message latency. Currently the latency
+ * will be gathered from the ActorCellMetrics.
+ */
+
+
+@Aspect
+class MessageQueueInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)")
+ def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {}
+
+ @Around("messageQueueCreation(owner, system)")
+ def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
+ val delegate = pjp.proceed.asInstanceOf[MessageQueue]
+
+ // We are not interested in monitoring mailboxes if we don't know where they belong to.
+ val monitoredMailbox = for(own <- owner; sys <- system) yield {
+ val systemName = sys.name
+ val ownerName = MetricDirectory.nameForActor(own)
+ val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName)
+
+ val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir())
+ Metrics.include(mailBoxName, queueSizeHistogram)
+
+ new MonitoredMessageQueue(delegate, queueSizeHistogram)
+ }
+
+ monitoredMailbox match {
+ case None => delegate
+ case Some(mmb) => mmb
+ }
+ }
+}
+
+
+class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{
+
+ def enqueue(receiver: ActorRef, handle: Envelope) = {
+ delegate.enqueue(receiver, handle)
+ queueSizeHistogram.update(numberOfMessages)
+ }
+
+ def dequeue(): Envelope = {
+ val envelope = delegate.dequeue()
+ queueSizeHistogram.update(numberOfMessages)
+
+ envelope
+ }
+
+ def numberOfMessages: Int = delegate.numberOfMessages
+ def hasMessages: Boolean = delegate.hasMessages
+ def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters)
+}
+
+
+
+
+
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
new file mode 100644
index 00000000..e75a638f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
@@ -0,0 +1,61 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import kamon.{Kamon, TraceContext}
+import org.aspectj.lang.ProceedingJoinPoint
+import scala.Some
+
+/**
+ * Marker interface, just to make sure we don't instrument all the Runnables in the classpath.
+ */
+trait TraceContextAwareRunnable extends Runnable {}
+
+
+@Aspect("perthis(instrumentedRunnableCreation())")
+class RunnableInstrumentation {
+
+ /**
+ * These are the Runnables that need to be instrumented and make the TraceContext available
+ * while their run method is executed.
+ */
+ @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
+ def onCompleteCallbacksRunnable: TraceContextAwareRunnable = null
+
+
+ /**
+ * Pointcuts
+ */
+
+ @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..))")
+ def instrumentedRunnableCreation(): Unit = {}
+
+ @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable.run())")
+ def runnableExecution() = {}
+
+
+ /**
+ * Aspect members
+ */
+
+ private val traceContext = Kamon.context
+
+
+ /**
+ * Advices
+ */
+ import kamon.TraceContextSwap.withContext
+
+ @Before("instrumentedRunnableCreation()")
+ def beforeCreation = {
+ //println((new Throwable).getStackTraceString)
+ }
+
+
+ @Around("runnableExecution()")
+ def around(pjp: ProceedingJoinPoint) = {
+ import pjp._
+
+ withContext(traceContext, proceed())
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
new file mode 100644
index 00000000..74261403
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
@@ -0,0 +1,49 @@
+package kamon.instrumentation
+
+import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
+import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect}
+
+class ActorCage(val name: String, val size: Int) {
+
+ def doIt: Unit = println("name")
+}
+
+trait CageMonitoring {
+ def histogram: Histogram
+ def count(value: Int): Unit
+}
+
+class CageMonitoringImp extends CageMonitoring{
+ final val histogram = new Histogram(new ExponentiallyDecayingReservoir())
+
+ def count(value: Int) = histogram.update(value)
+
+}
+
+
+@Aspect
+class InceptionAspect {
+
+ @DeclareMixin("kamon.instrumentation.ActorCage")
+ def mixin: CageMonitoring = new CageMonitoringImp
+
+
+ @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)")
+ def theActorCageDidIt(actorCage: CageMonitoring) = {}
+
+ @After("theActorCageDidIt(actorCage)")
+ def afterDoingIt(actorCage: CageMonitoring) = {
+ actorCage.count(1)
+ actorCage.histogram.getSnapshot.dump(System.out)
+ }
+
+
+
+}
+
+
+object Runner extends App {
+ val cage = new ActorCage("ivan", 10)
+
+ cage.doIt
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
new file mode 100644
index 00000000..54a13f39
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
@@ -0,0 +1,67 @@
+package kamon.metric
+
+import java.util.concurrent.{ThreadPoolExecutor, ExecutorService}
+import scala.concurrent.forkjoin.ForkJoinPool
+import com.codahale.metrics.{Metric, MetricFilter}
+
+object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector {
+
+ def register(fullName: String, executorService: ExecutorService) = executorService match {
+ case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp)
+ case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe)
+ case _ => // If it is a unknown Executor then just do nothing.
+ }
+
+ def deregister(fullName: String) = {
+ Metrics.registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
+ })
+ }
+}
+
+
+trait ForkJoinPoolMetricCollector {
+ import GaugeGenerator._
+ import BasicExecutorMetricNames._
+
+
+ def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = {
+ val forkJoinPoolGauge = newNumericGaugeFor(fjp) _
+
+ val allMetrics = Map(
+ fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt),
+ fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize),
+ fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount)
+ )
+
+ allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) }
+ }
+}
+
+trait ThreadPoolExecutorMetricCollector {
+ import GaugeGenerator._
+ import BasicExecutorMetricNames._
+
+ def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = {
+ val tpeGauge = newNumericGaugeFor(tpe) _
+
+ val allMetrics = Map(
+ fullName + queueSize -> tpeGauge(_.getQueue.size()),
+ fullName + poolSize -> tpeGauge(_.getPoolSize),
+ fullName + activeThreads -> tpeGauge(_.getActiveCount)
+ )
+
+ allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) }
+ }
+}
+
+
+object BasicExecutorMetricNames {
+ val queueSize = "queueSize"
+ val poolSize = "threads/poolSize"
+ val activeThreads = "threads/activeThreads"
+}
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala
new file mode 100644
index 00000000..30635432
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala
@@ -0,0 +1,12 @@
+package kamon.metric
+
+import com.codahale.metrics.Gauge
+
+trait GaugeGenerator {
+
+ def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] {
+ def getValue: V = generator(target)
+ }
+}
+
+object GaugeGenerator extends GaugeGenerator
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala
new file mode 100644
index 00000000..fb117968
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala
@@ -0,0 +1,6 @@
+package kamon.metric
+
+object MetricFilter {
+ def actorSystem(system: String): Boolean = !system.startsWith("kamon")
+ def actor(path: String, system: String): Boolean = true
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
new file mode 100644
index 00000000..cdc0a334
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
@@ -0,0 +1,146 @@
+package kamon.metric
+
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit}
+import akka.actor.ActorRef
+import com.codahale.metrics
+import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry}
+
+
+object Metrics {
+ val registry: MetricRegistry = new MetricRegistry
+
+ val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS)
+ //consoleReporter.build().start(45, TimeUnit.SECONDS)
+
+ //val newrelicReporter = NewRelicReporter(registry)
+ //newrelicReporter.start(5, TimeUnit.SECONDS)
+
+ def include(name: String, metric: Metric) = {
+ //registry.register(name, metric)
+ }
+
+ def exclude(name: String) = {
+ registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(name)
+ })
+ }
+
+
+
+ def deregister(fullName: String) = {
+ registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
+ })
+ }
+}
+
+object Watched {
+ case object Actor
+ case object Dispatcher
+}
+
+object MetricDirectory {
+ def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/"
+
+ def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox"
+
+ def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/")
+
+ def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon")
+
+
+ def shouldInstrumentActor(actorPath: String): Boolean = {
+ !(actorPath.isEmpty || actorPath.startsWith("system"))
+ }
+
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram)
+
+
+
+
+trait Histogram {
+ def update(value: Long): Unit
+ def snapshot: HistogramSnapshot
+}
+
+trait HistogramSnapshot {
+ def median: Double
+ def max: Double
+ def min: Double
+}
+
+
+case class ActorSystemMetrics(actorSystemName: String) {
+ import scala.collection.JavaConverters._
+ val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala
+
+ private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())
+
+ def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = {
+ val stats = createDispatcherCollector
+ dispatchers.put(dispatcherName, stats)
+ Some(stats)
+ }
+
+}
+
+
+case class CodahaleHistogram() extends Histogram {
+ private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir())
+
+ def update(value: Long) = histogram.update(value)
+ def snapshot: HistogramSnapshot = {
+ val snapshot = histogram.getSnapshot
+
+ CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin)
+ }
+}
+
+case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot
+
+
+
+
+
+
+
+/**
+ * Dispatcher Metrics that we care about currently with a histogram-like nature:
+ * - Work Queue Size
+ * - Total/Active Thread Count
+ */
+
+
+
+import annotation.tailrec
+import java.util.concurrent.atomic.AtomicReference
+
+object Atomic {
+ def apply[T]( obj : T) = new Atomic(new AtomicReference(obj))
+ implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref)
+}
+
+class Atomic[T](val atomic : AtomicReference[T]) {
+ @tailrec
+ final def update(f: T => T) : T = {
+ val oldValue = atomic.get()
+ val newValue = f(oldValue)
+ if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f)
+ }
+
+ def get() = atomic.get()
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala
new file mode 100644
index 00000000..5b4ceaf4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala
@@ -0,0 +1,51 @@
+package kamon.metric
+
+import com.codahale.metrics._
+
+object MetricsUtils {
+
+ def markMeter[T](meter:Meter)(f: => T): T = {
+ meter.mark()
+ f
+ }
+//
+// def incrementCounter(key: String) {
+// counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count
+// }
+//
+// def markMeter(key: String) {
+// meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark()
+// }
+//
+// def trace[T](key: String)(f: => T): T = {
+// val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) )
+// timer.time(f)
+// }
+
+// def markAndCountMeter[T](key: String)(f: => T): T = {
+// markMeter(key)
+// f
+// }
+//
+// def traceAndCount[T](key: String)(f: => T): T = {
+// incrementCounter(key)
+// trace(key) {
+// f
+// }
+ //}
+
+// implicit def runnable(f: () => Unit): Runnable =
+// new Runnable() { def run() = f() }
+//
+//
+// import java.util.concurrent.Callable
+//
+// implicit def callable[T](f: () => T): Callable[T] =
+// new Callable[T]() { def call() = f() }
+
+// private val actorCounter:Counter = new Counter
+// private val actorTimer:Timer = new Timer
+//
+// metricsRegistry.register(s"counter-for-${actorName}", actorCounter)
+// metricsRegistry.register(s"timer-for-${actorName}", actorTimer)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
new file mode 100644
index 00000000..70f3e54a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
@@ -0,0 +1,51 @@
+package kamon.metric
+
+import com.codahale.metrics
+import metrics._
+import java.util.concurrent.TimeUnit
+import java.util
+import com.newrelic.api.agent.NewRelic
+import scala.collection.JavaConverters._
+
+
+class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) {
+
+
+
+ private[NewRelicReporter] def processMeter(name: String, meter: Meter) {
+ NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat)
+ }
+
+ private[NewRelicReporter] def processCounter(name:String, counter:Counter) {
+ println(s"Logging to NewRelic: ${counter.getCount}")
+
+ }
+
+
+/* def processGauge(name: String, gauge: Gauge[_]) = {
+ println(s"the value is: "+gauge.getValue)
+ NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float])
+ }*/
+
+
+ def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
+ //Process Meters
+ meters.asScala.map{case(name, meter) => processMeter(name, meter)}
+
+ //Process Meters
+ counters.asScala.map{case(name, counter) => processCounter(name, counter)}
+
+ // Gauges
+ gauges.asScala.foreach{ case (name, gauge) => {
+ val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue()
+ val fullMetricName = "Custom" + name
+ NewRelic.recordMetric(fullMetricName, measure)
+ }}
+ }
+
+
+}
+
+object NewRelicReporter {
+ def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/spraytest/ClientTest.scala b/kamon-core/src/main/scala/spraytest/ClientTest.scala
new file mode 100644
index 00000000..07532d0a
--- /dev/null
+++ b/kamon-core/src/main/scala/spraytest/ClientTest.scala
@@ -0,0 +1,55 @@
+package spraytest
+
+import akka.actor.ActorSystem
+import spray.client.pipelining._
+import spray.httpx.SprayJsonSupport
+import spray.json._
+import scala.concurrent.Future
+import spray.can.Http
+import akka.io.IO
+
+/**
+ * BEGIN JSON Infrastructure
+ */
+case class Container(data: List[PointOfInterest])
+case class Geolocation(latitude: Float, longitude: Float)
+case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation)
+
+object GeoJsonProtocol extends DefaultJsonProtocol {
+ implicit val geolocationFormat = jsonFormat2(Geolocation)
+ implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest)
+ implicit val containerFormat = jsonFormat1(Container)
+}
+/** END-OF JSON Infrastructure */
+
+
+
+
+
+
+class ClientTest extends App {
+ implicit val actorSystem = ActorSystem("spray-client-test")
+ import actorSystem.dispatcher
+
+
+ import GeoJsonProtocol._
+ import SprayJsonSupport._
+
+
+ val actor = IO(Http)
+
+ val pipeline = sendReceive ~> unmarshal[Container]
+
+ val response = pipeline {
+ Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco")
+ } onSuccess {
+ case a => {
+ println(a)
+ }
+ }
+}
+
+
+
+
+
diff --git a/kamon-core/src/main/scala/spraytest/FutureTesting.scala b/kamon-core/src/main/scala/spraytest/FutureTesting.scala
new file mode 100644
index 00000000..b864d6d6
--- /dev/null
+++ b/kamon-core/src/main/scala/spraytest/FutureTesting.scala
@@ -0,0 +1,81 @@
+package spraytest
+/*
+import akka.actor.ActorSystem
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Try, Success}
+import kamon.actor.TransactionContext
+
+object FutureTesting extends App {
+
+ val actorSystem = ActorSystem("future-testing")
+ implicit val ec = actorSystem.dispatcher
+ implicit val tctx = TransactionContext(11, Nil)
+
+ threadPrintln("In the initial Thread")
+
+
+ val f = TraceableFuture {
+ threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}")
+ }
+
+ f.onComplete({
+ case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}")
+ })
+
+ f.onComplete({
+ case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}")
+ })
+
+
+
+
+
+
+
+
+ def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]")
+
+}
+
+
+
+
+trait TransactionContextWrapper {
+ def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = {
+ TransactionContext.current.set(tranContext.fork)
+ println(s"SetContext to: ${tranContext}")
+ val result = f
+
+ TransactionContext.current.remove()
+ result
+ }
+
+}
+
+class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper {
+ def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = {
+ future.onComplete(wrap(func, transactionContext))
+ }
+}
+
+object TraceableFuture {
+
+ implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future
+
+ def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = {
+ val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil))
+
+ new TraceableFuture(Future { wrappedBody })
+ }
+
+
+
+
+ def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = {
+ TransactionContext.current.set(transactionContext)
+ val result = body
+ TransactionContext.current.remove()
+ result
+ }
+}*/
+
diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala
new file mode 100644
index 00000000..f9d6869c
--- /dev/null
+++ b/kamon-core/src/main/scala/test/PingPong.scala
@@ -0,0 +1,34 @@
+package test
+
+import akka.actor.{Props, Actor, ActorSystem}
+
+object PingPong extends App {
+
+ val as = ActorSystem("ping-pong")
+
+ val pinger = as.actorOf(Props[Pinger])
+ val ponger = as.actorOf(Props[Ponger])
+
+ pinger.tell(Pong, ponger)
+
+
+ Thread.sleep(30000)
+ as.shutdown()
+
+
+}
+
+case object Ping
+case object Pong
+
+class Pinger extends Actor {
+ def receive = {
+ case Pong => sender ! Ping
+ }
+}
+
+class Ponger extends Actor {
+ def receive = {
+ case Ping => sender ! Pong
+ }
+}
diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
new file mode 100644
index 00000000..0026d953
--- /dev/null
+++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
@@ -0,0 +1,45 @@
+package akka.instrumentation
+
+import org.scalatest.{WordSpecLike, Matchers}
+import akka.actor.{Actor, Props, ActorSystem}
+
+import akka.testkit.{ImplicitSender, TestKit}
+import kamon.{TraceContext, Kamon}
+
+
+class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender {
+
+ "an instrumented actor ref" when {
+ "used inside the context of a transaction" should {
+ "propagate the trace context using bang" in new TraceContextEchoFixture {
+ echo ! "test"
+
+ expectMsg(Some(testTraceContext))
+ }
+
+ "propagate the trace context using tell" in {
+
+ }
+
+ "propagate the trace context using ask" in {
+
+ }
+ }
+ }
+
+ trait TraceContextEchoFixture {
+ val testTraceContext = Kamon.newTraceContext()
+ val echo = system.actorOf(Props[TraceContextEcho])
+
+ Kamon.set(testTraceContext)
+ }
+
+}
+
+class TraceContextEcho extends Actor {
+ def receive = {
+ case msg ⇒ sender ! Kamon.context()
+ }
+}
+
+
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
new file mode 100644
index 00000000..1eab6355
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
@@ -0,0 +1,22 @@
+package kamon.instrumentation
+
+import org.scalatest.{Matchers, WordSpec}
+import akka.actor.ActorSystem
+import kamon.Kamon
+
+class ActorSystemInstrumentationSpec extends WordSpec with Matchers {
+
+ // TODO: Selection filters to exclude unwanted actor systems. Read from configuration.
+
+ "the actor system instrumentation" should {
+ "register all actor systems created" in {
+ val as1 = ActorSystem("as1")
+ val as2 = ActorSystem("as2")
+
+
+ Kamon.Metric.actorSystem("as1") should not be (None)
+ Kamon.Metric.actorSystem("as2") should not be (None)
+ Kamon.Metric.actorSystem("unknown") should be (None)
+ }
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
new file mode 100644
index 00000000..89ef61f3
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
@@ -0,0 +1,34 @@
+package kamon.instrumentation
+
+import org.scalatest.{Matchers, WordSpec}
+import akka.actor.{Actor, Props, ActorSystem}
+import kamon.metric.MetricDirectory
+import kamon.Kamon
+
+class DispatcherInstrumentationSpec extends WordSpec with Matchers{
+
+
+ "the dispatcher instrumentation" should {
+ "instrument a dispatcher that belongs to a non-filtered actor system" in new SingleDispatcherActorSystem {
+ val x = Kamon.Metric.actorSystem("single-dispatcher").get.dispatchers
+ (1 to 10).foreach(actor ! _)
+
+ val active = x.get("akka.actor.default-dispatcher").get.activeThreadCount.snapshot
+ println("Active max: "+active.max)
+ println("Active min: "+active.min)
+
+ }
+ }
+
+
+ trait SingleDispatcherActorSystem {
+ val actorSystem = ActorSystem("single-dispatcher")
+ val actor = actorSystem.actorOf(Props(new Actor {
+ def receive = {
+ case a => sender ! a;
+ }
+ }))
+
+ }
+}
+
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala
new file mode 100644
index 00000000..cc55ec92
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala
@@ -0,0 +1,53 @@
+package kamon.instrumentation
+
+import org.scalatest.WordSpec
+import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
+import java.util.concurrent.ConcurrentLinkedQueue
+import akka.dispatch.{UnboundedMessageQueueSemantics, QueueBasedMessageQueue, Envelope}
+import java.util.Queue
+import akka.actor.{ActorSystem, Actor}
+
+class MessageQueueInstrumentationSpec(val actorSystem: ActorSystem) extends WordSpec {
+ def this() = this(ActorSystem("MessageQueueInstrumentationSpec"))
+
+
+ /*"A MonitoredMessageQueue" should {
+ "update the related histogram when a message is enqueued" in {
+ new PopulatedMessageQueueFixture {
+
+ assert(histogram.getSnapshot.getMax === 0)
+
+ for(i <- 1 to 3) { enqueueDummyMessage }
+
+ assert(histogram.getCount === 3)
+ assert(histogram.getSnapshot.getMax === 3)
+ assert(histogram.getSnapshot.getMin === 1)
+ }
+ }
+
+ "update the related histogram when a message is dequeued" in {
+ new PopulatedMessageQueueFixture {
+ for(i <- 1 to 3) { enqueueDummyMessage }
+ assert(histogram.getSnapshot.getMax === 3)
+
+ messageQueue.dequeue()
+ messageQueue.dequeue()
+
+ assert(histogram.getCount === 5)
+ assert(histogram.getSnapshot.getMax === 3)
+ assert(histogram.getSnapshot.getMin === 1)
+ }
+ }
+ }
+
+ trait PopulatedMessageQueueFixture {
+
+ val histogram = new Histogram(new ExponentiallyDecayingReservoir())
+/* val delegate = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
+ final def queue: Queue[Envelope] = this
+ }*/
+ val messageQueue = new MonitoredMessageQueue(delegate, histogram)
+
+ def enqueueDummyMessage = messageQueue.enqueue(Actor.noSender, Envelope("", Actor.noSender, actorSystem))
+ }*/
+}
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
new file mode 100644
index 00000000..de65aaca
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
@@ -0,0 +1,82 @@
+package kamon.instrumentation
+
+import scala.concurrent.{Await, Promise, Future}
+import org.scalatest.{Matchers, OptionValues, WordSpec}
+import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration}
+import kamon.{Kamon, TraceContext}
+import java.util.UUID
+import scala.util.Success
+import scala.concurrent.duration._
+import java.util.concurrent.TimeUnit
+import akka.actor.ActorSystem
+
+
+class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues {
+
+ "a instrumented runnable" when {
+ "created in a thread that does have a TraceContext" must {
+ "preserve the TraceContext" which {
+ "should be available during the run method execution" in { new FutureWithContextFixture {
+
+ whenReady(futureWithContext) { result =>
+ result.value should equal(testContext)
+ }
+ }}
+
+ "should be available during the execution of onComplete callbacks" in { new FutureWithContextFixture {
+ val onCompleteContext = Promise[TraceContext]()
+
+ futureWithContext.onComplete({
+ case _ => onCompleteContext.complete(Success(Kamon.context.get))
+ })
+
+ whenReady(onCompleteContext.future) { result =>
+ result should equal(testContext)
+ }
+ }}
+ }
+ }
+
+ "created in a thread that doest have a TraceContext" must {
+ "not capture any TraceContext for the body execution" in { new FutureWithoutContextFixture{
+
+ whenReady(futureWithoutContext) { result =>
+ result should equal(None)
+ }
+ }}
+
+ "not make any TraceContext available during the onComplete callback" in { new FutureWithoutContextFixture {
+ val onCompleteContext = Promise[Option[TraceContext]]()
+
+ futureWithoutContext.onComplete({
+ case _ => onCompleteContext.complete(Success(Kamon.context))
+ })
+
+ whenReady(onCompleteContext.future) { result =>
+ result should equal(None)
+ }
+ }}
+ }
+ }
+
+
+ /**
+ * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have.
+ */
+ implicit val testActorSystem = ActorSystem("test-actorsystem")
+ implicit val execContext = testActorSystem.dispatcher
+
+ class FutureWithContextFixture {
+ val testContext = TraceContext()
+ Kamon.set(testContext)
+
+ val futureWithContext = Future { Kamon.context}
+ }
+
+ trait FutureWithoutContextFixture {
+ Kamon.clear // Make sure no TraceContext is available
+ val futureWithoutContext = Future { Kamon.context }
+ }
+}
+
+