diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-08-07 19:06:33 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-08-07 19:06:33 -0300 |
commit | 923b88e8adef2f66b43e551fa4a0a1bbae5af7ff (patch) | |
tree | d555199f0c63b690ec51805b496ee2d54eb014da /src | |
parent | 1e6665e30d96772eab92aca4d23e176adcd88dc5 (diff) | |
download | Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.gz Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.bz2 Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.zip |
upgrading to akka 2.2
Diffstat (limited to 'src')
28 files changed, 0 insertions, 1941 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml deleted file mode 100644 index e6d61fa1..00000000 --- a/src/main/resources/META-INF/aop.xml +++ /dev/null @@ -1,34 +0,0 @@ -<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> - -<aspectj> - <weaver options="-verbose -showWeaveInfo"> - <!--<dump within="*" beforeandafter="true"/>--> - </weaver> - - <aspects> - - <aspect name="kamon.instrumentation.ActorRefTellInstrumentation"/> - <aspect name="kamon.instrumentation.ActorCellInvokeInstrumentation"/> - <aspect name="kamon.instrumentation.RunnableInstrumentation" /> - <aspect name="kamon.instrumentation.MessageQueueInstrumentation" /> - - <aspect name="kamon.instrumentation.InceptionAspect"/> - - <!-- ExecutorService Instrumentation for Akka. --> -<!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/> - <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>--> - <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/> - <aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/> - - - - <include within="*"/> - <exclude within="javax..*"/> - <exclude within="org.aspectj..*"/> - <exclude within="scala..*"/> - <exclude within="scalaz..*"/> - <exclude within="scalad..*"/> - <exclude within="play..*"/> - </aspects> - -</aspectj> diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf deleted file mode 100644 index 370acae9..00000000 --- a/src/main/resources/application.conf +++ /dev/null @@ -1,47 +0,0 @@ -akka { - actor { - default-dispatcher { - fork-join-executor { - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 2 - - # The parallelism factor is used to determine thread pool size using the - # following formula: ceil(available processors * factor). Resulting size - # is then bounded by the parallelism-min and parallelism-max values. - parallelism-factor = 3.0 - - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 8 - } - - throughput = 100 - } - } -} - -# Dispatcher is the name of the event-based dispatcher -#type = Dispatcher - -# What kind of ExecutionService to use -#executor = "kamon.executor.InstrumentedExecutorServiceConfigurator" - -# Min number of threads to cap factor-based parallelism number to -#parallelism-min = 2 - -# Parallelism (threads) ... ceil(available processors * factor) -#parallelism-factor = 2.0 - -# Max number of threads to cap factor-based parallelism number to -#parallelism-max = 10 - -# Throughput defines the maximum number of messages to be -# processed per actor before the thread jumps to the next actor. -# Set to 1 for as fair as possible. -#throughput = 100 - - - - - - - diff --git a/src/main/resources/newrelic.yml b/src/main/resources/newrelic.yml deleted file mode 100644 index 1b1ad53b..00000000 --- a/src/main/resources/newrelic.yml +++ /dev/null @@ -1,242 +0,0 @@ -# -# This file configures the New Relic Agent. New Relic monitors -# Java applications with deep visibility and low overhead. For more -# information, visit www.newrelic.com. -# -# This configuration file is custom generated for Ivan Topolnak - ivantopo@gmail.com -# -# This section is for settings common to all environments. -# Do not add anything above this next line. -common: &default_settings - # - # ============================== LICENSE KEY =============================== - - # You must specify the license key associated with your New Relic - # account. This key binds your Agent's data to your account in the - # New Relic service. - license_key: '2e24765acb032cb9e7207013b5ba3e2ab7d2d75c' - - # Agent Enabled - # Use this setting to force the agent to run or not run. - # Default is true. - # agent_enabled: true - - # Set to true to enable support for auto app naming. - # The name of each web app is detected automatically - # and the agent reports data separately for each one. - # This provides a finer-grained performance breakdown for - # web apps in New Relic. - # Default is false. - enable_auto_app_naming: false - - # Set to true to enable component-based transaction naming. - # Set to false to use the URI of a web request as the name of the transaction. - # Default is true. - enable_auto_transaction_naming: true - - # Set the name of your application as you'd like it show up in New Relic. - # if enable_auto_app_naming is false, the agent reports all data to this application. - # Otherwise, the agent reports only background tasks (transactions for non-web applications) to this application. - # To report data to more than one application, separate the application names with ";". - # For example, to report data to"My Application" and "My Application 2" use this: - # app_name: My Application;My Application 2 - # This setting is required. - app_name: My Application - - # The agent uses its own log file to keep its logging - # separate from that of your application. Specify the log level here. - # This setting is dynamic, so changes do not require restarting your application. - # The levels in increasing order of verboseness are: off, severe, warning, info, fine, finer, finest - # Default is info. - log_level: finest - enable_custom_tracing: true - - # Log all data to and from New Relic in plain text. - # This setting is dynamic, so changes do not require restarting your application. - # Default is false. - #audit_mode: true - - # The number of log files to use. - # Default is 1. - #log_file_count: 1 - - # The maximum number of bytes to write to any one log file. - # Default is 0 (no limit). - #log_limit_in_kbytes: 0 - - # The name of the log file. - # Default is newrelic_agent.log. - #log_file_name: newrelic_agent.log - - # The log file directory. - # Default is the logs directory in the newrelic.jar parent directory. - log_file_path: /home/ivantopo/Desktop/tmp - - # The agent communicates with New Relic via https by - # default. If you want to communicate with newrelic via http, - # then turn off SSL by setting this value to false. - # This work is done asynchronously to the threads that process your - # application code, so response times will not be directly affected - # by this change. - # Default is true. - ssl: true - - # Proxy settings for connecting to the New Relic server. - # - # If a proxy is used, the host setting is required. Other settings - # are optional. Default port is 8080. The username and password - # settings will be used to authenticate to Basic Auth challenges - # from a proxy server. - # - # proxy_host: hostname - # proxy_port: 8080 - # proxy_user: username - # proxy_password: password - - # Tells transaction tracer and error collector (when enabled) - # whether or not to capture HTTP params. When true, frameworks can - # exclude HTTP parameters from being captured. - # Default is false. - capture_params: false - - # Tells transaction tracer and error collector to not to collect - # specific http request parameters. - # ignored_params: credit_card, ssn, password - - # Transaction tracer captures deep information about slow - # transactions and sends this to the New Relic service once a - # minute. Included in the transaction is the exact call sequence of - # the transactions including any SQL statements issued. - transaction_tracer: - - # Transaction tracer is enabled by default. Set this to false to - # turn it off. This feature is only available at the higher product levels. - # Default is true. - enabled: true - - # Threshold in seconds for when to collect a transaction - # trace. When the response time of a controller action exceeds - # this threshold, a transaction trace will be recorded and sent to - # New Relic. Valid values are any float value, or (default) "apdex_f", - # which will use the threshold for the "Frustrated" Apdex level - # (greater than four times the apdex_t value). - # Default is apdex_f. - transaction_threshold: apdex_f - - # When transaction tracer is on, SQL statements can optionally be - # recorded. The recorder has three modes, "off" which sends no - # SQL, "raw" which sends the SQL statement in its original form, - # and "obfuscated", which strips out numeric and string literals. - # Default is obfuscated. - record_sql: obfuscated - - # Obfuscate only occurrences of specific SQL fields names. - # This setting only applies if "record_sql" is set to "raw". - #obfuscated_sql_fields: credit_card, ssn, password - - # Set this to true to log SQL statements instead of recording them. - # SQL is logged using the record_sql mode. - # Default is false. - log_sql: false - - # Threshold in seconds for when to collect stack trace for a SQL - # call. In other words, when SQL statements exceed this threshold, - # then capture and send to New Relic the current stack trace. This is - # helpful for pinpointing where long SQL calls originate from. - # Default is 0.5 seconds. - stack_trace_threshold: 0.5 - - # Determines whether the agent will capture query plans for slow - # SQL queries. Only supported for MySQL and PostgreSQL. - # Default is true. - explain_enabled: true - - # Threshold for query execution time below which query plans will not - # not be captured. Relevant only when `explain_enabled` is true. - # Default is 0.5 seconds. - explain_threshold: 0.5 - - # Use this setting to control the variety of transaction traces. - # The higher the setting, the greater the variety. - # Set this to 0 to always report the slowest transaction trace. - # Default is 20. - top_n: 20 - - - # Error collector captures information about uncaught exceptions and - # sends them to New Relic for viewing - error_collector: - - # Error collector is enabled by default. Set this to false to turn - # it off. This feature is only available at the higher product levels. - # Default is true. - enabled: true - - # To stop specific exceptions from reporting to New Relic, set this property - # to a comma separated list of full class names. - # - # ignore_errors: - - # To stop specific http status codes from being reporting to New Relic as errors, - # set this property to a comma separated list of status codes to ignore. - # When this property is commented out it defaults to ignoring 404s. - # - # ignore_status_codes: 404 - - # Cross Application Tracing adds request and response headers to - # external calls using the Apache HttpClient libraries to provided better - # performance data when calling applications monitored by other New Relic Agents. - # - cross_application_tracer: - # Set to true to enable cross application tracing. - # Default is true. - enabled: true - - # Thread profiler measures wall clock time, CPU time, and method call counts - # in your application's threads as they run. - thread_profiler: - - # Set to false to disable the thread profiler. - # Default is true. - enabled: true - - #============================== Browser Monitoring =============================== - # New Relic Real User Monitoring gives you insight into the performance real users are - # experiencing with your website. This is accomplished by measuring the time it takes for - # your users' browsers to download and render your web pages by injecting a small amount - # of JavaScript code into the header and footer of each page. - browser_monitoring: - # By default the agent automatically inserts API calls in compiled JSPs to - # inject the monitoring JavaScript into web pages. - # Set this attribute to false to turn off this behavior. - auto_instrument: true - # Set this attribute to false to prevent injection of the monitoring JavaScript. - # Default is true. - enabled: true - -# Application Environments -# ------------------------------------------ -# Environment specific settings are in this section. -# You can use the environment to override the default settings. -# For example, to change the app_name setting. -# Use -Dnewrelic.environment=<environment> on the Java command line -# to set the environment. -# The default environment is production. - -# NOTE if your application has other named environments, you should -# provide configuration settings for these environments here. - -development: - <<: *default_settings - app_name: KAMON[Development] - -test: - <<: *default_settings - app_name: My Application (Test) - -production: - <<: *default_settings - -staging: - <<: *default_settings - app_name: My Application (Staging)
\ No newline at end of file diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala deleted file mode 100644 index 5a1382a4..00000000 --- a/src/main/scala/kamon/Kamon.scala +++ /dev/null @@ -1,132 +0,0 @@ -package kamon - -import akka.actor.{Actor, Props, ActorSystem} -import scala.collection.JavaConverters._ -import java.util.concurrent.ConcurrentHashMap -import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics} -import scala.concurrent.duration.{FiniteDuration, Duration} -import com.newrelic.api.agent.NewRelic - -object Kamon { - - val ctx = new ThreadLocal[Option[TraceContext]] { - override def initialValue() = None - } - - implicit lazy val actorSystem = ActorSystem("kamon") - - - def context() = ctx.get() - def clear = ctx.remove() - def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - - def start = set(newTraceContext) - def stop = ctx.get match { - case Some(context) => context.close - case None => - } - - def newTraceContext(): TraceContext = TraceContext() - - - val publisher = actorSystem.actorOf(Props[TransactionPublisher]) - - def publish(tx: FullTransaction) = publisher ! tx - - - - object Metric { - val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala - - def actorSystemNames: List[String] = actorSystems.keys.toList - def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name)) - - def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) - } - - - - val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") - val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") - -} - - - - - - - - - -object Tracer { - val ctx = new ThreadLocal[Option[TraceContext]] { - override def initialValue() = None - } - - def context() = ctx.get() - def clear = ctx.remove() - def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - - def start = ??? //set(newTraceContext) - def stop = ctx.get match { - case Some(context) => context.close - case None => - } - - //def newTraceContext(): TraceContext = TraceContext() -} - - -class MetricManager extends Actor { - implicit val ec = context.system.dispatcher - - def receive = { - case RegisterForAllDispatchers(frequency) => { - val subscriber = sender - context.system.scheduler.schedule(frequency, frequency) { - Kamon.Metric.actorSystems.foreach { - case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach { - case (dispatcherName, dispatcherMetrics) => { - val activeThreads = dispatcherMetrics.activeThreadCount.snapshot - val poolSize = dispatcherMetrics.poolSize.snapshot - val queueSize = dispatcherMetrics.queueSize.snapshot - - subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize) - - } - } - } - } - } - } -} - -case class RegisterForAllDispatchers(frequency: FiniteDuration) -case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot) - - - - - - -class NewrelicReporterActor extends Actor { - import scala.concurrent.duration._ - - Kamon.metricManager ! RegisterForAllDispatchers(5 seconds) - - def receive = { - case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => { - println("PUBLISHED DISPATCHER STATS") - println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat) - println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat)) - println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat) - - - NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat) - NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat)) - - NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat) - } - } -}
\ No newline at end of file diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala deleted file mode 100644 index 6b32550f..00000000 --- a/src/main/scala/kamon/TraceContext.scala +++ /dev/null @@ -1,67 +0,0 @@ -package kamon - -import java.util.UUID -import akka.actor.{ActorSystem, ActorPath} -import akka.agent.Agent -import java.util.concurrent.TimeUnit -import scala.util.{Failure, Success} -import akka.util.Timeout - - -case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) { - implicit val timeout = Timeout(30, TimeUnit.SECONDS) - implicit val as = Kamon.actorSystem.dispatcher - - def append(entry: TraceEntry) = entries send (entry :: _) - def close = entries.future.onComplete({ - case Success(list) => Kamon.publish(FullTransaction(id, list)) - case Failure(t) => println("WTF!") - }) -} - -object TraceContext { - implicit val as2 = Kamon.actorSystem.dispatcher - def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) -} - - - -trait TraceEntry - -case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry - - - -case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry]) - - - - - -object Collector { - -} - -trait TraceEntryStorage { - def store(entry: TraceEntry): Boolean -} - -class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) { - def store(entry: TraceEntry) = storage.store(entry) -} - -object ThreadLocalTraceEntryStorage extends TraceEntryStorage { - - private val storage = new ThreadLocal[List[TraceEntry]] { - override def initialValue(): List[TraceEntry] = Nil - } - - def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get) - - def store(entry: TraceEntry): Boolean = { - update(entry :: _) - true - } -} - - diff --git a/src/main/scala/kamon/TraceContextSwap.scala b/src/main/scala/kamon/TraceContextSwap.scala deleted file mode 100644 index 68ee808b..00000000 --- a/src/main/scala/kamon/TraceContextSwap.scala +++ /dev/null @@ -1,26 +0,0 @@ -package kamon - -/** - * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards. - */ -trait TraceContextSwap { - - def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body) - - def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = { - ctx match { - case Some(context) => { - Kamon.set(context) - val bodyResult = primary - Kamon.clear - - bodyResult - } - case None => fallback - } - - } - -} - -object TraceContextSwap extends TraceContextSwap diff --git a/src/main/scala/kamon/TransactionPublisher.scala b/src/main/scala/kamon/TransactionPublisher.scala deleted file mode 100644 index 0626b91d..00000000 --- a/src/main/scala/kamon/TransactionPublisher.scala +++ /dev/null @@ -1,15 +0,0 @@ -package kamon - -import akka.actor.Actor -import java.util.UUID - -class TransactionPublisher extends Actor { - - def receive = { - case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries") - } - -} - - -case class FullTransaction(id: UUID, entries: List[TraceEntry]) diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala deleted file mode 100644 index 599f2a7a..00000000 --- a/src/main/scala/kamon/executor/eventbus.scala +++ /dev/null @@ -1,103 +0,0 @@ -package kamon.executor - -import akka.event.ActorEventBus -import akka.event.LookupClassification -import akka.actor._ -import java.util.concurrent.TimeUnit - -import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} -import akka.util.Timeout -import scala.util.{Random, Success, Failure} -import scala.concurrent.Future - -trait Message - -case class PostMessage(text:String) extends Message - -case class MessageEvent(val channel:String, val message:Message) - -class AppActorEventBus extends ActorEventBus with LookupClassification{ - type Event = MessageEvent - type Classifier=String - protected def mapSize(): Int={ - 10 - } - - protected def classify(event: Event): Classifier={ - event.channel - } - - protected def publish(event: Event, subscriber: Subscriber): Unit={ - subscriber ! event - } -} -case class Ping() -case class Pong() - -class PingActor extends Actor with ActorLogging { - - val pong = context.actorOf(Props[PongActor]) - val random = new Random() - def receive = { - case Pong() => { - //Thread.sleep(random.nextInt(2000)) - //log.info("Message from Ping") - pong ! Ping() - } - } -} - -class PongActor extends Actor with ActorLogging { - def receive = { - case Ping() => { - sender ! Pong() - } - } -} - - -object TryAkka extends App{ - val system = ActorSystem("MySystem") - val appActorEventBus=new AppActorEventBus - val NEW_POST_CHANNEL="/posts/new" - val subscriber = system.actorOf(Props(new Actor { - def receive = { - case d: MessageEvent => println(d) - } - })) - - Kamon.start - for(i <- 1 to 4) { - val ping = system.actorOf(Props[PingActor]) - ping ! Pong() - } - - - def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") - - /* - val newRelicReporter = new NewRelicReporter(registry) - newRelicReporter.start(1, TimeUnit.SECONDS) - -*/ - import akka.pattern.ask - implicit val timeout = Timeout(10, TimeUnit.SECONDS) - implicit def execContext = system.dispatcher - - - - Kamon.start - - Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) - threadPrintln("Before doing it") - val f = Future { threadPrintln("This is happening inside the future body") } - - Kamon.stop - - - //Thread.sleep(3000) - //system.shutdown() - -/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) - appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ -}
\ No newline at end of file diff --git a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala deleted file mode 100644 index 7398a2bd..00000000 --- a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ /dev/null @@ -1,89 +0,0 @@ -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Kamon, TraceContext} -import akka.dispatch.{MessageDispatcher, Envelope} -import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} -import kamon.metric.{MetricDirectory, Metrics} -import com.codahale.metrics -import kamon.instrumentation.TraceableMessage -import scala.Some - -case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) - - -@Aspect -class ActorRefTellInstrumentation { - import ProceedingJoinPointPimp._ - - @Pointcut("execution(* akka.actor.LocalActorRef+.$bang(..)) && target(actor) && args(message, sender)") - def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} - - @Around("sendingMessageToActorRef(actor, message, sender)") - def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { - - val actorName = MetricDirectory.nameForActor(actor) - val t = Metrics.registry.timer(actorName + "LATENCY") - //println(s"About to proceed with: $actor $message $sender") - pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender) - } -} - - -@Aspect("perthis(actorCellCreation(..))") -class ActorCellInvokeInstrumentation { - - var processingTimeTimer: Timer = _ - var shouldTrack = false - - // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @After("actorCellCreation(system, ref, props, dispatcher, parent)") - def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - val actorName = MetricDirectory.nameForActor(ref) - val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) - - /** TODO: Find a better way to filter the things we don't want to measure. */ - //if(system.name != "kamon" && actorName.startsWith("/user")) { - processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") - shouldTrack = true - //} - } - - - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") - def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} - - - @Around("invokingActorBehaviourAtActorCell(envelope)") - def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - import ProceedingJoinPointPimp._ - //println("ENVELOPE --------------------->"+envelope) - envelope match { - case Envelope(TraceableMessage(ctx, msg, timer), sender) => { - timer.stop() - - val originalEnvelope = envelope.copy(message = msg) - - //println("PROCESSING TIME TIMER: "+processingTimeTimer) - val pt = processingTimeTimer.time() - ctx match { - case Some(c) => { - Kamon.set(c) - //println("ENVELOPE ORIGINAL:---------------->"+originalEnvelope) - pjp.proceedWith(originalEnvelope) - Kamon.clear - } - case None => pjp.proceedWith(originalEnvelope) - } - pt.stop() - } - case _ => pjp.proceed - } - } -} diff --git a/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/src/main/scala/kamon/instrumentation/AspectJPimps.scala deleted file mode 100644 index 84c20c52..00000000 --- a/src/main/scala/kamon/instrumentation/AspectJPimps.scala +++ /dev/null @@ -1,23 +0,0 @@ -package kamon.instrumentation - -import org.aspectj.lang.ProceedingJoinPoint - -trait ProceedingJoinPointPimp { - import language.implicitConversions - - implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) -} - -object ProceedingJoinPointPimp extends ProceedingJoinPointPimp - -case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { - def proceedWith(newUniqueArg: AnyRef) = { - val args = pjp.getArgs - args.update(0, newUniqueArg) - pjp.proceed(args) - } - - def proceedWithTarget(args: AnyRef*) = { - pjp.proceed(args.toArray) - } -} diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala deleted file mode 100644 index b4f8a475..00000000 --- a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ /dev/null @@ -1,245 +0,0 @@ -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import java.util.concurrent._ -import org.aspectj.lang.ProceedingJoinPoint -import java.util -import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector} -import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} -import com.typesafe.config.Config -import kamon.Kamon -import scala.concurrent.forkjoin.ForkJoinPool -import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool - - -@Aspect -class ActorSystemInstrumentation { - - @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)") - def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {} - - @After("actorSystemInstantiation(name, applicationConfig, classLoader)") - def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = { - - Kamon.Metric.registerActorSystem(name) - } -} - -@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") -class ForkJoinPoolInstrumentation { - var activeThreadsHistogram: Histogram = _ - var poolSizeHistogram: Histogram = _ - - @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") - def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} - - @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)") - def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = { - val (actorSystemName, dispatcherName) = threadFactory match { - case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames) - case _ => ("Unknown", "Unknown") - } - - val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName) - for(m <- metrics) { - activeThreadsHistogram = m.activeThreadCount - poolSizeHistogram = m.poolSize - println(s"Registered $dispatcherName for actor system $actorSystemName") - } - } - - def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = { - knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown")) - } - - - - - @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)") - def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {} - - @After("forkJoinScan(fjp)") - def updateMetrics(fjp: AkkaForkJoinPool): Unit = { - activeThreadsHistogram.update(fjp.getActiveThreadCount) - poolSizeHistogram.update(fjp.getPoolSize) - } - - - -} - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -/** - * ExecutorService monitoring base: - */ -trait ExecutorServiceCollector { - def updateActiveThreadCount(diff: Int): Unit - def updateTotalThreadCount(diff: Int): Unit - def updateQueueSize(diff: Int): Unit -} - -trait WatchedExecutorService { - def collector: ExecutorServiceCollector -} - - - - - - - - - - - - - - -trait ExecutorServiceMonitoring { - def dispatcherMetrics: DispatcherMetricCollector -} - -class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { - @volatile var dispatcherMetrics: DispatcherMetricCollector = _ -} - - - - - - - - - - - - - - - - -case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { - def createExecutorService: ExecutorService = delegate.createExecutorService -} - -@Aspect -class ExecutorServiceFactoryProviderInstrumentation { - - @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()") - def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = { - true - } - - @Around("factoryMethodCall(dispatcherName, threadFactory)") - def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { - val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast - - val actorSystemName = threadFactory match { - case m: MonitorableThreadFactory => m.name - case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. - } - - new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate) - } - -} - - -@Aspect -class NamedExecutorServiceFactoryDelegateInstrumentation { - - @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)") - def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {} - - @Around("factoryMethodCall(namedFactory)") - def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { - val delegate = pjp.proceed().asInstanceOf[ExecutorService] - val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) - - ExecutorServiceMetricCollector.register(executorFullName, delegate) - - new NamedExecutorServiceDelegate(executorFullName, delegate) - } -} - -case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { - def shutdown() = { - ExecutorServiceMetricCollector.deregister(fullName) - delegate.shutdown() - } - def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() - def isShutdown: Boolean = delegate.isShutdown - def isTerminated: Boolean = delegate.isTerminated - def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit) - def submit[T](task: Callable[T]): Future[T] = delegate.submit(task) - def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result) - def submit(task: Runnable): Future[_] = delegate.submit(task) - def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks) - def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit) - def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) - def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) - def execute(command: Runnable) = delegate.execute(command) -} - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala deleted file mode 100644 index c21502ac..00000000 --- a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ /dev/null @@ -1,73 +0,0 @@ -package kamon.instrumentation - -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue} -import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} -import akka.actor.{ActorSystem, ActorRef} -import kamon.metric.{Metrics, MetricDirectory} -import org.aspectj.lang.ProceedingJoinPoint - - -/** - * For Mailboxes we would like to track the queue size and message latency. Currently the latency - * will be gathered from the ActorCellMetrics. - */ - - -@Aspect -class MessageQueueInstrumentation { - - @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") - def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} - - @Around("messageQueueCreation(owner, system)") - def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { - val delegate = pjp.proceed.asInstanceOf[MessageQueue] - - // We are not interested in monitoring mailboxes if we don't know where they belong to. - val monitoredMailbox = for(own <- owner; sys <- system) yield { - val systemName = sys.name - val ownerName = MetricDirectory.nameForActor(own) - val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) - - val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir()) - Metrics.include(mailBoxName, queueSizeHistogram) - - new MonitoredMessageQueue(delegate, queueSizeHistogram) - } - - monitoredMailbox match { - case None => delegate - case Some(mmb) => mmb - } - } -} - - -class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{ - - def enqueue(receiver: ActorRef, handle: Envelope) = { - delegate.enqueue(receiver, handle) - queueSizeHistogram.update(numberOfMessages) - } - - def dequeue(): Envelope = { - val envelope = delegate.dequeue() - queueSizeHistogram.update(numberOfMessages) - - envelope - } - - def numberOfMessages: Int = delegate.numberOfMessages - def hasMessages: Boolean = delegate.hasMessages - def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) -} - - - - - - - - - diff --git a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala deleted file mode 100644 index e75a638f..00000000 --- a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala +++ /dev/null @@ -1,61 +0,0 @@ -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import kamon.{Kamon, TraceContext} -import org.aspectj.lang.ProceedingJoinPoint -import scala.Some - -/** - * Marker interface, just to make sure we don't instrument all the Runnables in the classpath. - */ -trait TraceContextAwareRunnable extends Runnable {} - - -@Aspect("perthis(instrumentedRunnableCreation())") -class RunnableInstrumentation { - - /** - * These are the Runnables that need to be instrumented and make the TraceContext available - * while their run method is executed. - */ - @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") - def onCompleteCallbacksRunnable: TraceContextAwareRunnable = null - - - /** - * Pointcuts - */ - - @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..))") - def instrumentedRunnableCreation(): Unit = {} - - @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable.run())") - def runnableExecution() = {} - - - /** - * Aspect members - */ - - private val traceContext = Kamon.context - - - /** - * Advices - */ - import kamon.TraceContextSwap.withContext - - @Before("instrumentedRunnableCreation()") - def beforeCreation = { - //println((new Throwable).getStackTraceString) - } - - - @Around("runnableExecution()") - def around(pjp: ProceedingJoinPoint) = { - import pjp._ - - withContext(traceContext, proceed()) - } - -} diff --git a/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala deleted file mode 100644 index 74261403..00000000 --- a/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala +++ /dev/null @@ -1,49 +0,0 @@ -package kamon.instrumentation - -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} - -class ActorCage(val name: String, val size: Int) { - - def doIt: Unit = println("name") -} - -trait CageMonitoring { - def histogram: Histogram - def count(value: Int): Unit -} - -class CageMonitoringImp extends CageMonitoring{ - final val histogram = new Histogram(new ExponentiallyDecayingReservoir()) - - def count(value: Int) = histogram.update(value) - -} - - -@Aspect -class InceptionAspect { - - @DeclareMixin("kamon.instrumentation.ActorCage") - def mixin: CageMonitoring = new CageMonitoringImp - - - @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)") - def theActorCageDidIt(actorCage: CageMonitoring) = {} - - @After("theActorCageDidIt(actorCage)") - def afterDoingIt(actorCage: CageMonitoring) = { - actorCage.count(1) - actorCage.histogram.getSnapshot.dump(System.out) - } - - - -} - - -object Runner extends App { - val cage = new ActorCage("ivan", 10) - - cage.doIt -} diff --git a/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala deleted file mode 100644 index 54a13f39..00000000 --- a/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala +++ /dev/null @@ -1,67 +0,0 @@ -package kamon.metric - -import java.util.concurrent.{ThreadPoolExecutor, ExecutorService} -import scala.concurrent.forkjoin.ForkJoinPool -import com.codahale.metrics.{Metric, MetricFilter} - -object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { - - def register(fullName: String, executorService: ExecutorService) = executorService match { - case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp) - case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe) - case _ => // If it is a unknown Executor then just do nothing. - } - - def deregister(fullName: String) = { - Metrics.registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) - }) - } -} - - -trait ForkJoinPoolMetricCollector { - import GaugeGenerator._ - import BasicExecutorMetricNames._ - - - def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { - val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ - - val allMetrics = Map( - fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), - fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), - fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount) - ) - - allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } - } -} - -trait ThreadPoolExecutorMetricCollector { - import GaugeGenerator._ - import BasicExecutorMetricNames._ - - def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = { - val tpeGauge = newNumericGaugeFor(tpe) _ - - val allMetrics = Map( - fullName + queueSize -> tpeGauge(_.getQueue.size()), - fullName + poolSize -> tpeGauge(_.getPoolSize), - fullName + activeThreads -> tpeGauge(_.getActiveCount) - ) - - allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } - } -} - - -object BasicExecutorMetricNames { - val queueSize = "queueSize" - val poolSize = "threads/poolSize" - val activeThreads = "threads/activeThreads" -} - - - - diff --git a/src/main/scala/kamon/metric/GaugeGenerator.scala b/src/main/scala/kamon/metric/GaugeGenerator.scala deleted file mode 100644 index 30635432..00000000 --- a/src/main/scala/kamon/metric/GaugeGenerator.scala +++ /dev/null @@ -1,12 +0,0 @@ -package kamon.metric - -import com.codahale.metrics.Gauge - -trait GaugeGenerator { - - def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] { - def getValue: V = generator(target) - } -} - -object GaugeGenerator extends GaugeGenerator diff --git a/src/main/scala/kamon/metric/MetricFilter.scala b/src/main/scala/kamon/metric/MetricFilter.scala deleted file mode 100644 index fb117968..00000000 --- a/src/main/scala/kamon/metric/MetricFilter.scala +++ /dev/null @@ -1,6 +0,0 @@ -package kamon.metric - -object MetricFilter { - def actorSystem(system: String): Boolean = !system.startsWith("kamon") - def actor(path: String, system: String): Boolean = true -} diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala deleted file mode 100644 index 3992ab43..00000000 --- a/src/main/scala/kamon/metric/Metrics.scala +++ /dev/null @@ -1,144 +0,0 @@ -package kamon.metric - -import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit} -import akka.actor.ActorRef -import com.codahale.metrics -import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry} - - -object Metrics { - val registry: MetricRegistry = new MetricRegistry - - val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS) - //consoleReporter.build().start(45, TimeUnit.SECONDS) - - //val newrelicReporter = NewRelicReporter(registry) - //newrelicReporter.start(5, TimeUnit.SECONDS) - - def include(name: String, metric: Metric) = registry.register(name, metric) - - def exclude(name: String) = { - registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(name) - }) - } - - - - def deregister(fullName: String) = { - registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) - }) - } -} - -object Watched { - case object Actor - case object Dispatcher -} - -object MetricDirectory { - def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" - - def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox" - - def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/") - - def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon") - - - def shouldInstrumentActor(actorPath: String): Boolean = { - !(actorPath.isEmpty || actorPath.startsWith("system")) - } - - -} - - - - - - - - - - - - -case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram) - - - - -trait Histogram { - def update(value: Long): Unit - def snapshot: HistogramSnapshot -} - -trait HistogramSnapshot { - def median: Double - def max: Double - def min: Double -} - - -case class ActorSystemMetrics(actorSystemName: String) { - import scala.collection.JavaConverters._ - val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala - - private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) - - def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = { - val stats = createDispatcherCollector - dispatchers.put(dispatcherName, stats) - Some(stats) - } - -} - - -case class CodahaleHistogram() extends Histogram { - private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir()) - - def update(value: Long) = histogram.update(value) - def snapshot: HistogramSnapshot = { - val snapshot = histogram.getSnapshot - - CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin) - } -} - -case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot - - - - - - - -/** - * Dispatcher Metrics that we care about currently with a histogram-like nature: - * - Work Queue Size - * - Total/Active Thread Count - */ - - - -import annotation.tailrec -import java.util.concurrent.atomic.AtomicReference - -object Atomic { - def apply[T]( obj : T) = new Atomic(new AtomicReference(obj)) - implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref) -} - -class Atomic[T](val atomic : AtomicReference[T]) { - @tailrec - final def update(f: T => T) : T = { - val oldValue = atomic.get() - val newValue = f(oldValue) - if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f) - } - - def get() = atomic.get() -}
\ No newline at end of file diff --git a/src/main/scala/kamon/metric/MetricsUtils.scala b/src/main/scala/kamon/metric/MetricsUtils.scala deleted file mode 100644 index 5b4ceaf4..00000000 --- a/src/main/scala/kamon/metric/MetricsUtils.scala +++ /dev/null @@ -1,51 +0,0 @@ -package kamon.metric - -import com.codahale.metrics._ - -object MetricsUtils { - - def markMeter[T](meter:Meter)(f: => T): T = { - meter.mark() - f - } -// -// def incrementCounter(key: String) { -// counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count -// } -// -// def markMeter(key: String) { -// meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark() -// } -// -// def trace[T](key: String)(f: => T): T = { -// val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) ) -// timer.time(f) -// } - -// def markAndCountMeter[T](key: String)(f: => T): T = { -// markMeter(key) -// f -// } -// -// def traceAndCount[T](key: String)(f: => T): T = { -// incrementCounter(key) -// trace(key) { -// f -// } - //} - -// implicit def runnable(f: () => Unit): Runnable = -// new Runnable() { def run() = f() } -// -// -// import java.util.concurrent.Callable -// -// implicit def callable[T](f: () => T): Callable[T] = -// new Callable[T]() { def call() = f() } - -// private val actorCounter:Counter = new Counter -// private val actorTimer:Timer = new Timer -// -// metricsRegistry.register(s"counter-for-${actorName}", actorCounter) -// metricsRegistry.register(s"timer-for-${actorName}", actorTimer) -}
\ No newline at end of file diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/src/main/scala/kamon/metric/NewRelicReporter.scala deleted file mode 100644 index 70f3e54a..00000000 --- a/src/main/scala/kamon/metric/NewRelicReporter.scala +++ /dev/null @@ -1,51 +0,0 @@ -package kamon.metric - -import com.codahale.metrics -import metrics._ -import java.util.concurrent.TimeUnit -import java.util -import com.newrelic.api.agent.NewRelic -import scala.collection.JavaConverters._ - - -class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) { - - - - private[NewRelicReporter] def processMeter(name: String, meter: Meter) { - NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat) - } - - private[NewRelicReporter] def processCounter(name:String, counter:Counter) { - println(s"Logging to NewRelic: ${counter.getCount}") - - } - - -/* def processGauge(name: String, gauge: Gauge[_]) = { - println(s"the value is: "+gauge.getValue) - NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float]) - }*/ - - - def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { - //Process Meters - meters.asScala.map{case(name, meter) => processMeter(name, meter)} - - //Process Meters - counters.asScala.map{case(name, counter) => processCounter(name, counter)} - - // Gauges - gauges.asScala.foreach{ case (name, gauge) => { - val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue() - val fullMetricName = "Custom" + name - NewRelic.recordMetric(fullMetricName, measure) - }} - } - - -} - -object NewRelicReporter { - def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) -}
\ No newline at end of file diff --git a/src/main/scala/spraytest/ClientTest.scala b/src/main/scala/spraytest/ClientTest.scala deleted file mode 100644 index 07532d0a..00000000 --- a/src/main/scala/spraytest/ClientTest.scala +++ /dev/null @@ -1,55 +0,0 @@ -package spraytest - -import akka.actor.ActorSystem -import spray.client.pipelining._ -import spray.httpx.SprayJsonSupport -import spray.json._ -import scala.concurrent.Future -import spray.can.Http -import akka.io.IO - -/** - * BEGIN JSON Infrastructure - */ -case class Container(data: List[PointOfInterest]) -case class Geolocation(latitude: Float, longitude: Float) -case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation) - -object GeoJsonProtocol extends DefaultJsonProtocol { - implicit val geolocationFormat = jsonFormat2(Geolocation) - implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest) - implicit val containerFormat = jsonFormat1(Container) -} -/** END-OF JSON Infrastructure */ - - - - - - -class ClientTest extends App { - implicit val actorSystem = ActorSystem("spray-client-test") - import actorSystem.dispatcher - - - import GeoJsonProtocol._ - import SprayJsonSupport._ - - - val actor = IO(Http) - - val pipeline = sendReceive ~> unmarshal[Container] - - val response = pipeline { - Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco") - } onSuccess { - case a => { - println(a) - } - } -} - - - - - diff --git a/src/main/scala/spraytest/FutureTesting.scala b/src/main/scala/spraytest/FutureTesting.scala deleted file mode 100644 index b864d6d6..00000000 --- a/src/main/scala/spraytest/FutureTesting.scala +++ /dev/null @@ -1,81 +0,0 @@ -package spraytest -/* -import akka.actor.ActorSystem -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Try, Success} -import kamon.actor.TransactionContext - -object FutureTesting extends App { - - val actorSystem = ActorSystem("future-testing") - implicit val ec = actorSystem.dispatcher - implicit val tctx = TransactionContext(11, Nil) - - threadPrintln("In the initial Thread") - - - val f = TraceableFuture { - threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}") - } - - f.onComplete({ - case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}") - }) - - f.onComplete({ - case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}") - }) - - - - - - - - - def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]") - -} - - - - -trait TransactionContextWrapper { - def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = { - TransactionContext.current.set(tranContext.fork) - println(s"SetContext to: ${tranContext}") - val result = f - - TransactionContext.current.remove() - result - } - -} - -class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper { - def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = { - future.onComplete(wrap(func, transactionContext)) - } -} - -object TraceableFuture { - - implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future - - def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = { - val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil)) - - new TraceableFuture(Future { wrappedBody }) - } - - - - - def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = { - TransactionContext.current.set(transactionContext) - val result = body - TransactionContext.current.remove() - result - } -}*/ - diff --git a/src/main/scala/test/PingPong.scala b/src/main/scala/test/PingPong.scala deleted file mode 100644 index f9d6869c..00000000 --- a/src/main/scala/test/PingPong.scala +++ /dev/null @@ -1,34 +0,0 @@ -package test - -import akka.actor.{Props, Actor, ActorSystem} - -object PingPong extends App { - - val as = ActorSystem("ping-pong") - - val pinger = as.actorOf(Props[Pinger]) - val ponger = as.actorOf(Props[Ponger]) - - pinger.tell(Pong, ponger) - - - Thread.sleep(30000) - as.shutdown() - - -} - -case object Ping -case object Pong - -class Pinger extends Actor { - def receive = { - case Pong => sender ! Ping - } -} - -class Ponger extends Actor { - def receive = { - case Ping => sender ! Pong - } -} diff --git a/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala deleted file mode 100644 index 1a0e509f..00000000 --- a/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ /dev/null @@ -1,43 +0,0 @@ -package akka.instrumentation - -import org.scalatest.{Matchers, WordSpec} -import akka.actor.{Actor, Props, ActorSystem} -import kamon.metric.Metrics._ -import scala.collection.JavaConverters._ -import akka.testkit.TestActorRef - - -class ActorInstrumentationSpec extends WordSpec with Matchers { - implicit val system = ActorSystem() - import system._ - - val echoRef = actorOf(Props(new EchoActor), "Echo-Actor") - val meterForEchoActor = "meter-for-akka://default/user/Echo-Actor" - val totalMessages = 1000 - - "an instrumented Actor" should { - "send a message and record a metric on the Metrics Registry with the number of sent messages" in { - - val echoActor = TestActorRef[EchoActor] - - - - (1 to totalMessages).foreach {x:Int => - echoActor ! s"Message ${x}" - } - - //val messages = registry.getMeters.asScala.get(meterForEchoActor).get.getCount - - //messages should equal(totalMessages) - } - } - -} - -class EchoActor extends Actor { - def receive = { - case msg ⇒ sender ! msg - } -} - - diff --git a/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala deleted file mode 100644 index 1eab6355..00000000 --- a/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala +++ /dev/null @@ -1,22 +0,0 @@ -package kamon.instrumentation - -import org.scalatest.{Matchers, WordSpec} -import akka.actor.ActorSystem -import kamon.Kamon - -class ActorSystemInstrumentationSpec extends WordSpec with Matchers { - - // TODO: Selection filters to exclude unwanted actor systems. Read from configuration. - - "the actor system instrumentation" should { - "register all actor systems created" in { - val as1 = ActorSystem("as1") - val as2 = ActorSystem("as2") - - - Kamon.Metric.actorSystem("as1") should not be (None) - Kamon.Metric.actorSystem("as2") should not be (None) - Kamon.Metric.actorSystem("unknown") should be (None) - } - } -} diff --git a/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala deleted file mode 100644 index 89ef61f3..00000000 --- a/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -package kamon.instrumentation - -import org.scalatest.{Matchers, WordSpec} -import akka.actor.{Actor, Props, ActorSystem} -import kamon.metric.MetricDirectory -import kamon.Kamon - -class DispatcherInstrumentationSpec extends WordSpec with Matchers{ - - - "the dispatcher instrumentation" should { - "instrument a dispatcher that belongs to a non-filtered actor system" in new SingleDispatcherActorSystem { - val x = Kamon.Metric.actorSystem("single-dispatcher").get.dispatchers - (1 to 10).foreach(actor ! _) - - val active = x.get("akka.actor.default-dispatcher").get.activeThreadCount.snapshot - println("Active max: "+active.max) - println("Active min: "+active.min) - - } - } - - - trait SingleDispatcherActorSystem { - val actorSystem = ActorSystem("single-dispatcher") - val actor = actorSystem.actorOf(Props(new Actor { - def receive = { - case a => sender ! a; - } - })) - - } -} - diff --git a/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala deleted file mode 100644 index cc55ec92..00000000 --- a/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala +++ /dev/null @@ -1,53 +0,0 @@ -package kamon.instrumentation - -import org.scalatest.WordSpec -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import java.util.concurrent.ConcurrentLinkedQueue -import akka.dispatch.{UnboundedMessageQueueSemantics, QueueBasedMessageQueue, Envelope} -import java.util.Queue -import akka.actor.{ActorSystem, Actor} - -class MessageQueueInstrumentationSpec(val actorSystem: ActorSystem) extends WordSpec { - def this() = this(ActorSystem("MessageQueueInstrumentationSpec")) - - - /*"A MonitoredMessageQueue" should { - "update the related histogram when a message is enqueued" in { - new PopulatedMessageQueueFixture { - - assert(histogram.getSnapshot.getMax === 0) - - for(i <- 1 to 3) { enqueueDummyMessage } - - assert(histogram.getCount === 3) - assert(histogram.getSnapshot.getMax === 3) - assert(histogram.getSnapshot.getMin === 1) - } - } - - "update the related histogram when a message is dequeued" in { - new PopulatedMessageQueueFixture { - for(i <- 1 to 3) { enqueueDummyMessage } - assert(histogram.getSnapshot.getMax === 3) - - messageQueue.dequeue() - messageQueue.dequeue() - - assert(histogram.getCount === 5) - assert(histogram.getSnapshot.getMax === 3) - assert(histogram.getSnapshot.getMin === 1) - } - } - } - - trait PopulatedMessageQueueFixture { - - val histogram = new Histogram(new ExponentiallyDecayingReservoir()) -/* val delegate = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final def queue: Queue[Envelope] = this - }*/ - val messageQueue = new MonitoredMessageQueue(delegate, histogram) - - def enqueueDummyMessage = messageQueue.enqueue(Actor.noSender, Envelope("", Actor.noSender, actorSystem)) - }*/ -} diff --git a/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala deleted file mode 100644 index de65aaca..00000000 --- a/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala +++ /dev/null @@ -1,82 +0,0 @@ -package kamon.instrumentation - -import scala.concurrent.{Await, Promise, Future} -import org.scalatest.{Matchers, OptionValues, WordSpec} -import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration} -import kamon.{Kamon, TraceContext} -import java.util.UUID -import scala.util.Success -import scala.concurrent.duration._ -import java.util.concurrent.TimeUnit -import akka.actor.ActorSystem - - -class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { - - "a instrumented runnable" when { - "created in a thread that does have a TraceContext" must { - "preserve the TraceContext" which { - "should be available during the run method execution" in { new FutureWithContextFixture { - - whenReady(futureWithContext) { result => - result.value should equal(testContext) - } - }} - - "should be available during the execution of onComplete callbacks" in { new FutureWithContextFixture { - val onCompleteContext = Promise[TraceContext]() - - futureWithContext.onComplete({ - case _ => onCompleteContext.complete(Success(Kamon.context.get)) - }) - - whenReady(onCompleteContext.future) { result => - result should equal(testContext) - } - }} - } - } - - "created in a thread that doest have a TraceContext" must { - "not capture any TraceContext for the body execution" in { new FutureWithoutContextFixture{ - - whenReady(futureWithoutContext) { result => - result should equal(None) - } - }} - - "not make any TraceContext available during the onComplete callback" in { new FutureWithoutContextFixture { - val onCompleteContext = Promise[Option[TraceContext]]() - - futureWithoutContext.onComplete({ - case _ => onCompleteContext.complete(Success(Kamon.context)) - }) - - whenReady(onCompleteContext.future) { result => - result should equal(None) - } - }} - } - } - - - /** - * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have. - */ - implicit val testActorSystem = ActorSystem("test-actorsystem") - implicit val execContext = testActorSystem.dispatcher - - class FutureWithContextFixture { - val testContext = TraceContext() - Kamon.set(testContext) - - val futureWithContext = Future { Kamon.context} - } - - trait FutureWithoutContextFixture { - Kamon.clear // Make sure no TraceContext is available - val futureWithoutContext = Future { Kamon.context } - } -} - - |