From cd1a9dd25fb550a515e7a7408b88233773268c38 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 7 Aug 2013 19:06:33 -0300 Subject: upgrading to akka 2.2 --- kamon-core/src/main/resources/META-INF/aop.xml | 34 +++ kamon-core/src/main/resources/application.conf | 47 ++++ kamon-core/src/main/resources/newrelic.yml | 242 ++++++++++++++++++++ kamon-core/src/main/scala/kamon/Kamon.scala | 132 +++++++++++ kamon-core/src/main/scala/kamon/TraceContext.scala | 67 ++++++ .../src/main/scala/kamon/TraceContextSwap.scala | 26 +++ .../main/scala/kamon/TransactionPublisher.scala | 15 ++ .../src/main/scala/kamon/executor/eventbus.scala | 103 +++++++++ .../ActorRefTellInstrumentation.scala | 89 ++++++++ .../scala/kamon/instrumentation/AspectJPimps.scala | 23 ++ .../instrumentation/ExecutorServiceMetrics.scala | 245 +++++++++++++++++++++ .../instrumentation/MessageQueueMetrics.scala | 73 ++++++ .../instrumentation/RunnableInstrumentation.scala | 61 +++++ .../instrumentation/SampleInstrumentation.scala | 49 +++++ .../metric/ExecutorServiceMetricCollector.scala | 67 ++++++ .../main/scala/kamon/metric/GaugeGenerator.scala | 12 + .../src/main/scala/kamon/metric/MetricFilter.scala | 6 + .../src/main/scala/kamon/metric/Metrics.scala | 146 ++++++++++++ .../src/main/scala/kamon/metric/MetricsUtils.scala | 51 +++++ .../main/scala/kamon/metric/NewRelicReporter.scala | 51 +++++ .../src/main/scala/spraytest/ClientTest.scala | 55 +++++ .../src/main/scala/spraytest/FutureTesting.scala | 81 +++++++ kamon-core/src/main/scala/test/PingPong.scala | 34 +++ .../instrumentation/ActorInstrumentationSpec.scala | 45 ++++ .../ActorSystemInstrumentationSpec.scala | 22 ++ .../DispatcherInstrumentationSpec.scala | 34 +++ .../MessageQueueInstrumentationSpec.scala | 53 +++++ .../RunnableInstrumentationSpec.scala | 82 +++++++ 28 files changed, 1945 insertions(+) create mode 100644 kamon-core/src/main/resources/META-INF/aop.xml create mode 100644 kamon-core/src/main/resources/application.conf create mode 100644 kamon-core/src/main/resources/newrelic.yml create mode 100644 kamon-core/src/main/scala/kamon/Kamon.scala create mode 100644 kamon-core/src/main/scala/kamon/TraceContext.scala create mode 100644 kamon-core/src/main/scala/kamon/TraceContextSwap.scala create mode 100644 kamon-core/src/main/scala/kamon/TransactionPublisher.scala create mode 100644 kamon-core/src/main/scala/kamon/executor/eventbus.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MetricFilter.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Metrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala create mode 100644 kamon-core/src/main/scala/spraytest/ClientTest.scala create mode 100644 kamon-core/src/main/scala/spraytest/FutureTesting.scala create mode 100644 kamon-core/src/main/scala/test/PingPong.scala create mode 100644 kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala create mode 100644 kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..e6d61fa1 --- /dev/null +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf new file mode 100644 index 00000000..370acae9 --- /dev/null +++ b/kamon-core/src/main/resources/application.conf @@ -0,0 +1,47 @@ +akka { + actor { + default-dispatcher { + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + + # The parallelism factor is used to determine thread pool size using the + # following formula: ceil(available processors * factor). Resulting size + # is then bounded by the parallelism-min and parallelism-max values. + parallelism-factor = 3.0 + + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 8 + } + + throughput = 100 + } + } +} + +# Dispatcher is the name of the event-based dispatcher +#type = Dispatcher + +# What kind of ExecutionService to use +#executor = "kamon.executor.InstrumentedExecutorServiceConfigurator" + +# Min number of threads to cap factor-based parallelism number to +#parallelism-min = 2 + +# Parallelism (threads) ... ceil(available processors * factor) +#parallelism-factor = 2.0 + +# Max number of threads to cap factor-based parallelism number to +#parallelism-max = 10 + +# Throughput defines the maximum number of messages to be +# processed per actor before the thread jumps to the next actor. +# Set to 1 for as fair as possible. +#throughput = 100 + + + + + + + diff --git a/kamon-core/src/main/resources/newrelic.yml b/kamon-core/src/main/resources/newrelic.yml new file mode 100644 index 00000000..1b1ad53b --- /dev/null +++ b/kamon-core/src/main/resources/newrelic.yml @@ -0,0 +1,242 @@ +# +# This file configures the New Relic Agent. New Relic monitors +# Java applications with deep visibility and low overhead. For more +# information, visit www.newrelic.com. +# +# This configuration file is custom generated for Ivan Topolnak - ivantopo@gmail.com +# +# This section is for settings common to all environments. +# Do not add anything above this next line. +common: &default_settings + # + # ============================== LICENSE KEY =============================== + + # You must specify the license key associated with your New Relic + # account. This key binds your Agent's data to your account in the + # New Relic service. + license_key: '2e24765acb032cb9e7207013b5ba3e2ab7d2d75c' + + # Agent Enabled + # Use this setting to force the agent to run or not run. + # Default is true. + # agent_enabled: true + + # Set to true to enable support for auto app naming. + # The name of each web app is detected automatically + # and the agent reports data separately for each one. + # This provides a finer-grained performance breakdown for + # web apps in New Relic. + # Default is false. + enable_auto_app_naming: false + + # Set to true to enable component-based transaction naming. + # Set to false to use the URI of a web request as the name of the transaction. + # Default is true. + enable_auto_transaction_naming: true + + # Set the name of your application as you'd like it show up in New Relic. + # if enable_auto_app_naming is false, the agent reports all data to this application. + # Otherwise, the agent reports only background tasks (transactions for non-web applications) to this application. + # To report data to more than one application, separate the application names with ";". + # For example, to report data to"My Application" and "My Application 2" use this: + # app_name: My Application;My Application 2 + # This setting is required. + app_name: My Application + + # The agent uses its own log file to keep its logging + # separate from that of your application. Specify the log level here. + # This setting is dynamic, so changes do not require restarting your application. + # The levels in increasing order of verboseness are: off, severe, warning, info, fine, finer, finest + # Default is info. + log_level: finest + enable_custom_tracing: true + + # Log all data to and from New Relic in plain text. + # This setting is dynamic, so changes do not require restarting your application. + # Default is false. + #audit_mode: true + + # The number of log files to use. + # Default is 1. + #log_file_count: 1 + + # The maximum number of bytes to write to any one log file. + # Default is 0 (no limit). + #log_limit_in_kbytes: 0 + + # The name of the log file. + # Default is newrelic_agent.log. + #log_file_name: newrelic_agent.log + + # The log file directory. + # Default is the logs directory in the newrelic.jar parent directory. + log_file_path: /home/ivantopo/Desktop/tmp + + # The agent communicates with New Relic via https by + # default. If you want to communicate with newrelic via http, + # then turn off SSL by setting this value to false. + # This work is done asynchronously to the threads that process your + # application code, so response times will not be directly affected + # by this change. + # Default is true. + ssl: true + + # Proxy settings for connecting to the New Relic server. + # + # If a proxy is used, the host setting is required. Other settings + # are optional. Default port is 8080. The username and password + # settings will be used to authenticate to Basic Auth challenges + # from a proxy server. + # + # proxy_host: hostname + # proxy_port: 8080 + # proxy_user: username + # proxy_password: password + + # Tells transaction tracer and error collector (when enabled) + # whether or not to capture HTTP params. When true, frameworks can + # exclude HTTP parameters from being captured. + # Default is false. + capture_params: false + + # Tells transaction tracer and error collector to not to collect + # specific http request parameters. + # ignored_params: credit_card, ssn, password + + # Transaction tracer captures deep information about slow + # transactions and sends this to the New Relic service once a + # minute. Included in the transaction is the exact call sequence of + # the transactions including any SQL statements issued. + transaction_tracer: + + # Transaction tracer is enabled by default. Set this to false to + # turn it off. This feature is only available at the higher product levels. + # Default is true. + enabled: true + + # Threshold in seconds for when to collect a transaction + # trace. When the response time of a controller action exceeds + # this threshold, a transaction trace will be recorded and sent to + # New Relic. Valid values are any float value, or (default) "apdex_f", + # which will use the threshold for the "Frustrated" Apdex level + # (greater than four times the apdex_t value). + # Default is apdex_f. + transaction_threshold: apdex_f + + # When transaction tracer is on, SQL statements can optionally be + # recorded. The recorder has three modes, "off" which sends no + # SQL, "raw" which sends the SQL statement in its original form, + # and "obfuscated", which strips out numeric and string literals. + # Default is obfuscated. + record_sql: obfuscated + + # Obfuscate only occurrences of specific SQL fields names. + # This setting only applies if "record_sql" is set to "raw". + #obfuscated_sql_fields: credit_card, ssn, password + + # Set this to true to log SQL statements instead of recording them. + # SQL is logged using the record_sql mode. + # Default is false. + log_sql: false + + # Threshold in seconds for when to collect stack trace for a SQL + # call. In other words, when SQL statements exceed this threshold, + # then capture and send to New Relic the current stack trace. This is + # helpful for pinpointing where long SQL calls originate from. + # Default is 0.5 seconds. + stack_trace_threshold: 0.5 + + # Determines whether the agent will capture query plans for slow + # SQL queries. Only supported for MySQL and PostgreSQL. + # Default is true. + explain_enabled: true + + # Threshold for query execution time below which query plans will not + # not be captured. Relevant only when `explain_enabled` is true. + # Default is 0.5 seconds. + explain_threshold: 0.5 + + # Use this setting to control the variety of transaction traces. + # The higher the setting, the greater the variety. + # Set this to 0 to always report the slowest transaction trace. + # Default is 20. + top_n: 20 + + + # Error collector captures information about uncaught exceptions and + # sends them to New Relic for viewing + error_collector: + + # Error collector is enabled by default. Set this to false to turn + # it off. This feature is only available at the higher product levels. + # Default is true. + enabled: true + + # To stop specific exceptions from reporting to New Relic, set this property + # to a comma separated list of full class names. + # + # ignore_errors: + + # To stop specific http status codes from being reporting to New Relic as errors, + # set this property to a comma separated list of status codes to ignore. + # When this property is commented out it defaults to ignoring 404s. + # + # ignore_status_codes: 404 + + # Cross Application Tracing adds request and response headers to + # external calls using the Apache HttpClient libraries to provided better + # performance data when calling applications monitored by other New Relic Agents. + # + cross_application_tracer: + # Set to true to enable cross application tracing. + # Default is true. + enabled: true + + # Thread profiler measures wall clock time, CPU time, and method call counts + # in your application's threads as they run. + thread_profiler: + + # Set to false to disable the thread profiler. + # Default is true. + enabled: true + + #============================== Browser Monitoring =============================== + # New Relic Real User Monitoring gives you insight into the performance real users are + # experiencing with your website. This is accomplished by measuring the time it takes for + # your users' browsers to download and render your web pages by injecting a small amount + # of JavaScript code into the header and footer of each page. + browser_monitoring: + # By default the agent automatically inserts API calls in compiled JSPs to + # inject the monitoring JavaScript into web pages. + # Set this attribute to false to turn off this behavior. + auto_instrument: true + # Set this attribute to false to prevent injection of the monitoring JavaScript. + # Default is true. + enabled: true + +# Application Environments +# ------------------------------------------ +# Environment specific settings are in this section. +# You can use the environment to override the default settings. +# For example, to change the app_name setting. +# Use -Dnewrelic.environment= on the Java command line +# to set the environment. +# The default environment is production. + +# NOTE if your application has other named environments, you should +# provide configuration settings for these environments here. + +development: + <<: *default_settings + app_name: KAMON[Development] + +test: + <<: *default_settings + app_name: My Application (Test) + +production: + <<: *default_settings + +staging: + <<: *default_settings + app_name: My Application (Staging) \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala new file mode 100644 index 00000000..c3080909 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -0,0 +1,132 @@ +package kamon + +import akka.actor.{Actor, Props, ActorSystem} +import scala.collection.JavaConverters._ +import java.util.concurrent.ConcurrentHashMap +import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics} +import scala.concurrent.duration.{FiniteDuration, Duration} +import com.newrelic.api.agent.NewRelic + +object Kamon { + + val ctx = new ThreadLocal[Option[TraceContext]] { + override def initialValue() = None + } + + implicit lazy val actorSystem = ActorSystem("kamon") + + + def context() = ctx.get() + def clear = ctx.remove() + def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) + + def start = set(newTraceContext) + def stop = ctx.get match { + case Some(context) => context.close + case None => + } + + def newTraceContext(): TraceContext = TraceContext() + + + val publisher = actorSystem.actorOf(Props[TransactionPublisher]) + + def publish(tx: FullTransaction) = publisher ! tx + + + + object Metric { + val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala + + def actorSystemNames: List[String] = actorSystems.keys.toList + def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name)) + + def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) + } + + + + val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") + val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") + +} + + + + + + + + + +object Tracer { + val ctx = new ThreadLocal[Option[TraceContext]] { + override def initialValue() = None + } + + def context() = ctx.get() + def clear = ctx.remove() + def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) + + def start = ??? //set(newTraceContext) + def stop = ctx.get match { + case Some(context) => context.close + case None => + } + + //def newTraceContext(): TraceContext = TraceContext() +} + + +class MetricManager extends Actor { + implicit val ec = context.system.dispatcher + + def receive = { + case RegisterForAllDispatchers(frequency) => { + val subscriber = sender + context.system.scheduler.schedule(frequency, frequency) { + Kamon.Metric.actorSystems.foreach { + case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach { + case (dispatcherName, dispatcherMetrics) => { + val activeThreads = dispatcherMetrics.activeThreadCount.snapshot + val poolSize = dispatcherMetrics.poolSize.snapshot + val queueSize = dispatcherMetrics.queueSize.snapshot + + subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize) + + } + } + } + } + } + } +} + +case class RegisterForAllDispatchers(frequency: FiniteDuration) +case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot) + + + + + + +class NewrelicReporterActor extends Actor { + import scala.concurrent.duration._ + + Kamon.metricManager ! RegisterForAllDispatchers(5 seconds) + + def receive = { + case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => { + /*println("PUBLISHED DISPATCHER STATS") + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat) + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat)) + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)*/ + + + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat) + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat)) + + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat) + } + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala new file mode 100644 index 00000000..6b32550f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -0,0 +1,67 @@ +package kamon + +import java.util.UUID +import akka.actor.{ActorSystem, ActorPath} +import akka.agent.Agent +import java.util.concurrent.TimeUnit +import scala.util.{Failure, Success} +import akka.util.Timeout + + +case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) { + implicit val timeout = Timeout(30, TimeUnit.SECONDS) + implicit val as = Kamon.actorSystem.dispatcher + + def append(entry: TraceEntry) = entries send (entry :: _) + def close = entries.future.onComplete({ + case Success(list) => Kamon.publish(FullTransaction(id, list)) + case Failure(t) => println("WTF!") + }) +} + +object TraceContext { + implicit val as2 = Kamon.actorSystem.dispatcher + def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) +} + + + +trait TraceEntry + +case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry + + + +case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry]) + + + + + +object Collector { + +} + +trait TraceEntryStorage { + def store(entry: TraceEntry): Boolean +} + +class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) { + def store(entry: TraceEntry) = storage.store(entry) +} + +object ThreadLocalTraceEntryStorage extends TraceEntryStorage { + + private val storage = new ThreadLocal[List[TraceEntry]] { + override def initialValue(): List[TraceEntry] = Nil + } + + def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get) + + def store(entry: TraceEntry): Boolean = { + update(entry :: _) + true + } +} + + diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala new file mode 100644 index 00000000..68ee808b --- /dev/null +++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala @@ -0,0 +1,26 @@ +package kamon + +/** + * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards. + */ +trait TraceContextSwap { + + def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body) + + def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = { + ctx match { + case Some(context) => { + Kamon.set(context) + val bodyResult = primary + Kamon.clear + + bodyResult + } + case None => fallback + } + + } + +} + +object TraceContextSwap extends TraceContextSwap diff --git a/kamon-core/src/main/scala/kamon/TransactionPublisher.scala b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala new file mode 100644 index 00000000..0626b91d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala @@ -0,0 +1,15 @@ +package kamon + +import akka.actor.Actor +import java.util.UUID + +class TransactionPublisher extends Actor { + + def receive = { + case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries") + } + +} + + +case class FullTransaction(id: UUID, entries: List[TraceEntry]) diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala new file mode 100644 index 00000000..599f2a7a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala @@ -0,0 +1,103 @@ +package kamon.executor + +import akka.event.ActorEventBus +import akka.event.LookupClassification +import akka.actor._ +import java.util.concurrent.TimeUnit + +import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} +import akka.util.Timeout +import scala.util.{Random, Success, Failure} +import scala.concurrent.Future + +trait Message + +case class PostMessage(text:String) extends Message + +case class MessageEvent(val channel:String, val message:Message) + +class AppActorEventBus extends ActorEventBus with LookupClassification{ + type Event = MessageEvent + type Classifier=String + protected def mapSize(): Int={ + 10 + } + + protected def classify(event: Event): Classifier={ + event.channel + } + + protected def publish(event: Event, subscriber: Subscriber): Unit={ + subscriber ! event + } +} +case class Ping() +case class Pong() + +class PingActor extends Actor with ActorLogging { + + val pong = context.actorOf(Props[PongActor]) + val random = new Random() + def receive = { + case Pong() => { + //Thread.sleep(random.nextInt(2000)) + //log.info("Message from Ping") + pong ! Ping() + } + } +} + +class PongActor extends Actor with ActorLogging { + def receive = { + case Ping() => { + sender ! Pong() + } + } +} + + +object TryAkka extends App{ + val system = ActorSystem("MySystem") + val appActorEventBus=new AppActorEventBus + val NEW_POST_CHANNEL="/posts/new" + val subscriber = system.actorOf(Props(new Actor { + def receive = { + case d: MessageEvent => println(d) + } + })) + + Kamon.start + for(i <- 1 to 4) { + val ping = system.actorOf(Props[PingActor]) + ping ! Pong() + } + + + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") + + /* + val newRelicReporter = new NewRelicReporter(registry) + newRelicReporter.start(1, TimeUnit.SECONDS) + +*/ + import akka.pattern.ask + implicit val timeout = Timeout(10, TimeUnit.SECONDS) + implicit def execContext = system.dispatcher + + + + Kamon.start + + Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) + threadPrintln("Before doing it") + val f = Future { threadPrintln("This is happening inside the future body") } + + Kamon.stop + + + //Thread.sleep(3000) + //system.shutdown() + +/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) + appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala new file mode 100644 index 00000000..82915ce9 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -0,0 +1,89 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{Props, ActorSystem, ActorRef} +import kamon.{Kamon, TraceContext} +import akka.dispatch.{MessageDispatcher, Envelope} +import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} +import kamon.metric.{MetricDirectory, Metrics} +import com.codahale.metrics +import kamon.instrumentation.TraceableMessage +import scala.Some + +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) + + +@Aspect +class ActorRefTellInstrumentation { + import ProceedingJoinPointPimp._ + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)") + def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} + + @Around("sendingMessageToActorRef(actor, message, sender)") + def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { + + val actorName = MetricDirectory.nameForActor(actor) + val t = Metrics.registry.timer(actorName + "LATENCY") + //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") + pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender) + } +} + + +@Aspect("perthis(actorCellCreation(..))") +class ActorCellInvokeInstrumentation { + + var processingTimeTimer: Timer = _ + var shouldTrack = false + + // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(system, ref, props, dispatcher, parent)") + def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + val actorName = MetricDirectory.nameForActor(ref) + val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + + /** TODO: Find a better way to filter the things we don't want to measure. */ + //if(system.name != "kamon" && actorName.startsWith("/user")) { + processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") + shouldTrack = true + //} + } + + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + import ProceedingJoinPointPimp._ + println("ENVELOPE --------------------->"+envelope) + envelope match { + case Envelope(TraceableMessage(ctx, msg, timer), sender) => { + timer.stop() + + val originalEnvelope = envelope.copy(message = msg) + + //println("PROCESSING TIME TIMER: "+processingTimeTimer) + val pt = processingTimeTimer.time() + ctx match { + case Some(c) => { + Kamon.set(c) + println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + pjp.proceedWith(originalEnvelope) + Kamon.clear + } + case None => pjp.proceedWith(originalEnvelope) + } + pt.stop() + } + case _ => pjp.proceed + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala new file mode 100644 index 00000000..84c20c52 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala @@ -0,0 +1,23 @@ +package kamon.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint + +trait ProceedingJoinPointPimp { + import language.implicitConversions + + implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) +} + +object ProceedingJoinPointPimp extends ProceedingJoinPointPimp + +case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { + def proceedWith(newUniqueArg: AnyRef) = { + val args = pjp.getArgs + args.update(0, newUniqueArg) + pjp.proceed(args) + } + + def proceedWithTarget(args: AnyRef*) = { + pjp.proceed(args.toArray) + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala new file mode 100644 index 00000000..b4f8a475 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -0,0 +1,245 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import java.util.concurrent._ +import org.aspectj.lang.ProceedingJoinPoint +import java.util +import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector} +import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} +import com.typesafe.config.Config +import kamon.Kamon +import scala.concurrent.forkjoin.ForkJoinPool +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool + + +@Aspect +class ActorSystemInstrumentation { + + @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)") + def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {} + + @After("actorSystemInstantiation(name, applicationConfig, classLoader)") + def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = { + + Kamon.Metric.registerActorSystem(name) + } +} + +@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") +class ForkJoinPoolInstrumentation { + var activeThreadsHistogram: Histogram = _ + var poolSizeHistogram: Histogram = _ + + @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") + def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} + + @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)") + def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = { + val (actorSystemName, dispatcherName) = threadFactory match { + case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames) + case _ => ("Unknown", "Unknown") + } + + val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName) + for(m <- metrics) { + activeThreadsHistogram = m.activeThreadCount + poolSizeHistogram = m.poolSize + println(s"Registered $dispatcherName for actor system $actorSystemName") + } + } + + def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = { + knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown")) + } + + + + + @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)") + def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {} + + @After("forkJoinScan(fjp)") + def updateMetrics(fjp: AkkaForkJoinPool): Unit = { + activeThreadsHistogram.update(fjp.getActiveThreadCount) + poolSizeHistogram.update(fjp.getPoolSize) + } + + + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +/** + * ExecutorService monitoring base: + */ +trait ExecutorServiceCollector { + def updateActiveThreadCount(diff: Int): Unit + def updateTotalThreadCount(diff: Int): Unit + def updateQueueSize(diff: Int): Unit +} + +trait WatchedExecutorService { + def collector: ExecutorServiceCollector +} + + + + + + + + + + + + + + +trait ExecutorServiceMonitoring { + def dispatcherMetrics: DispatcherMetricCollector +} + +class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { + @volatile var dispatcherMetrics: DispatcherMetricCollector = _ +} + + + + + + + + + + + + + + + + +case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { + def createExecutorService: ExecutorService = delegate.createExecutorService +} + +@Aspect +class ExecutorServiceFactoryProviderInstrumentation { + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()") + def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = { + true + } + + @Around("factoryMethodCall(dispatcherName, threadFactory)") + def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast + + val actorSystemName = threadFactory match { + case m: MonitorableThreadFactory => m.name + case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. + } + + new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate) + } + +} + + +@Aspect +class NamedExecutorServiceFactoryDelegateInstrumentation { + + @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)") + def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {} + + @Around("factoryMethodCall(namedFactory)") + def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { + val delegate = pjp.proceed().asInstanceOf[ExecutorService] + val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + + ExecutorServiceMetricCollector.register(executorFullName, delegate) + + new NamedExecutorServiceDelegate(executorFullName, delegate) + } +} + +case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { + def shutdown() = { + ExecutorServiceMetricCollector.deregister(fullName) + delegate.shutdown() + } + def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() + def isShutdown: Boolean = delegate.isShutdown + def isTerminated: Boolean = delegate.isTerminated + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit) + def submit[T](task: Callable[T]): Future[T] = delegate.submit(task) + def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result) + def submit(task: Runnable): Future[_] = delegate.submit(task) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) + def execute(command: Runnable) = delegate.execute(command) +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala new file mode 100644 index 00000000..c21502ac --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -0,0 +1,73 @@ +package kamon.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue} +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import akka.actor.{ActorSystem, ActorRef} +import kamon.metric.{Metrics, MetricDirectory} +import org.aspectj.lang.ProceedingJoinPoint + + +/** + * For Mailboxes we would like to track the queue size and message latency. Currently the latency + * will be gathered from the ActorCellMetrics. + */ + + +@Aspect +class MessageQueueInstrumentation { + + @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") + def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} + + @Around("messageQueueCreation(owner, system)") + def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { + val delegate = pjp.proceed.asInstanceOf[MessageQueue] + + // We are not interested in monitoring mailboxes if we don't know where they belong to. + val monitoredMailbox = for(own <- owner; sys <- system) yield { + val systemName = sys.name + val ownerName = MetricDirectory.nameForActor(own) + val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) + + val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir()) + Metrics.include(mailBoxName, queueSizeHistogram) + + new MonitoredMessageQueue(delegate, queueSizeHistogram) + } + + monitoredMailbox match { + case None => delegate + case Some(mmb) => mmb + } + } +} + + +class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{ + + def enqueue(receiver: ActorRef, handle: Envelope) = { + delegate.enqueue(receiver, handle) + queueSizeHistogram.update(numberOfMessages) + } + + def dequeue(): Envelope = { + val envelope = delegate.dequeue() + queueSizeHistogram.update(numberOfMessages) + + envelope + } + + def numberOfMessages: Int = delegate.numberOfMessages + def hasMessages: Boolean = delegate.hasMessages + def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) +} + + + + + + + + + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala new file mode 100644 index 00000000..e75a638f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -0,0 +1,61 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import kamon.{Kamon, TraceContext} +import org.aspectj.lang.ProceedingJoinPoint +import scala.Some + +/** + * Marker interface, just to make sure we don't instrument all the Runnables in the classpath. + */ +trait TraceContextAwareRunnable extends Runnable {} + + +@Aspect("perthis(instrumentedRunnableCreation())") +class RunnableInstrumentation { + + /** + * These are the Runnables that need to be instrumented and make the TraceContext available + * while their run method is executed. + */ + @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") + def onCompleteCallbacksRunnable: TraceContextAwareRunnable = null + + + /** + * Pointcuts + */ + + @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..))") + def instrumentedRunnableCreation(): Unit = {} + + @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable.run())") + def runnableExecution() = {} + + + /** + * Aspect members + */ + + private val traceContext = Kamon.context + + + /** + * Advices + */ + import kamon.TraceContextSwap.withContext + + @Before("instrumentedRunnableCreation()") + def beforeCreation = { + //println((new Throwable).getStackTraceString) + } + + + @Around("runnableExecution()") + def around(pjp: ProceedingJoinPoint) = { + import pjp._ + + withContext(traceContext, proceed()) + } + +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala new file mode 100644 index 00000000..74261403 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala @@ -0,0 +1,49 @@ +package kamon.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} + +class ActorCage(val name: String, val size: Int) { + + def doIt: Unit = println("name") +} + +trait CageMonitoring { + def histogram: Histogram + def count(value: Int): Unit +} + +class CageMonitoringImp extends CageMonitoring{ + final val histogram = new Histogram(new ExponentiallyDecayingReservoir()) + + def count(value: Int) = histogram.update(value) + +} + + +@Aspect +class InceptionAspect { + + @DeclareMixin("kamon.instrumentation.ActorCage") + def mixin: CageMonitoring = new CageMonitoringImp + + + @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)") + def theActorCageDidIt(actorCage: CageMonitoring) = {} + + @After("theActorCageDidIt(actorCage)") + def afterDoingIt(actorCage: CageMonitoring) = { + actorCage.count(1) + actorCage.histogram.getSnapshot.dump(System.out) + } + + + +} + + +object Runner extends App { + val cage = new ActorCage("ivan", 10) + + cage.doIt +} diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala new file mode 100644 index 00000000..54a13f39 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala @@ -0,0 +1,67 @@ +package kamon.metric + +import java.util.concurrent.{ThreadPoolExecutor, ExecutorService} +import scala.concurrent.forkjoin.ForkJoinPool +import com.codahale.metrics.{Metric, MetricFilter} + +object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { + + def register(fullName: String, executorService: ExecutorService) = executorService match { + case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp) + case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe) + case _ => // If it is a unknown Executor then just do nothing. + } + + def deregister(fullName: String) = { + Metrics.registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } +} + + +trait ForkJoinPoolMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + + def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { + val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ + + val allMetrics = Map( + fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), + fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), + fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount) + ) + + allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } + } +} + +trait ThreadPoolExecutorMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = { + val tpeGauge = newNumericGaugeFor(tpe) _ + + val allMetrics = Map( + fullName + queueSize -> tpeGauge(_.getQueue.size()), + fullName + poolSize -> tpeGauge(_.getPoolSize), + fullName + activeThreads -> tpeGauge(_.getActiveCount) + ) + + allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } + } +} + + +object BasicExecutorMetricNames { + val queueSize = "queueSize" + val poolSize = "threads/poolSize" + val activeThreads = "threads/activeThreads" +} + + + + diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala new file mode 100644 index 00000000..30635432 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala @@ -0,0 +1,12 @@ +package kamon.metric + +import com.codahale.metrics.Gauge + +trait GaugeGenerator { + + def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] { + def getValue: V = generator(target) + } +} + +object GaugeGenerator extends GaugeGenerator diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala new file mode 100644 index 00000000..fb117968 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala @@ -0,0 +1,6 @@ +package kamon.metric + +object MetricFilter { + def actorSystem(system: String): Boolean = !system.startsWith("kamon") + def actor(path: String, system: String): Boolean = true +} diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala new file mode 100644 index 00000000..cdc0a334 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala @@ -0,0 +1,146 @@ +package kamon.metric + +import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit} +import akka.actor.ActorRef +import com.codahale.metrics +import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry} + + +object Metrics { + val registry: MetricRegistry = new MetricRegistry + + val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS) + //consoleReporter.build().start(45, TimeUnit.SECONDS) + + //val newrelicReporter = NewRelicReporter(registry) + //newrelicReporter.start(5, TimeUnit.SECONDS) + + def include(name: String, metric: Metric) = { + //registry.register(name, metric) + } + + def exclude(name: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(name) + }) + } + + + + def deregister(fullName: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } +} + +object Watched { + case object Actor + case object Dispatcher +} + +object MetricDirectory { + def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" + + def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox" + + def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/") + + def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon") + + + def shouldInstrumentActor(actorPath: String): Boolean = { + !(actorPath.isEmpty || actorPath.startsWith("system")) + } + + +} + + + + + + + + + + + + +case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram) + + + + +trait Histogram { + def update(value: Long): Unit + def snapshot: HistogramSnapshot +} + +trait HistogramSnapshot { + def median: Double + def max: Double + def min: Double +} + + +case class ActorSystemMetrics(actorSystemName: String) { + import scala.collection.JavaConverters._ + val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala + + private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) + + def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = { + val stats = createDispatcherCollector + dispatchers.put(dispatcherName, stats) + Some(stats) + } + +} + + +case class CodahaleHistogram() extends Histogram { + private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir()) + + def update(value: Long) = histogram.update(value) + def snapshot: HistogramSnapshot = { + val snapshot = histogram.getSnapshot + + CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin) + } +} + +case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot + + + + + + + +/** + * Dispatcher Metrics that we care about currently with a histogram-like nature: + * - Work Queue Size + * - Total/Active Thread Count + */ + + + +import annotation.tailrec +import java.util.concurrent.atomic.AtomicReference + +object Atomic { + def apply[T]( obj : T) = new Atomic(new AtomicReference(obj)) + implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref) +} + +class Atomic[T](val atomic : AtomicReference[T]) { + @tailrec + final def update(f: T => T) : T = { + val oldValue = atomic.get() + val newValue = f(oldValue) + if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f) + } + + def get() = atomic.get() +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala new file mode 100644 index 00000000..5b4ceaf4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala @@ -0,0 +1,51 @@ +package kamon.metric + +import com.codahale.metrics._ + +object MetricsUtils { + + def markMeter[T](meter:Meter)(f: => T): T = { + meter.mark() + f + } +// +// def incrementCounter(key: String) { +// counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count +// } +// +// def markMeter(key: String) { +// meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark() +// } +// +// def trace[T](key: String)(f: => T): T = { +// val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) ) +// timer.time(f) +// } + +// def markAndCountMeter[T](key: String)(f: => T): T = { +// markMeter(key) +// f +// } +// +// def traceAndCount[T](key: String)(f: => T): T = { +// incrementCounter(key) +// trace(key) { +// f +// } + //} + +// implicit def runnable(f: () => Unit): Runnable = +// new Runnable() { def run() = f() } +// +// +// import java.util.concurrent.Callable +// +// implicit def callable[T](f: () => T): Callable[T] = +// new Callable[T]() { def call() = f() } + +// private val actorCounter:Counter = new Counter +// private val actorTimer:Timer = new Timer +// +// metricsRegistry.register(s"counter-for-${actorName}", actorCounter) +// metricsRegistry.register(s"timer-for-${actorName}", actorTimer) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala new file mode 100644 index 00000000..70f3e54a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala @@ -0,0 +1,51 @@ +package kamon.metric + +import com.codahale.metrics +import metrics._ +import java.util.concurrent.TimeUnit +import java.util +import com.newrelic.api.agent.NewRelic +import scala.collection.JavaConverters._ + + +class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) { + + + + private[NewRelicReporter] def processMeter(name: String, meter: Meter) { + NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat) + } + + private[NewRelicReporter] def processCounter(name:String, counter:Counter) { + println(s"Logging to NewRelic: ${counter.getCount}") + + } + + +/* def processGauge(name: String, gauge: Gauge[_]) = { + println(s"the value is: "+gauge.getValue) + NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float]) + }*/ + + + def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { + //Process Meters + meters.asScala.map{case(name, meter) => processMeter(name, meter)} + + //Process Meters + counters.asScala.map{case(name, counter) => processCounter(name, counter)} + + // Gauges + gauges.asScala.foreach{ case (name, gauge) => { + val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue() + val fullMetricName = "Custom" + name + NewRelic.recordMetric(fullMetricName, measure) + }} + } + + +} + +object NewRelicReporter { + def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/spraytest/ClientTest.scala b/kamon-core/src/main/scala/spraytest/ClientTest.scala new file mode 100644 index 00000000..07532d0a --- /dev/null +++ b/kamon-core/src/main/scala/spraytest/ClientTest.scala @@ -0,0 +1,55 @@ +package spraytest + +import akka.actor.ActorSystem +import spray.client.pipelining._ +import spray.httpx.SprayJsonSupport +import spray.json._ +import scala.concurrent.Future +import spray.can.Http +import akka.io.IO + +/** + * BEGIN JSON Infrastructure + */ +case class Container(data: List[PointOfInterest]) +case class Geolocation(latitude: Float, longitude: Float) +case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation) + +object GeoJsonProtocol extends DefaultJsonProtocol { + implicit val geolocationFormat = jsonFormat2(Geolocation) + implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest) + implicit val containerFormat = jsonFormat1(Container) +} +/** END-OF JSON Infrastructure */ + + + + + + +class ClientTest extends App { + implicit val actorSystem = ActorSystem("spray-client-test") + import actorSystem.dispatcher + + + import GeoJsonProtocol._ + import SprayJsonSupport._ + + + val actor = IO(Http) + + val pipeline = sendReceive ~> unmarshal[Container] + + val response = pipeline { + Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco") + } onSuccess { + case a => { + println(a) + } + } +} + + + + + diff --git a/kamon-core/src/main/scala/spraytest/FutureTesting.scala b/kamon-core/src/main/scala/spraytest/FutureTesting.scala new file mode 100644 index 00000000..b864d6d6 --- /dev/null +++ b/kamon-core/src/main/scala/spraytest/FutureTesting.scala @@ -0,0 +1,81 @@ +package spraytest +/* +import akka.actor.ActorSystem +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Try, Success} +import kamon.actor.TransactionContext + +object FutureTesting extends App { + + val actorSystem = ActorSystem("future-testing") + implicit val ec = actorSystem.dispatcher + implicit val tctx = TransactionContext(11, Nil) + + threadPrintln("In the initial Thread") + + + val f = TraceableFuture { + threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}") + } + + f.onComplete({ + case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}") + }) + + f.onComplete({ + case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}") + }) + + + + + + + + + def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]") + +} + + + + +trait TransactionContextWrapper { + def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = { + TransactionContext.current.set(tranContext.fork) + println(s"SetContext to: ${tranContext}") + val result = f + + TransactionContext.current.remove() + result + } + +} + +class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper { + def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = { + future.onComplete(wrap(func, transactionContext)) + } +} + +object TraceableFuture { + + implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future + + def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = { + val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil)) + + new TraceableFuture(Future { wrappedBody }) + } + + + + + def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = { + TransactionContext.current.set(transactionContext) + val result = body + TransactionContext.current.remove() + result + } +}*/ + diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala new file mode 100644 index 00000000..f9d6869c --- /dev/null +++ b/kamon-core/src/main/scala/test/PingPong.scala @@ -0,0 +1,34 @@ +package test + +import akka.actor.{Props, Actor, ActorSystem} + +object PingPong extends App { + + val as = ActorSystem("ping-pong") + + val pinger = as.actorOf(Props[Pinger]) + val ponger = as.actorOf(Props[Ponger]) + + pinger.tell(Pong, ponger) + + + Thread.sleep(30000) + as.shutdown() + + +} + +case object Ping +case object Pong + +class Pinger extends Actor { + def receive = { + case Pong => sender ! Ping + } +} + +class Ponger extends Actor { + def receive = { + case Ping => sender ! Pong + } +} diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala new file mode 100644 index 00000000..0026d953 --- /dev/null +++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala @@ -0,0 +1,45 @@ +package akka.instrumentation + +import org.scalatest.{WordSpecLike, Matchers} +import akka.actor.{Actor, Props, ActorSystem} + +import akka.testkit.{ImplicitSender, TestKit} +import kamon.{TraceContext, Kamon} + + +class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender { + + "an instrumented actor ref" when { + "used inside the context of a transaction" should { + "propagate the trace context using bang" in new TraceContextEchoFixture { + echo ! "test" + + expectMsg(Some(testTraceContext)) + } + + "propagate the trace context using tell" in { + + } + + "propagate the trace context using ask" in { + + } + } + } + + trait TraceContextEchoFixture { + val testTraceContext = Kamon.newTraceContext() + val echo = system.actorOf(Props[TraceContextEcho]) + + Kamon.set(testTraceContext) + } + +} + +class TraceContextEcho extends Actor { + def receive = { + case msg ⇒ sender ! Kamon.context() + } +} + + diff --git a/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala new file mode 100644 index 00000000..1eab6355 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala @@ -0,0 +1,22 @@ +package kamon.instrumentation + +import org.scalatest.{Matchers, WordSpec} +import akka.actor.ActorSystem +import kamon.Kamon + +class ActorSystemInstrumentationSpec extends WordSpec with Matchers { + + // TODO: Selection filters to exclude unwanted actor systems. Read from configuration. + + "the actor system instrumentation" should { + "register all actor systems created" in { + val as1 = ActorSystem("as1") + val as2 = ActorSystem("as2") + + + Kamon.Metric.actorSystem("as1") should not be (None) + Kamon.Metric.actorSystem("as2") should not be (None) + Kamon.Metric.actorSystem("unknown") should be (None) + } + } +} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala new file mode 100644 index 00000000..89ef61f3 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala @@ -0,0 +1,34 @@ +package kamon.instrumentation + +import org.scalatest.{Matchers, WordSpec} +import akka.actor.{Actor, Props, ActorSystem} +import kamon.metric.MetricDirectory +import kamon.Kamon + +class DispatcherInstrumentationSpec extends WordSpec with Matchers{ + + + "the dispatcher instrumentation" should { + "instrument a dispatcher that belongs to a non-filtered actor system" in new SingleDispatcherActorSystem { + val x = Kamon.Metric.actorSystem("single-dispatcher").get.dispatchers + (1 to 10).foreach(actor ! _) + + val active = x.get("akka.actor.default-dispatcher").get.activeThreadCount.snapshot + println("Active max: "+active.max) + println("Active min: "+active.min) + + } + } + + + trait SingleDispatcherActorSystem { + val actorSystem = ActorSystem("single-dispatcher") + val actor = actorSystem.actorOf(Props(new Actor { + def receive = { + case a => sender ! a; + } + })) + + } +} + diff --git a/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala new file mode 100644 index 00000000..cc55ec92 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala @@ -0,0 +1,53 @@ +package kamon.instrumentation + +import org.scalatest.WordSpec +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import java.util.concurrent.ConcurrentLinkedQueue +import akka.dispatch.{UnboundedMessageQueueSemantics, QueueBasedMessageQueue, Envelope} +import java.util.Queue +import akka.actor.{ActorSystem, Actor} + +class MessageQueueInstrumentationSpec(val actorSystem: ActorSystem) extends WordSpec { + def this() = this(ActorSystem("MessageQueueInstrumentationSpec")) + + + /*"A MonitoredMessageQueue" should { + "update the related histogram when a message is enqueued" in { + new PopulatedMessageQueueFixture { + + assert(histogram.getSnapshot.getMax === 0) + + for(i <- 1 to 3) { enqueueDummyMessage } + + assert(histogram.getCount === 3) + assert(histogram.getSnapshot.getMax === 3) + assert(histogram.getSnapshot.getMin === 1) + } + } + + "update the related histogram when a message is dequeued" in { + new PopulatedMessageQueueFixture { + for(i <- 1 to 3) { enqueueDummyMessage } + assert(histogram.getSnapshot.getMax === 3) + + messageQueue.dequeue() + messageQueue.dequeue() + + assert(histogram.getCount === 5) + assert(histogram.getSnapshot.getMax === 3) + assert(histogram.getSnapshot.getMin === 1) + } + } + } + + trait PopulatedMessageQueueFixture { + + val histogram = new Histogram(new ExponentiallyDecayingReservoir()) +/* val delegate = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final def queue: Queue[Envelope] = this + }*/ + val messageQueue = new MonitoredMessageQueue(delegate, histogram) + + def enqueueDummyMessage = messageQueue.enqueue(Actor.noSender, Envelope("", Actor.noSender, actorSystem)) + }*/ +} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala new file mode 100644 index 00000000..de65aaca --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala @@ -0,0 +1,82 @@ +package kamon.instrumentation + +import scala.concurrent.{Await, Promise, Future} +import org.scalatest.{Matchers, OptionValues, WordSpec} +import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration} +import kamon.{Kamon, TraceContext} +import java.util.UUID +import scala.util.Success +import scala.concurrent.duration._ +import java.util.concurrent.TimeUnit +import akka.actor.ActorSystem + + +class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { + + "a instrumented runnable" when { + "created in a thread that does have a TraceContext" must { + "preserve the TraceContext" which { + "should be available during the run method execution" in { new FutureWithContextFixture { + + whenReady(futureWithContext) { result => + result.value should equal(testContext) + } + }} + + "should be available during the execution of onComplete callbacks" in { new FutureWithContextFixture { + val onCompleteContext = Promise[TraceContext]() + + futureWithContext.onComplete({ + case _ => onCompleteContext.complete(Success(Kamon.context.get)) + }) + + whenReady(onCompleteContext.future) { result => + result should equal(testContext) + } + }} + } + } + + "created in a thread that doest have a TraceContext" must { + "not capture any TraceContext for the body execution" in { new FutureWithoutContextFixture{ + + whenReady(futureWithoutContext) { result => + result should equal(None) + } + }} + + "not make any TraceContext available during the onComplete callback" in { new FutureWithoutContextFixture { + val onCompleteContext = Promise[Option[TraceContext]]() + + futureWithoutContext.onComplete({ + case _ => onCompleteContext.complete(Success(Kamon.context)) + }) + + whenReady(onCompleteContext.future) { result => + result should equal(None) + } + }} + } + } + + + /** + * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have. + */ + implicit val testActorSystem = ActorSystem("test-actorsystem") + implicit val execContext = testActorSystem.dispatcher + + class FutureWithContextFixture { + val testContext = TraceContext() + Kamon.set(testContext) + + val futureWithContext = Future { Kamon.context} + } + + trait FutureWithoutContextFixture { + Kamon.clear // Make sure no TraceContext is available + val futureWithoutContext = Future { Kamon.context } + } +} + + -- cgit v1.2.3