From 03503f6be54cbde22261c56919f5ef45ebdcb21d Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Fri, 29 Sep 2017 13:41:52 -0700 Subject: Initial commit --- .gitignore | 1 + .scalafmt.conf | 1 + build.sbt | 6 ++ project/build.properties | 1 + project/plugins.sbt | 1 + src/main/scala/xyz/driver/tracing/Span.scala | 24 +++++ .../scala/xyz/driver/tracing/TraceContext.scala | 1 + src/main/scala/xyz/driver/tracing/Tracer.scala | 5 + .../xyz/driver/tracing/TracingDirectives.scala | 96 ++++++++++++++++++ .../xyz/driver/tracing/google/GoogleTracer.scala | 64 ++++++++++++ src/main/scala/xyz/driver/tracing/google/api.scala | 111 +++++++++++++++++++++ 11 files changed, 311 insertions(+) create mode 100644 .gitignore create mode 100644 .scalafmt.conf create mode 100644 build.sbt create mode 100644 project/build.properties create mode 100644 project/plugins.sbt create mode 100644 src/main/scala/xyz/driver/tracing/Span.scala create mode 100644 src/main/scala/xyz/driver/tracing/TraceContext.scala create mode 100644 src/main/scala/xyz/driver/tracing/Tracer.scala create mode 100644 src/main/scala/xyz/driver/tracing/TracingDirectives.scala create mode 100644 src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala create mode 100644 src/main/scala/xyz/driver/tracing/google/api.scala diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f97022 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ \ No newline at end of file diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..9908ac7 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1 @@ +rewrite.rules = [SortImports] \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..a3faa13 --- /dev/null +++ b/build.sbt @@ -0,0 +1,6 @@ +scalaVersion := "2.12.3" + +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-http" % "10.0.10", + "io.spray" %% "spray-json" % "1.3.3" +) diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..369929b --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.0.2 \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..ab0cfab --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.2.0") diff --git a/src/main/scala/xyz/driver/tracing/Span.scala b/src/main/scala/xyz/driver/tracing/Span.scala new file mode 100644 index 0000000..2194e8d --- /dev/null +++ b/src/main/scala/xyz/driver/tracing/Span.scala @@ -0,0 +1,24 @@ +package xyz.driver.tracing + +import java.util.UUID +import java.time._ + +case class Span( + traceId: UUID, + spanId: UUID, + name: String, + parentSpanId: Option[UUID] = None, + labels: Map[String, String] = Map.empty, + startTime: Instant = Instant.now, + endTime: Instant = Instant.now +) { + + def started(clock: Clock = Clock.systemUTC): Span = + this.copy(startTime = clock.instant()) + def ended(clock: Clock = Clock.systemUTC): Span = + this.copy(endTime = clock.instant()) + + def withLabels(extraLabels: (String, String)*) = + this.copy(labels = this.labels ++ extraLabels) + +} diff --git a/src/main/scala/xyz/driver/tracing/TraceContext.scala b/src/main/scala/xyz/driver/tracing/TraceContext.scala new file mode 100644 index 0000000..bc77c2f --- /dev/null +++ b/src/main/scala/xyz/driver/tracing/TraceContext.scala @@ -0,0 +1 @@ +package xyz.driver.tracing diff --git a/src/main/scala/xyz/driver/tracing/Tracer.scala b/src/main/scala/xyz/driver/tracing/Tracer.scala new file mode 100644 index 0000000..00a1fbf --- /dev/null +++ b/src/main/scala/xyz/driver/tracing/Tracer.scala @@ -0,0 +1,5 @@ +package xyz.driver.tracing + +trait Tracer { + def submit(span: Span): Unit +} diff --git a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala b/src/main/scala/xyz/driver/tracing/TracingDirectives.scala new file mode 100644 index 0000000..0c333f6 --- /dev/null +++ b/src/main/scala/xyz/driver/tracing/TracingDirectives.scala @@ -0,0 +1,96 @@ +package xyz.driver.tracing + +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server._ //{Directive, Directive0, Directive1} +import java.util.UUID +import scala.util.Random +import java.time._ +import scala.concurrent._ +import scala.collection.immutable.Seq + +trait TracingDirectives { + import TracingDirectives._ + + def optionalTraceContext: Directive1[Option[TraceContext]] = + extractRequest.map { req => + TraceContext.fromHeaders(req.headers) + } + + def withTraceContext(ctx: TraceContext): Directive0 = + mapRequest(req => req.withHeaders(ctx.headers)) + + def trace(name: String, labels: Map[String, String] = Map.empty)( + implicit tracer: Tracer): Directive0 = + optionalTraceContext.flatMap { + case parent => + val span: Span = parent match { + case None => // no parent span, create new trace + Span( + traceId = UUID.randomUUID, + spanId = UUID.randomUUID, + name = name, + labels = labels + ) + case Some(TraceContext(traceId, parentSpanId)) => + Span( + traceId = traceId, + spanId = UUID.randomUUID, + parentSpanId = parentSpanId, + name = name, + labels = labels + ) + } + + withTraceContext(TraceContext.fromSpan(span)) & mapRouteResult { res => + tracer.submit(span.ended()) + res + } + } + + /* + def span2(name2: String, tracer: Tracer): Directive0 = { + val f: RouteResult ⇒ RouteResult = ??? + Directive { inner ⇒ ctx ⇒ + inner(())(ctx).map(f)(ctx.executionContext) + } + }*/ + +} +/* + def span2(name2: String, tracer: Tracer): Directive0 = { + val f: RouteResult ⇒ RouteResult = ??? + Directive { inner ⇒ ctx ⇒ + inner(())(ctx).map(f)(ctx.executionContext) + } + }*/ + +object TracingDirectives { + + case class TraceContext(traceId: UUID, parentSpanId: Option[UUID]) { + import TraceContext._ + + def headers: Seq[HttpHeader] = + Seq(RawHeader(TraceHeaderName, traceId.toString)) ++ + parentSpanId.toSeq.map(id => RawHeader(SpanHeaderName, id.toString)) + } + object TraceContext { + val TraceHeaderName = "Tracing-Trace-Id" + val SpanHeaderName = "Tracing-Span-Id" + + def fromHeaders(headers: Seq[HttpHeader]): Option[TraceContext] = { + val traceId = headers + .find(_.name == TraceHeaderName) + .map(_.value) + .map(UUID.fromString) + val parentSpanId = + headers.find(_.name == SpanHeaderName).map(_.value).map(UUID.fromString) + traceId.map { tid => + TraceContext(tid, parentSpanId) + } + } + def fromSpan(span: Span) = TraceContext(span.traceId, Some(span.spanId)) + } + +} diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala new file mode 100644 index 0000000..fb8d9e6 --- /dev/null +++ b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala @@ -0,0 +1,64 @@ +package xyz.driver.tracing +package google + +import akka.stream._ +import akka.stream.scaladsl._ +import akka.actor.ActorSystem +import akka.http.scaladsl._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server._ +import scala.util.control._ +import scala.concurrent.duration._ +import java.util.UUID + +class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)( + implicit system: ActorSystem, + materializer: Materializer) + extends Tracer { + + lazy val connectionPool = Http().superPool[Unit]() + + private val batchingPipeline: Flow[Span, Traces, _] = + Flow[Span] + .groupedWithin(bufferSize, 1.second) + .map { spans => + val traces: Seq[Trace] = spans + .groupBy(_.traceId) + .map { + case (traceId, spans) => + Trace( + traceId, + projectId, + spans.map(span => TraceSpan.fromSpan(span)) + ) + } + .toSeq + Traces(traces) + } + + lazy val queue: SourceQueueWithComplete[Span] = { + Source + .queue[Span](bufferSize, OverflowStrategy.dropNew) + .viaMat(batchingPipeline)(Keep.left) + .map { traces => + val entity = HttpEntity + val req = HttpRequest( + HttpMethods.PATCH, + s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces" + ) + (req, ()) + } + .viaMat(connectionPool)(Keep.left) + .mapError { + case NonFatal(e) => + system.log.warning( + s"Exception encountered while submitting trace: $e") + e + } + .to(Sink.ignore) + .run() + } + + override def submit(span: Span): Unit = queue.offer(span) + +} diff --git a/src/main/scala/xyz/driver/tracing/google/api.scala b/src/main/scala/xyz/driver/tracing/google/api.scala new file mode 100644 index 0000000..2dcac92 --- /dev/null +++ b/src/main/scala/xyz/driver/tracing/google/api.scala @@ -0,0 +1,111 @@ +package xyz.driver.tracing +package google + +import spray.json._ +import spray.json.DefaultJsonProtocol._ +import java.util.UUID +import java.nio.ByteBuffer +import java.time._ +import java.time.format._ + +case class TraceSpan( + spanId: Long, + kind: TraceSpan.SpanKind, + name: String, + startTime: Instant, + endTime: Instant, + parentSpanId: Option[Long], + labels: Map[String, String] +) + +object TraceSpan { + + sealed trait SpanKind + // Unspecified + case object Unspecified extends SpanKind + // Indicates that the span covers server-side handling of an RPC or other remote network request. + case object RpcServer extends SpanKind + // Indicates that the span covers the client-side wrapper around an RPC or other remote request. + case object RpcClient extends SpanKind + + object SpanKind { + implicit val format: JsonFormat[SpanKind] = new JsonFormat[SpanKind] { + override def write(x: SpanKind): JsValue = x match { + case Unspecified => JsString("SPAN_KIND_UNSPECIFIED") + case RpcServer => JsString("RPC_SERVER") + case RpcClient => JsString("RPC_CLIENT") + } + override def read(x: JsValue): SpanKind = x match { + case JsString("SPAN_KIND_UNSPECIFIED") => Unspecified + case JsString("RPC_SERVER") => RpcServer + case JsString("RPC_CLIENT") => RpcClient + case other => + spray.json.deserializationError(s"`$other` is not a valid span kind") + } + } + } + + implicit val instantFormat = new JsonFormat[Instant] { + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXXZ") + override def write(x: Instant): JsValue = JsString(formatter.format(x)) + override def read(x: JsValue): Instant = x match { + case JsString(x) => Instant.parse(x) + case other => + spray.json.deserializationError(s"`$other` is not a valid instant") + } + } + + implicit val format: JsonFormat[TraceSpan] = jsonFormat7(TraceSpan.apply) + + def fromSpan(span: Span) = TraceSpan( + span.spanId.getLeastSignificantBits, + Unspecified, + span.name, + span.startTime, + span.endTime, + span.parentSpanId.map(_.getLeastSignificantBits), + span.labels + ) + +} + +case class Trace( + traceId: UUID, + projectId: String = "", + spans: Seq[TraceSpan] = Seq.empty +) + +object Trace { + + implicit val uuidFormat = new JsonFormat[UUID] { + override def write(x: UUID) = { + val buffer = ByteBuffer.allocate(16) + buffer.putLong(x.getMostSignificantBits) + buffer.putLong(x.getLeastSignificantBits) + val array = buffer.array() + val string = new StringBuilder + for (i <- 0 until 16) { + string ++= f"${array(i) & 0xff}%02x" + } + JsString(string.result) + } + override def read(x: JsValue): UUID = x match { + case JsString(str) if str.length == 32 => + val (msb, lsb) = str.splitAt(16) + new UUID(java.lang.Long.decode(msb), java.lang.Long.decode(lsb)) + case JsString(str) => + spray.json.deserializationError( + "128-bit id string must be exactly 32 characters long") + case other => + spray.json.deserializationError("expected 32 character hex string") + } + } + + implicit val format: JsonFormat[Trace] = jsonFormat3(Trace.apply) + +} + +case class Traces(traces: Seq[Trace]) +object Traces { + implicit val format: JsonFormat[Traces] = jsonFormat1(Traces.apply) +} -- cgit v1.2.3