From 05bc848cc504b6825c7dcc49dd9aac0cd02e895c Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 29 Aug 2018 22:03:43 -0700 Subject: Add tracing to client HTTP transport and improve tracing tags --- src/main/scala/xyz/driver/core/app/init.scala | 9 +-- .../scala/xyz/driver/core/init/AkkaBootable.scala | 2 +- src/main/scala/xyz/driver/core/init/HttpApi.scala | 32 +++++---- .../xyz/driver/core/reporting/GoogleReporter.scala | 37 ++++++++-- .../scala/xyz/driver/core/reporting/Reporter.scala | 34 ++++++--- .../driver/core/reporting/ScalaLoggerLike.scala | 13 ++-- .../core/rest/HttpRestServiceTransport.scala | 80 +++++++++++++--------- .../xyz/driver/core/rest/headers/Traceparent.scala | 10 +-- src/main/scala/xyz/driver/core/rest/package.scala | 4 +- .../driver/core/rest/serviceRequestContext.scala | 13 ++++ 10 files changed, 157 insertions(+), 77 deletions(-) diff --git a/src/main/scala/xyz/driver/core/app/init.scala b/src/main/scala/xyz/driver/core/app/init.scala index b638fd3..767fd0b 100644 --- a/src/main/scala/xyz/driver/core/app/init.scala +++ b/src/main/scala/xyz/driver/core/app/init.scala @@ -10,6 +10,7 @@ import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.Logger import org.slf4j.LoggerFactory import xyz.driver.core.logging.MdcExecutionContext +import xyz.driver.core.reporting.{NoTraceReporter, ScalaLoggerLike} import xyz.driver.core.time.provider.TimeProvider import xyz.driver.tracing.{GoogleTracer, NoTracer, Tracer} @@ -25,7 +26,7 @@ object init { val gitHeadCommit: scala.Option[String] } - case class ApplicationContext(config: Config, clock: Clock, log: Logger) { + case class ApplicationContext(config: Config, clock: Clock, reporter: ScalaLoggerLike) { val time: TimeProvider = clock } @@ -63,7 +64,7 @@ object init { serviceAccountFile = serviceAccountKeyFile )(actorSystem, materializer) } else { - applicationContext.log.warn(s"Tracing file $serviceAccountKeyFile was not found, using NoTracer!") + applicationContext.reporter.logger.warn(s"Tracing file $serviceAccountKeyFile was not found, using NoTracer!") NoTracer } } @@ -89,7 +90,7 @@ object init { ApplicationContext( config = getEnvironmentSpecificConfig(), clock = Clock.systemUTC(), - log = Logger(LoggerFactory.getLogger(classOf[DriverApp]))) + new NoTraceReporter(Logger(LoggerFactory.getLogger(classOf[DriverApp])))) def createDefaultApplication( modules: Seq[Module], @@ -107,7 +108,7 @@ object init { buildInfo.gitHeadCommit.getOrElse("None"), modules = modules, context.time, - context.log, + context.reporter, context.config, interface = "0.0.0.0", baseUrl, diff --git a/src/main/scala/xyz/driver/core/init/AkkaBootable.scala b/src/main/scala/xyz/driver/core/init/AkkaBootable.scala index 6a28fe8..8aa8de9 100644 --- a/src/main/scala/xyz/driver/core/init/AkkaBootable.scala +++ b/src/main/scala/xyz/driver/core/init/AkkaBootable.scala @@ -107,7 +107,7 @@ trait AkkaBootable { applicationVersion = version.getOrElse(""), actorSystem = system, executionContext = executionContext, - log = reporter + reporter = reporter ) // utilities diff --git a/src/main/scala/xyz/driver/core/init/HttpApi.scala b/src/main/scala/xyz/driver/core/init/HttpApi.scala index a2312ce..e7d975a 100644 --- a/src/main/scala/xyz/driver/core/init/HttpApi.scala +++ b/src/main/scala/xyz/driver/core/init/HttpApi.scala @@ -9,7 +9,6 @@ import xyz.driver.core.rest.Swagger import xyz.driver.core.rest.directives.Directives import akka.http.scaladsl.model.headers._ import xyz.driver.core.reporting.Reporter.CausalRelation -import xyz.driver.core.reporting.SpanContext import xyz.driver.core.rest.headers.Traceparent import scala.collection.JavaConverters._ @@ -58,22 +57,27 @@ trait HttpApi extends CloudServices with Directives with SprayJsonSupport { self private def traced(inner: Route): Route = (ctx: RequestContext) => { val tags = Map( - "service_name" -> name, - "service_version" -> version.getOrElse(""), - "http_path" -> ctx.request.uri.path.toString, - "http_method" -> ctx.request.method.value.toString, - "http_uri" -> ctx.request.uri.toString, - "http_user_agent" -> ctx.request.header[`User-Agent`].map(_.value).getOrElse("") + "service.version" -> version.getOrElse(""), + // open tracing semantic tags + "span.kind" -> "server", + "service" -> name, + "http.url" -> ctx.request.uri.toString, + "http.method" -> ctx.request.method.value, + "peer.hostname" -> ctx.request.uri.authority.host.toString, + // google's tracing console provides extra search features if we define these tags + "/http/path" -> ctx.request.uri.path.toString, + "/http/method" -> ctx.request.method.value.toString, + "/http/url" -> ctx.request.uri.toString, + "/http/user_agent" -> ctx.request.header[`User-Agent`].map(_.value).getOrElse("") ) - val parent = ctx.request.header[Traceparent].map { p => - SpanContext(p.traceId, p.spanId) -> CausalRelation.Child + val parent = ctx.request.header[Traceparent].map { header => + header.spanContext -> CausalRelation.Child } reporter - .traceWithOptionalParentAsync(s"${ctx.request.method.value.toLowerCase}_${ctx.request.uri.path}", tags, parent) { - sctx => - val header = Traceparent(sctx.traceId, sctx.spanId) - val withHeader = ctx.withRequest(ctx.request.withHeaders(header)) - inner(withHeader) + .traceWithOptionalParentAsync(s"http_handle_rpc", tags, parent) { spanContext => + val header = Traceparent(spanContext) + val withHeader = ctx.withRequest(ctx.request.withHeaders(header)) + inner(withHeader) } } diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala index 2f889f5..d4d20a4 100644 --- a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala +++ b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala @@ -10,6 +10,7 @@ import akka.stream.{Materializer, OverflowStrategy} import com.google.auth.oauth2.ServiceAccountCredentials import com.softwaremill.sttp._ import com.typesafe.scalalogging.Logger +import org.slf4j.MDC import spray.json.DerivedJsonProtocol._ import spray.json._ import xyz.driver.core.reporting.Reporter.CausalRelation @@ -17,7 +18,7 @@ import xyz.driver.core.reporting.Reporter.CausalRelation import scala.async.Async._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} -import scala.util.Random +import scala.util.{Failure, Random, Success, Try} import scala.util.control.NonFatal /** A reporter that collects traces and submits them to @@ -117,7 +118,8 @@ class GoogleReporter( TruncatableString(displayName), Instant.now(), Instant.now(), - Attributes(attributes ++ Map("namespace" -> namespace)) + Attributes(attributes ++ Map("service.namespace" -> namespace)), + Failure(new IllegalStateException("span status not set")) ) def traceWithOptionalParent[A]( @@ -145,12 +147,20 @@ class GoogleReporter( } val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) val result = operation(child) - result.onComplete { _ => + result.onComplete { result => span.endTime = Instant.now() + span.status = result submit(span) } result } + + override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( + implicit ctx: SpanContext): Unit = { + MDC.put("trace", s"projects/${credentials.getProjectId}/traces/${ctx.traceId}") + super.log(severity, message, reason) + } + } object GoogleReporter { @@ -167,7 +177,8 @@ object GoogleReporter { displayName: TruncatableString, startTime: Instant, var endTime: Instant, - attributes: Attributes + var attributes: Attributes, + var status: Try[_] ) private case class Spans(spans: Seq[Span]) @@ -185,9 +196,25 @@ object GoogleReporter { } } + private implicit val statusFormat = new RootJsonFormat[Try[_]] { + override def read(json: JsValue): Try[_] = sys.error("unimplemented") + override def write(obj: Try[_]) = { + // error codes defined at https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto + val (code, message) = obj match { + case Success(_) => (0, "success") + case Failure(_) => (2, "failure") + } + JsObject( + "code" -> code.toJson, + "message" -> message.toJson, + "details" -> JsArray() + ) + } + } + private implicit val attributeFormat: RootJsonFormat[Attributes] = jsonFormat1(Attributes) private implicit val truncatableStringFormat: RootJsonFormat[TruncatableString] = jsonFormat1(TruncatableString) - private implicit val spanFormat: RootJsonFormat[Span] = jsonFormat7(Span) + private implicit val spanFormat: RootJsonFormat[Span] = jsonFormat8(Span) private implicit val spansFormat: RootJsonFormat[Spans] = jsonFormat1(Spans) } diff --git a/src/main/scala/xyz/driver/core/reporting/Reporter.scala b/src/main/scala/xyz/driver/core/reporting/Reporter.scala index 9649ada..57e2310 100644 --- a/src/main/scala/xyz/driver/core/reporting/Reporter.scala +++ b/src/main/scala/xyz/driver/core/reporting/Reporter.scala @@ -2,7 +2,7 @@ package xyz.driver.core.reporting import com.typesafe.scalalogging.Logger import org.slf4j.helpers.NOPLogger -import xyz.driver.core.reporting.Reporter.CausalRelation +import xyz.driver.core.reporting.Reporter.{CausalRelation, Severity} import scala.concurrent.Future @@ -128,20 +128,28 @@ trait Reporter { Some(ctx -> relation) )(op) + /** Log a message. */ + def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit + /** Log a debug message. */ - def debug(message: String)(implicit ctx: SpanContext): Unit + def debug(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, None) + def debug(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + log(Severity.Debug, message, Some(reason)) /** Log an informational message. */ - def info(message: String)(implicit ctx: SpanContext): Unit + def info(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, None) + def info(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + log(Severity.Informational, message, Some(reason)) /** Log a warning message. */ - def warn(message: String)(implicit ctx: SpanContext): Unit - - /** Log a error message. */ - def error(message: String)(implicit ctx: SpanContext): Unit + def warn(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, None) + def warn(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + log(Severity.Warning, message, Some(reason)) - /** Log a error message with an associated throwable that caused the error condition. */ - def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit + /** Log an error message. */ + def error(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, None) + def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + log(Severity.Error, message, Some(reason)) } @@ -164,4 +172,12 @@ object Reporter { case object Follows extends CausalRelation } + sealed trait Severity + object Severity { + case object Debug extends Severity + case object Informational extends Severity + case object Warning extends Severity + case object Error extends Severity + } + } diff --git a/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala b/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala index c1131fb..eda81fb 100644 --- a/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala +++ b/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala @@ -5,12 +5,13 @@ trait ScalaLoggerLike extends Reporter { def logger: Logger - override def debug(message: String)(implicit ctx: SpanContext): Unit = logger.debug(message) - override def info(message: String)(implicit ctx: SpanContext): Unit = logger.info(message) - override def warn(message: String)(implicit ctx: SpanContext): Unit = logger.warn(message) - override def error(message: String)(implicit ctx: SpanContext): Unit = logger.error(message) - override def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = - logger.error(message, reason) + override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( + implicit ctx: SpanContext): Unit = severity match { + case Reporter.Severity.Debug => logger.debug(message, reason.orNull) + case Reporter.Severity.Informational => logger.info(message, reason.orNull) + case Reporter.Severity.Warning => logger.warn(message, reason.orNull) + case Reporter.Severity.Error => logger.error(message, reason.orNull) + } } diff --git a/src/main/scala/xyz/driver/core/rest/HttpRestServiceTransport.scala b/src/main/scala/xyz/driver/core/rest/HttpRestServiceTransport.scala index c3b6bff..e60998f 100644 --- a/src/main/scala/xyz/driver/core/rest/HttpRestServiceTransport.scala +++ b/src/main/scala/xyz/driver/core/rest/HttpRestServiceTransport.scala @@ -6,9 +6,9 @@ import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.Materializer import akka.stream.scaladsl.TcpIdleTimeoutException -import com.typesafe.scalalogging.Logger import org.slf4j.MDC import xyz.driver.core.Name +import xyz.driver.core.reporting.Reporter import xyz.driver.core.rest.errors.{ExternalServiceException, ExternalServiceTimeoutException} import scala.concurrent.{ExecutionContext, Future} @@ -19,7 +19,7 @@ class HttpRestServiceTransport( applicationVersion: String, actorSystem: ActorSystem, executionContext: ExecutionContext, - log: Logger) + reporter: Reporter) extends ServiceTransport { protected implicit val execution: ExecutionContext = executionContext @@ -27,42 +27,58 @@ class HttpRestServiceTransport( protected val httpClient: HttpClient = new SingleRequestHttpClient(applicationName, applicationVersion, actorSystem) def sendRequestGetResponse(context: ServiceRequestContext)(requestStub: HttpRequest): Future[HttpResponse] = { + val tags = Map( + // open tracing semantic tags + "span.kind" -> "client", + "service" -> applicationName.value, + "http.url" -> requestStub.uri.toString, + "http.method" -> requestStub.method.value, + "peer.hostname" -> requestStub.uri.authority.host.toString, + // google's tracing console provides extra search features if we define these tags + "/http/path" -> requestStub.uri.path.toString, + "/http/method" -> requestStub.method.value.toString, + "/http/url" -> requestStub.uri.toString + ) + reporter.traceAsync(s"http_call_rpc", tags) { implicit span => + val requestTime = System.currentTimeMillis() - val requestTime = System.currentTimeMillis() + val request = requestStub + .withHeaders(context.contextHeaders.toSeq.map { + case (ContextHeaders.TrackingIdHeader, _) => + RawHeader(ContextHeaders.TrackingIdHeader, context.trackingId) + case (ContextHeaders.StacktraceHeader, _) => + RawHeader( + ContextHeaders.StacktraceHeader, + Option(MDC.get("stack")) + .orElse(context.contextHeaders.get(ContextHeaders.StacktraceHeader)) + .getOrElse("")) + case (header, headerValue) => RawHeader(header, headerValue) + }: _*) - val request = requestStub - .withHeaders(context.contextHeaders.toSeq.map { - case (ContextHeaders.TrackingIdHeader, _) => - RawHeader(ContextHeaders.TrackingIdHeader, context.trackingId) - case (ContextHeaders.StacktraceHeader, _) => - RawHeader( - ContextHeaders.StacktraceHeader, - Option(MDC.get("stack")) - .orElse(context.contextHeaders.get(ContextHeaders.StacktraceHeader)) - .getOrElse("")) - case (header, headerValue) => RawHeader(header, headerValue) - }: _*) + reporter.debug(s"Sending request to ${request.method} ${request.uri}") - log.debug(s"Sending request to ${request.method} ${request.uri}") + val response = httpClient.makeRequest(request) - val response = httpClient.makeRequest(request) + response.onComplete { + case Success(r) => + val responseLatency = System.currentTimeMillis() - requestTime + reporter.debug( + s"Response from ${request.uri} to request $requestStub is successful in $responseLatency ms: $r") - response.onComplete { - case Success(r) => - val responseLatency = System.currentTimeMillis() - requestTime - log.debug(s"Response from ${request.uri} to request $requestStub is successful in $responseLatency ms: $r") + case Failure(t: Throwable) => + val responseLatency = System.currentTimeMillis() - requestTime + reporter.warn( + s"Failed to receive response from ${request.method.value} ${request.uri} in $responseLatency ms", + t) + }(executionContext) - case Failure(t: Throwable) => - val responseLatency = System.currentTimeMillis() - requestTime - log.warn(s"Failed to receive response from ${request.method} ${request.uri} in $responseLatency ms", t) - }(executionContext) - - response.recoverWith { - case _: TcpIdleTimeoutException => - val serviceCalled = s"${requestStub.method} ${requestStub.uri}" - Future.failed(ExternalServiceTimeoutException(serviceCalled)) - case t: Throwable => Future.failed(t) - } + response.recoverWith { + case _: TcpIdleTimeoutException => + val serviceCalled = s"${requestStub.method.value} ${requestStub.uri}" + Future.failed(ExternalServiceTimeoutException(serviceCalled)) + case t: Throwable => Future.failed(t) + } + }(context.spanContext) } def sendRequest(context: ServiceRequestContext)(requestStub: HttpRequest)( diff --git a/src/main/scala/xyz/driver/core/rest/headers/Traceparent.scala b/src/main/scala/xyz/driver/core/rest/headers/Traceparent.scala index 9d470ad..866476d 100644 --- a/src/main/scala/xyz/driver/core/rest/headers/Traceparent.scala +++ b/src/main/scala/xyz/driver/core/rest/headers/Traceparent.scala @@ -3,20 +3,21 @@ package rest package headers import akka.http.scaladsl.model.headers.{ModeledCustomHeader, ModeledCustomHeaderCompanion} +import xyz.driver.core.reporting.SpanContext import scala.util.Try -/** Encapsulates trace context in an HTTP header for propagation across services. +/** Encapsulates a trace context in an HTTP header for propagation across services. * * This implementation corresponds to the W3C editor's draft specification (as of 2018-08-28) * https://w3c.github.io/distributed-tracing/report-trace-context.html. The 'flags' field is * ignored. */ -final case class Traceparent(traceId: String, spanId: String) extends ModeledCustomHeader[Traceparent] { +final case class Traceparent(spanContext: SpanContext) extends ModeledCustomHeader[Traceparent] { override def renderInRequests = true override def renderInResponses = true override val companion: Traceparent.type = Traceparent - override def value: String = f"01-$traceId-$spanId-00" + override def value: String = f"01-${spanContext.traceId}-${spanContext.spanId}-00" } object Traceparent extends ModeledCustomHeaderCompanion[Traceparent] { override val name = "traceparent" @@ -26,8 +27,7 @@ object Traceparent extends ModeledCustomHeaderCompanion[Traceparent] { version == "01", s"Found unsupported version '$version' in traceparent header. Only version '01' is supported.") new Traceparent( - traceId, - spanId + new SpanContext(traceId, spanId) ) } } diff --git a/src/main/scala/xyz/driver/core/rest/package.scala b/src/main/scala/xyz/driver/core/rest/package.scala index 104261a..3be8f02 100644 --- a/src/main/scala/xyz/driver/core/rest/package.scala +++ b/src/main/scala/xyz/driver/core/rest/package.scala @@ -15,6 +15,7 @@ import scalaz.Scalaz.{intInstance, stringInstance} import scalaz.syntax.equal._ import scalaz.{Functor, OptionT} import xyz.driver.core.rest.auth.AuthProvider +import xyz.driver.core.rest.headers.Traceparent import xyz.driver.tracing.TracingDirectives import scala.concurrent.Future @@ -199,7 +200,8 @@ object `package` { h.name === ContextHeaders.AuthenticationTokenHeader || h.name === ContextHeaders.TrackingIdHeader || h.name === ContextHeaders.PermissionsTokenHeader || h.name === ContextHeaders.StacktraceHeader || h.name === ContextHeaders.TraceHeaderName || h.name === ContextHeaders.SpanHeaderName || - h.name === ContextHeaders.OriginatingIpHeader || h.name === ContextHeaders.ClientFingerprintHeader + h.name === ContextHeaders.OriginatingIpHeader || h.name === ContextHeaders.ClientFingerprintHeader || + h.name === Traceparent.name } .map { header => if (header.name === ContextHeaders.AuthenticationTokenHeader) { diff --git a/src/main/scala/xyz/driver/core/rest/serviceRequestContext.scala b/src/main/scala/xyz/driver/core/rest/serviceRequestContext.scala index 76f5a0d..d2e4bc3 100644 --- a/src/main/scala/xyz/driver/core/rest/serviceRequestContext.scala +++ b/src/main/scala/xyz/driver/core/rest/serviceRequestContext.scala @@ -6,7 +6,11 @@ import xyz.driver.core.auth.{AuthToken, PermissionsToken, User} import xyz.driver.core.generators import scalaz.Scalaz.{mapEqual, stringInstance} import scalaz.syntax.equal._ +import xyz.driver.core.reporting.SpanContext import xyz.driver.core.rest.auth.AuthProvider +import xyz.driver.core.rest.headers.Traceparent + +import scala.util.Try class ServiceRequestContext( val trackingId: String = generators.nextUuid().toString, @@ -45,6 +49,15 @@ class ServiceRequestContext( case _ => false } + def spanContext: SpanContext = { + val validHeader = Try { + contextHeaders(Traceparent.name) + }.flatMap { value => + Traceparent.parse(value) + } + validHeader.map(_.spanContext).getOrElse(SpanContext.fresh()) + } + override def toString: String = s"ServiceRequestContext($trackingId, $contextHeaders)" } -- cgit v1.2.3