diff options
Diffstat (limited to 'kamon-newrelic/src/test/scala/kamon')
-rw-r--r-- | kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala | 253 | ||||
-rw-r--r-- | kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala | 156 |
2 files changed, 314 insertions, 95 deletions
diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala index 8b61c241..7db9f2d0 100644 --- a/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/AgentSpec.scala @@ -16,20 +16,23 @@ package kamon.newrelic +import java.lang.management.ManagementFactory + import akka.actor.{ ActorRef, ActorSystem, Props } import akka.io.IO -import akka.testkit.TestActor.{ AutoPilot, KeepRunning } import akka.testkit._ import com.typesafe.config.ConfigFactory import kamon.AkkaExtensionSwap -import kamon.newrelic.MetricTranslator.TimeSliceMetrics import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } import spray.can.Http -import spray.http.{ HttpRequest, HttpResponse, _ } - -class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll { +import spray.http._ +import spray.httpx.encoding.Deflate +import spray.httpx.{ SprayJsonSupport, RequestBuilding } +import spray.json.JsArray +import spray.json._ - import kamon.newrelic.AgentSpec._ +class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll with RequestBuilding with SprayJsonSupport { + import JsonProtocol._ implicit lazy val system: ActorSystem = ActorSystem("Agent-Spec", ConfigFactory.parseString( """ @@ -39,120 +42,180 @@ class AgentSpec extends TestKitBase with WordSpecLike with BeforeAndAfterAll { |} |kamon { | newrelic { - | retry-delay = 1 second - | max-retry = 3 + | app-name = kamon + | license-key = 1111111111 + | initialize-retry-delay = 1 second + | max-initialize-retries = 3 | } |} | """.stripMargin)) - var agent: ActorRef = _ - - setupFakeHttpManager - - "the Newrelic Agent" should { - "try to connect upon creation, retry to connect if an error occurs" in { - EventFilter.info(message = "Initialization failed: Unexpected response from HTTP transport: None, retrying in 1 seconds", occurrences = 3).intercept { - system.actorOf(Props[Agent]) - Thread.sleep(1000) - } - } - - "when everything is fine should select a NewRelic collector" in { + "the New Relic Agent" should { + "try to establish a connection to the collector upon creation" in { + val httpManager = setHttpManager(TestProbe()) + val agent = system.actorOf(Props[Agent]) + + // Request NR for a collector + httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector.newrelic.com", "get_redirect_host"), JsArray()) + }) + + // Receive the assigned collector + httpManager.reply(jsonResponse( + """ + | { + | "return_value": "collector-8.newrelic.com" + | } + | """.stripMargin)) + + // Connect to the collector + val (host, pid) = getHostAndPid() + httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector-8.newrelic.com", "connect"), + s""" + | [ + | { + | "agent_version": "3.1.0", + | "app_name": [ "kamon" ], + | "host": "$host", + | "identifier": "java:kamon", + | "language": "java", + | "pid": $pid + | } + | ] + """.stripMargin.parseJson)(sprayJsonMarshaller(JsValueFormat)) + }) + + // Receive the runID EventFilter.info(message = "Agent initialized with runID: [161221111] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { - system.actorOf(Props[Agent]) + httpManager.reply(jsonResponse( + """ + | { + | "return_value": { + | "agent_run_id": 161221111 + | } + | } + | """.stripMargin)) } } - "merge the metrics if not possible send them and do it in the next post" in { - EventFilter.info(pattern = "Trying to send metrics to NewRelic collector, attempt.*", occurrences = 2).intercept { - agent = system.actorOf(Props[Agent].withDispatcher(CallingThreadDispatcher.Id)) + "retry the connection in case it fails" in { + val httpManager = setHttpManager(TestProbe()) + val agent = system.actorOf(Props[Agent]) - for (_ ← 1 to 3) { - sendDelayedMetric(agent) - } - } - } + // Request NR for a collector + val request = httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector.newrelic.com", "get_redirect_host"), JsArray()) + }) - "when the connection is re-established, the metrics should be send" in { - EventFilter.info(message = "Sending metrics to NewRelic collector", occurrences = 2).intercept { - sendDelayedMetric(agent) + // Fail the request. + EventFilter[RuntimeException](start = "Initialization failed, retrying in 1 seconds", occurrences = 1).intercept { + httpManager.reply(Timedout(request)) } + + // Request NR for a collector, second attempt + httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector.newrelic.com", "get_redirect_host"), JsArray()) + }) + + // Receive the assigned collector + httpManager.reply(jsonResponse( + """ + | { + | "return_value": "collector-8.newrelic.com" + | } + | """.stripMargin)) + + // Connect to the collector + val (host, pid) = getHostAndPid() + httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector-8.newrelic.com", "connect"), + s""" + | [ + | { + | "agent_version": "3.1.0", + | "app_name": [ "kamon" ], + | "host": "$host", + | "identifier": "java:kamon", + | "language": "java", + | "pid": $pid + | } + | ] + """.stripMargin.parseJson)(sprayJsonMarshaller(JsValueFormat)) + }) + + // Receive the runID + EventFilter.info( + message = "Agent initialized with runID: [161221112] and collector: [collector-8.newrelic.com]", occurrences = 1).intercept { + + httpManager.reply(jsonResponse( + """ + | { + | "return_value": { + | "agent_run_id": 161221112 + | } + | } + | """.stripMargin)) + } } - } - def setupFakeHttpManager: Unit = { - val ConnectionAttempts = 3 // an arbitrary value only for testing purposes - val PostAttempts = 3 // if the number is achieved, the metrics should be discarded - val fakeHttpManager = TestProbe() - var attemptsToConnect: Int = 0 // should retry grab an NewRelic collector after retry-delay - var attemptsToSendMetrics: Int = 0 - - fakeHttpManager.setAutoPilot(new TestActor.AutoPilot { - def run(sender: ActorRef, msg: Any): AutoPilot = { - msg match { - case HttpRequest(_, uri, _, _, _) if rawMethodIs("get_redirect_host", uri) ⇒ - if (attemptsToConnect == ConnectionAttempts) { - sender ! jsonResponse( - """ - | { - | "return_value": "collector-8.newrelic.com" - | } - | """.stripMargin) - system.log.info("Selecting Collector") - - } else { - sender ! None - attemptsToConnect += 1 - system.log.info("Network Error or Connection Refuse") - } - - case HttpRequest(_, uri, _, _, _) if rawMethodIs("connect", uri) ⇒ - sender ! jsonResponse( - """ - | { - | "return_value": { - | "agent_run_id": 161221111 - | } - | } - | """.stripMargin) - system.log.info("Connecting") - - case HttpRequest(_, uri, _, _, _) if rawMethodIs("metric_data", uri) ⇒ - if (attemptsToSendMetrics < PostAttempts) { - sender ! None - attemptsToSendMetrics += 1 - system.log.info("Error when trying to send metrics to NewRelic collector, the metrics will be merged") - } else { - system.log.info("Sending metrics to NewRelic collector") - } + "give up the connection after max-initialize-retries" in { + val httpManager = setHttpManager(TestProbe()) + val agent = system.actorOf(Props[Agent]) + + // First attempt and two retries + for (_ ← 1 to 3) { + + // Request NR for a collector + val request = httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector.newrelic.com", "get_redirect_host"), JsArray()) + }) + + // Fail the request. + EventFilter[RuntimeException](start = "Initialization failed, retrying in 1 seconds", occurrences = 1).intercept { + httpManager.reply(Timedout(request)) } - KeepRunning } - def jsonResponse(json: String): HttpResponse = { - HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, json)) - } + // Final retry. Request NR for a collector + val request = httpManager.expectMsg(Deflate.encode { + Post(rawMethodUri("collector.newrelic.com", "get_redirect_host"), JsArray()) + }) - def rawMethodIs(method: String, uri: Uri): Boolean = { - uri.query.get("method").filter(_ == method).isDefined + // Give up on connecting. + EventFilter[RuntimeException](message = "Giving up while trying to set up a connection with the New Relic collector.", occurrences = 1).intercept { + httpManager.reply(Timedout(request)) } - }) + } + } + def setHttpManager(probe: TestProbe): TestProbe = { AkkaExtensionSwap.swap(system, Http, new IO.Extension { - def manager: ActorRef = fakeHttpManager.ref + def manager: ActorRef = probe.ref }) + probe + } + + def rawMethodUri(host: String, methodName: String): Uri = { + Uri(s"http://$host/agent_listener/invoke_raw_method").withQuery( + "method" -> methodName, + "license_key" -> "1111111111", + "marshal_format" -> "json", + "protocol_version" -> "12") + } + + def jsonResponse(json: String): HttpResponse = { + HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, json)) } - override def afterAll() { - super.afterAll() - system.shutdown() + def getHostAndPid(): (String, String) = { + val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@') + (runtimeName(1), runtimeName(0)) } -} -object AgentSpec { - def sendDelayedMetric(agent: ActorRef, delay: Int = 1000): Unit = { - agent ! TimeSliceMetrics(100000L, 200000L, Map("Latency" -> NewRelic.Metric("Latency", None, 1000L, 2000D, 3000D, 1D, 100000D, 300D))) - Thread.sleep(delay) + implicit def JsValueFormat = new RootJsonFormat[JsValue] { + def write(value: JsValue) = value + def read(value: JsValue) = value } }
\ No newline at end of file diff --git a/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala new file mode 100644 index 00000000..3cf4bbd0 --- /dev/null +++ b/kamon-newrelic/src/test/scala/kamon/newrelic/MetricReporterSpec.scala @@ -0,0 +1,156 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.newrelic + +import akka.actor.{ ActorRef, ActorSystem } +import akka.io.IO +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import kamon.metric.{ TraceMetrics, Metrics } +import kamon.{ Kamon, AkkaExtensionSwap } +import kamon.metric.Subscriptions.TickMetricSnapshot +import org.scalatest.WordSpecLike +import spray.can.Http +import spray.http.Uri.Query +import spray.http._ +import spray.httpx.encoding.Deflate +import spray.httpx.{ RequestBuilding, SprayJsonSupport } +import scala.concurrent.duration._ + +class MetricReporterSpec extends TestKitBase with WordSpecLike with RequestBuilding with SprayJsonSupport { + import kamon.newrelic.JsonProtocol._ + + implicit lazy val system: ActorSystem = ActorSystem("metric-reporter-spec", ConfigFactory.parseString( + """ + |akka { + | loggers = ["akka.testkit.TestEventListener"] + | loglevel = "INFO" + |} + |kamon { + | metric { + | tick-interval = 1 hour + | } + |} + | + """.stripMargin)) + + val agentSettings = Agent.Settings("1111111111", "kamon", "test-host", 1, 1, 30 seconds, 1D) + val baseQuery = Query( + "license_key" -> agentSettings.licenseKey, + "marshal_format" -> "json", + "protocol_version" -> "12") + val baseCollectorUri = Uri("http://collector-1.newrelic.com/agent_listener/invoke_raw_method").withQuery(baseQuery) + + "the MetricReporter" should { + "report metrics to New Relic upon arrival" in new FakeTickSnapshotsFixture { + val httpManager = setHttpManager(TestProbe()) + val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri)) + + metricReporter ! firstSnapshot + httpManager.expectMsg(Deflate.encode { + HttpRequest(method = HttpMethods.POST, uri = rawMethodUri("collector-1.newrelic.com", "metric_data"), entity = compactJsonEntity( + s""" + |[9999,0,0, + |[ + | [{"name":"Apdex"},[3,0.0,0.0,1.0,1.0,0.0]], + | [{"name":"WebTransaction"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]], + | [{"name":"External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]], + | [{"name":"WebTransaction/Custom/example-trace"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]], + | [{"name":"HttpDispatcher"},[3,0.005996544,0.005996544,0.000999424,0.002998272,0.000013983876644864]], + | [{"name":"External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]] + |] + |] + """.stripMargin)) + }) + } + + "accumulate metrics if posting fails" in new FakeTickSnapshotsFixture { + val httpManager = setHttpManager(TestProbe()) + val metricReporter = system.actorOf(MetricReporter.props(agentSettings, 9999, baseCollectorUri)) + + metricReporter ! firstSnapshot + val request = httpManager.expectMsgType[HttpRequest] + httpManager.reply(Timedout(request)) + + metricReporter ! secondSnapshot + httpManager.expectMsg(Deflate.encode { + HttpRequest(method = HttpMethods.POST, uri = rawMethodUri("collector-1.newrelic.com", "metric_data"), entity = compactJsonEntity( + s""" + |[9999,0,0, + |[ + | [{"name":"Apdex"},[6,0.0,0.0,1.0,1.0,0.0]], + | [{"name":"WebTransaction"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]], + | [{"name": "External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]], + | [{"name":"WebTransaction/Custom/example-trace"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]], + | [{"name":"HttpDispatcher"},[6,0.02097152,0.02097152,0.000999424,0.005996544,0.000090731720998912]], + | [{"name": "External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]] + |] + |] + """.stripMargin)) + }) + } + } + /* + [9999, 0, 0, [ + [{"name": "Apdex"}, [6, 0.0, 0.0, 1.0, 1.0, 0.0]], + [{"name": "WebTransaction"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]], + [{"name": "External"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]], + [{"name": "WebTransaction/Custom/example-trace"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]], + [{"name": "HttpDispatcher"}, [6, 0.02097152, 0.02097152, 0.000999424, 0.005996544, 0.000090731720998912]], + [{"name": "External/allWeb"}, [0, 0.0, 0.0, 0.0, 0.0, 0.0]]]]*/ + + def setHttpManager(probe: TestProbe): TestProbe = { + AkkaExtensionSwap.swap(system, Http, new IO.Extension { + def manager: ActorRef = probe.ref + }) + probe + } + + def rawMethodUri(host: String, methodName: String): Uri = { + Uri(s"http://$host/agent_listener/invoke_raw_method").withQuery( + "method" -> methodName, + "run_id" -> "9999", + "license_key" -> "1111111111", + "marshal_format" -> "json", + "protocol_version" -> "12") + } + + def compactJsonEntity(jsonString: String): HttpEntity = { + import spray.json._ + + val compactJson = jsonString.parseJson.compactPrint + HttpEntity(ContentTypes.`application/json`, compactJson) + } + + trait FakeTickSnapshotsFixture { + val testTraceID = TraceMetrics("example-trace") + val recorder = Kamon(Metrics).register(testTraceID, TraceMetrics.Factory).get + val collectionContext = Kamon(Metrics).buildDefaultCollectionContext + + def collectRecorder = recorder.collect(collectionContext) + + recorder.elapsedTime.record(1000000) + recorder.elapsedTime.record(2000000) + recorder.elapsedTime.record(3000000) + val firstSnapshot = TickMetricSnapshot(1, 100, Map(testTraceID -> collectRecorder)) + + recorder.elapsedTime.record(6000000) + recorder.elapsedTime.record(5000000) + recorder.elapsedTime.record(4000000) + val secondSnapshot = TickMetricSnapshot(100, 200, Map(testTraceID -> collectRecorder)) + } +}
\ No newline at end of file |