aboutsummaryrefslogtreecommitdiff
path: root/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala')
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala128
1 files changed, 86 insertions, 42 deletions
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
index 9c4075eb..25fbc9db 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
@@ -1,37 +1,46 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.newrelic
+import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds }
+
import akka.actor.{ ActorLogging, Actor }
+import akka.event.LoggingAdapter
+import org.slf4j.LoggerFactory
import spray.json._
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation }
import spray.httpx.encoding.Deflate
import spray.http._
import spray.json.lenses.JsonLenses._
-import akka.pattern.pipe
import java.lang.management.ManagementFactory
import spray.client.pipelining._
-import scala.util.control.NonFatal
+import scala.util.{ Failure, Success }
import spray.http.Uri.Query
import kamon.newrelic.MetricTranslator.TimeSliceMetrics
+import scala.concurrent.duration._
class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging {
+
import context.dispatcher
import Agent._
+ import Retry._
+
+ self ! Initialize
val agentInfo = {
val config = context.system.settings.config.getConfig("kamon.newrelic")
@@ -40,8 +49,10 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with
// Name has the format of pid@host
val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')
+ val retryDelay = FiniteDuration(config.getDuration("retry-delay", milliseconds), milliseconds)
+ val maxRetry = config.getInt("max-retry")
- AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt)
+ AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetry, retryDelay)
}
val baseQuery = Query(
@@ -49,33 +60,36 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with
"marshal_format" -> "json",
"protocol_version" -> "12")
- def receive = {
- case Initialize(runId, collector) ⇒
- log.info("Agent initialized with runID: [{}] and collector: [{}]", runId, collector)
- context become reporting(runId, collector)
+ def receive: Receive = uninitialized
+
+ def uninitialized: Receive = {
+ case Initialize ⇒ {
+ connectToCollector onComplete {
+ case Success(agent) ⇒ {
+ log.info("Agent initialized with runID: [{}] and collector: [{}]", agent.runId, agent.collector)
+ context become reporting(agent.runId, agent.collector)
+ }
+ case Failure(reason) ⇒ self ! InitializationFailed(reason)
+ }
+ }
+ case InitializationFailed(reason) ⇒ {
+ log.info("Initialization failed: {}, retrying in {} seconds", reason.getMessage, agentInfo.retryDelay.toSeconds)
+ context.system.scheduler.scheduleOnce(agentInfo.retryDelay, self, Initialize)
+ }
+ case everythingElse ⇒ //ignore
}
def reporting(runId: Long, collector: String): Receive = {
case metrics: TimeSliceMetrics ⇒ sendMetricData(runId, collector, metrics)
}
- override def preStart(): Unit = {
- super.preStart()
- initialize
- }
-
- def initialize: Unit = {
- pipe({
- for (
- collector ← selectCollector;
- runId ← connect(collector, agentInfo)
- ) yield Initialize(runId, collector)
- } recover {
- case NonFatal(ex) ⇒ InitializationFailed(ex)
- }) to self
- }
+ def connectToCollector: Future[Initialized] = for {
+ collector ← selectCollector
+ runId ← connect(collector, agentInfo)
+ } yield Initialized(runId, collector)
import AgentJsonProtocol._
+
val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive
val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson
@@ -111,19 +125,49 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with
val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery
val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query)
- compressedPipeline {
- Post(sendMetricDataUri, MetricData(runId, metrics))
+ withMaxAttempts(agentInfo.maxRetry, metrics, log) { currentMetrics ⇒
+ compressedPipeline {
+ log.info("Sending metrics to NewRelic collector")
+ Post(sendMetricDataUri, MetricData(runId, currentMetrics))
+ }
}
}
-
}
object Agent {
-
- case class Initialize(runId: Long, collector: String)
+ case class Initialize()
+ case class Initialized(runId: Long, collector: String)
case class InitializationFailed(reason: Throwable)
case class CollectorSelection(return_value: String)
- case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int)
-
+ case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int, maxRetry: Int = 0, retryDelay: FiniteDuration)
case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics)
+}
+
+object Retry {
+
+ @volatile private var attempts: Int = 0
+ @volatile private var pendingMetrics: Option[TimeSliceMetrics] = None
+
+ def withMaxAttempts[T](maxRetry: Int, metrics: TimeSliceMetrics, log: LoggingAdapter)(block: TimeSliceMetrics ⇒ Future[T])(implicit executor: ExecutionContext): Unit = {
+
+ val currentMetrics = metrics.merge(pendingMetrics)
+
+ if (currentMetrics.metrics.nonEmpty) {
+ block(currentMetrics) onComplete {
+ case Success(_) ⇒
+ pendingMetrics = None
+ attempts = 0
+ case Failure(_) ⇒
+ attempts += 1
+ if (maxRetry > attempts) {
+ log.info("Trying to send metrics to NewRelic collector, attempt [{}] of [{}]", attempts, maxRetry)
+ pendingMetrics = Some(currentMetrics)
+ } else {
+ log.info("Max attempts achieved, proceeding to remove all pending metrics")
+ pendingMetrics = None
+ attempts = 0
+ }
+ }
+ }
+ }
} \ No newline at end of file