aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego Parra <diegolparra@gmail.com>2015-05-28 13:25:55 -0300
committerDiego Parra <diegolparra@gmail.com>2015-05-28 13:25:55 -0300
commit57dd25e3afbfc2682b81c07161850104f32fd841 (patch)
tree85a081b82832948a2c05a95cd1355f900e813fc6
parent2a8e23ff1bda3c7fcd48eb21668de870122c9aa0 (diff)
parentb927d1440f793bce669c8dc5b796f333cbd8abe6 (diff)
downloadKamon-57dd25e3afbfc2682b81c07161850104f32fd841.tar.gz
Kamon-57dd25e3afbfc2682b81c07161850104f32fd841.tar.bz2
Kamon-57dd25e3afbfc2682b81c07161850104f32fd841.zip
Merge pull request #205 from jtjeferreira/fixSprayClientIntrumentationError
= spray: Fix segment finishing on errors
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala8
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala36
2 files changed, 40 insertions, 4 deletions
diff --git a/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala
index 4c408567..1950787d 100644
--- a/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala
@@ -24,7 +24,7 @@ import spray.http._
import spray.http.HttpHeaders.RawHeader
import kamon.trace._
import kamon.spray.{ ClientInstrumentationLevel, Spray }
-import akka.actor.ActorRef
+import akka.actor.{ ActorRef, Status }
import scala.concurrent.{ Future, ExecutionContext }
import akka.util.Timeout
@@ -87,7 +87,7 @@ class ClientRequestInstrumentation {
def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = {
if (requestContext.traceContext.nonEmpty) {
Tracer.withContext(requestContext.traceContext) {
- if (message.isInstanceOf[HttpMessageEnd])
+ if (message.isInstanceOf[HttpMessageEnd] || message.isInstanceOf[Status.Failure])
requestContext.asInstanceOf[SegmentAware].segment.finish()
pjp.proceed()
@@ -124,7 +124,7 @@ class ClientRequestInstrumentation {
request.asInstanceOf[SegmentAware].segment = segment
val responseFuture = originalSendReceive.apply(request)
- responseFuture.map(result ⇒ segment.finish())(SameThreadExecutionContext)
+ responseFuture.onComplete(_ ⇒ segment.finish())(SameThreadExecutionContext)
responseFuture
} getOrElse originalSendReceive.apply(request)
@@ -148,4 +148,4 @@ class ClientRequestInstrumentation {
pjp.proceed(Array[AnyRef](request, modifiedHeaders))
}
-} \ No newline at end of file
+}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
index b66d54c3..2279469c 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestInstrumentationSpec.scala
@@ -17,6 +17,7 @@
package kamon.spray
import akka.testkit.TestProbe
+import akka.actor.Status
import kamon.testkit.BaseKamonSpec
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Millis, Seconds, Span }
@@ -138,6 +139,41 @@ class ClientRequestInstrumentationSpec extends BaseKamonSpec("client-request-ins
segmentMetricsSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
}
+ "start and finish a segment in case of error" in {
+ enableAutomaticTraceTokenPropagation()
+ enablePipeliningSegmentCollectionStrategy()
+
+ val transport = TestProbe()
+ val (_, _, bound) = buildSHostConnectorAndServer
+
+ // Initiate a request within the context of a trace
+ val (testContext, responseFuture) = Tracer.withContext(newContext("assign-name-to-segment-with-request-level-api")) {
+ val rF = sendReceive(transport.ref)(ec, 10.seconds) {
+ Get(s"http://${bound.localAddress.getHostName}:${bound.localAddress.getPort}/request-level-api-segment")
+ }
+
+ (Tracer.currentContext, rF)
+ }
+
+ // Receive the request and reply back
+ transport.expectMsgType[HttpRequest]
+ transport.reply(Status.Failure(new Exception("An Error Ocurred")))
+ responseFuture.failed.futureValue.getMessage() should be("An Error Ocurred")
+
+ testContext.finish()
+
+ val traceMetricsSnapshot = takeSnapshotOf("assign-name-to-segment-with-request-level-api", "trace")
+ traceMetricsSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+
+ val segmentMetricsSnapshot = takeSnapshotOf("request-level /request-level-api-segment", "trace-segment",
+ tags = Map(
+ "trace" -> "assign-name-to-segment-with-request-level-api",
+ "category" -> SegmentCategory.HttpClient,
+ "library" -> Spray.SegmentLibraryName))
+
+ segmentMetricsSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+ }
+
"rename a request level api segment once it reaches the relevant host connector" in {
enableAutomaticTraceTokenPropagation()
enablePipeliningSegmentCollectionStrategy()