aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-02-04 18:16:07 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-02-04 18:16:07 -0300
commit57e433c07a271b4e5e4159500cdc828cd7bb6a83 (patch)
tree6b2928dcf3c1dc68c4131aa864a0a0f53ccf2160
parent7307e1cc97e0363d1fb4cc116fc69a5272ca3730 (diff)
downloadKamon-57e433c07a271b4e5e4159500cdc828cd7bb6a83.tar.gz
Kamon-57e433c07a271b4e5e4159500cdc828cd7bb6a83.tar.bz2
Kamon-57e433c07a271b4e5e4159500cdc828cd7bb6a83.zip
partial rewrite of kamon-newrelic
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala46
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala64
-rw-r--r--kamon-core/src/main/scala/kamon/trace/UowTracing.scala82
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala51
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala43
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala31
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala88
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala (renamed from kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala)28
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala103
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala6
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala60
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala17
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/package.scala4
-rw-r--r--kamon-playground/src/main/resources/application.conf2
-rw-r--r--kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala1
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala18
17 files changed, 152 insertions, 494 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
index a8b90b57..359540fc 100644
--- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
@@ -25,7 +25,7 @@ import akka.actor
import kamon.metrics.Metrics.MetricGroupFilter
import kamon.metrics.Subscriptions.Subscribe
-class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension {
+class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val config = system.settings.config
val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
val filters = loadFilters(config)
diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
index 654c37b0..2111563b 100644
--- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
@@ -16,12 +16,13 @@
package kamon.metrics
-import akka.actor.{ ActorRef, Actor }
+import akka.actor.{Props, ActorRef, Actor}
import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe }
import kamon.util.GlobPathFilter
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{FiniteDuration, Duration}
import java.util.concurrent.TimeUnit
import kamon.Kamon
+import kamon.metrics.TickMetricSnapshotBuffer.FlushBuffer
class Subscriptions extends Actor {
import context.system
@@ -85,5 +86,46 @@ object Subscriptions {
category.equals(identity.category) && globFilter.accept(identity.name)
}
}
+}
+
+
+class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor {
+ val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
+
+ def receive = empty
+
+ def empty: Actor.Receive = {
+ case tick : TickMetricSnapshot => context become(buffering(tick))
+ case FlushBuffer => // Nothing to flush.
+ }
+
+ def buffering(buffered: TickMetricSnapshot): Actor.Receive = {
+ case TickMetricSnapshot(_, to, tickMetrics) =>
+ val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup)
+ val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics)
+
+ context become(buffering(combinedSnapshot))
+
+ case FlushBuffer =>
+ receiver ! buffered
+ context become(empty)
+
+ }
+
+
+ override def postStop(): Unit = {
+ flushSchedule.cancel()
+ super.postStop()
+ }
+
+ def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = new MetricGroupSnapshot {
+ val metrics = combineMaps(left.metrics, right.metrics)((l, r) => l.merge(r))
+ }
+}
+
+object TickMetricSnapshotBuffer {
+ case object FlushBuffer
+ def props(flushInterval: FiniteDuration, receiver: ActorRef): Props =
+ Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver))
}
diff --git a/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala
deleted file mode 100644
index 11c58cae..00000000
--- a/kamon-core/src/main/scala/kamon/metrics/TickMetricSnapshotBuffer.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics
-
-import akka.actor.{Props, ActorRef, Actor}
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import kamon.metrics.TickMetricSnapshotBuffer.FlushBuffer
-import scala.concurrent.duration.FiniteDuration
-
-
-class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor {
- val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
-
- def receive = empty
-
- def empty: Actor.Receive = {
- case tick : TickMetricSnapshot => context become(buffering(tick))
- case FlushBuffer => // Nothing to flush.
- }
-
- def buffering(buffered: TickMetricSnapshot): Actor.Receive = {
- case TickMetricSnapshot(_, to, tickMetrics) =>
- val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup)
- val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics)
-
- context become(buffering(combinedSnapshot))
-
- case FlushBuffer =>
- receiver ! buffered
- context become(empty)
-
- }
-
-
- override def postStop(): Unit = {
- flushSchedule.cancel()
- super.postStop()
- }
-
- def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = new MetricGroupSnapshot {
- val metrics = combineMaps(left.metrics, right.metrics)((l, r) => l.merge(r))
- }
-}
-
-object TickMetricSnapshotBuffer {
- case object FlushBuffer
-
- def props(flushInterval: FiniteDuration, receiver: ActorRef): Props =
- Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver))
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
deleted file mode 100644
index 20cce830..00000000
--- a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.trace
-
-import akka.actor._
-import scala.concurrent.duration.Duration
-import kamon.trace.UowTracing._
-
-sealed trait UowSegment {
- def id: Long
- def timestamp: Long
-}
-
-trait AutoTimestamp extends UowSegment {
- val timestamp = System.nanoTime
-}
-
-object UowTracing {
- case class Start(id: Long, name: String) extends AutoTimestamp
- case class Finish(id: Long) extends AutoTimestamp
- case class Rename(id: Long, name: String) extends AutoTimestamp
- case class WebExternalStart(id: Long, host: String) extends AutoTimestamp
- case class WebExternalFinish(id: Long) extends AutoTimestamp
- case class WebExternal(id: Long, start: Long, finish: Long, host: String) extends AutoTimestamp
-}
-
-case class UowTrace(name: String, uow: String, start: Long, end: Long, segments: Seq[UowSegment]) {
- def elapsed: Long = end - start
-}
-
-class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging {
- context.setReceiveTimeout(aggregationTimeout)
-
- var name: String = "UNKNOWN"
- var segments: Seq[UowSegment] = Nil
-
- var pendingExternal = List[WebExternalStart]()
-
- var start = 0L
- var end = 0L
-
- def receive = {
- case start: Start ⇒
- this.start = start.timestamp
- segments = segments :+ start
- name = start.name
- case finish: Finish ⇒
- end = finish.timestamp
- segments = segments :+ finish; finishTracing()
- case wes: WebExternalStart ⇒ pendingExternal = pendingExternal :+ wes
- case finish @ WebExternalFinish(id) ⇒ pendingExternal.find(_.id == id).map(start ⇒ {
- segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host)
- })
- case Rename(id, newName) ⇒ name = newName
- case segment: UowSegment ⇒ segments = segments :+ segment
- case ReceiveTimeout ⇒
- log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments)
- context.stop(self)
- }
-
- def finishTracing(): Unit = {
- reporting ! UowTrace(name, "", start, end, segments)
- context.stop(self)
- }
-}
-
-object UowTraceAggregator {
- def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout)
-}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala
deleted file mode 100644
index 3b32f3ac..00000000
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceAggregatorSpec.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.trace.instrumentation
-
-import org.scalatest.{ WordSpecLike, WordSpec }
-import akka.testkit.{ TestKitBase, TestKit }
-import akka.actor.ActorSystem
-import scala.concurrent.duration._
-import kamon.trace.UowTracing.{ Finish, Rename, Start }
-import kamon.trace.{ UowTrace, UowTraceAggregator }
-
-class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike {
-
- "a TraceAggregator" should {
- "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture {
- within(1 second) {
- aggregator ! Start(1, "/accounts")
- aggregator ! Finish(1)
-
- //expectMsg(UowTrace("UNKNOWN", Seq(Start(1, "/accounts"), Finish(1))))
- }
- }
-
- "change the uow name after receiving a Rename message" in new AggregatorFixture {
- within(1 second) {
- aggregator ! Start(1, "/accounts")
- aggregator ! Rename(1, "test-uow")
- aggregator ! Finish(1)
-
- //expectMsg(UowTrace("test-uow", Seq(Start(1, "/accounts"), Finish(1))))
- }
- }
- }
-
- trait AggregatorFixture {
- val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds))
- }
-}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
index 4082458c..a73f390a 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
@@ -26,7 +26,8 @@ import akka.pattern.pipe
import java.lang.management.ManagementFactory
import spray.client.pipelining._
import scala.util.control.NonFatal
-import kamon.newrelic.NewRelicMetric.{ Data, ID, MetricBatch }
+import spray.http.Uri.Query
+import kamon.newrelic.MetricTranslator.TimeSliceMetrics
class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging {
import context.dispatcher
@@ -43,12 +44,20 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with
AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt)
}
+ val baseQuery = Query(
+ "license_key" -> agentInfo.licenseKey,
+ "marshal_format" -> "json",
+ "protocol_version" -> "12")
+
+
def receive = {
- case Initialize(runId, collector) ⇒ context become reporting(runId, collector)
+ case Initialize(runId, collector) ⇒
+ log.info("Agent initialized with runID: [{}] and collector: [{}]", runId, collector)
+ context become reporting(runId, collector)
}
def reporting(runId: Long, collector: String): Receive = {
- case batch: MetricBatch ⇒ sendMetricData(runId, collector, batch.metrics)
+ case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics)
}
override def preStart(): Unit = {
@@ -68,34 +77,44 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with
}
import AgentJsonProtocol._
- val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive
+ val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = logRequest(println(_)) ~>encode(Deflate) ~> sendReceive ~> logResponse(println(_))
val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson
def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson
+
def selectCollector: Future[String] = {
+ val query = ("method" -> "get_redirect_host") +: baseQuery
+ val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query)
+
compressedToJsonPipeline {
- Post(s"http://collector.newrelic.com/agent_listener/invoke_raw_method?method=get_redirect_host&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", JsArray())
+ Post(getRedirectHostUri, JsArray())
+
} map { json ⇒
json.extract[String]('return_value)
}
}
def connect(collectorHost: String, connect: AgentInfo): Future[Long] = {
+ log.debug("Connecting to NewRelic Collector [{}]", collectorHost)
+
+ val query = ("method" -> "connect") +: baseQuery
+ val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query)
+
compressedToJsonPipeline {
- Post(s"http://$collectorHost/agent_listener/invoke_raw_method?method=connect&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", connect)
+ Post(connectUri, connect)
+
} map { json ⇒
json.extract[Long]('return_value / 'agent_run_id)
}
}
- def sendMetricData(runId: Long, collector: String, metrics: List[(ID, Data)]) = {
- log.info("Reporting this to NewRelic: " + metrics.mkString("\n"))
+ def sendMetricData(runId: Long, collector: String, metrics: TimeSliceMetrics) = {
+ val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery
+ val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query)
- val end = System.currentTimeMillis() / 1000L
- val start = end - 60
compressedPipeline {
- Post(s"http://$collector/agent_listener/invoke_raw_method?method=metric_data&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12&run_id=$runId", MetricData(runId, start, end, metrics))
+ Post(sendMetricDataUri, MetricData(runId, metrics))
}
}
@@ -108,5 +127,5 @@ object Agent {
case class CollectorSelection(return_value: String)
case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int)
- case class MetricData(runId: Long, start: Long, end: Long, metrics: List[(ID, Data)])
+ case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics)
} \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala
index da8199ab..ef556e11 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala
@@ -32,26 +32,23 @@ object AgentJsonProtocol extends DefaultJsonProtocol {
"pid" -> JsNumber(obj.pid)))
}
- import NewRelicMetric._
-
- implicit def listWriter[T: JsonWriter] = new JsonWriter[List[T]] {
- def write(list: List[T]) = JsArray(list.map(_.toJson))
+ implicit def seqWriter[T: JsonWriter] = new JsonWriter[Seq[T]] {
+ def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toList)
}
- implicit object MetricDetailWriter extends JsonWriter[(ID, Data)] {
- def write(obj: (ID, Data)): JsValue = {
- val (id, data) = obj
+ implicit object MetricDetailWriter extends JsonWriter[NewRelic.Metric] {
+ def write(obj: NewRelic.Metric): JsValue = {
JsArray(
JsObject(
- "name" -> JsString(id.name) // TODO Include scope
+ "name" -> JsString(obj.name) // TODO Include scope
),
JsArray(
- JsNumber(data.callCount),
- JsNumber(data.total),
- JsNumber(data.totalExclusive),
- JsNumber(data.min),
- JsNumber(data.max),
- JsNumber(data.sumOfSquares)))
+ JsNumber(obj.callCount),
+ JsNumber(obj.total),
+ JsNumber(obj.totalExclusive),
+ JsNumber(obj.min),
+ JsNumber(obj.max),
+ JsNumber(obj.sumOfSquares)))
}
}
@@ -59,8 +56,8 @@ object AgentJsonProtocol extends DefaultJsonProtocol {
def write(obj: MetricData): JsValue =
JsArray(
JsNumber(obj.runId),
- JsNumber(obj.start),
- JsNumber(obj.end),
- obj.metrics.toJson)
+ JsNumber(obj.timeSliceMetrics.from),
+ JsNumber(obj.timeSliceMetrics.to),
+ obj.timeSliceMetrics.metrics.toJson)
}
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala
deleted file mode 100644
index 34bb4f46..00000000
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.newrelic
-
-import akka.actor.Actor
-import kamon.trace.UowTrace
-import com.newrelic.api.agent.{ NewRelic ⇒ NRAgent }
-import kamon.trace.UowTracing.WebExternal
-
-class Apdex extends Actor {
- val t = 500
-
- var satisfied: Int = 0
- var tolerating: Int = 0
- var frustrated: Int = 0
-
- def receive = {
- case trace: UowTrace ⇒ recordTransaction(trace)
-
- }
-
- def clearStats: Unit = {
- satisfied = 0
- tolerating = 0
- frustrated = 0
- }
-
- def total: Int = satisfied + tolerating + frustrated
-
- def updateStats(sampleTime: Double): Unit = {
- if (sampleTime < t)
- satisfied += 1
- else if (sampleTime >= t && sampleTime <= 4 * t)
- tolerating += 1
- else
- frustrated += 1
- }
-
- def recordTransaction(uowTrace: UowTrace): Unit = {
- val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp) / 1E9)
-
- updateStats(time)
-
- NRAgent.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat)
- NRAgent.recordMetric("WebTransaction", time.toFloat)
- NRAgent.recordMetric("HttpDispatcher", time.toFloat)
-
- uowTrace.segments.collect { case we: WebExternal ⇒ we }.foreach { webExternalTrace ⇒
- val external = ((webExternalTrace.finish - webExternalTrace.start) / 1E9).toFloat
-
- NRAgent.recordMetric(s"External/${webExternalTrace.host}/http", external)
- NRAgent.recordMetric(s"External/${webExternalTrace.host}/all", external)
- NRAgent.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external)
- }
-
- val allExternals = uowTrace.segments.collect { case we: WebExternal ⇒ we } sortBy (_.timestamp)
-
- def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match {
- case Nil ⇒ accum
- case head :: tail ⇒
- if (head.start > lastEnd)
- measureExternal(accum + (head.finish - head.start), head.finish, tail)
- else
- measureExternal(accum + (head.finish - lastEnd), head.finish, tail)
- }
-
- val external = measureExternal(0, 0, allExternals) / 1E9
-
- NRAgent.recordMetric(s"External/all", external.toFloat)
- NRAgent.recordMetric(s"External/allWeb", external.toFloat)
-
- }
-}
-
-case object Flush
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala
index dc6f0868..39177e30 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Metric.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala
@@ -16,17 +16,23 @@
package kamon.newrelic
-case class Metric(name: String, scope: Option[String], callCount: Long, total: Double, totalExclusive: Double,
- min: Double, max: Double, sumOfSquares: Double) {
-
- def merge(that: Metric): Metric = {
- Metric(name, scope,
- callCount + that.callCount,
- total + that.total,
- totalExclusive + that.totalExclusive,
- math.min(min, that.min),
- math.max(max, that.max),
- sumOfSquares + that.sumOfSquares)
+import akka.actor.{Props, ActorRef, Actor}
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.newrelic.MetricTranslator.TimeSliceMetrics
+
+class MetricTranslator(receiver: ActorRef) extends Actor with WebTransactionMetrics {
+
+ def receive = {
+ case TickMetricSnapshot(from, to, metrics) =>
+ val allMetrics = collectWebTransactionMetrics(metrics)
+
+ receiver ! TimeSliceMetrics(from, to, allMetrics)
}
}
+
+object MetricTranslator {
+ case class TimeSliceMetrics(from: Long, to: Long, metrics: Seq[NewRelic.Metric])
+
+ def props(receiver: ActorRef): Props = Props(new MetricTranslator(receiver))
+}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
index 57be566c..ef2de343 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
@@ -16,97 +16,48 @@
package kamon.newrelic
import akka.actor._
-import scala.collection.mutable
-import kamon.Kamon
-import kamon.trace.{ UowTrace }
-import kamon.newrelic.NewRelicMetric.{ MetricBatch, FlushMetrics }
import scala.concurrent.duration._
+import kamon.Kamon
+import kamon.metrics.{TickMetricSnapshotBuffer, TraceMetrics, Metrics}
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import akka.actor
-class NewRelic extends ExtensionId[NewRelicExtension] {
- def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system)
-}
class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- val api: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic")
+ val manager: ActorRef = system.actorOf(Props[NewRelicManager], "kamon-newrelic")
+
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", manager, permanently = true)
}
class NewRelicManager extends Actor with ActorLogging {
- log.info("Registering the Kamon(NewRelic) extension")
-
- //Kamon(Trace)(context.system).api ! Trace.Register
+ log.info("Starting the Kamon(NewRelic) extension")
- val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics2], "web-transaction-metrics")
val agent = context.actorOf(Props[Agent], "agent")
-
- import context.dispatcher
- context.system.scheduler.schedule(1 minute, 1 minute) {
- webTransactionMetrics.tell(FlushMetrics, agent)
- }
+ val translator = context.actorOf(MetricTranslator.props(agent), "translator")
+ val buffer = context.actorOf(TickMetricSnapshotBuffer.props(1 minute, translator), "metric-buffer")
def receive = {
- case trace: UowTrace ⇒ webTransactionMetrics ! trace
- }
-}
-
-object NewRelicMetric {
- case class ID(name: String, scope: Option[String])
- case class Data(var callCount: Long, var total: Double, var totalExclusive: Double, var min: Double, var max: Double, var sumOfSquares: Double) {
- def record(value: Double): Unit = {
- if (value > max) max = value
- if (value < min) min = value
-
- total += value
- totalExclusive += value
- sumOfSquares += value * value
- callCount += 1
- }
- }
-
- object Data {
- def apply(): Data = Data(0, 0, 0, Double.MaxValue, 0, 0)
+ case tick: TickMetricSnapshot => buffer.forward(tick)
}
-
- case object FlushMetrics
- case class MetricBatch(metrics: List[(ID, Data)])
}
-class WebTransactionMetrics2 extends Actor with ActorLogging {
- val apdexT = 0.5D
- var metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data]
- var apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0)
-
- def receive = {
- case trace: UowTrace ⇒ updateStats(trace)
- case FlushMetrics ⇒ flush
- }
-
- def flush: Unit = {
- sender ! MetricBatch(metrics.toList :+ (NewRelicMetric.ID("Apdex", None), apdex))
- apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0)
- metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data]
- }
-
- def recordValue(metricID: NewRelicMetric.ID, value: Double): Unit = {
- metrics.getOrElseUpdate(metricID, NewRelicMetric.Data()).record(value)
- }
-
- def recordApdex(time: Double): Unit = {
- if (time <= apdexT)
- apdex.callCount += 1
- else if (time > apdexT && time <= (4 * apdexT))
- apdex.total += 1
- else
- apdex.totalExclusive += 1
- }
+object NewRelic extends ExtensionId[NewRelicExtension] with ExtensionIdProvider {
+ def lookup(): ExtensionId[_ <: actor.Extension] = NewRelic
+ def createExtension(system: ExtendedActorSystem): NewRelicExtension = new NewRelicExtension(system)
- def updateStats(trace: UowTrace): Unit = {
- // Basic Metrics
- val elapsedSeconds = trace.elapsed / 1E9D
+ case class Metric(name: String, scope: Option[String], callCount: Long, total: Double, totalExclusive: Double,
+ min: Double, max: Double, sumOfSquares: Double) {
+
+ def merge(that: Metric): Metric = {
+ Metric(name, scope,
+ callCount + that.callCount,
+ total + that.total,
+ totalExclusive + that.totalExclusive,
+ math.min(min, that.min),
+ math.max(max, that.max),
+ sumOfSquares + that.sumOfSquares)
+ }
- recordApdex(elapsedSeconds)
- recordValue(NewRelicMetric.ID("WebTransaction", None), elapsedSeconds)
- recordValue(NewRelicMetric.ID("HttpDispatcher", None), elapsedSeconds)
- recordValue(NewRelicMetric.ID("WebTransaction/Custom/" + trace.name, None), elapsedSeconds)
}
-}
+} \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
index 9f458bb5..65a98cb1 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
@@ -18,7 +18,7 @@ package kamon.newrelic
import akka.actor.Actor
import akka.event.Logging.Error
import akka.event.Logging.{ LoggerInitialized, InitializeLogger }
-import com.newrelic.api.agent.NewRelic
+import com.newrelic.api.agent.{NewRelic => NR}
import kamon.trace.TraceContextAware
class NewRelicErrorLogger extends Actor {
@@ -37,9 +37,9 @@ class NewRelicErrorLogger extends Actor {
}
if (error.cause == Error.NoCause) {
- NewRelic.noticeError(error.message.toString, params)
+ NR.noticeError(error.message.toString, params)
} else {
- NewRelic.noticeError(error.cause, params)
+ NR.noticeError(error.cause, params)
}
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala
deleted file mode 100644
index 4e3d0d8d..00000000
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.newrelic
-
-import akka.actor.Actor
-import kamon.trace.UowTrace
-import com.newrelic.api.agent.{ NewRelic ⇒ NR }
-import kamon.trace.UowTracing.WebExternal
-
-class NewRelicReporting extends Actor {
- def receive = {
- case trace: UowTrace ⇒ recordTransaction(trace)
- }
-
- def recordTransaction(uowTrace: UowTrace): Unit = {
- val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp) / 1E9)
-
- NR.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat)
- NR.recordMetric("WebTransaction", time.toFloat)
- NR.recordMetric("HttpDispatcher", time.toFloat)
-
- uowTrace.segments.collect { case we: WebExternal ⇒ we }.foreach { webExternalTrace ⇒
- val external = ((webExternalTrace.finish - webExternalTrace.start) / 1E9).toFloat
-
- NR.recordMetric(s"External/${webExternalTrace.host}/http", external)
- NR.recordMetric(s"External/${webExternalTrace.host}/all", external)
- NR.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external)
- }
-
- val allExternals = uowTrace.segments.collect { case we: WebExternal ⇒ we } sortBy (_.timestamp)
-
- def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match {
- case Nil ⇒ accum
- case head :: tail ⇒
- if (head.start > lastEnd)
- measureExternal(accum + (head.finish - head.start), head.finish, tail)
- else
- measureExternal(accum + (head.finish - lastEnd), head.finish, tail)
- }
-
- val external = measureExternal(0, 0, allExternals) / 1E9
-
- NR.recordMetric(s"External/all", external.toFloat)
- NR.recordMetric(s"External/allWeb", external.toFloat)
-
- }
-}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala
index e10e9271..7fe5793c 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/WebTransactionMetrics.scala
@@ -17,18 +17,17 @@
package kamon.newrelic
import kamon.metrics.{ TraceMetrics, MetricGroupSnapshot, MetricGroupIdentity }
+import kamon.metrics.TraceMetrics.ElapsedTime
-object WebTransactionMetrics {
- def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): List[Metric] = {
- metrics.collect {
+trait WebTransactionMetrics {
+ def collectWebTransactionMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]): List[NewRelic.Metric] = {
+ val webTransactionMetrics = metrics.collect {
case (TraceMetrics(name), groupSnapshot) ⇒
-
-
- groupSnapshot.metrics foreach {
- case (metricIdentity, snapshot) => println(s"[$name] - ${toNewRelicMetric(name, None, snapshot)}")
- }
+ groupSnapshot.metrics collect {
+ case (ElapsedTime, snapshot) => toNewRelicMetric("HttpDispatcher", None, snapshot)
+ }
}
- Nil
+ webTransactionMetrics.flatten.toList
}
}
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala
index f0b28d95..bf83b049 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/package.scala
@@ -22,7 +22,7 @@ import kamon.metrics.MetricSnapshot
package object newrelic {
- def toNewRelicMetric(name: String, scope: Option[String], snapshot: MetricSnapshot): Metric = {
+ def toNewRelicMetric(name: String, scope: Option[String], snapshot: MetricSnapshot): NewRelic.Metric = {
var total: Double = 0D
var sumOfSquares: Double = 0D
@@ -40,6 +40,6 @@ package object newrelic {
val scaledMin = snapshot.min / 1E9D
val scaledMax = snapshot.max / 1E9D
- Metric(name, scope, snapshot.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares)
+ NewRelic.Metric(name, scope, snapshot.numberOfMeasurements, total, total, scaledMin, scaledMax, sumOfSquares)
}
}
diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf
index f0698592..c490c6c2 100644
--- a/kamon-playground/src/main/resources/application.conf
+++ b/kamon-playground/src/main/resources/application.conf
@@ -1,6 +1,6 @@
akka {
loggers = [ "akka.event.slf4j.Slf4jLogger" ]
- loglevel = INFO
+ loglevel = DEBUG
extensions = ["kamon.newrelic.NewRelic"]
diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
index fb3d05d2..06d8795a 100644
--- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
+++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
@@ -111,7 +111,6 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
class PrintWhatever extends Actor {
def receive = {
- case tick: TickMetricSnapshot => WebTransactionMetrics.collectWebTransactionMetrics(tick.metrics)
case anything ⇒ println(anything)
}
}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
index 8fd84bfb..d3d18a1b 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestInstrumentationSpec.scala
@@ -16,28 +16,18 @@
package kamon.spray
import _root_.spray.httpx.RequestBuilding
-import _root_.spray.routing.SimpleRoutingApp
-import akka.testkit.{ TestKitBase, TestProbe, TestKit }
-import akka.actor.{ ActorRef, ActorSystem }
+import akka.testkit.{ TestProbe, TestKit }
+import akka.actor.ActorSystem
import org.scalatest.{ Matchers, WordSpecLike }
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import _root_.spray.client.pipelining._
-import akka.util.Timeout
-import kamon.trace.{ UowTrace }
import kamon.Kamon
import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }
import spray.http.HttpHeaders.RawHeader
import spray.http.{ HttpResponse, HttpRequest }
-import spray.http.HttpHeaders.Host
-import akka.io.{ Tcp, IO }
-import spray.can.Http
-import akka.io.Tcp.Bound
import kamon.metrics.{ TraceMetrics, Metrics }
-import kamon.metrics.TraceMetrics.TraceMetricSnapshot
import kamon.metrics.Subscriptions.TickMetricSnapshot
-class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with WordSpecLike with Matchers with RequestBuilding with ScalaFutures with PatienceConfiguration with TestServer {
+class ServerRequestInstrumentationSpec extends TestKit(ActorSystem("spec")) with WordSpecLike with Matchers with RequestBuilding
+ with ScalaFutures with PatienceConfiguration with TestServer {
"the spray server request tracing instrumentation" should {
"reply back with the same trace token header provided in the request" in {