diff options
author | Diego <diegolparra@gmail.com> | 2014-10-18 23:57:26 -0300 |
---|---|---|
committer | Diego <diegolparra@gmail.com> | 2014-10-18 23:57:26 -0300 |
commit | fc22c55f1c5caac4a4921855c30b966722ec8157 (patch) | |
tree | b3791628007315c645e05c2974940c2b08c5b3bd /kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala | |
parent | 0f4847445b31a2a76897f7405512a58fb4d4a1dd (diff) | |
download | Kamon-fc22c55f1c5caac4a4921855c30b966722ec8157.tar.gz Kamon-fc22c55f1c5caac4a4921855c30b966722ec8157.tar.bz2 Kamon-fc22c55f1c5caac4a4921855c30b966722ec8157.zip |
! kamon-newrelic: * Avoid reporting data to Newrelic if no metrics have been collected
* Implement error handling with NewRelic Agent * Minor refactor * close issue #7 and issue #17
Diffstat (limited to 'kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala')
-rw-r--r-- | kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala | 128 |
1 files changed, 86 insertions, 42 deletions
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 9c4075eb..25fbc9db 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -1,37 +1,46 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.newrelic +import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds } + import akka.actor.{ ActorLogging, Actor } +import akka.event.LoggingAdapter +import org.slf4j.LoggerFactory import spray.json._ -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContext, Future } import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation } import spray.httpx.encoding.Deflate import spray.http._ import spray.json.lenses.JsonLenses._ -import akka.pattern.pipe import java.lang.management.ManagementFactory import spray.client.pipelining._ -import scala.util.control.NonFatal +import scala.util.{ Failure, Success } import spray.http.Uri.Query import kamon.newrelic.MetricTranslator.TimeSliceMetrics +import scala.concurrent.duration._ class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging { + import context.dispatcher import Agent._ + import Retry._ + + self ! Initialize val agentInfo = { val config = context.system.settings.config.getConfig("kamon.newrelic") @@ -40,8 +49,10 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with // Name has the format of pid@host val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + val retryDelay = FiniteDuration(config.getDuration("retry-delay", milliseconds), milliseconds) + val maxRetry = config.getInt("max-retry") - AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt) + AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetry, retryDelay) } val baseQuery = Query( @@ -49,33 +60,36 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with "marshal_format" -> "json", "protocol_version" -> "12") - def receive = { - case Initialize(runId, collector) ⇒ - log.info("Agent initialized with runID: [{}] and collector: [{}]", runId, collector) - context become reporting(runId, collector) + def receive: Receive = uninitialized + + def uninitialized: Receive = { + case Initialize ⇒ { + connectToCollector onComplete { + case Success(agent) ⇒ { + log.info("Agent initialized with runID: [{}] and collector: [{}]", agent.runId, agent.collector) + context become reporting(agent.runId, agent.collector) + } + case Failure(reason) ⇒ self ! InitializationFailed(reason) + } + } + case InitializationFailed(reason) ⇒ { + log.info("Initialization failed: {}, retrying in {} seconds", reason.getMessage, agentInfo.retryDelay.toSeconds) + context.system.scheduler.scheduleOnce(agentInfo.retryDelay, self, Initialize) + } + case everythingElse ⇒ //ignore } def reporting(runId: Long, collector: String): Receive = { case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics) } - override def preStart(): Unit = { - super.preStart() - initialize - } - - def initialize: Unit = { - pipe({ - for ( - collector ← selectCollector; - runId ← connect(collector, agentInfo) - ) yield Initialize(runId, collector) - } recover { - case NonFatal(ex) ⇒ InitializationFailed(ex) - }) to self - } + def connectToCollector: Future[Initialized] = for { + collector ← selectCollector + runId ← connect(collector, agentInfo) + } yield Initialized(runId, collector) import AgentJsonProtocol._ + val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson @@ -111,19 +125,49 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query) - compressedPipeline { - Post(sendMetricDataUri, MetricData(runId, metrics)) + withMaxAttempts(agentInfo.maxRetry, metrics, log) { currentMetrics ⇒ + compressedPipeline { + log.info("Sending metrics to NewRelic collector") + Post(sendMetricDataUri, MetricData(runId, currentMetrics)) + } } } - } object Agent { - - case class Initialize(runId: Long, collector: String) + case class Initialize() + case class Initialized(runId: Long, collector: String) case class InitializationFailed(reason: Throwable) case class CollectorSelection(return_value: String) - case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int) - + case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int, maxRetry: Int = 0, retryDelay: FiniteDuration) case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics) +} + +object Retry { + + @volatile private var attempts: Int = 0 + @volatile private var pendingMetrics: Option[TimeSliceMetrics] = None + + def withMaxAttempts[T](maxRetry: Int, metrics: TimeSliceMetrics, log: LoggingAdapter)(block: TimeSliceMetrics ⇒ Future[T])(implicit executor: ExecutionContext): Unit = { + + val currentMetrics = metrics.merge(pendingMetrics) + + if (currentMetrics.metrics.nonEmpty) { + block(currentMetrics) onComplete { + case Success(_) ⇒ + pendingMetrics = None + attempts = 0 + case Failure(_) ⇒ + attempts += 1 + if (maxRetry > attempts) { + log.info("Trying to send metrics to NewRelic collector, attempt [{}] of [{}]", attempts, maxRetry) + pendingMetrics = Some(currentMetrics) + } else { + log.info("Max attempts achieved, proceeding to remove all pending metrics") + pendingMetrics = None + attempts = 0 + } + } + } + } }
\ No newline at end of file |