diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-11-03 23:27:57 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-11-03 23:27:57 +0100 |
commit | 59faf588080b137817444a6877170e2bd687427f (patch) | |
tree | a5f1cadf6c21915938d435e1a9d01583475d0c6a /kamon-newrelic/src | |
parent | 4b999c39b6bd09d891de718fad10b795264755c6 (diff) | |
parent | 6e3d9ae88ecce10420eeac82294c54c1b43dedf4 (diff) | |
download | Kamon-59faf588080b137817444a6877170e2bd687427f.tar.gz Kamon-59faf588080b137817444a6877170e2bd687427f.tar.bz2 Kamon-59faf588080b137817444a6877170e2bd687427f.zip |
Merge branch 'master' into release-0.2, kamon-play still need fixes.
Conflicts:
kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala
kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
project/Dependencies.scala
Diffstat (limited to 'kamon-newrelic/src')
7 files changed, 241 insertions, 117 deletions
diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf index 13aaca2f..059420f9 100644 --- a/kamon-newrelic/src/main/resources/reference.conf +++ b/kamon-newrelic/src/main/resources/reference.conf @@ -8,6 +8,13 @@ kamon { app-name = "Kamon[Development]" license-key = e7d350b14228f3d28f35bc3140df2c3e565ea5d5 + + # delay between connection attempts to NewRelic collector + retry-delay = 30 seconds + + # attempts to send pending metrics in the next tick, + # combining the current metrics plus the pending, after max-retry, deletes all pending metrics + max-retry = 3 } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 299773e4..bca02582 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -1,37 +1,46 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.newrelic +import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds } + import akka.actor.{ ActorLogging, Actor } +import akka.event.LoggingAdapter +import org.slf4j.LoggerFactory import spray.json._ -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation } import spray.httpx.encoding.Deflate import spray.http._ import spray.json.lenses.JsonLenses._ -import akka.pattern.pipe import java.lang.management.ManagementFactory import spray.client.pipelining._ -import scala.util.control.NonFatal +import scala.util.{ Failure, Success } import spray.http.Uri.Query import kamon.newrelic.MetricTranslator.TimeSliceMetrics +import scala.concurrent.duration._ class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging { + import context.dispatcher import Agent._ + import Retry._ + + self ! Initialize val agentInfo = { val config = context.system.settings.config.getConfig("kamon.newrelic") @@ -40,8 +49,10 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with // Name has the format of pid@host val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + val retryDelay = FiniteDuration(config.getDuration("retry-delay", milliseconds), milliseconds) + val maxRetry = config.getInt("max-retry") - AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt) + AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetry, retryDelay) } val baseQuery = Query( @@ -49,33 +60,36 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with "marshal_format" -> "json", "protocol_version" -> "12") - def receive = { - case Initialize(runId, collector) ⇒ - log.info("Agent initialized with runID: [{}] and collector: [{}]", runId, collector) - context become reporting(runId, collector) + def receive: Receive = uninitialized + + def uninitialized: Receive = { + case Initialize ⇒ { + connectToCollector onComplete { + case Success(agent) ⇒ { + log.info("Agent initialized with runID: [{}] and collector: [{}]", agent.runId, agent.collector) + context become reporting(agent.runId, agent.collector) + } + case Failure(reason) ⇒ self ! InitializationFailed(reason) + } + } + case InitializationFailed(reason) ⇒ { + log.info("Initialization failed: {}, retrying in {} seconds", reason.getMessage, agentInfo.retryDelay.toSeconds) + context.system.scheduler.scheduleOnce(agentInfo.retryDelay, self, Initialize) + } + case everythingElse ⇒ //ignore } def reporting(runId: Long, collector: String): Receive = { case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics) } - override def preStart(): Unit = { - super.preStart() - initialize - } - - def initialize: Unit = { - pipe({ - for ( - collector ← selectCollector; - runId ← connect(collector, agentInfo) - ) yield Initialize(runId, collector) - } recover { - case NonFatal(ex) ⇒ InitializationFailed(ex) - }) to self - } + def connectToCollector: Future[Initialized] = for { + collector ← selectCollector + runId ← connect(collector, agentInfo) + } yield Initialized(runId, collector) import AgentJsonProtocol._ + val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson @@ -111,19 +125,49 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query) - compressedPipeline { - Post(sendMetricDataUri, MetricData(runId, metrics)) + withMaxAttempts(agentInfo.maxRetry, metrics, log) { currentMetrics ⇒ + compressedPipeline { + log.info("Sending metrics to NewRelic collector") + Post(sendMetricDataUri, MetricData(runId, currentMetrics)) + } } } - } object Agent { - - case class Initialize(runId: Long, collector: String) + case class Initialize() + case class Initialized(runId: Long, collector: String) case class InitializationFailed(reason: Throwable) case class CollectorSelection(return_value: String) - case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int) - + case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int, maxRetry: Int = 0, retryDelay: FiniteDuration) case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics) +} + +object Retry { + + @volatile private var attempts: Int = 0 + @volatile private var pendingMetrics: Option[TimeSliceMetrics] = None + + def withMaxAttempts[T](maxRetry: Int, metrics: TimeSliceMetrics, log: LoggingAdapter)(block: TimeSliceMetrics ⇒ Future[T])(implicit executor: ExecutionContext): Unit = { + + val currentMetrics = metrics.merge(pendingMetrics) + + if (currentMetrics.metrics.nonEmpty) { + block(currentMetrics) onComplete { + case Success(_) ⇒ + pendingMetrics = None + attempts = 0 + case Failure(_) ⇒ + attempts += 1 + if (maxRetry > attempts) { + log.info("Trying to send metrics to NewRelic collector, attempt [{}] of [{}]", attempts, maxRetry) + pendingMetrics = Some(currentMetrics) + } else { + log.info("Max attempts achieved, proceeding to remove all pending metrics") + pendingMetrics = None + attempts = 0 + } + } + } + } }
\ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala index ef556e11..9b3e6dea 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala @@ -33,7 +33,7 @@ object AgentJsonProtocol extends DefaultJsonProtocol { } implicit def seqWriter[T: JsonWriter] = new JsonWriter[Seq[T]] { - def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toList) + def write(seq: Seq[T]) = JsArray(seq.map(_.toJson).toVector) } implicit object MetricDetailWriter extends JsonWriter[NewRelic.Metric] { @@ -58,6 +58,6 @@ object AgentJsonProtocol extends DefaultJsonProtocol { JsNumber(obj.runId), JsNumber(obj.timeSliceMetrics.from), JsNumber(obj.timeSliceMetrics.to), - obj.timeSliceMetrics.metrics.toJson) + obj.timeSliceMetrics.metrics.values.toSeq.toJson) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala index a3bb6311..5fa571e1 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala @@ -28,14 +28,21 @@ class MetricTranslator(receiver: ActorRef) extends Actor val fromInSeconds = (from / 1E3).toInt val toInSeconds = (to / 1E3).toInt val allMetrics = collectWebTransactionMetrics(metrics) ++ collectCustomMetrics(metrics) + val groupedMetrics: Map[String, NewRelic.Metric] = allMetrics.map(metric ⇒ metric.name -> metric)(collection.breakOut) // avoid intermediate tuple - receiver ! TimeSliceMetrics(fromInSeconds, toInSeconds, allMetrics) + receiver ! TimeSliceMetrics(fromInSeconds, toInSeconds, groupedMetrics) } } object MetricTranslator { - case class TimeSliceMetrics(from: Long, to: Long, metrics: Seq[NewRelic.Metric]) + case class TimeSliceMetrics(from: Long, to: Long, metrics: Map[String, NewRelic.Metric]) { + import kamon.metric._ + + def merge(thatMetrics: Option[TimeSliceMetrics]): TimeSliceMetrics = { + thatMetrics.map(that ⇒ TimeSliceMetrics(from + that.from, to + that.to, combineMaps(metrics, that.metrics)((l, r) ⇒ l.merge(r)))).getOrElse(this) + } + } def props(receiver: ActorRef): Props = Props(new MetricTranslator(receiver)) } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala index bdac5298..92e673ee 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -1,27 +1,28 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.newrelic import akka.actor._ -import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } -import scala.concurrent.duration._ import kamon.Kamon -import kamon.metric.{ UserMetrics, TickMetricSnapshotBuffer, TraceMetrics, Metrics } import kamon.metric.Subscriptions.TickMetricSnapshot -import akka.actor +import kamon.metric.UserMetrics.{ UserCounters, UserGauges, UserHistograms, UserMinMaxCounters } +import kamon.metric.{ Metrics, TickMetricSnapshotBuffer, TraceMetrics } + +import scala.concurrent.duration._ class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config.getConfig("kamon.newrelic") diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 4203f81f..08b5df99 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -1,23 +1,23 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.newrelic -import akka.actor.{ ActorLogging, Actor } -import akka.event.Logging.Error -import akka.event.Logging.{ LoggerInitialized, InitializeLogger } +import akka.actor.{ Actor, ActorLogging } +import akka.event.Logging.{ Error, InitializeLogger, LoggerInitialized } import com.newrelic.api.agent.{ NewRelic ⇒ NR } import kamon.trace.TraceContextAware @@ -33,17 +33,8 @@ class NewRelicErrorLogger extends Actor with ActorLogging { def notifyError(error: Error): Unit = { val params = new java.util.HashMap[String, String]() - if (error.isInstanceOf[TraceContextAware]) { - val ctx = error.asInstanceOf[TraceContextAware].traceContext - - for (c ← ctx) { - params.put("TraceToken", c.token) - } - } else if (!aspectJMissingAlreadyReported) { - log.warning("ASPECTJ WEAVER MISSING. You might have missed to include the javaagent JVM startup parameter in" + - " your application. Please refer to http://kamon.io/get-started/ for instructions on how to do it.") - aspectJMissingAlreadyReported = true - } + val ctx = error.asInstanceOf[TraceContextAware].traceContext + params.put("TraceToken", ctx.token) if (error.cause == Error.NoCause) { NR.noticeError(error.message.toString, params) diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala index 28dcde79..8b61c241 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala @@ -1,57 +1,111 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.newrelic -import akka.testkit.{ TestActor, TestProbe, TestKit } -import akka.actor.{ Props, ActorRef, ActorSystem } -import org.scalatest.WordSpecLike +import akka.actor.{ ActorRef, ActorSystem, Props } +import akka.io.IO +import akka.testkit.TestActor.{ AutoPilot, KeepRunning } +import akka.testkit._ +import com.typesafe.config.ConfigFactory import kamon.AkkaExtensionSwap +import kamon.newrelic.MetricTranslator.TimeSliceMetrics +import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } import spray.can.Http -import akka.io.IO -import akka.testkit.TestActor.{ KeepRunning, AutoPilot } -import spray.http._ -import spray.http.HttpRequest -import spray.http.HttpResponse +import spray.http.{ HttpRequest, HttpResponse, _ } + +class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll { -class AgentSpec extends TestKit(ActorSystem("agent-spec")) with WordSpecLike { + import kamon.newrelic.AgentSpec._ + + implicit lazy val system: ActorSystem = ActorSystem("Agent-Spec", ConfigFactory.parseString( + """ + |akka { + | loggers = ["akka.testkit.TestEventListener"] + | loglevel = "INFO" + |} + |kamon { + | newrelic { + | retry-delay = 1 second + | max-retry = 3 + | } + |} + | + """.stripMargin)) + + var agent: ActorRef = _ setupFakeHttpManager "the Newrelic Agent" should { - "try to connect upon creation" in { - val agent = system.actorOf(Props[Agent]) + "try to connect upon creation, retry to connect if an error occurs" in { + EventFilter.info(message = "Initialization failed: Unexpected response from HTTP transport: None, retrying in 1 seconds", occurrences = 3).intercept { + system.actorOf(Props[Agent]) + Thread.sleep(1000) + } + } + + "when everything is fine should select a NewRelic collector" in { + EventFilter.info(message = "Agent initialized with runID: [161221111] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { + system.actorOf(Props[Agent]) + } + } + + "merge the metrics if not possible send them and do it in the next post" in { + EventFilter.info(pattern = "Trying to send metrics to NewRelic collector, attempt.*", occurrences = 2).intercept { + agent = system.actorOf(Props[Agent].withDispatcher(CallingThreadDispatcher.Id)) + + for (_ ← 1 to 3) { + sendDelayedMetric(agent) + } + } + } - Thread.sleep(5000) + "when the connection is re-established, the metrics should be send" in { + EventFilter.info(message = "Sending metrics to NewRelic collector", occurrences = 2).intercept { + sendDelayedMetric(agent) + } } } def setupFakeHttpManager: Unit = { + val ConnectionAttempts = 3 // an arbitrary value only for testing purposes + val PostAttempts = 3 // if the number is achieved, the metrics should be discarded val fakeHttpManager = TestProbe() + var attemptsToConnect: Int = 0 // should retry grab an NewRelic collector after retry-delay + var attemptsToSendMetrics: Int = 0 + fakeHttpManager.setAutoPilot(new TestActor.AutoPilot { def run(sender: ActorRef, msg: Any): AutoPilot = { msg match { case HttpRequest(_, uri, _, _, _) if rawMethodIs("get_redirect_host", uri) ⇒ - sender ! jsonResponse( - """ + if (attemptsToConnect == ConnectionAttempts) { + sender ! jsonResponse( + """ | { | "return_value": "collector-8.newrelic.com" | } | """.stripMargin) + system.log.info("Selecting Collector") - println("Selecting Collector") + } else { + sender ! None + attemptsToConnect += 1 + system.log.info("Network Error or Connection Refuse") + } case HttpRequest(_, uri, _, _, _) if rawMethodIs("connect", uri) ⇒ sender ! jsonResponse( @@ -62,9 +116,17 @@ class AgentSpec extends TestKit(ActorSystem("agent-spec")) with WordSpecLike { | } | } | """.stripMargin) - println("Connecting") - } + system.log.info("Connecting") + case HttpRequest(_, uri, _, _, _) if rawMethodIs("metric_data", uri) ⇒ + if (attemptsToSendMetrics < PostAttempts) { + sender ! None + attemptsToSendMetrics += 1 + system.log.info("Error when trying to send metrics to NewRelic collector, the metrics will be merged") + } else { + system.log.info("Sending metrics to NewRelic collector") + } + } KeepRunning } @@ -81,4 +143,16 @@ class AgentSpec extends TestKit(ActorSystem("agent-spec")) with WordSpecLike { def manager: ActorRef = fakeHttpManager.ref }) } + + override def afterAll() { + super.afterAll() + system.shutdown() + } } + +object AgentSpec { + def sendDelayedMetric(agent: ActorRef, delay: Int = 1000): Unit = { + agent ! TimeSliceMetrics(100000L, 200000L, Map("Latency" -> NewRelic.Metric("Latency", None, 1000L, 2000D, 3000D, 1D, 100000D, 300D))) + Thread.sleep(delay) + } +}
\ No newline at end of file |