diff options
Diffstat (limited to 'kamon-spray/src/main/scala')
-rw-r--r-- | kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala | 4 | ||||
-rw-r--r-- | kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala (renamed from kamon-spray/src/main/scala/kamon/spray/Spray.scala) | 61 | ||||
-rw-r--r-- | kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala | 35 | ||||
-rw-r--r-- | kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala (renamed from kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala) | 29 | ||||
-rw-r--r-- | kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala (renamed from kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala) | 38 |
5 files changed, 104 insertions, 63 deletions
diff --git a/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala index e98b63d9..4eefee95 100644 --- a/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala +++ b/kamon-spray/src/main/scala/kamon/spray/KamonTraceDirectives.scala @@ -17,11 +17,11 @@ package kamon.spray import spray.routing.directives.BasicDirectives import spray.routing._ -import kamon.trace.TraceRecorder +import kamon.trace.TraceContext trait KamonTraceDirectives extends BasicDirectives { def traceName(name: String): Directive0 = mapRequest { req ⇒ - TraceRecorder.rename(name) + TraceContext.currentContext.rename(name) req } } diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala index ab8d6a7d..3df8d972 100644 --- a/kamon-spray/src/main/scala/kamon/spray/Spray.scala +++ b/kamon-spray/src/main/scala/kamon/spray/SprayExtension.scala @@ -18,47 +18,49 @@ package kamon.spray import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } import akka.actor +import akka.event.{ Logging, LoggingAdapter } import kamon.Kamon import kamon.http.HttpServerMetrics -import kamon.metric.Metrics +import kamon.metric.{ Entity, Metrics } import spray.http.HttpHeaders.Host import spray.http.HttpRequest object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = Spray - def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtension(system) + def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtensionImpl(system) val SegmentLibraryName = "spray-client" } -object ClientSegmentCollectionStrategy { - sealed trait Strategy - case object Pipelining extends Strategy - case object Internal extends Strategy +trait SprayExtension extends Kamon.Extension { + def settings: SprayExtensionSettings + def log: LoggingAdapter + def httpServerMetrics: HttpServerMetrics + def generateTraceName(request: HttpRequest): String + def generateRequestLevelApiSegmentName(request: HttpRequest): String + def generateHostLevelApiSegmentName(request: HttpRequest): String } -class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { - private val config = system.settings.config.getConfig("kamon.spray") +class SprayExtensionImpl(system: ExtendedActorSystem) extends SprayExtension { + val settings = SprayExtensionSettings(system) + val log = Logging(system, "SprayExtension") - val includeTraceToken: Boolean = config.getBoolean("automatic-trace-token-propagation") - val traceTokenHeaderName: String = config.getString("trace-token-header-name") - val httpServerMetrics = Kamon(Metrics)(system).register(HttpServerMetrics, HttpServerMetrics.Factory).get - // It's safe to assume that HttpServerMetrics will always exist because there is no particular filter for it. + val httpServerMetrics = { + val metricsExtension = Metrics.get(system) + val factory = metricsExtension.instrumentFactory(HttpServerMetrics.category) + val entity = Entity("spray-server", HttpServerMetrics.category) - private val nameGeneratorFQN = config.getString("name-generator") - private val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems. + Metrics.get(system).register(entity, new HttpServerMetrics(factory)).recorder + } - val clientSegmentCollectionStrategy: ClientSegmentCollectionStrategy.Strategy = - config.getString("client.segment-collection-strategy") match { - case "pipelining" ⇒ ClientSegmentCollectionStrategy.Pipelining - case "internal" ⇒ ClientSegmentCollectionStrategy.Internal - case other ⇒ throw new IllegalArgumentException(s"Configured segment-collection-strategy [$other] is invalid, " + - s"only pipelining and internal are valid options.") - } + def generateTraceName(request: HttpRequest): String = + settings.nameGenerator.generateTraceName(request) - def generateTraceName(request: HttpRequest): String = nameGenerator.generateTraceName(request) - def generateRequestLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateRequestLevelApiSegmentName(request) - def generateHostLevelApiSegmentName(request: HttpRequest): String = nameGenerator.generateHostLevelApiSegmentName(request) + def generateRequestLevelApiSegmentName(request: HttpRequest): String = + settings.nameGenerator.generateRequestLevelApiSegmentName(request) + + def generateHostLevelApiSegmentName(request: HttpRequest): String = + settings.nameGenerator.generateHostLevelApiSegmentName(request) } trait SprayNameGenerator { @@ -68,14 +70,19 @@ trait SprayNameGenerator { } class DefaultSprayNameGenerator extends SprayNameGenerator { - def hostFromHeaders(request: HttpRequest): Option[String] = request.header[Host].map(_.host) def generateRequestLevelApiSegmentName(request: HttpRequest): String = { val uriAddress = request.uri.authority.host.address if (uriAddress.equals("")) hostFromHeaders(request).getOrElse("unknown-host") else uriAddress } - def generateHostLevelApiSegmentName(request: HttpRequest): String = hostFromHeaders(request).getOrElse("unknown-host") + def generateHostLevelApiSegmentName(request: HttpRequest): String = + hostFromHeaders(request).getOrElse("unknown-host") + + def generateTraceName(request: HttpRequest): String = + request.method.value + ": " + request.uri.path + + private def hostFromHeaders(request: HttpRequest): Option[String] = + request.header[Host].map(_.host) - def generateTraceName(request: HttpRequest): String = request.method.value + ": " + request.uri.path } diff --git a/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala b/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala new file mode 100644 index 00000000..44c71eaf --- /dev/null +++ b/kamon-spray/src/main/scala/kamon/spray/SprayExtensionSettings.scala @@ -0,0 +1,35 @@ +package kamon.spray + +import akka.actor.ExtendedActorSystem + +case class SprayExtensionSettings( + includeTraceTokenHeader: Boolean, + traceTokenHeaderName: String, + nameGenerator: SprayNameGenerator, + clientInstrumentationLevel: ClientInstrumentationLevel.Level) + +object SprayExtensionSettings { + def apply(system: ExtendedActorSystem): SprayExtensionSettings = { + val config = system.settings.config.getConfig("kamon.spray") + + val includeTraceTokenHeader: Boolean = config.getBoolean("automatic-trace-token-propagation") + val traceTokenHeaderName: String = config.getString("trace-token-header-name") + + val nameGeneratorFQN = config.getString("name-generator") + val nameGenerator: SprayNameGenerator = system.dynamicAccess.createInstanceFor[SprayNameGenerator](nameGeneratorFQN, Nil).get // let's bubble up any problems. + + val clientInstrumentationLevel: ClientInstrumentationLevel.Level = config.getString("client.instrumentation-level") match { + case "request-level" ⇒ ClientInstrumentationLevel.RequestLevelAPI + case "host-level" ⇒ ClientInstrumentationLevel.HostLevelAPI + case other ⇒ sys.error(s"Invalid client instrumentation level [$other] found in configuration.") + } + + SprayExtensionSettings(includeTraceTokenHeader, traceTokenHeaderName, nameGenerator, clientInstrumentationLevel) + } +} + +object ClientInstrumentationLevel { + sealed trait Level + case object RequestLevelAPI extends Level + case object HostLevelAPI extends Level +} diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala index 813915c4..fa9063ad 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ClientRequestInstrumentation.scala @@ -21,8 +21,7 @@ import org.aspectj.lang.ProceedingJoinPoint import spray.http._ import spray.http.HttpHeaders.RawHeader import kamon.trace._ -import kamon.Kamon -import kamon.spray.{ ClientSegmentCollectionStrategy, Spray } +import kamon.spray.{ ClientInstrumentationLevel, Spray } import akka.actor.ActorRef import scala.concurrent.{ Future, ExecutionContext } import akka.util.Timeout @@ -47,10 +46,10 @@ class ClientRequestInstrumentation { // This read to requestContext.traceContext takes care of initializing the aspect timely. requestContext.traceContext - TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - val sprayExtension = Kamon(Spray)(system) + TraceContext.map { ctx ⇒ + val sprayExtension = ctx.lookupExtension(Spray) - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Internal) { + if (sprayExtension.settings.clientInstrumentationLevel == ClientInstrumentationLevel.HostLevelAPI) { if (requestContext.segment.isEmpty) { val clientRequestName = sprayExtension.generateHostLevelApiSegmentName(request) val segment = ctx.startSegment(clientRequestName, SegmentCategory.HttpClient, Spray.SegmentLibraryName) @@ -74,7 +73,7 @@ class ClientRequestInstrumentation { @Around("copyingRequestContext(old)") def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TraceContextAware): Any = { - TraceRecorder.withInlineTraceContextReplacement(old.traceContext) { + TraceContext.withContext(old.traceContext) { pjp.proceed() } } @@ -85,7 +84,7 @@ class ClientRequestInstrumentation { @Around("dispatchToCommander(requestContext, message)") def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TraceContextAware, message: Any): Any = { if (requestContext.traceContext.nonEmpty) { - TraceRecorder.withInlineTraceContextReplacement(requestContext.traceContext) { + TraceContext.withContext(requestContext.traceContext) { if (message.isInstanceOf[HttpMessageEnd]) requestContext.asInstanceOf[SegmentAware].segment.finish() @@ -112,10 +111,10 @@ class ClientRequestInstrumentation { val originalSendReceive = pjp.proceed().asInstanceOf[HttpRequest ⇒ Future[HttpResponse]] (request: HttpRequest) ⇒ { - TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - val sprayExtension = Kamon(Spray)(system) + TraceContext.map { ctx ⇒ + val sprayExtension = ctx.lookupExtension(Spray) val segment = - if (sprayExtension.clientSegmentCollectionStrategy == ClientSegmentCollectionStrategy.Pipelining) + if (sprayExtension.settings.clientInstrumentationLevel == ClientInstrumentationLevel.RequestLevelAPI) ctx.startSegment(sprayExtension.generateRequestLevelApiSegmentName(request), SegmentCategory.HttpClient, Spray.SegmentLibraryName) else EmptyTraceContext.EmptySegment @@ -139,10 +138,10 @@ class ClientRequestInstrumentation { @Around("includingDefaultHeadersAtHttpHostConnector(request, defaultHeaders)") def aroundIncludingDefaultHeadersAtHttpHostConnector(pjp: ProceedingJoinPoint, request: HttpMessage, defaultHeaders: List[HttpHeader]): Any = { - val modifiedHeaders = TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - val sprayExtension = Kamon(Spray)(system) - if (sprayExtension.includeTraceToken) - RawHeader(sprayExtension.traceTokenHeaderName, ctx.token) :: defaultHeaders + val modifiedHeaders = TraceContext.map { ctx ⇒ + val sprayExtension = ctx.lookupExtension(Spray) + if (sprayExtension.settings.includeTraceTokenHeader) + RawHeader(sprayExtension.settings.traceTokenHeaderName, ctx.token) :: defaultHeaders else defaultHeaders @@ -150,4 +149,4 @@ class ClientRequestInstrumentation { pjp.proceed(Array[AnyRef](request, modifiedHeaders)) } -} +}
\ No newline at end of file diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala index 1ae4ad80..73287132 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/kamon/spray/instrumentation/ServerRequestInstrumentation.scala @@ -13,14 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. * ========================================================== */ -package spray.can.server +package spray.can.server.instrumentation import kamon.trace.TraceLocal.{ HttpContext, HttpContextKey } import org.aspectj.lang.annotation._ import kamon.trace._ import akka.actor.ActorSystem +import spray.can.server.OpenRequest import spray.http.{ HttpResponse, HttpMessagePartWrapper, HttpRequest } -import akka.event.Logging.Warning import kamon.Kamon import kamon.spray.{ SprayExtension, Spray } import org.aspectj.lang.ProceedingJoinPoint @@ -40,14 +40,16 @@ class ServerRequestInstrumentation { @After("openRequestInit(openRequest, request)") def afterInit(openRequest: TraceContextAware, request: HttpRequest): Unit = { val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system + val tracer = Tracer.get(system) val sprayExtension = Kamon(Spray)(system) val defaultTraceName = sprayExtension.generateTraceName(request) - val token = if (sprayExtension.includeTraceToken) { - request.headers.find(_.name == sprayExtension.traceTokenHeaderName).map(_.value) + val token = if (sprayExtension.settings.includeTraceTokenHeader) { + request.headers.find(_.name == sprayExtension.settings.traceTokenHeaderName).map(_.value) } else None - TraceRecorder.start(defaultTraceName, token)(system) + val newContext = token.map(customToken ⇒ tracer.newContext(defaultTraceName, customToken)) getOrElse (tracer.newContext(defaultTraceName)) + TraceContext.setCurrentContext(newContext) // Necessary to force initialization of traceContext when initiating the request. openRequest.traceContext @@ -58,7 +60,7 @@ class ServerRequestInstrumentation { @After("openNewRequest()") def afterOpenNewRequest(): Unit = { - TraceRecorder.clearContext + TraceContext.clearCurrentContext } @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest) && args(response)") @@ -66,26 +68,24 @@ class ServerRequestInstrumentation { @Around("openRequestCreation(openRequest, response)") def afterFinishingRequest(pjp: ProceedingJoinPoint, openRequest: TraceContextAware, response: HttpMessagePartWrapper): Any = { - val incomingContext = TraceRecorder.currentContext + val incomingContext = TraceContext.currentContext val storedContext = openRequest.traceContext // The stored context is always a DefaultTraceContext if the instrumentation is running - val system = storedContext.system - - verifyTraceContextConsistency(incomingContext, storedContext, system) + verifyTraceContextConsistency(incomingContext, storedContext) if (incomingContext.isEmpty) pjp.proceed() else { - val sprayExtension = Kamon(Spray)(system) + val sprayExtension = incomingContext.lookupExtension(Spray) - val proceedResult = if (sprayExtension.includeTraceToken) { - val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.traceTokenHeaderName, incomingContext.token) + val proceedResult = if (sprayExtension.settings.includeTraceTokenHeader) { + val responseWithHeader = includeTraceTokenIfPossible(response, sprayExtension.settings.traceTokenHeaderName, incomingContext.token) pjp.proceed(Array(openRequest, responseWithHeader)) } else pjp.proceed - TraceRecorder.finish() + TraceContext.currentContext.finish() recordHttpServerMetrics(response, incomingContext.name, sprayExtension) @@ -96,15 +96,15 @@ class ServerRequestInstrumentation { } } - def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext, system: ActorSystem): Unit = { - def publishWarning(text: String, system: ActorSystem): Unit = - system.eventStream.publish(Warning("ServerRequestInstrumentation", classOf[ServerRequestInstrumentation], text)) + def verifyTraceContextConsistency(incomingTraceContext: TraceContext, storedTraceContext: TraceContext): Unit = { + def publishWarning(text: String): Unit = + storedTraceContext.lookupExtension(Spray).log.warning(text) if (incomingTraceContext.nonEmpty) { if (incomingTraceContext.token != storedTraceContext.token) - publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]", system) + publishWarning(s"Different trace token found when trying to close a trace, original: [${storedTraceContext.token}] - incoming: [${incomingTraceContext.token}]") } else - publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]", system) + publishWarning(s"EmptyTraceContext present while closing the trace with token [${storedTraceContext.token}]") } def recordHttpServerMetrics(response: HttpMessagePartWrapper, traceName: String, sprayExtension: SprayExtension): Unit = |