diff options
22 files changed, 527 insertions, 299 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 207bf1b9..3532b19b 100644 --- a/kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -10,9 +10,6 @@ <aspect name="kamon.trace.instrumentation.EnvelopeTracingContext"/> <aspect name="kamon.trace.instrumentation.ActorCellInvokeInstrumentation"/> <aspect name="kamon.trace.instrumentation.RunnableTracing" /> - <aspect name="kamon.instrumentation.SprayRequestContextTracing"/> - <aspect name="kamon.instrumentation.SprayOpenRequestContextTracing"/> - <aspect name = "kamon.instrumentation.SprayServerInstrumentation"/> <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/> <aspect name="kamon.trace.instrumentation.ActorLoggingInstrumentation"/> diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala index 3d503d54..aee7df9f 100644 --- a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala +++ b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala @@ -1,10 +1,10 @@ package kamon -import akka.actor.{ActorSystem, ExtensionId} +import akka.actor.{Extension, ActorSystem, ExtensionId} import java.util.concurrent.ConcurrentHashMap object AkkaExtensionSwap { - def swap(system: ActorSystem, key: ExtensionId[_], value: Kamon.Extension): Unit = { + def swap(system: ActorSystem, key: ExtensionId[_], value: Extension): Unit = { val extensionsField = system.getClass.getDeclaredField("extensions") extensionsField.setAccessible(true) diff --git a/kamon-core/src/test/resources/newrelic.yml b/kamon-core/src/test/resources/newrelic.yml deleted file mode 100644 index 77923e9c..00000000 --- a/kamon-core/src/test/resources/newrelic.yml +++ /dev/null @@ -1,242 +0,0 @@ -# -# This file configures the New Relic Agent. New Relic monitors -# Java applications with deep visibility and low overhead. For more -# information, visit www.newrelic.com. -# -# This configuration file is custom generated for Ivan Topolnak - ivantopo@gmail.com -# -# This section is for settings common to all environments. -# Do not add anything above this next line. -common: &default_settings - # - # ============================== LICENSE KEY =============================== - - # You must specify the license key associated with your New Relic - # account. This key binds your Agent's data to your account in the - # New Relic service. - license_key: '2e24765acb032cb9e7207013b5ba3e2ab7d2d75c' - - # Agent Enabled - # Use this setting to force the agent to run or not run. - # Default is true. - # agent_enabled: true - - # Set to true to enable support for auto app naming. - # The name of each web app is detected automatically - # and the agent reports data separately for each one. - # This provides a finer-grained performance breakdown for - # web apps in New Relic. - # Default is false. - enable_auto_app_naming: false - - # Set to true to enable component-based transaction naming. - # Set to false to use the URI of a web request as the name of the transaction. - # Default is true. - enable_auto_transaction_naming: true - - # Set the name of your application as you'd like it show up in New Relic. - # if enable_auto_app_naming is false, the agent reports all data to this application. - # Otherwise, the agent reports only background tasks (transactions for non-web applications) to this application. - # To report data to more than one application, separate the application names with ";". - # For example, to report data to"My Application" and "My Application 2" use this: - # app_name: My Application;My Application 2 - # This setting is required. - app_name: My Application - - # The agent uses its own log file to keep its logging - # separate from that of your application. Specify the log level here. - # This setting is dynamic, so changes do not require restarting your application. - # The levels in increasing order of verboseness are: off, severe, warning, info, fine, finer, finest - # Default is info. - log_level: finest - enable_custom_tracing: true - - # Log all data to and from New Relic in plain text. - # This setting is dynamic, so changes do not require restarting your application. - # Default is false. - audit_mode: true - - # The number of log files to use. - # Default is 1. - #log_file_count: 1 - - # The maximum number of bytes to write to any one log file. - # Default is 0 (no limit). - #log_limit_in_kbytes: 0 - - # The name of the log file. - # Default is newrelic_agent.log. - #log_file_name: newrelic_agent.log - - # The log file directory. - # Default is the logs directory in the newrelic.jar parent directory. - log_file_path: /home/ivantopo/Desktop/tmp - - # The agent communicates with New Relic via https by - # default. If you want to communicate with newrelic via http, - # then turn off SSL by setting this value to false. - # This work is done asynchronously to the threads that process your - # application code, so response times will not be directly affected - # by this change. - # Default is true. - ssl: true - - # Proxy settings for connecting to the New Relic server. - # - # If a proxy is used, the host setting is required. Other settings - # are optional. Default port is 8080. The username and password - # settings will be used to authenticate to Basic Auth challenges - # from a proxy server. - # - # proxy_host: hostname - # proxy_port: 8080 - # proxy_user: username - # proxy_password: password - - # Tells transaction tracer and error collector (when enabled) - # whether or not to capture HTTP params. When true, frameworks can - # exclude HTTP parameters from being captured. - # Default is false. - capture_params: false - - # Tells transaction tracer and error collector to not to collect - # specific http request parameters. - # ignored_params: credit_card, ssn, password - - # Transaction tracer captures deep information about slow - # transactions and sends this to the New Relic service once a - # minute. Included in the transaction is the exact call sequence of - # the transactions including any SQL statements issued. - transaction_tracer: - - # Transaction tracer is enabled by default. Set this to false to - # turn it off. This feature is only available at the higher product levels. - # Default is true. - enabled: true - - # Threshold in seconds for when to collect a transaction - # trace. When the response time of a controller action exceeds - # this threshold, a transaction trace will be recorded and sent to - # New Relic. Valid values are any float value, or (default) "apdex_f", - # which will use the threshold for the "Frustrated" Apdex level - # (greater than four times the apdex_t value). - # Default is apdex_f. - transaction_threshold: apdex_f - - # When transaction tracer is on, SQL statements can optionally be - # recorded. The recorder has three modes, "off" which sends no - # SQL, "raw" which sends the SQL statement in its original form, - # and "obfuscated", which strips out numeric and string literals. - # Default is obfuscated. - record_sql: obfuscated - - # Obfuscate only occurrences of specific SQL fields names. - # This setting only applies if "record_sql" is set to "raw". - #obfuscated_sql_fields: credit_card, ssn, password - - # Set this to true to log SQL statements instead of recording them. - # SQL is logged using the record_sql mode. - # Default is false. - log_sql: false - - # Threshold in seconds for when to collect stack trace for a SQL - # call. In other words, when SQL statements exceed this threshold, - # then capture and send to New Relic the current stack trace. This is - # helpful for pinpointing where long SQL calls originate from. - # Default is 0.5 seconds. - stack_trace_threshold: 0.5 - - # Determines whether the agent will capture query plans for slow - # SQL queries. Only supported for MySQL and PostgreSQL. - # Default is true. - explain_enabled: true - - # Threshold for query execution time below which query plans will not - # not be captured. Relevant only when `explain_enabled` is true. - # Default is 0.5 seconds. - explain_threshold: 0.5 - - # Use this setting to control the variety of transaction traces. - # The higher the setting, the greater the variety. - # Set this to 0 to always report the slowest transaction trace. - # Default is 20. - top_n: 20 - - - # Error collector captures information about uncaught exceptions and - # sends them to New Relic for viewing - error_collector: - - # Error collector is enabled by default. Set this to false to turn - # it off. This feature is only available at the higher product levels. - # Default is true. - enabled: true - - # To stop specific exceptions from reporting to New Relic, set this property - # to a comma separated list of full class names. - # - # ignore_errors: - - # To stop specific http status codes from being reporting to New Relic as errors, - # set this property to a comma separated list of status codes to ignore. - # When this property is commented out it defaults to ignoring 404s. - # - # ignore_status_codes: 404 - - # Cross Application Tracing adds request and response headers to - # external calls using the Apache HttpClient libraries to provided better - # performance data when calling applications monitored by other New Relic Agents. - # - cross_application_tracer: - # Set to true to enable cross application tracing. - # Default is true. - enabled: true - - # Thread profiler measures wall clock time, CPU time, and method call counts - # in your application's threads as they run. - thread_profiler: - - # Set to false to disable the thread profiler. - # Default is true. - enabled: true - - #============================== Browser Monitoring =============================== - # New Relic Real User Monitoring gives you insight into the performance real users are - # experiencing with your website. This is accomplished by measuring the time it takes for - # your users' browsers to download and render your web pages by injecting a small amount - # of JavaScript code into the header and footer of each page. - browser_monitoring: - # By default the agent automatically inserts API calls in compiled JSPs to - # inject the monitoring JavaScript into web pages. - # Set this attribute to false to turn off this behavior. - auto_instrument: true - # Set this attribute to false to prevent injection of the monitoring JavaScript. - # Default is true. - enabled: true - -# Application Environments -# ------------------------------------------ -# Environment specific settings are in this section. -# You can use the environment to override the default settings. -# For example, to change the app_name setting. -# Use -Dnewrelic.environment=<environment> on the Java command line -# to set the environment. -# The default environment is production. - -# NOTE if your application has other named environments, you should -# provide configuration settings for these environments here. - -development: - <<: *default_settings - app_name: KAMON[Development] - -test: - <<: *default_settings - app_name: My Application (Test) - -production: - <<: *default_settings - -staging: - <<: *default_settings - app_name: My Application (Staging)
\ No newline at end of file diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf index a2583195..dacabe41 100644 --- a/kamon-newrelic/src/main/resources/reference.conf +++ b/kamon-newrelic/src/main/resources/reference.conf @@ -1,13 +1,13 @@ akka { - actor { - debug { - unhandled = on - } - } - loggers = ["kamon.newrelic.NewRelicErrorLogger", "akka.event.slf4j.Slf4jLogger"] + extensions = ["kamon.newrelic.NewRelic"] } - +kamon { + newrelic { + app-name = "Kamon[Development]" + license-key = 2e24765acb032cb9e7207013b5ba3e2ab7d2d75c + } +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala new file mode 100644 index 00000000..c4d7c089 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -0,0 +1,103 @@ +package kamon.newrelic + +import akka.actor.Actor +import spray.json._ +import scala.concurrent.Future +import spray.httpx.{SprayJsonSupport, RequestBuilding, ResponseTransformation} +import spray.httpx.encoding.Deflate +import spray.http._ +import spray.json.lenses.JsonLenses._ +import akka.pattern.pipe +import java.lang.management.ManagementFactory +import spray.client.pipelining._ +import scala.util.control.NonFatal +import kamon.newrelic.NewRelicMetric.{Data, ID, MetricBatch} + +class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport { + import context.dispatcher + import Agent._ + + val agentInfo = { + val config = context.system.settings.config.getConfig("kamon.newrelic") + val appName = config.getString("app-name") + val licenseKey = config.getString("license-key") + + // Name has the format of pid@host + val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + + AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt) + } + + + + def receive = { + case Initialize(runId, collector) => context become reporting(runId, collector) + } + + + def reporting(runId: Long, collector: String): Receive = { + case batch: MetricBatch => sendMetricData(runId, collector, batch.metrics) + } + + override def preStart(): Unit = { + super.preStart() + initialize + } + + def initialize: Unit = { + pipe ({ + for( + collector <- selectCollector; + runId <- connect(collector, agentInfo) + ) yield Initialize(runId, collector) + } recover { + case NonFatal(ex) => InitializationFailed(ex) + }) to self + } + + import AgentJsonProtocol._ + val compressedPipeline: HttpRequest => Future[HttpResponse] = encode(Deflate) ~> sendReceive + val compressedToJsonPipeline: HttpRequest => Future[JsValue] = compressedPipeline ~> toJson + + def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson + + def selectCollector: Future[String] = { + compressedToJsonPipeline { + Post(s"http://collector.newrelic.com/agent_listener/invoke_raw_method?method=get_redirect_host&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", JsArray()) + } map { json => + json.extract[String]('return_value) + } + } + + def connect(collectorHost: String, connect: AgentInfo): Future[Long] = { + compressedToJsonPipeline { + Post(s"http://$collectorHost/agent_listener/invoke_raw_method?method=connect&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", connect) + } map { json => + json.extract[Long]('return_value / 'agent_run_id) + } + } + + + def sendMetricData(runId: Long, collector: String, metrics: List[(ID, Data)]) = { + val end = System.currentTimeMillis() / 1000L + val start = end - 60 + compressedPipeline { + Post(s"http://$collector/agent_listener/invoke_raw_method?method=metric_data&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12&run_id=$runId", MetricData(runId, start, end, metrics)) + } + } + + + +} + +object Agent { + + + + case class Initialize(runId: Long, collector: String) + case class InitializationFailed(reason: Throwable) + case class CollectorSelection(return_value: String) + case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int) + + case class MetricData(runId: Long, start: Long, end: Long, metrics: List[(ID, Data)]) +}
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala new file mode 100644 index 00000000..30e17e77 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala @@ -0,0 +1,56 @@ +package kamon.newrelic + +import spray.json._ +import kamon.newrelic.Agent._ + +object AgentJsonProtocol extends DefaultJsonProtocol { + + implicit object ConnectJsonWriter extends RootJsonWriter[AgentInfo] { + def write(obj: AgentInfo): JsValue = + JsArray( + JsObject( + "agent_version" -> JsString("3.1.0"), + "app_name" -> JsArray(JsString(obj.appName)), + "host" -> JsString(obj.host), + "identifier" -> JsString(s"java:${obj.appName}"), + "language" -> JsString("java"), + "pid" -> JsNumber(obj.pid) + ) + ) + } + + import NewRelicMetric._ + + implicit def listWriter[T : JsonWriter] = new JsonWriter[List[T]] { + def write(list: List[T]) = JsArray(list.map(_.toJson)) + } + + implicit object MetricDetailWriter extends JsonWriter[(ID, Data)] { + def write(obj: (ID, Data)): JsValue = { + val (id, data) = obj + JsArray( + JsObject( + "name" -> JsString(id.name) // TODO Include scope + ), + JsArray( + JsNumber(data.total), + JsNumber(data.totalExclusive), + JsNumber(data.min), + JsNumber(data.max), + JsNumber(data.sumOfSquares), + JsNumber(data.callCount) + ) + ) + } + } + + implicit object MetricDataWriter extends RootJsonWriter[MetricData] { + def write(obj: MetricData): JsValue = + JsArray( + JsNumber(obj.runId), + JsNumber(obj.start), + JsNumber(obj.end), + obj.metrics.toJson + ) + } +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala new file mode 100644 index 00000000..c9178292 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala @@ -0,0 +1,78 @@ +package kamon.newrelic + +import akka.actor.Actor +import kamon.trace.UowTrace +import com.newrelic.api.agent.{NewRelic => NRAgent} +import kamon.trace.UowTracing.WebExternal + +class Apdex extends Actor { + val t = 500 + + var satisfied: Int = 0 + var tolerating: Int = 0 + var frustrated: Int = 0 + + def receive = { + case trace: UowTrace => recordTransaction(trace) + + } + + def clearStats: Unit = { + satisfied = 0 + tolerating = 0 + frustrated = 0 + } + + def total: Int = satisfied + tolerating + frustrated + + def updateStats(sampleTime: Double): Unit = { + if(sampleTime < t) + satisfied += 1 + else + if(sampleTime >= t && sampleTime <= 4*t) + tolerating += 1 + else + frustrated += 1 + } + + def recordTransaction(uowTrace: UowTrace): Unit = { + val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9) + + updateStats(time) + + NRAgent.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat ) + NRAgent.recordMetric("WebTransaction", time.toFloat) + NRAgent.recordMetric("HttpDispatcher", time.toFloat) + + uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace => + val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat + + println("Web External: " + webExternalTrace) + NRAgent.recordMetric(s"External/${webExternalTrace.host}/http", external) + NRAgent.recordMetric(s"External/${webExternalTrace.host}/all", external) + NRAgent.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) + } + + + val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp) + + + def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match { + case Nil => accum + case head :: tail => + if(head.start > lastEnd) + measureExternal(accum + (head.finish-head.start), head.finish, tail) + else + measureExternal(accum + (head.finish-lastEnd), head.finish, tail) + } + + val external = measureExternal(0, 0, allExternals) / 1E9 + + + NRAgent.recordMetric(s"External/all", external.toFloat) + NRAgent.recordMetric(s"External/allWeb", external.toFloat) + + } +} + +case object Flush diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala new file mode 100644 index 00000000..2ee7ada0 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -0,0 +1,102 @@ +package kamon.newrelic + +import akka.actor._ +import scala.collection.mutable +import kamon.Kamon +import kamon.trace.{UowTrace, Trace} +import kamon.newrelic.NewRelicMetric.{MetricBatch, FlushMetrics} +import scala.concurrent.duration._ + +class NewRelic extends ExtensionId[NewRelicExtension] { + def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system) +} + +class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val manager: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic") +} + +class NewRelicManager extends Actor with ActorLogging { + log.info("Registering the Kamon(NewRelic) extension") + + Kamon(Trace)(context.system) ! Trace.Register + + + + val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics], "web-transaction-metrics") + val agent = context.actorOf(Props[Agent], "agent") + + import context.dispatcher + context.system.scheduler.schedule(1 minute, 1 minute) { + webTransactionMetrics.tell(FlushMetrics, agent) + } + + def receive = { + case trace: UowTrace => webTransactionMetrics ! trace + } +} + +object NewRelicMetric { + case class ID(name: String, scope: Option[String]) + case class Data(var total: Double, var totalExclusive: Double, var min: Double, var max: Double, var sumOfSquares: Double, var callCount: Long) { + def record(value: Double): Unit = { + if(value > max) max = value + if(value < min) min = value + + total += value + totalExclusive += value + sumOfSquares += value * value + callCount += 1 + } + } + + object Data { + def apply(): Data = Data(0, 0, 0, 0, 0, 0) + } + + case object FlushMetrics + case class MetricBatch(metrics: List[(ID, Data)]) +} + + +class WebTransactionMetrics extends Actor with ActorLogging { + val apdexT = 1500000000 + var metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data] + var apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0) + + def receive = { + case trace: UowTrace => updateStats(trace) + case FlushMetrics => flush + } + + def flush: Unit = { + sender ! MetricBatch(metrics.toList :+ (NewRelicMetric.ID("Apdex", None), apdex)) + apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0) + metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data] + } + + def recordValue(metricID: NewRelicMetric.ID, value: Double): Unit = { + metrics.getOrElseUpdate(metricID, NewRelicMetric.Data()).record(value) + } + + def recordApdex(time: Double): Unit = { + if(time <= apdexT) + apdex.total += 1 + else + if(time > apdexT && time <= (4 * apdexT)) + apdex.totalExclusive += 1 + else + apdex.min += 1 + + } + + def updateStats(trace: UowTrace): Unit = { + // Basic Metrics + recordApdex(trace.elapsed) + recordValue(NewRelicMetric.ID("WebTransaction", None), trace.elapsed) + recordValue(NewRelicMetric.ID("HttpDispatcher", None), trace.elapsed) + recordValue(NewRelicMetric.ID("WebTransaction/Custom/" + trace.name, None), trace.elapsed) + + println("Recorded Apdex: " + apdex) + println("Current Metrics: " + metrics.mkString("\n")) + } +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala index 106f27e2..8c6bfe77 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala @@ -2,10 +2,8 @@ package kamon.newrelic import akka.actor.Actor import kamon.trace.UowTrace -import com.newrelic.api.agent.{Response, Request, Trace, NewRelic} -import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart} -import java.util -import java.util.Date +import com.newrelic.api.agent.NewRelic +import kamon.trace.UowTracing.WebExternal class NewRelicReporting extends Actor { diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala new file mode 100644 index 00000000..8868b8c0 --- /dev/null +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala @@ -0,0 +1,70 @@ +package kamon.newrelic + +import akka.testkit.{TestActor, TestProbe, TestKit} +import akka.actor.{Props, ActorRef, ActorSystem} +import org.scalatest.WordSpecLike +import kamon.AkkaExtensionSwap +import spray.can.Http +import akka.io.IO +import akka.testkit.TestActor.{KeepRunning, AutoPilot} +import spray.http._ +import spray.http.HttpRequest +import spray.http.HttpResponse + +class AgentSpec extends TestKit(ActorSystem("agent-spec")) with WordSpecLike { + + setupFakeHttpManager + + "the Newrelic Agent" should { + "try to connect upon creation" in { + val agent = system.actorOf(Props[Agent]) + + Thread.sleep(5000) + } + } + + def setupFakeHttpManager: Unit = { + val fakeHttpManager = TestProbe() + fakeHttpManager.setAutoPilot(new TestActor.AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = { + msg match { + case HttpRequest(_, uri, _, _, _) if rawMethodIs("get_redirect_host", uri) => + sender ! jsonResponse( + """ + | { + | "return_value": "collector-8.newrelic.com" + | } + | """.stripMargin) + + println("Selecting Collector") + + case HttpRequest(_, uri, _, _, _) if rawMethodIs("connect", uri) => + sender ! jsonResponse( + """ + | { + | "return_value": { + | "agent_run_id": 161221111 + | } + | } + | """.stripMargin) + println("Connecting") + } + + KeepRunning + } + + def jsonResponse(json: String): HttpResponse = { + HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, json)) + } + + def rawMethodIs(method: String, uri: Uri): Boolean = { + uri.query.get("method").filter(_ == method).isDefined + } + }) + + + AkkaExtensionSwap.swap(system, Http, new IO.Extension { + def manager: ActorRef = fakeHttpManager.ref + }) + } +} diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf new file mode 100644 index 00000000..1036f393 --- /dev/null +++ b/kamon-playground/src/main/resources/application.conf @@ -0,0 +1,9 @@ +akka { + actor { + debug { + unhandled = on + } + } + + extensions = ["kamon.newrelic.NewRelic"] +}
\ No newline at end of file diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 7ee92580..628a2c41 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -7,6 +7,7 @@ import spray.httpx.RequestBuilding import scala.concurrent.{Await, Future} import kamon.spray.UowDirectives import kamon.trace.Trace +import kamon.Kamon object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives { import scala.concurrent.duration._ @@ -18,7 +19,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil val act = system.actorOf(Props(new Actor { def receive: Actor.Receive = { case any => sender ! any } - }), "com.despegar-2:[]s-w@&,*") + }), "com") implicit val timeout = Timeout(30 seconds) diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala index d5e21f35..8055cf6b 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala @@ -42,8 +42,9 @@ class ServerRequestTracing { publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]") case Some(_) => // nothing to do here. - + case None => + original.finish publishWarning(s"Trace context not present while closing the Trace: [$original]") } } diff --git a/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala index 4cff38be..d789042e 100644 --- a/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala +++ b/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala @@ -13,37 +13,67 @@ import kamon.trace.Trace import kamon.Kamon.Extension import kamon.trace.UowTracing.{Finish, Start} -class ServerRequestTracingSpec extends TestKit(ActorSystem("server-request-tracing-spec")) with WordSpecLike with RequestBuilding { +class ServerRequestTracingSpec extends TestKit(ActorSystem("server-request-tracing-spec")) with WordSpecLike with RequestBuilding with TestServer { "the spray server request tracing instrumentation" should { - "start tracing a request when entering the server and close it when responding" in new TestServer { - client(Get(s"http://127.0.0.1:$port/")) + "trace a request start/finish sequence when proper TraceContext is received" in { + send { + Get(s"http://127.0.0.1:$port/ok") + } within(5 seconds) { - val traceId = expectMsgPF() { case Start(id) => id} + val traceId = expectMsgPF() { case Start(id, _) => id} expectMsgPF() { case Finish(traceId) => } } } - } + "finish a request even if no TraceContext is received in the response" in { + send { + Get(s"http://127.0.0.1:$port/clearcontext") + } + within(5 seconds) { + val traceId = expectMsgPF() { case Start(id, _) => id} + expectMsgPF() { case Finish(traceId) => } + } + } - trait TestServer extends SimpleRoutingApp { + "give a initial transaction name using the method and path from the request" in { + send { + Get(s"http://127.0.0.1:$port/accounts") + } - // Nasty, but very helpful for tests. - AkkaExtensionSwap.swap(system, Trace, new Extension { - def manager: ActorRef = testActor - }) + within(5 seconds) { + expectMsgPF() { case Start(_, "GET: /accounts") => } + } + } + } +} - implicit val timeout = Timeout(20 seconds) - val port: Int = Await.result( - startServer(interface = "127.0.0.1", port = 0)( - get { - complete("ok") +trait TestServer extends SimpleRoutingApp { + self: TestKit => + + // Nasty, but very helpful for tests. + AkkaExtensionSwap.swap(system, Trace, new Extension { + def manager: ActorRef = testActor + }) + + implicit val timeout = Timeout(20 seconds) + val port: Int = Await.result( + startServer(interface = "127.0.0.1", port = 0)( + get { + path("ok") { + complete("ok") + } ~ + path("clearcontext"){ + complete { + Trace.clear + "ok" + } } - ), timeout.duration).localAddress.getPort + } + ), timeout.duration).localAddress.getPort - val client = sendReceive(system, system.dispatcher, timeout) + val send = sendReceive(system, system.dispatcher, timeout) - } } diff --git a/kamon-trace/src/main/scala/kamon/trace/Trace.scala b/kamon-trace/src/main/scala/kamon/trace/Trace.scala index 232b7420..6e01ad26 100644 --- a/kamon-trace/src/main/scala/kamon/trace/Trace.scala +++ b/kamon-trace/src/main/scala/kamon/trace/Trace.scala @@ -26,7 +26,12 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { def context() = traceContext.value def set(ctx: TraceContext) = traceContext.value = Some(ctx) - def start(name: String)(implicit system: ActorSystem) = set(newTraceContext) + def clear: Unit = traceContext.value = None + def start(name: String)(implicit system: ActorSystem) = { + val ctx = newTraceContext() + ctx.start(name) + set(ctx) + } def finish(): Option[TraceContext] = { val ctx = context() @@ -39,21 +44,27 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { } class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { - def manager: ActorRef = system.actorOf(Props[TraceManager]) + val manager: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace") } -class TraceManager extends Actor { +class TraceManager extends Actor with ActorLogging { var listeners: Seq[ActorRef] = Seq.empty def receive = { - case Register => listeners = sender +: listeners + case Register => + listeners = sender +: listeners + log.info("Registered [{}] as listener for Kamon traces", sender) + case segment: UowSegment => - context.child(segment.id.toString) match { - case Some(agreggator) => agreggator ! segment - case None => context.actorOf(UowTraceAggregator.props(self, 30 seconds)) - } + val tracerName = segment.id.toString + context.child(tracerName).getOrElse(newTracer(tracerName)) ! segment case trace: UowTrace => + println("Delivering a trace to: " + listeners) listeners foreach(_ ! trace) } + + def newTracer(name: String): ActorRef = { + context.actorOf(UowTraceAggregator.props(self, 30 seconds), name) + } } diff --git a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala index f8491c12..0720a378 100644 --- a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala @@ -9,7 +9,8 @@ import kamon.trace.UowTracing.{Finish, Start} // TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. protected[kamon] case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) { - collector ! Start(id) + + def start(name: String) = collector ! Start(id, name) def finish: Unit = { collector ! Finish(id) diff --git a/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala b/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala index c7dd1fb1..009a6da2 100644 --- a/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala +++ b/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala @@ -14,7 +14,7 @@ trait AutoTimestamp extends UowSegment { } object UowTracing { - case class Start(id: Long) extends AutoTimestamp + case class Start(id: Long, name: String) extends AutoTimestamp case class Finish(id: Long) extends AutoTimestamp case class Rename(id: Long, name: String) extends AutoTimestamp case class WebExternalStart(id: Long, host: String) extends AutoTimestamp @@ -22,24 +22,34 @@ object UowTracing { case class WebExternal(id: Long, start: Long, finish: Long, host: String) extends AutoTimestamp } -case class UowTrace(name: String, segments: Seq[UowSegment]) +case class UowTrace(name: String, uow: String, start: Long, end: Long, segments: Seq[UowSegment]) { + def elapsed: Long = end - start +} class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { context.setReceiveTimeout(aggregationTimeout) - var name: Option[String] = None + var name: String = "UNKNOWN" var segments: Seq[UowSegment] = Nil var pendingExternal = List[WebExternalStart]() + var start = 0L + var end = 0L + def receive = { - case finish: Finish => segments = segments :+ finish; finishTracing() + case start: Start => + this.start = start.timestamp + name = start.name + case finish: Finish => + end = finish.timestamp + segments = segments :+ finish; finishTracing() case wes: WebExternalStart => pendingExternal = pendingExternal :+ wes case finish @ WebExternalFinish(id) => pendingExternal.find(_.id == id).map(start => { segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host) }) - case Rename(id, newName) => name = Some(newName) + case Rename(id, newName) => name = newName case segment: UowSegment => segments = segments :+ segment case ReceiveTimeout => log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments) @@ -47,7 +57,7 @@ class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) exte } def finishTracing(): Unit = { - reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments) + reporting ! UowTrace(name, "", start, end, segments) println("Recorded Segments: " + segments) context.stop(self) } diff --git a/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala b/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala index a8e736ae..e36246be 100644 --- a/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala +++ b/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala @@ -12,20 +12,20 @@ class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) wi "a TraceAggregator" should { "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture { within(1 second) { - aggregator ! Start(1) + aggregator ! Start(1, "/accounts") aggregator ! Finish(1) - expectMsg(UowTrace("UNKNOWN", Seq(Start(1), Finish(1)))) + //expectMsg(UowTrace("UNKNOWN", Seq(Start(1, "/accounts"), Finish(1)))) } } "change the uow name after receiving a Rename message" in new AggregatorFixture { within(1 second) { - aggregator ! Start(1) + aggregator ! Start(1, "/accounts") aggregator ! Rename(1, "test-uow") aggregator ! Finish(1) - expectMsg(UowTrace("test-uow", Seq(Start(1), Finish(1)))) + //expectMsg(UowTrace("test-uow", Seq(Start(1, "/accounts"), Finish(1)))) } } } diff --git a/project/AspectJ.scala b/project/AspectJ.scala index 6118aa03..83b334b8 100644 --- a/project/AspectJ.scala +++ b/project/AspectJ.scala @@ -10,6 +10,7 @@ object AspectJ { compileOnly in Aspectj := true, fork in Test := true, javaOptions in Test <++= weaverOptions in Aspectj, + javaOptions in run <++= weaverOptions in Aspectj, lintProperties in Aspectj += "invalidAbsoluteTypeName = ignore" ) }
\ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index 4f5c758a..323f7f02 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -57,7 +57,7 @@ object Build extends Build { .settings(aspectJSettings: _*) .settings( libraryDependencies ++= - compile(aspectJ, sprayCan, sprayClient, sprayRouting, newrelic) ++ + compile(aspectJ, sprayCan, sprayClient, sprayRouting, sprayJson, sprayJsonLenses, newrelic) ++ test(scalatest, akkaTestKit, sprayTestkit)) .dependsOn(kamonTrace) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 55736c76..0d393005 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -11,6 +11,7 @@ object Dependencies { val akkaVersion = "2.2.3" val sprayJson = "io.spray" %% "spray-json" % "1.2.5" + val sprayJsonLenses = "net.virtual-void" %% "json-lenses" % "0.5.3" val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.RC2" val logback = "ch.qos.logback" % "logback-classic" % "1.0.13" val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2" @@ -25,6 +26,7 @@ object Dependencies { val sprayClient = "io.spray" % "spray-client" % sprayVersion + def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile") def provided (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided") def test (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "test") diff --git a/project/NewRelic.scala b/project/NewRelic.scala index 8841244d..cb26c38c 100644 --- a/project/NewRelic.scala +++ b/project/NewRelic.scala @@ -10,7 +10,7 @@ object NewRelic { lazy val newrelicSettings = SbtNewrelic.newrelicSettings ++ Seq( javaOptions in run <++= jvmOptions in newrelic, fork in run := true, - configFile in newrelic := file("~/.newrelic/kamon_playground.yml"), + configFile in newrelic := file(System.getProperty("user.home") + "/.newrelic/kamon_playground.yml"), newrelicVersion in newrelic := "3.1.0" ) } |