From 57e433c07a271b4e5e4159500cdc828cd7bb6a83 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 4 Feb 2014 18:16:07 -0300 Subject: partial rewrite of kamon-newrelic --- .../scala/kamon/metrics/MetricsExtension.scala | 2 +- .../main/scala/kamon/metrics/Subscriptions.scala | 46 ++++++++- .../kamon/metrics/TickMetricSnapshotBuffer.scala | 64 ------------- .../src/main/scala/kamon/trace/UowTracing.scala | 82 ---------------- .../instrumentation/TraceAggregatorSpec.scala | 51 ---------- .../src/main/scala/kamon/newrelic/Agent.scala | 43 ++++++--- .../scala/kamon/newrelic/AgentJsonProtocol.scala | 31 +++---- .../src/main/scala/kamon/newrelic/Apdex.scala | 88 ------------------ .../src/main/scala/kamon/newrelic/Metric.scala | 32 ------- .../scala/kamon/newrelic/MetricTranslator.scala | 38 ++++++++ .../src/main/scala/kamon/newrelic/NewRelic.scala | 103 ++++++--------------- .../scala/kamon/newrelic/NewRelicErrorLogger.scala | 6 +- .../scala/kamon/newrelic/NewRelicReporting.scala | 60 ------------ .../kamon/newrelic/WebTransactionMetrics.scala | 17 ++-- .../src/main/scala/kamon/newrelic/package.scala | 4 +- .../src/main/resources/application.conf | 2 +- .../main/scala/test/SimpleRequestProcessor.scala | 1 - .../spray/ServerRequestInstrumentationSpec.scala | 18 +--- 18 files changed, 173 insertions(+), 515 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala delete mode 100644 kamon-core/src/main/scala/kamon/trace/UowTracing.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala delete mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala delete mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala create mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala delete mode 100644 kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala index a8b90b57..359540fc 100644 --- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala @@ -25,7 +25,7 @@ import akka.actor import kamon.metrics.Metrics.MetricGroupFilter import kamon.metrics.Subscriptions.Subscribe -class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension { +class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() val filters = loadFilters(config) diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala index 654c37b0..2111563b 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -16,12 +16,13 @@ package kamon.metrics -import akka.actor.{ ActorRef, Actor } +import akka.actor.{Props, ActorRef, Actor} import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } import kamon.util.GlobPathFilter -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{FiniteDuration, Duration} import java.util.concurrent.TimeUnit import kamon.Kamon +import kamon.metrics.TickMetricSnapshotBuffer.FlushBuffer class Subscriptions extends Actor { import context.system @@ -85,5 +86,46 @@ object Subscriptions { category.equals(identity.category) && globFilter.accept(identity.name) } } +} + + +class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { + val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + + def receive = empty + + def empty: Actor.Receive = { + case tick : TickMetricSnapshot => context become(buffering(tick)) + case FlushBuffer => // Nothing to flush. + } + + def buffering(buffered: TickMetricSnapshot): Actor.Receive = { + case TickMetricSnapshot(_, to, tickMetrics) => + val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) + val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) + + context become(buffering(combinedSnapshot)) + + case FlushBuffer => + receiver ! buffered + context become(empty) + + } + + + override def postStop(): Unit = { + flushSchedule.cancel() + super.postStop() + } + + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = new MetricGroupSnapshot { + val metrics = combineMaps(left.metrics, right.metrics)((l, r) => l.merge(r)) + } +} + +object TickMetricSnapshotBuffer { + case object FlushBuffer + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = + Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) } diff --git a/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala deleted file mode 100644 index 11c58cae..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import akka.actor.{Props, ActorRef, Actor} -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.TickMetricSnapshotBuffer.FlushBuffer -import scala.concurrent.duration.FiniteDuration - - -class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { - val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) - - def receive = empty - - def empty: Actor.Receive = { - case tick : TickMetricSnapshot => context become(buffering(tick)) - case FlushBuffer => // Nothing to flush. - } - - def buffering(buffered: TickMetricSnapshot): Actor.Receive = { - case TickMetricSnapshot(_, to, tickMetrics) => - val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) - val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) - - context become(buffering(combinedSnapshot)) - - case FlushBuffer => - receiver ! buffered - context become(empty) - - } - - - override def postStop(): Unit = { - flushSchedule.cancel() - super.postStop() - } - - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = new MetricGroupSnapshot { - val metrics = combineMaps(left.metrics, right.metrics)((l, r) => l.merge(r)) - } -} - -object TickMetricSnapshotBuffer { - case object FlushBuffer - - def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = - Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) -} diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala deleted file mode 100644 index 20cce830..00000000 --- a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.trace - -import akka.actor._ -import scala.concurrent.duration.Duration -import kamon.trace.UowTracing._ - -sealed trait UowSegment { - def id: Long - def timestamp: Long -} - -trait AutoTimestamp extends UowSegment { - val timestamp = System.nanoTime -} - -object UowTracing { - case class Start(id: Long, name: String) extends AutoTimestamp - case class Finish(id: Long) extends AutoTimestamp - case class Rename(id: Long, name: String) extends AutoTimestamp - case class WebExternalStart(id: Long, host: String) extends AutoTimestamp - case class WebExternalFinish(id: Long) extends AutoTimestamp - case class WebExternal(id: Long, start: Long, finish: Long, host: String) extends AutoTimestamp -} - -case class UowTrace(name: String, uow: String, start: Long, end: Long, segments: Seq[UowSegment]) { - def elapsed: Long = end - start -} - -class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { - context.setReceiveTimeout(aggregationTimeout) - - var name: String = "UNKNOWN" - var segments: Seq[UowSegment] = Nil - - var pendingExternal = List[WebExternalStart]() - - var start = 0L - var end = 0L - - def receive = { - case start: Start ⇒ - this.start = start.timestamp - segments = segments :+ start - name = start.name - case finish: Finish ⇒ - end = finish.timestamp - segments = segments :+ finish; finishTracing() - case wes: WebExternalStart ⇒ pendingExternal = pendingExternal :+ wes - case finish @ WebExternalFinish(id) ⇒ pendingExternal.find(_.id == id).map(start ⇒ { - segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host) - }) - case Rename(id, newName) ⇒ name = newName - case segment: UowSegment ⇒ segments = segments :+ segment - case ReceiveTimeout ⇒ - log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments) - context.stop(self) - } - - def finishTracing(): Unit = { - reporting ! UowTrace(name, "", start, end, segments) - context.stop(self) - } -} - -object UowTraceAggregator { - def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout) -} diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala deleted file mode 100644 index 3b32f3ac..00000000 --- a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.trace.instrumentation - -import org.scalatest.{ WordSpecLike, WordSpec } -import akka.testkit.{ TestKitBase, TestKit } -import akka.actor.ActorSystem -import scala.concurrent.duration._ -import kamon.trace.UowTracing.{ Finish, Rename, Start } -import kamon.trace.{ UowTrace, UowTraceAggregator } - -class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike { - - "a TraceAggregator" should { - "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture { - within(1 second) { - aggregator ! Start(1, "/accounts") - aggregator ! Finish(1) - - //expectMsg(UowTrace("UNKNOWN", Seq(Start(1, "/accounts"), Finish(1)))) - } - } - - "change the uow name after receiving a Rename message" in new AggregatorFixture { - within(1 second) { - aggregator ! Start(1, "/accounts") - aggregator ! Rename(1, "test-uow") - aggregator ! Finish(1) - - //expectMsg(UowTrace("test-uow", Seq(Start(1, "/accounts"), Finish(1)))) - } - } - } - - trait AggregatorFixture { - val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds)) - } -} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 4082458c..a73f390a 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -26,7 +26,8 @@ import akka.pattern.pipe import java.lang.management.ManagementFactory import spray.client.pipelining._ import scala.util.control.NonFatal -import kamon.newrelic.NewRelicMetric.{ Data, ID, MetricBatch } +import spray.http.Uri.Query +import kamon.newrelic.MetricTranslator.TimeSliceMetrics class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging { import context.dispatcher @@ -43,12 +44,20 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt) } + val baseQuery = Query( + "license_key" -> agentInfo.licenseKey, + "marshal_format" -> "json", + "protocol_version" -> "12") + + def receive = { - case Initialize(runId, collector) ⇒ context become reporting(runId, collector) + case Initialize(runId, collector) ⇒ + log.info("Agent initialized with runID: [{}] and collector: [{}]", runId, collector) + context become reporting(runId, collector) } def reporting(runId: Long, collector: String): Receive = { - case batch: MetricBatch ⇒ sendMetricData(runId, collector, batch.metrics) + case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics) } override def preStart(): Unit = { @@ -68,34 +77,44 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with } import AgentJsonProtocol._ - val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive + val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = logRequest(println(_)) ~>encode(Deflate) ~> sendReceive ~> logResponse(println(_)) val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson + def selectCollector: Future[String] = { + val query = ("method" -> "get_redirect_host") +: baseQuery + val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query) + compressedToJsonPipeline { - Post(s"http://collector.newrelic.com/agent_listener/invoke_raw_method?method=get_redirect_host&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", JsArray()) + Post(getRedirectHostUri, JsArray()) + } map { json ⇒ json.extract[String]('return_value) } } def connect(collectorHost: String, connect: AgentInfo): Future[Long] = { + log.debug("Connecting to NewRelic Collector [{}]", collectorHost) + + val query = ("method" -> "connect") +: baseQuery + val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query) + compressedToJsonPipeline { - Post(s"http://$collectorHost/agent_listener/invoke_raw_method?method=connect&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", connect) + Post(connectUri, connect) + } map { json ⇒ json.extract[Long]('return_value / 'agent_run_id) } } - def sendMetricData(runId: Long, collector: String, metrics: List[(ID, Data)]) = { - log.info("Reporting this to NewRelic: " + metrics.mkString("\n")) + def sendMetricData(runId: Long, collector: String, metrics: TimeSliceMetrics) = { + val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery + val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query) - val end = System.currentTimeMillis() / 1000L - val start = end - 60 compressedPipeline { - Post(s"http://$collector/agent_listener/invoke_raw_method?method=metric_data&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12&run_id=$runId", MetricData(runId, start, end, metrics)) + Post(sendMetricDataUri, MetricData(runId, metrics)) } } @@ -108,5 +127,5 @@ object Agent { case class CollectorSelection(return_value: String) case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int) - case class MetricData(runId: Long, start: Long, end: Long, metrics: List[(ID, Data)]) + case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics) } \ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala index da8199ab..ef556e11 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala @@ -32,26 +32,23 @@ object AgentJsonProtocol extends DefaultJsonProtocol { "pid" -> JsNumber(obj.pid))) } - import NewRelicMetric._ - - implicit def listWriter[T: JsonWriter] = new JsonWriter[List[T]] { - def write(list: List[T]) = JsArray(list.map(_.toJson)) + implicit def seqWriter[T: JsonWriter] = new JsonWriter[Seq[T]] { + def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toList) } - implicit object MetricDetailWriter extends JsonWriter[(ID, Data)] { - def write(obj: (ID, Data)): JsValue = { - val (id, data) = obj + implicit object MetricDetailWriter extends JsonWriter[NewRelic.Metric] { + def write(obj: NewRelic.Metric): JsValue = { JsArray( JsObject( - "name" -> JsString(id.name) // TODO Include scope + "name" -> JsString(obj.name) // TODO Include scope ), JsArray( - JsNumber(data.callCount), - JsNumber(data.total), - JsNumber(data.totalExclusive), - JsNumber(data.min), - JsNumber(data.max), - JsNumber(data.sumOfSquares))) + JsNumber(obj.callCount), + JsNumber(obj.total), + JsNumber(obj.totalExclusive), + JsNumber(obj.min), + JsNumber(obj.max), + JsNumber(obj.sumOfSquares))) } } @@ -59,8 +56,8 @@ object AgentJsonProtocol extends DefaultJsonProtocol { def write(obj: MetricData): JsValue = JsArray( JsNumber(obj.runId), - JsNumber(obj.start), - JsNumber(obj.end), - obj.metrics.toJson) + JsNumber(obj.timeSliceMetrics.from), + JsNumber(obj.timeSliceMetrics.to), + obj.timeSliceMetrics.metrics.toJson) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala deleted file mode 100644 index 34bb4f46..00000000 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.newrelic - -import akka.actor.Actor -import kamon.trace.UowTrace -import com.newrelic.api.agent.{ NewRelic ⇒ NRAgent } -import kamon.trace.UowTracing.WebExternal - -class Apdex extends Actor { - val t = 500 - - var satisfied: Int = 0 - var tolerating: Int = 0 - var frustrated: Int = 0 - - def receive = { - case trace: UowTrace ⇒ recordTransaction(trace) - - } - - def clearStats: Unit = { - satisfied = 0 - tolerating = 0 - frustrated = 0 - } - - def total: Int = satisfied + tolerating + frustrated - - def updateStats(sampleTime: Double): Unit = { - if (sampleTime < t) - satisfied += 1 - else if (sampleTime >= t && sampleTime <= 4 * t) - tolerating += 1 - else - frustrated += 1 - } - - def recordTransaction(uowTrace: UowTrace): Unit = { - val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp) / 1E9) - - updateStats(time) - - NRAgent.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat) - NRAgent.recordMetric("WebTransaction", time.toFloat) - NRAgent.recordMetric("HttpDispatcher", time.toFloat) - - uowTrace.segments.collect { case we: WebExternal ⇒ we }.foreach { webExternalTrace ⇒ - val external = ((webExternalTrace.finish - webExternalTrace.start) / 1E9).toFloat - - NRAgent.recordMetric(s"External/${webExternalTrace.host}/http", external) - NRAgent.recordMetric(s"External/${webExternalTrace.host}/all", external) - NRAgent.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) - } - - val allExternals = uowTrace.segments.collect { case we: WebExternal ⇒ we } sortBy (_.timestamp) - - def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match { - case Nil ⇒ accum - case head :: tail ⇒ - if (head.start > lastEnd) - measureExternal(accum + (head.finish - head.start), head.finish, tail) - else - measureExternal(accum + (head.finish - lastEnd), head.finish, tail) - } - - val external = measureExternal(0, 0, allExternals) / 1E9 - - NRAgent.recordMetric(s"External/all", external.toFloat) - NRAgent.recordMetric(s"External/allWeb", external.toFloat) - - } -} - -case object Flush diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala deleted file mode 100644 index dc6f0868..00000000 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.newrelic - -case class Metric(name: String, scope: Option[String], callCount: Long, total: Double, totalExclusive: Double, - min: Double, max: Double, sumOfSquares: Double) { - - def merge(that: Metric): Metric = { - Metric(name, scope, - callCount + that.callCount, - total + that.total, - totalExclusive + that.totalExclusive, - math.min(min, that.min), - math.max(max, that.max), - sumOfSquares + that.sumOfSquares) - } - -} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala new file mode 100644 index 00000000..39177e30 --- /dev/null +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala @@ -0,0 +1,38 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.newrelic + +import akka.actor.{Props, ActorRef, Actor} +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.newrelic.MetricTranslator.TimeSliceMetrics + +class MetricTranslator(receiver: ActorRef) extends Actor with WebTransactionMetrics { + + def receive = { + case TickMetricSnapshot(from, to, metrics) => + val allMetrics = collectWebTransactionMetrics(metrics) + + receiver ! TimeSliceMetrics(from, to, allMetrics) + } + +} + +object MetricTranslator { + case class TimeSliceMetrics(from: Long, to: Long, metrics: Seq[NewRelic.Metric]) + + def props(receiver: ActorRef): Props = Props(new MetricTranslator(receiver)) +} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala index 57be566c..ef2de343 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -16,97 +16,48 @@ package kamon.newrelic import akka.actor._ -import scala.collection.mutable -import kamon.Kamon -import kamon.trace.{ UowTrace } -import kamon.newrelic.NewRelicMetric.{ MetricBatch, FlushMetrics } import scala.concurrent.duration._ +import kamon.Kamon +import kamon.metrics.{TickMetricSnapshotBuffer, TraceMetrics, Metrics} +import kamon.metrics.Subscriptions.TickMetricSnapshot +import akka.actor -class NewRelic extends ExtensionId[NewRelicExtension] { - def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system) -} class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val api: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic") + val manager: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic") + + Kamon(Metrics)(system).subscribe(TraceMetrics, "*", manager, permanently = true) } class NewRelicManager extends Actor with ActorLogging { - log.info("Registering the Kamon(NewRelic) extension") - - //Kamon(Trace)(context.system).api ! Trace.Register + log.info("Starting the Kamon(NewRelic) extension") - val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics2], "web-transaction-metrics") val agent = context.actorOf(Props[Agent], "agent") - - import context.dispatcher - context.system.scheduler.schedule(1 minute, 1 minute) { - webTransactionMetrics.tell(FlushMetrics, agent) - } + val translator = context.actorOf(MetricTranslator.props(agent), "translator") + val buffer = context.actorOf(TickMetricSnapshotBuffer.props(1 minute, translator), "metric-buffer") def receive = { - case trace: UowTrace ⇒ webTransactionMetrics ! trace - } -} - -object NewRelicMetric { - case class ID(name: String, scope: Option[String]) - case class Data(var callCount: Long, var total: Double, var totalExclusive: Double, var min: Double, var max: Double, var sumOfSquares: Double) { - def record(value: Double): Unit = { - if (value > max) max = value - if (value < min) min = value - - total += value - totalExclusive += value - sumOfSquares += value * value - callCount += 1 - } - } - - object Data { - def apply(): Data = Data(0, 0, 0, Double.MaxValue, 0, 0) + case tick: TickMetricSnapshot => buffer.forward(tick) } - - case object FlushMetrics - case class MetricBatch(metrics: List[(ID, Data)]) } -class WebTransactionMetrics2 extends Actor with ActorLogging { - val apdexT = 0.5D - var metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data] - var apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0) - - def receive = { - case trace: UowTrace ⇒ updateStats(trace) - case FlushMetrics ⇒ flush - } - - def flush: Unit = { - sender ! MetricBatch(metrics.toList :+ (NewRelicMetric.ID("Apdex", None), apdex)) - apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0) - metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data] - } - - def recordValue(metricID: NewRelicMetric.ID, value: Double): Unit = { - metrics.getOrElseUpdate(metricID, NewRelicMetric.Data()).record(value) - } - - def recordApdex(time: Double): Unit = { - if (time <= apdexT) - apdex.callCount += 1 - else if (time > apdexT && time <= (4 * apdexT)) - apdex.total += 1 - else - apdex.totalExclusive += 1 - } +object NewRelic extends ExtensionId[NewRelicExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = NewRelic + def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system) - def updateStats(trace: UowTrace): Unit = { - // Basic Metrics - val elapsedSeconds = trace.elapsed / 1E9D + case class Metric(name: String, scope: Option[String], callCount: Long, total: Double, totalExclusive: Double, + min: Double, max: Double, sumOfSquares: Double) { + + def merge(that: Metric): Metric = { + Metric(name, scope, + callCount + that.callCount, + total + that.total, + totalExclusive + that.totalExclusive, + math.min(min, that.min), + math.max(max, that.max), + sumOfSquares + that.sumOfSquares) + } - recordApdex(elapsedSeconds) - recordValue(NewRelicMetric.ID("WebTransaction", None), elapsedSeconds) - recordValue(NewRelicMetric.ID("HttpDispatcher", None), elapsedSeconds) - recordValue(NewRelicMetric.ID("WebTransaction/Custom/" + trace.name, None), elapsedSeconds) } -} +} \ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 9f458bb5..65a98cb1 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -18,7 +18,7 @@ package kamon.newrelic import akka.actor.Actor import akka.event.Logging.Error import akka.event.Logging.{ LoggerInitialized, InitializeLogger } -import com.newrelic.api.agent.NewRelic +import com.newrelic.api.agent.{NewRelic => NR} import kamon.trace.TraceContextAware class NewRelicErrorLogger extends Actor { @@ -37,9 +37,9 @@ class NewRelicErrorLogger extends Actor { } if (error.cause == Error.NoCause) { - NewRelic.noticeError(error.message.toString, params) + NR.noticeError(error.message.toString, params) } else { - NewRelic.noticeError(error.cause, params) + NR.noticeError(error.cause, params) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala deleted file mode 100644 index 4e3d0d8d..00000000 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.newrelic - -import akka.actor.Actor -import kamon.trace.UowTrace -import com.newrelic.api.agent.{ NewRelic ⇒ NR } -import kamon.trace.UowTracing.WebExternal - -class NewRelicReporting extends Actor { - def receive = { - case trace: UowTrace ⇒ recordTransaction(trace) - } - - def recordTransaction(uowTrace: UowTrace): Unit = { - val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp) / 1E9) - - NR.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat) - NR.recordMetric("WebTransaction", time.toFloat) - NR.recordMetric("HttpDispatcher", time.toFloat) - - uowTrace.segments.collect { case we: WebExternal ⇒ we }.foreach { webExternalTrace ⇒ - val external = ((webExternalTrace.finish - webExternalTrace.start) / 1E9).toFloat - - NR.recordMetric(s"External/${webExternalTrace.host}/http", external) - NR.recordMetric(s"External/${webExternalTrace.host}/all", external) - NR.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) - } - - val allExternals = uowTrace.segments.collect { case we: WebExternal ⇒ we } sortBy (_.timestamp) - - def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match { - case Nil ⇒ accum - case head :: tail ⇒ - if (head.start > lastEnd) - measureExternal(accum + (head.finish - head.start), head.finish, tail) - else - measureExternal(accum + (head.finish - lastEnd), head.finish, tail) - } - - val external = measureExternal(0, 0, allExternals) / 1E9 - - NR.recordMetric(s"External/all", external.toFloat) - NR.recordMetric(s"External/allWeb", external.toFloat) - - } -} diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala index e10e9271..7fe5793c 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala @@ -17,18 +17,17 @@ package kamon.newrelic import kamon.metrics.{ TraceMetrics, MetricGroupSnapshot, MetricGroupIdentity } +import kamon.metrics.TraceMetrics.ElapsedTime -object WebTransactionMetrics { - def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): List[Metric] = { - metrics.collect { +trait WebTransactionMetrics { + def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): List[NewRelic.Metric] = { + val webTransactionMetrics = metrics.collect { case (TraceMetrics(name), groupSnapshot) ⇒ - - - groupSnapshot.metrics foreach { - case (metricIdentity, snapshot) => println(s"[$name] - ${toNewRelicMetric(name, None, snapshot)}") - } + groupSnapshot.metrics collect { + case (ElapsedTime, snapshot) => toNewRelicMetric("HttpDispatcher", None, snapshot) + } } - Nil + webTransactionMetrics.flatten.toList } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala index f0b28d95..bf83b049 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala @@ -22,7 +22,7 @@ import kamon.metrics.MetricSnapshot package object newrelic { - def toNewRelicMetric(name: String, scope: Option[String], snapshot: MetricSnapshot): Metric = { + def toNewRelicMetric(name: String, scope: Option[String], snapshot: MetricSnapshot): NewRelic.Metric = { var total: Double = 0D var sumOfSquares: Double = 0D @@ -40,6 +40,6 @@ package object newrelic { val scaledMin = snapshot.min / 1E9D val scaledMax = snapshot.max / 1E9D - Metric(name, scope, snapshot.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares) + NewRelic.Metric(name, scope, snapshot.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares) } } diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index f0698592..c490c6c2 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -1,6 +1,6 @@ akka { loggers = [ "akka.event.slf4j.Slf4jLogger" ] - loglevel = INFO + loglevel = DEBUG extensions = ["kamon.newrelic.NewRelic"] diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index fb3d05d2..06d8795a 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -111,7 +111,6 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil class PrintWhatever extends Actor { def receive = { - case tick: TickMetricSnapshot => WebTransactionMetrics.collectWebTransactionMetrics(tick.metrics) case anything ⇒ println(anything) } } diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala index 8fd84bfb..d3d18a1b 100644 --- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala +++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala @@ -16,28 +16,18 @@ package kamon.spray import _root_.spray.httpx.RequestBuilding -import _root_.spray.routing.SimpleRoutingApp -import akka.testkit.{ TestKitBase, TestProbe, TestKit } -import akka.actor.{ ActorRef, ActorSystem } +import akka.testkit.{ TestProbe, TestKit } +import akka.actor.ActorSystem import org.scalatest.{ Matchers, WordSpecLike } -import scala.concurrent.Await -import scala.concurrent.duration._ -import _root_.spray.client.pipelining._ -import akka.util.Timeout -import kamon.trace.{ UowTrace } import kamon.Kamon import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures } import spray.http.HttpHeaders.RawHeader import spray.http.{ HttpResponse, HttpRequest } -import spray.http.HttpHeaders.Host -import akka.io.{ Tcp, IO } -import spray.can.Http -import akka.io.Tcp.Bound import kamon.metrics.{ TraceMetrics, Metrics } -import kamon.metrics.TraceMetrics.TraceMetricSnapshot import kamon.metrics.Subscriptions.TickMetricSnapshot -class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with WordSpecLike with Matchers with RequestBuilding with ScalaFutures with PatienceConfiguration with TestServer { +class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with WordSpecLike with Matchers with RequestBuilding + with ScalaFutures with PatienceConfiguration with TestServer { "the spray server request tracing instrumentation" should { "reply back with the same trace token header provided in the request" in { -- cgit v1.2.3