From 9b0bed19489dbbd59c52a77e9c0c080a880ca262 Mon Sep 17 00:00:00 2001 From: "John St. John" Date: Thu, 21 Sep 2017 16:28:23 -0700 Subject: Jstjohn/google stackdriver trace (#64) Add app-level tracing to driver-core (https://www.pivotaltracker.com/story/show/151100422) --- build.sbt | 38 ++++++----- src/main/resources/reference.conf | 26 ++++++++ src/main/scala/xyz/driver/core/app.scala | 35 ++++++---- src/main/scala/xyz/driver/core/rest.scala | 10 +-- .../driver/core/trace/GoogleServiceTracer.scala | 14 ++++ .../driver/core/trace/GoogleStackdriverTrace.scala | 47 ++++++++++++++ .../trace/GoogleStackdriverTraceWithConsumer.scala | 75 ++++++++++++++++++++++ .../scala/xyz/driver/core/trace/LoggingTrace.scala | 21 ++++++ .../driver/core/trace/LoggingTraceConsumer.scala | 11 ++++ .../xyz/driver/core/trace/ServiceTracer.scala | 17 +++++ .../core/trace/SimpleSpanContextHandler.scala | 17 +++++ src/main/scala/xyz/driver/core/trace/package.scala | 6 ++ 12 files changed, 287 insertions(+), 30 deletions(-) create mode 100644 src/main/resources/reference.conf create mode 100644 src/main/scala/xyz/driver/core/trace/GoogleServiceTracer.scala create mode 100644 src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala create mode 100644 src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala create mode 100644 src/main/scala/xyz/driver/core/trace/LoggingTrace.scala create mode 100644 src/main/scala/xyz/driver/core/trace/LoggingTraceConsumer.scala create mode 100644 src/main/scala/xyz/driver/core/trace/ServiceTracer.scala create mode 100644 src/main/scala/xyz/driver/core/trace/SimpleSpanContextHandler.scala create mode 100644 src/main/scala/xyz/driver/core/trace/package.scala diff --git a/build.sbt b/build.sbt index 87828d6..de163c4 100644 --- a/build.sbt +++ b/build.sbt @@ -2,24 +2,32 @@ import sbt._ import Keys._ lazy val akkaHttpV = "10.0.5" +lazy val googleTraceV = "0.4.0" lazy val core = (project in file(".")) .driverLibrary("core") .settings(lintingSettings ++ formatSettings) .settings(libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-http-core" % akkaHttpV, - "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpV, - "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpV, - "com.pauldijou" %% "jwt-core" % "0.14.0", - "org.scalatest" %% "scalatest" % "3.0.1" % "test", - "org.scalacheck" %% "scalacheck" % "1.13.4" % "test", - "org.mockito" % "mockito-core" % "1.9.5" % "test", - "com.github.swagger-akka-http" %% "swagger-akka-http" % "0.9.1", - "com.amazonaws" % "aws-java-sdk-s3" % "1.11.26", - "com.google.cloud" % "google-cloud-pubsub" % "0.17.2-alpha", - "com.google.cloud" % "google-cloud-storage" % "1.6.0", - "com.typesafe.slick" %% "slick" % "3.1.1", - "com.typesafe" % "config" % "1.2.1", - "com.typesafe.scala-logging" %% "scala-logging" % "3.5.0", - "ch.qos.logback" % "logback-classic" % "1.1.3" + "com.typesafe.akka" %% "akka-http-core" % akkaHttpV, + "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpV, + "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpV, + "com.pauldijou" %% "jwt-core" % "0.14.0", + "org.scalatest" %% "scalatest" % "3.0.1" % "test", + "org.scalacheck" %% "scalacheck" % "1.13.4" % "test", + "org.mockito" % "mockito-core" % "1.9.5" % "test", + "com.github.swagger-akka-http" %% "swagger-akka-http" % "0.9.1", + "com.amazonaws" % "aws-java-sdk-s3" % "1.11.26", + "com.google.cloud" % "google-cloud-pubsub" % "0.17.2-alpha", + "com.google.cloud" % "google-cloud-storage" % "1.6.0", + "com.typesafe.slick" %% "slick" % "3.1.1", + "com.typesafe" % "config" % "1.2.1", + "com.typesafe.scala-logging" %% "scala-logging" % "3.5.0", + "ch.qos.logback" % "logback-classic" % "1.1.3", + "com.google.cloud.trace" % "core" % googleTraceV, + "com.google.cloud.trace" % "logging-service" % googleTraceV, + "com.google.cloud.trace" % "trace-grpc-api-service" % googleTraceV, + // the following version of netty boringssl (or maybe greater) is/was needed to avoid Jetty ALPN/NPN + // config errors w/ SSL in google libs. Before removing test that tracing posts to google + // in a service that uses this library. + "io.netty" % "netty-tcnative-boringssl-static" % "2.0.3.Final" )) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf new file mode 100644 index 0000000..637f713 --- /dev/null +++ b/src/main/resources/reference.conf @@ -0,0 +1,26 @@ + +// This file takes on the lowest config priority. Any settings implemented in +// entities that use this library will take precedence. +application { + baseUrl: "localhost:8080" + environment: "local_testing" +} + +swagger { + apiVersion = "2.0" + basePath = "/" + docsPath = "api-docs" + + apiInfo { + title = "NEW SERVICE" + description = "Please implement swagger info in your new service" + termsOfServiceUrl = "TOC Url" + contact { + name = "Driver Inc." + url = "http://driver.xyz" + email = "info@driver.xyz" + } + license = "Apache V2" + licenseUrl = "http://www.apache.org/licenses/LICENSE-2.0" + } +} diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala index b5ac4d0..763a363 100644 --- a/src/main/scala/xyz/driver/core/app.scala +++ b/src/main/scala/xyz/driver/core/app.scala @@ -23,13 +23,14 @@ import xyz.driver.core.rest._ import xyz.driver.core.stats.SystemStats import xyz.driver.core.time.Time import xyz.driver.core.time.provider.{SystemTimeProvider, TimeProvider} +import xyz.driver.core.trace.{LoggingTrace, ServiceTracer} import scala.compat.Platform.ConcurrentModificationException import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} import scala.reflect.runtime.universe._ -import scala.util.control.NonFatal import scala.util.Try +import scala.util.control.NonFatal import scalaz.Scalaz.stringInstance import scalaz.syntax.equal._ @@ -45,11 +46,14 @@ object app { interface: String = "::0", baseUrl: String = "localhost:8080", scheme: String = "http", - port: Int = 8080)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext) { + port: Int = 8080, + tracer: Option[ServiceTracer] = None)(implicit actorSystem: ActorSystem, + executionContext: ExecutionContext) { implicit private lazy val materializer = ActorMaterializer()(actorSystem) private lazy val http = Http()(actorSystem) - + val appEnvironment = config.getString("application.environment") + val serviceTracer = tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log)) def run(): Unit = { activateServices(modules) scheduleServicesDeactivation(modules) @@ -89,7 +93,8 @@ object app { "X-Content-Type-Options", "Strict-Transport-Security", AuthProvider.SetAuthenticationTokenHeader, - AuthProvider.SetPermissionsTokenHeader + AuthProvider.SetPermissionsTokenHeader, + trace.TracingHeaderKey ) private def allowOrigin(originHeader: Option[Origin]) = @@ -132,7 +137,9 @@ object app { extractClientIP { ip => optionalHeaderValueByType[Origin](()) { originHeader => { ctx => - val trackingId = rest.extractTrackingId(ctx.request) + val traceSpan = serviceTracer.startSpan(ctx.request) + val tracingHeader = traceSpan.header + val trackingId = rest.extractTrackingId(ctx.request) MDC.put("trackingId", trackingId) val updatedStacktrace = (rest.extractStacktrace(ctx.request) ++ Array(appName)).mkString("->") @@ -148,24 +155,30 @@ object app { val contextWithTrackingId = ctx.withRequest( ctx.request + .addHeader(tracingHeader) .addHeader(RawHeader(ContextHeaders.TrackingIdHeader, trackingId)) .addHeader(RawHeader(ContextHeaders.StacktraceHeader, updatedStacktrace))) handleExceptions(ExceptionHandler(exceptionHandler))({ c => requestLogging.flatMap { _ => - val tracingHeader = RawHeader(ContextHeaders.TrackingIdHeader, trackingId) + val trackingHeader = RawHeader(ContextHeaders.TrackingIdHeader, trackingId) - val responseHeaders = List[HttpHeader](tracingHeader, - allowOrigin(originHeader), - `Access-Control-Allow-Headers`(allowedHeaders: _*), - `Access-Control-Expose-Headers`(allowedHeaders: _*)) + val responseHeaders = List[HttpHeader]( + trackingHeader, + tracingHeader, + allowOrigin(originHeader), + `Access-Control-Allow-Headers`(allowedHeaders: _*), + `Access-Control-Expose-Headers`(allowedHeaders: _*) + ) respondWithHeaders(responseHeaders) { modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _) }(c) } - })(contextWithTrackingId) + })(contextWithTrackingId).andThen { + case _ => serviceTracer.endSpan(traceSpan) + } } } } diff --git a/src/main/scala/xyz/driver/core/rest.scala b/src/main/scala/xyz/driver/core/rest.scala index dde570a..95df010 100644 --- a/src/main/scala/xyz/driver/core/rest.scala +++ b/src/main/scala/xyz/driver/core/rest.scala @@ -13,8 +13,8 @@ import akka.http.scaladsl.server.AuthenticationFailedRejection.CredentialsReject import akka.http.scaladsl.server.Route import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream._ +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.util.ByteString import com.github.swagger.akka.model._ import com.github.swagger.akka.{HasActorSystem, SwaggerHttpService} @@ -24,11 +24,11 @@ import io.swagger.models.Scheme import org.slf4j.MDC import pdi.jwt.{Jwt, JwtAlgorithm} import xyz.driver.core.auth._ -import xyz.driver.core.{Name, generators} import xyz.driver.core.time.provider.TimeProvider +import xyz.driver.core.{Name, generators, trace} -import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} import scalaz.Scalaz.{futureInstance, intInstance, listInstance, mapEqual, mapMonoid, stringInstance} import scalaz.syntax.equal._ @@ -58,7 +58,8 @@ object `package` { def extractContextHeaders(request: HttpRequest): Map[String, String] = { request.headers.filter { h => h.name === ContextHeaders.AuthenticationTokenHeader || h.name === ContextHeaders.TrackingIdHeader || - h.name === ContextHeaders.PermissionsTokenHeader || h.name === ContextHeaders.StacktraceHeader + h.name === ContextHeaders.PermissionsTokenHeader || h.name === ContextHeaders.StacktraceHeader || + h.name === ContextHeaders.TracingHeader } map { header => if (header.name === ContextHeaders.AuthenticationTokenHeader) { header.name -> header.value.stripPrefix(ContextHeaders.AuthenticationHeaderPrefix).trim @@ -176,6 +177,7 @@ object ContextHeaders { val AuthenticationHeaderPrefix = "Bearer" val TrackingIdHeader = "X-Trace" val StacktraceHeader = "X-Stacktrace" + val TracingHeader = trace.TracingHeaderKey } object AuthProvider { diff --git a/src/main/scala/xyz/driver/core/trace/GoogleServiceTracer.scala b/src/main/scala/xyz/driver/core/trace/GoogleServiceTracer.scala new file mode 100644 index 0000000..ead4c6f --- /dev/null +++ b/src/main/scala/xyz/driver/core/trace/GoogleServiceTracer.scala @@ -0,0 +1,14 @@ +package xyz.driver.core.trace + +import akka.http.scaladsl.model.headers.RawHeader +import com.google.cloud.trace.Tracer +import com.google.cloud.trace.core.{SpanContextFactory, TraceContext} + +final case class GoogleStackdriverTraceSpan(tracer: Tracer, context: TraceContext) extends CanMakeHeader { + def header: RawHeader = + RawHeader(TracingHeaderKey, SpanContextFactory.toHeader(context.getHandle.getCurrentSpanContext)) +} + +trait GoogleServiceTracer extends ServiceTracer { + type TracerSpanPayload = GoogleStackdriverTraceSpan +} diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala new file mode 100644 index 0000000..1ff8d10 --- /dev/null +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala @@ -0,0 +1,47 @@ +package xyz.driver.core.trace + +import java.io.FileInputStream +import java.nio.file.{Files, Paths} +import java.util + +import akka.http.scaladsl.model.HttpRequest +import com.google.auth.oauth2.GoogleCredentials +import com.google.cloud.trace.grpc.v1.GrpcTraceConsumer +import com.google.cloud.trace.v1.consumer.TraceConsumer +import com.typesafe.scalalogging.Logger + +final class GoogleStackdriverTrace(projectId: String, + clientSecretsFile: String, + appName: String, + appEnvironment: String, + log: Logger) + extends GoogleServiceTracer { + + // initialize our various tracking storage systems + private val clientSecretsInputStreamOpt: Option[FileInputStream] = if (Files.exists(Paths.get(clientSecretsFile))) { + Some(new FileInputStream(clientSecretsFile)) + } else { + None + } + // if the google credentials are invalid, just log the traces + private val traceConsumer: TraceConsumer = clientSecretsInputStreamOpt.fold[TraceConsumer] { + log.warn(s"Google credentials not found in path: $clientSecretsFile") + new LoggingTraceConsumer(log) + } { clientSecretsInputStream => + GrpcTraceConsumer + .create( + "cloudtrace.googleapis.com", + GoogleCredentials + .fromStream(clientSecretsInputStream) + .createScoped(util.Arrays.asList("https://www.googleapis.com/auth/trace.append")) + ) + } + + private val googleServiceTracer = + new GoogleStackdriverTraceWithConsumer(projectId, appName, appEnvironment, traceConsumer) + + override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan = + googleServiceTracer.startSpan(httpRequest) + + override def endSpan(span: GoogleStackdriverTraceSpan): Unit = googleServiceTracer.endSpan(span) +} diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala new file mode 100644 index 0000000..7fed3c7 --- /dev/null +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala @@ -0,0 +1,75 @@ +package xyz.driver.core.trace + +import akka.http.scaladsl.model.HttpRequest +import com.google.cloud.trace.core._ +import com.google.cloud.trace.sink.TraceSink +import com.google.cloud.trace.v1.TraceSinkV1 +import com.google.cloud.trace.v1.consumer.{SizedBufferingTraceConsumer, TraceConsumer} +import com.google.cloud.trace.v1.producer.TraceProducer +import com.google.cloud.trace.v1.util.RoughTraceSizer +import com.google.cloud.trace.{SpanContextHandler, SpanContextHandlerTracer, Tracer} + +import scala.compat.java8.OptionConverters._ + +final class GoogleStackdriverTraceWithConsumer(projectId: String, + appName: String, + appEnvironment: String, + traceConsumer: TraceConsumer) + extends GoogleServiceTracer { + + private val traceProducer: TraceProducer = new TraceProducer() + private val threadSafeBufferingTraceConsumer = + new SizedBufferingTraceConsumer(traceConsumer, new RoughTraceSizer(), 100) + + private val traceSink: TraceSink = new TraceSinkV1(projectId, traceProducer, threadSafeBufferingTraceConsumer) + + private val spanContextFactory: SpanContextFactory = new SpanContextFactory( + new ConstantTraceOptionsFactory(true, true)) + private val timestampFactory: TimestampFactory = new JavaTimestampFactory() + + override def startSpan(httpRequest: HttpRequest): TracerSpanPayload = { + val parentHeaderOption: Option[akka.http.javadsl.model.HttpHeader] = + httpRequest.getHeader(TracingHeaderKey).asScala + val (spanContext: SpanContext, spanKind: SpanKind) = parentHeaderOption.fold { + (spanContextFactory.initialContext(), SpanKind.RPC_CLIENT) + } { parentHeader => + (spanContextFactory.fromHeader(parentHeader.value()), SpanKind.RPC_SERVER) + } + + val contextHandler: SpanContextHandler = new SimpleSpanContextHandler(spanContext) + val httpMethod = httpRequest.method.value + val httpHost = httpRequest.uri.authority.host.address() + val httpRelative = httpRequest.uri.toRelative.toString() + val tracer: Tracer = new SpanContextHandlerTracer(traceSink, contextHandler, spanContextFactory, timestampFactory) + // Create a span using the given timestamps. + // https://cloud.google.com/trace/docs/reference/v1/rest/v1/projects.traces#TraceSpan + val spanOptions: StartSpanOptions = (new StartSpanOptions()).setSpanKind(spanKind) + + val spanLabelBuilder = Labels + .builder() + .add("/http/method", httpMethod) + .add("/http/url", httpRelative) + .add("/http/host", httpHost) + .add("/component", appName) + .add("/environment", appEnvironment) + + parentHeaderOption.foreach { parentHeader => + spanLabelBuilder.add("/span/parent", parentHeader.value()) + } + + // The cloudTrace analysis reporting UI makes it easy to query by name prefix. + // this spanName gives us the ability to grab things that are specific to a particular UDE/env, as well as all + // endpoints in that service, as well as a particular endpoint in a particular environment/service. + val spanName: String = s"($appEnvironment->$appName)$httpRelative" + + val context: TraceContext = tracer.startSpan(spanName, spanOptions) + tracer.annotateSpan(context, spanLabelBuilder.build()) + GoogleStackdriverTraceSpan(tracer, context) + } + + override def endSpan(span: TracerSpanPayload): Unit = { + span.tracer.endSpan(span.context) + threadSafeBufferingTraceConsumer.flush() // flush out the thread safe buffer + } + +} diff --git a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala new file mode 100644 index 0000000..9db85b7 --- /dev/null +++ b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala @@ -0,0 +1,21 @@ +package xyz.driver.core.trace + +import akka.http.scaladsl.model.HttpRequest +import com.google.cloud.trace.v1.consumer.TraceConsumer +import com.typesafe.scalalogging.Logger + +final class LoggingTrace(appName: String, appEnvironment: String, log: Logger) extends GoogleServiceTracer { + + private val traceConsumer: TraceConsumer = new LoggingTraceConsumer(log) + private val googleServiceTracer = new GoogleStackdriverTraceWithConsumer( + "logging-tracer", + appName, + appEnvironment, + traceConsumer + ) + + override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan = + googleServiceTracer.startSpan(httpRequest) + + override def endSpan(span: GoogleStackdriverTraceSpan): Unit = googleServiceTracer.endSpan(span) +} diff --git a/src/main/scala/xyz/driver/core/trace/LoggingTraceConsumer.scala b/src/main/scala/xyz/driver/core/trace/LoggingTraceConsumer.scala new file mode 100644 index 0000000..24df94a --- /dev/null +++ b/src/main/scala/xyz/driver/core/trace/LoggingTraceConsumer.scala @@ -0,0 +1,11 @@ +package xyz.driver.core.trace + +import com.google.cloud.trace.v1.consumer.TraceConsumer +import com.google.devtools.cloudtrace.v1.Traces +import com.typesafe.scalalogging.Logger + +class LoggingTraceConsumer(log: Logger) extends TraceConsumer { + def receive(traces: Traces): Unit = { + log.trace(s"Received traces: $traces") + } +} diff --git a/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala b/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala new file mode 100644 index 0000000..25562cd --- /dev/null +++ b/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala @@ -0,0 +1,17 @@ +package xyz.driver.core.trace + +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.headers.RawHeader + +trait CanMakeHeader { + def header: RawHeader +} + +trait ServiceTracer { + + type TracerSpanPayload <: CanMakeHeader + + def startSpan(httpRequest: HttpRequest): TracerSpanPayload + + def endSpan(span: TracerSpanPayload): Unit +} diff --git a/src/main/scala/xyz/driver/core/trace/SimpleSpanContextHandler.scala b/src/main/scala/xyz/driver/core/trace/SimpleSpanContextHandler.scala new file mode 100644 index 0000000..4ea0e8c --- /dev/null +++ b/src/main/scala/xyz/driver/core/trace/SimpleSpanContextHandler.scala @@ -0,0 +1,17 @@ +package xyz.driver.core.trace + +import com.google.cloud.trace.SpanContextHandler +import com.google.cloud.trace.DetachedSpanContextHandle +import com.google.cloud.trace.core.{SpanContext, SpanContextHandle} + +@SuppressWarnings(Array("org.wartremover.warts.Var")) +class SimpleSpanContextHandler(rootSpan: SpanContext) extends SpanContextHandler { + private var currentSpanContext = rootSpan + + override def current(): SpanContext = currentSpanContext + + override def attach(context: SpanContext): SpanContextHandle = { + currentSpanContext = context + new DetachedSpanContextHandle(context) + } +} diff --git a/src/main/scala/xyz/driver/core/trace/package.scala b/src/main/scala/xyz/driver/core/trace/package.scala new file mode 100644 index 0000000..0dec833 --- /dev/null +++ b/src/main/scala/xyz/driver/core/trace/package.scala @@ -0,0 +1,6 @@ +package xyz.driver.core + +package object trace { + // this happens to be the same as the name google uses, but can be anything we want + val TracingHeaderKey: String = "X-Cloud-Trace-Context" +} -- cgit v1.2.3