From fc22c55f1c5caac4a4921855c30b966722ec8157 Mon Sep 17 00:00:00 2001 From: Diego Date: Sat, 18 Oct 2014 23:57:26 -0300 Subject: ! kamon-newrelic: * Avoid reporting data to Newrelic if no metrics have been collected * Implement error handling with NewRelic Agent * Minor refactor * close issue #7 and issue #17 --- kamon-newrelic/src/main/resources/reference.conf | 7 ++ .../src/main/scala/kamon/newrelic/Agent.scala | 128 ++++++++++++++------- .../scala/kamon/newrelic/MetricTranslator.scala | 6 +- .../src/main/scala/kamon/newrelic/NewRelic.scala | 37 +++--- .../scala/kamon/newrelic/NewRelicErrorLogger.scala | 42 +++---- 5 files changed, 136 insertions(+), 84 deletions(-) (limited to 'kamon-newrelic/src/main') diff --git a/kamon-newrelic/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf index 13aaca2f..059420f9 100644 --- a/kamon-newrelic/src/main/resources/reference.conf +++ b/kamon-newrelic/src/main/resources/reference.conf @@ -8,6 +8,13 @@ kamon { app-name = "Kamon[Development]" license-key = e7d350b14228f3d28f35bc3140df2c3e565ea5d5 + + # delay between connection attempts to NewRelic collector + retry-delay = 30 seconds + + # attempts to send pending metrics in the next tick, + # combining the current metrics plus the pending, after max-retry, deletes all pending metrics + max-retry = 3 } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 9c4075eb..25fbc9db 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -1,37 +1,46 @@ -/* =================================================== - * Copyright © 2013 the kamon project +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.newrelic +import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds } + import akka.actor.{ ActorLogging, Actor } +import akka.event.LoggingAdapter +import org.slf4j.LoggerFactory import spray.json._ -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation } import spray.httpx.encoding.Deflate import spray.http._ import spray.json.lenses.JsonLenses._ -import akka.pattern.pipe import java.lang.management.ManagementFactory import spray.client.pipelining._ -import scala.util.control.NonFatal +import scala.util.{ Failure, Success } import spray.http.Uri.Query import kamon.newrelic.MetricTranslator.TimeSliceMetrics +import scala.concurrent.duration._ class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging { + import context.dispatcher import Agent._ + import Retry._ + + self ! Initialize val agentInfo = { val config = context.system.settings.config.getConfig("kamon.newrelic") @@ -40,8 +49,10 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with // Name has the format of pid@host val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + val retryDelay = FiniteDuration(config.getDuration("retry-delay", milliseconds), milliseconds) + val maxRetry = config.getInt("max-retry") - AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt) + AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetry, retryDelay) } val baseQuery = Query( @@ -49,33 +60,36 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with "marshal_format" -> "json", "protocol_version" -> "12") - def receive = { - case Initialize(runId, collector) ⇒ - log.info("Agent initialized with runID: [{}] and collector: [{}]", runId, collector) - context become reporting(runId, collector) + def receive: Receive = uninitialized + + def uninitialized: Receive = { + case Initialize ⇒ { + connectToCollector onComplete { + case Success(agent) ⇒ { + log.info("Agent initialized with runID: [{}] and collector: [{}]", agent.runId, agent.collector) + context become reporting(agent.runId, agent.collector) + } + case Failure(reason) ⇒ self ! InitializationFailed(reason) + } + } + case InitializationFailed(reason) ⇒ { + log.info("Initialization failed: {}, retrying in {} seconds", reason.getMessage, agentInfo.retryDelay.toSeconds) + context.system.scheduler.scheduleOnce(agentInfo.retryDelay, self, Initialize) + } + case everythingElse ⇒ //ignore } def reporting(runId: Long, collector: String): Receive = { case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics) } - override def preStart(): Unit = { - super.preStart() - initialize - } - - def initialize: Unit = { - pipe({ - for ( - collector ← selectCollector; - runId ← connect(collector, agentInfo) - ) yield Initialize(runId, collector) - } recover { - case NonFatal(ex) ⇒ InitializationFailed(ex) - }) to self - } + def connectToCollector: Future[Initialized] = for { + collector ← selectCollector + runId ← connect(collector, agentInfo) + } yield Initialized(runId, collector) import AgentJsonProtocol._ + val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson @@ -111,19 +125,49 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query) - compressedPipeline { - Post(sendMetricDataUri, MetricData(runId, metrics)) + withMaxAttempts(agentInfo.maxRetry, metrics, log) { currentMetrics ⇒ + compressedPipeline { + log.info("Sending metrics to NewRelic collector") + Post(sendMetricDataUri, MetricData(runId, currentMetrics)) + } } } - } object Agent { - - case class Initialize(runId: Long, collector: String) + case class Initialize() + case class Initialized(runId: Long, collector: String) case class InitializationFailed(reason: Throwable) case class CollectorSelection(return_value: String) - case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int) - + case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int, maxRetry: Int = 0, retryDelay: FiniteDuration) case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics) +} + +object Retry { + + @volatile private var attempts: Int = 0 + @volatile private var pendingMetrics: Option[TimeSliceMetrics] = None + + def withMaxAttempts[T](maxRetry: Int, metrics: TimeSliceMetrics, log: LoggingAdapter)(block: TimeSliceMetrics ⇒ Future[T])(implicit executor: ExecutionContext): Unit = { + + val currentMetrics = metrics.merge(pendingMetrics) + + if (currentMetrics.metrics.nonEmpty) { + block(currentMetrics) onComplete { + case Success(_) ⇒ + pendingMetrics = None + attempts = 0 + case Failure(_) ⇒ + attempts += 1 + if (maxRetry > attempts) { + log.info("Trying to send metrics to NewRelic collector, attempt [{}] of [{}]", attempts, maxRetry) + pendingMetrics = Some(currentMetrics) + } else { + log.info("Max attempts achieved, proceeding to remove all pending metrics") + pendingMetrics = None + attempts = 0 + } + } + } + } } \ No newline at end of file diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala index a3bb6311..6313a2aa 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/MetricTranslator.scala @@ -35,7 +35,11 @@ class MetricTranslator(receiver: ActorRef) extends Actor } object MetricTranslator { - case class TimeSliceMetrics(from: Long, to: Long, metrics: Seq[NewRelic.Metric]) + case class TimeSliceMetrics(from: Long, to: Long, metrics: Seq[NewRelic.Metric]) { + def merge(thatMetrics: Option[TimeSliceMetrics]): TimeSliceMetrics = { + thatMetrics.map(that ⇒ TimeSliceMetrics(from + that.from, to + that.to, metrics ++ that.metrics)).getOrElse(this) + } + } def props(receiver: ActorRef): Props = Props(new MetricTranslator(receiver)) } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala index fd97b2c0..b270d228 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -1,28 +1,31 @@ -/* =================================================== - * Copyright © 2013 the kamon project +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.newrelic +import java.util.concurrent.TimeUnit.MILLISECONDS + +import akka.actor import akka.actor._ -import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } -import scala.concurrent.duration._ import kamon.Kamon -import kamon.metric.{ UserMetrics, TickMetricSnapshotBuffer, TraceMetrics, Metrics } import kamon.metric.Subscriptions.TickMetricSnapshot -import akka.actor -import java.util.concurrent.TimeUnit.MILLISECONDS +import kamon.metric.UserMetrics.{ UserCounters, UserGauges, UserHistograms, UserMinMaxCounters } +import kamon.metric.{ Metrics, TickMetricSnapshotBuffer, TraceMetrics } + +import scala.concurrent.duration._ class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config.getConfig("kamon.newrelic") diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 4203f81f..4bb0ad3a 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -1,23 +1,23 @@ -/* =================================================== - * Copyright © 2013 the kamon project +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.newrelic -import akka.actor.{ ActorLogging, Actor } -import akka.event.Logging.Error -import akka.event.Logging.{ LoggerInitialized, InitializeLogger } +import akka.actor.{ Actor, ActorLogging } +import akka.event.Logging.{ Error, InitializeLogger, LoggerInitialized } import com.newrelic.api.agent.{ NewRelic ⇒ NR } import kamon.trace.TraceContextAware @@ -33,16 +33,10 @@ class NewRelicErrorLogger extends Actor with ActorLogging { def notifyError(error: Error): Unit = { val params = new java.util.HashMap[String, String]() - if (error.isInstanceOf[TraceContextAware]) { - val ctx = error.asInstanceOf[TraceContextAware].traceContext + val ctx = error.asInstanceOf[TraceContextAware].traceContext - for (c ← ctx) { - params.put("TraceToken", c.token) - } - } else if (!aspectJMissingAlreadyReported) { - log.warning("ASPECTJ WEAVER MISSING. You might have missed to include the javaagent JVM startup parameter in" + - " your application. Please refer to http://kamon.io/get-started/ for instructions on how to do it.") - aspectJMissingAlreadyReported = true + for (c ← ctx) { + params.put("TraceToken", c.token) } if (error.cause == Error.NoCause) { -- cgit v1.2.3