From 2caece9ef7574406c548b4a1f333de4c9579b3a2 Mon Sep 17 00:00:00 2001 From: Diego Parra Date: Fri, 3 May 2013 17:06:10 -0300 Subject: Initial Commit Kamon --- src/main/resources/META-INF/aop.xml | 19 ++ src/main/resources/application.conf | 47 ++++ src/main/resources/newrelic.yml | 242 +++++++++++++++++++++ src/main/scala/akka/actor/ActorAspect.scala | 25 +++ .../InstrumentedExecutorServiceConfigurator.scala | 63 ++++++ src/main/scala/kamon/executor/eventbus.scala | 82 +++++++ src/main/scala/kamon/metric/AkkaMetrics.scala | 1 + src/main/scala/kamon/metric/Metrics.scala | 49 +++++ src/main/scala/kamon/metric/NewRelicReporter.scala | 41 ++++ 9 files changed, 569 insertions(+) create mode 100644 src/main/resources/META-INF/aop.xml create mode 100644 src/main/resources/application.conf create mode 100644 src/main/resources/newrelic.yml create mode 100644 src/main/scala/akka/actor/ActorAspect.scala create mode 100644 src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala create mode 100644 src/main/scala/kamon/executor/eventbus.scala create mode 100644 src/main/scala/kamon/metric/AkkaMetrics.scala create mode 100644 src/main/scala/kamon/metric/Metrics.scala create mode 100644 src/main/scala/kamon/metric/NewRelicReporter.scala (limited to 'src/main') diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..03ccb0e8 --- /dev/null +++ b/src/main/resources/META-INF/aop.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + + + + diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 00000000..370acae9 --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,47 @@ +akka { + actor { + default-dispatcher { + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + + # The parallelism factor is used to determine thread pool size using the + # following formula: ceil(available processors * factor). Resulting size + # is then bounded by the parallelism-min and parallelism-max values. + parallelism-factor = 3.0 + + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 8 + } + + throughput = 100 + } + } +} + +# Dispatcher is the name of the event-based dispatcher +#type = Dispatcher + +# What kind of ExecutionService to use +#executor = "kamon.executor.InstrumentedExecutorServiceConfigurator" + +# Min number of threads to cap factor-based parallelism number to +#parallelism-min = 2 + +# Parallelism (threads) ... ceil(available processors * factor) +#parallelism-factor = 2.0 + +# Max number of threads to cap factor-based parallelism number to +#parallelism-max = 10 + +# Throughput defines the maximum number of messages to be +# processed per actor before the thread jumps to the next actor. +# Set to 1 for as fair as possible. +#throughput = 100 + + + + + + + diff --git a/src/main/resources/newrelic.yml b/src/main/resources/newrelic.yml new file mode 100644 index 00000000..e347635e --- /dev/null +++ b/src/main/resources/newrelic.yml @@ -0,0 +1,242 @@ +# +# This file configures the New Relic Agent. New Relic monitors +# Java applications with deep visibility and low overhead. For more +# information, visit www.newrelic.com. +# +# This configuration file is custom generated for Ivan Topolnak - ivantopo@gmail.com +# +# This section is for settings common to all environments. +# Do not add anything above this next line. +common: &default_settings + # + # ============================== LICENSE KEY =============================== + + # You must specify the license key associated with your New Relic + # account. This key binds your Agent's data to your account in the + # New Relic service. + license_key: '2e24765acb032cb9e7207013b5ba3e2ab7d2d75c' + + # Agent Enabled + # Use this setting to force the agent to run or not run. + # Default is true. + # agent_enabled: true + + # Set to true to enable support for auto app naming. + # The name of each web app is detected automatically + # and the agent reports data separately for each one. + # This provides a finer-grained performance breakdown for + # web apps in New Relic. + # Default is false. + enable_auto_app_naming: false + + # Set to true to enable component-based transaction naming. + # Set to false to use the URI of a web request as the name of the transaction. + # Default is true. + enable_auto_transaction_naming: true + + # Set the name of your application as you'd like it show up in New Relic. + # if enable_auto_app_naming is false, the agent reports all data to this application. + # Otherwise, the agent reports only background tasks (transactions for non-web applications) to this application. + # To report data to more than one application, separate the application names with ";". + # For example, to report data to"My Application" and "My Application 2" use this: + # app_name: My Application;My Application 2 + # This setting is required. + app_name: My Application + + # The agent uses its own log file to keep its logging + # separate from that of your application. Specify the log level here. + # This setting is dynamic, so changes do not require restarting your application. + # The levels in increasing order of verboseness are: off, severe, warning, info, fine, finer, finest + # Default is info. + log_level: info + enable_custom_tracing: true + + # Log all data to and from New Relic in plain text. + # This setting is dynamic, so changes do not require restarting your application. + # Default is false. + #audit_mode: true + + # The number of log files to use. + # Default is 1. + #log_file_count: 1 + + # The maximum number of bytes to write to any one log file. + # Default is 0 (no limit). + #log_limit_in_kbytes: 0 + + # The name of the log file. + # Default is newrelic_agent.log. + #log_file_name: newrelic_agent.log + + # The log file directory. + # Default is the logs directory in the newrelic.jar parent directory. + #log_file_path: + + # The agent communicates with New Relic via https by + # default. If you want to communicate with newrelic via http, + # then turn off SSL by setting this value to false. + # This work is done asynchronously to the threads that process your + # application code, so response times will not be directly affected + # by this change. + # Default is true. + ssl: true + + # Proxy settings for connecting to the New Relic server. + # + # If a proxy is used, the host setting is required. Other settings + # are optional. Default port is 8080. The username and password + # settings will be used to authenticate to Basic Auth challenges + # from a proxy server. + # + # proxy_host: hostname + # proxy_port: 8080 + # proxy_user: username + # proxy_password: password + + # Tells transaction tracer and error collector (when enabled) + # whether or not to capture HTTP params. When true, frameworks can + # exclude HTTP parameters from being captured. + # Default is false. + capture_params: false + + # Tells transaction tracer and error collector to not to collect + # specific http request parameters. + # ignored_params: credit_card, ssn, password + + # Transaction tracer captures deep information about slow + # transactions and sends this to the New Relic service once a + # minute. Included in the transaction is the exact call sequence of + # the transactions including any SQL statements issued. + transaction_tracer: + + # Transaction tracer is enabled by default. Set this to false to + # turn it off. This feature is only available at the higher product levels. + # Default is true. + enabled: true + + # Threshold in seconds for when to collect a transaction + # trace. When the response time of a controller action exceeds + # this threshold, a transaction trace will be recorded and sent to + # New Relic. Valid values are any float value, or (default) "apdex_f", + # which will use the threshold for the "Frustrated" Apdex level + # (greater than four times the apdex_t value). + # Default is apdex_f. + transaction_threshold: apdex_f + + # When transaction tracer is on, SQL statements can optionally be + # recorded. The recorder has three modes, "off" which sends no + # SQL, "raw" which sends the SQL statement in its original form, + # and "obfuscated", which strips out numeric and string literals. + # Default is obfuscated. + record_sql: obfuscated + + # Obfuscate only occurrences of specific SQL fields names. + # This setting only applies if "record_sql" is set to "raw". + #obfuscated_sql_fields: credit_card, ssn, password + + # Set this to true to log SQL statements instead of recording them. + # SQL is logged using the record_sql mode. + # Default is false. + log_sql: false + + # Threshold in seconds for when to collect stack trace for a SQL + # call. In other words, when SQL statements exceed this threshold, + # then capture and send to New Relic the current stack trace. This is + # helpful for pinpointing where long SQL calls originate from. + # Default is 0.5 seconds. + stack_trace_threshold: 0.5 + + # Determines whether the agent will capture query plans for slow + # SQL queries. Only supported for MySQL and PostgreSQL. + # Default is true. + explain_enabled: true + + # Threshold for query execution time below which query plans will not + # not be captured. Relevant only when `explain_enabled` is true. + # Default is 0.5 seconds. + explain_threshold: 0.5 + + # Use this setting to control the variety of transaction traces. + # The higher the setting, the greater the variety. + # Set this to 0 to always report the slowest transaction trace. + # Default is 20. + top_n: 20 + + + # Error collector captures information about uncaught exceptions and + # sends them to New Relic for viewing + error_collector: + + # Error collector is enabled by default. Set this to false to turn + # it off. This feature is only available at the higher product levels. + # Default is true. + enabled: true + + # To stop specific exceptions from reporting to New Relic, set this property + # to a comma separated list of full class names. + # + # ignore_errors: + + # To stop specific http status codes from being reporting to New Relic as errors, + # set this property to a comma separated list of status codes to ignore. + # When this property is commented out it defaults to ignoring 404s. + # + # ignore_status_codes: 404 + + # Cross Application Tracing adds request and response headers to + # external calls using the Apache HttpClient libraries to provided better + # performance data when calling applications monitored by other New Relic Agents. + # + cross_application_tracer: + # Set to true to enable cross application tracing. + # Default is true. + enabled: true + + # Thread profiler measures wall clock time, CPU time, and method call counts + # in your application's threads as they run. + thread_profiler: + + # Set to false to disable the thread profiler. + # Default is true. + enabled: true + + #============================== Browser Monitoring =============================== + # New Relic Real User Monitoring gives you insight into the performance real users are + # experiencing with your website. This is accomplished by measuring the time it takes for + # your users' browsers to download and render your web pages by injecting a small amount + # of JavaScript code into the header and footer of each page. + browser_monitoring: + # By default the agent automatically inserts API calls in compiled JSPs to + # inject the monitoring JavaScript into web pages. + # Set this attribute to false to turn off this behavior. + auto_instrument: true + # Set this attribute to false to prevent injection of the monitoring JavaScript. + # Default is true. + enabled: true + +# Application Environments +# ------------------------------------------ +# Environment specific settings are in this section. +# You can use the environment to override the default settings. +# For example, to change the app_name setting. +# Use -Dnewrelic.environment= on the Java command line +# to set the environment. +# The default environment is production. + +# NOTE if your application has other named environments, you should +# provide configuration settings for these environments here. + +development: + <<: *default_settings + app_name: KAMON[Development] + +test: + <<: *default_settings + app_name: My Application (Test) + +production: + <<: *default_settings + +staging: + <<: *default_settings + app_name: My Application (Staging) \ No newline at end of file diff --git a/src/main/scala/akka/actor/ActorAspect.scala b/src/main/scala/akka/actor/ActorAspect.scala new file mode 100644 index 00000000..b028d8c6 --- /dev/null +++ b/src/main/scala/akka/actor/ActorAspect.scala @@ -0,0 +1,25 @@ +package akka.actor + +import org.aspectj.lang.annotation.{Around, Pointcut, Before, Aspect} +import org.aspectj.lang.ProceedingJoinPoint +import kamon.metric.Metrics + +@Aspect +class ActorAspect extends Metrics { + + + @Pointcut("execution(* ActorCell+.receiveMessage(..))") + private def actorReceive:Unit = {} + + @Around("actorReceive() && target(actor)") + def around(pjp: ProceedingJoinPoint, actor: ActorCell): AnyRef = { + + + val actorName:String = actor.self.path.toString + + markAndCountMeter(actorName){ + pjp.proceed + } + + } +} \ No newline at end of file diff --git a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala new file mode 100644 index 00000000..62f90da8 --- /dev/null +++ b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala @@ -0,0 +1,63 @@ +package kamon.executor + +import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites} +import com.typesafe.config.Config +import scala.concurrent.forkjoin.ForkJoinPool +import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService} +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool +import java.util + +class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends ForkJoinExecutorConfigurator(config, prerequisites) { + + println("Created the instrumented executor") + + + class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) + extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) { + + + override def createExecutorService: ExecutorService = { + super.createExecutorService match { + case fjp: AkkaForkJoinPool => new WrappedPool(fjp) + case other => other + } + } + } + +} + +case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long) + +class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService { + + + def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount) + + def shutdown = fjp.shutdown() + + def shutdownNow(): util.List[Runnable] = fjp.shutdownNow() + + def isShutdown: Boolean = fjp.isShutdown + + def isTerminated: Boolean = fjp.isTerminated + + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit) + + def submit[T](task: Callable[T]): Future[T] = fjp.submit(task) + + def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result) + + def submit(task: Runnable): Future[_] = fjp.submit(task) + + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks) + + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit) + + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks) + + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks) + + def execute(command: Runnable) = fjp.execute(command) +} + diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala new file mode 100644 index 00000000..d64ff444 --- /dev/null +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -0,0 +1,82 @@ +package kamon.executor + +import akka.event.ActorEventBus +import akka.event.LookupClassification +import akka.actor.{ActorRef, ActorSystem, Props, Actor} +import java.util.concurrent.TimeUnit +import kamon.metric.NewRelicReporter + +import com.yammer.metrics.core.{MetricName, MetricsRegistry} +import com.yammer.metrics.reporting.ConsoleReporter + + +trait Message + +case class PostMessage(text:String) extends Message + +case class MessageEvent(val channel:String, val message:Message) + +class AppActorEventBus extends ActorEventBus with LookupClassification{ + type Event = MessageEvent + type Classifier=String + protected def mapSize(): Int={ + 10 + } + + protected def classify(event: Event): Classifier={ + event.channel + } + + protected def publish(event: Event, subscriber: Subscriber): Unit={ + subscriber ! event + } +} + +object TryAkka extends App{ + val system = ActorSystem("MySystem") + val appActorEventBus=new AppActorEventBus + val NEW_POST_CHANNEL="/posts/new" + val subscriber = system.actorOf(Props(new Actor { + def receive = { + case d: MessageEvent => println(d) + } + })) + + + + + case class Ping() + case class Pong() + + class PingActor(val target: ActorRef) extends Actor { + def receive = { + case Pong() => target ! Ping() + } + } + + class PongActor extends Actor { + var i = 0 + def receive = { + case Ping() => { + i=i+1 + sender ! Pong() + } + } + } + + + /* + val newRelicReporter = new NewRelicReporter(registry) + newRelicReporter.start(1, TimeUnit.SECONDS) + +*/ + + for(i <- 1 to 8) { + val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-actor-${i}"))), s"pong-actor-${i}") + ping ! Pong() + } + + +/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) + appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ +} \ No newline at end of file diff --git a/src/main/scala/kamon/metric/AkkaMetrics.scala b/src/main/scala/kamon/metric/AkkaMetrics.scala new file mode 100644 index 00000000..de647c07 --- /dev/null +++ b/src/main/scala/kamon/metric/AkkaMetrics.scala @@ -0,0 +1 @@ +package kamon.metric diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala new file mode 100644 index 00000000..ecfa0ec6 --- /dev/null +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -0,0 +1,49 @@ +package kamon.metric + +import com.yammer.metrics.core.{MetricName, MetricsRegistry} +import scala.collection.mutable.{HashMap,SynchronizedMap} +import com.yammer.metrics.scala.{Meter, Counter, MetricsGroup, Timer} +import com.yammer.metrics.reporting.{ConsoleReporter, JmxReporter} +import scala.collection.mutable +import java.util.concurrent.TimeUnit + +class Metrics { + private lazy val metricsRegistry: MetricsRegistry = new MetricsRegistry() + private lazy val metricsGroup = new MetricsGroup(this.getClass, metricsRegistry) + + private lazy val meters = new mutable.HashMap[String, Meter] + private lazy val timers = new HashMap[String, Timer] with SynchronizedMap[String, Timer] + private lazy val counters = new HashMap[String, Counter] with SynchronizedMap[String, Counter] + + + + val consoleReporter = ConsoleReporter.enable(metricsRegistry, 1, TimeUnit.SECONDS) + val newrelicReport = new NewRelicReporter(metricsRegistry, "newrelic-reporter"); + newrelicReport.run() + newrelicReport.start(1, TimeUnit.SECONDS) + + def incrementCounter(key: String) { + counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count + } + + def markMeter(key: String) { + meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark() + } + + def trace[T](key: String)(f: => T): T = { + val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) ) + timer.time(f) + } + + def markAndCountMeter[T](key: String)(f: => T): T = { + markMeter(key) + f + } + + def traceAndCount[T](key: String)(f: => T): T = { + incrementCounter(key) + trace(key) { + f + } + } +} \ No newline at end of file diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/src/main/scala/kamon/metric/NewRelicReporter.scala new file mode 100644 index 00000000..9aa374aa --- /dev/null +++ b/src/main/scala/kamon/metric/NewRelicReporter.scala @@ -0,0 +1,41 @@ +package kamon.metric + +import com.newrelic.api.agent.NewRelic +import com.yammer.metrics.reporting.AbstractPollingReporter +import com.yammer.metrics.core._ + + +class NewRelicReporter(registry: MetricsRegistry, name: String) extends AbstractPollingReporter(registry, name) with MetricProcessor[String] { + + + + def processMeter(name: MetricName, meter: Metered, context: String) { + println(s"Logging to NewRelic: ${meter.count()}") + NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.count()) + } + + + def processCounter(name: MetricName, counter: Counter, context: String) {} + + def processHistogram(name: MetricName, histogram: Histogram, context: String) {} + + def processTimer(name: MetricName, timer: Timer, context: String) {} + + def processGauge(name: MetricName, gauge: Gauge[_], context: String) {} + + private final val predicate: MetricPredicate = null + + + def run() { + import scala.collection.JavaConversions._ + for (entry <- getMetricsRegistry.groupedMetrics(predicate).entrySet) { + import scala.collection.JavaConversions._ + for (subEntry <- entry.getValue.entrySet) { + subEntry.getValue.processWith(this, subEntry.getKey, "") + } + + } + + } + +} -- cgit v1.2.3