aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@driver.xyz>2017-09-29 13:41:52 -0700
committerJakob Odersky <jakob@driver.xyz>2017-09-29 13:44:10 -0700
commit03503f6be54cbde22261c56919f5ef45ebdcb21d (patch)
treefd85cb40860674bd404476346b9ed8ea84a1cfe1
downloadtracing-03503f6be54cbde22261c56919f5ef45ebdcb21d.tar.gz
tracing-03503f6be54cbde22261c56919f5ef45ebdcb21d.tar.bz2
tracing-03503f6be54cbde22261c56919f5ef45ebdcb21d.zip
Initial commit
-rw-r--r--.gitignore1
-rw-r--r--.scalafmt.conf1
-rw-r--r--build.sbt6
-rw-r--r--project/build.properties1
-rw-r--r--project/plugins.sbt1
-rw-r--r--src/main/scala/xyz/driver/tracing/Span.scala24
-rw-r--r--src/main/scala/xyz/driver/tracing/TraceContext.scala1
-rw-r--r--src/main/scala/xyz/driver/tracing/Tracer.scala5
-rw-r--r--src/main/scala/xyz/driver/tracing/TracingDirectives.scala96
-rw-r--r--src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala64
-rw-r--r--src/main/scala/xyz/driver/tracing/google/api.scala111
11 files changed, 311 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..9f97022
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+target/ \ No newline at end of file
diff --git a/.scalafmt.conf b/.scalafmt.conf
new file mode 100644
index 0000000..9908ac7
--- /dev/null
+++ b/.scalafmt.conf
@@ -0,0 +1 @@
+rewrite.rules = [SortImports] \ No newline at end of file
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..a3faa13
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,6 @@
+scalaVersion := "2.12.3"
+
+libraryDependencies ++= Seq(
+ "com.typesafe.akka" %% "akka-http" % "10.0.10",
+ "io.spray" %% "spray-json" % "1.3.3"
+)
diff --git a/project/build.properties b/project/build.properties
new file mode 100644
index 0000000..369929b
--- /dev/null
+++ b/project/build.properties
@@ -0,0 +1 @@
+sbt.version=1.0.2 \ No newline at end of file
diff --git a/project/plugins.sbt b/project/plugins.sbt
new file mode 100644
index 0000000..ab0cfab
--- /dev/null
+++ b/project/plugins.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.2.0")
diff --git a/src/main/scala/xyz/driver/tracing/Span.scala b/src/main/scala/xyz/driver/tracing/Span.scala
new file mode 100644
index 0000000..2194e8d
--- /dev/null
+++ b/src/main/scala/xyz/driver/tracing/Span.scala
@@ -0,0 +1,24 @@
+package xyz.driver.tracing
+
+import java.util.UUID
+import java.time._
+
+case class Span(
+ traceId: UUID,
+ spanId: UUID,
+ name: String,
+ parentSpanId: Option[UUID] = None,
+ labels: Map[String, String] = Map.empty,
+ startTime: Instant = Instant.now,
+ endTime: Instant = Instant.now
+) {
+
+ def started(clock: Clock = Clock.systemUTC): Span =
+ this.copy(startTime = clock.instant())
+ def ended(clock: Clock = Clock.systemUTC): Span =
+ this.copy(endTime = clock.instant())
+
+ def withLabels(extraLabels: (String, String)*) =
+ this.copy(labels = this.labels ++ extraLabels)
+
+}
diff --git a/src/main/scala/xyz/driver/tracing/TraceContext.scala b/src/main/scala/xyz/driver/tracing/TraceContext.scala
new file mode 100644
index 0000000..bc77c2f
--- /dev/null
+++ b/src/main/scala/xyz/driver/tracing/TraceContext.scala
@@ -0,0 +1 @@
+package xyz.driver.tracing
diff --git a/src/main/scala/xyz/driver/tracing/Tracer.scala b/src/main/scala/xyz/driver/tracing/Tracer.scala
new file mode 100644
index 0000000..00a1fbf
--- /dev/null
+++ b/src/main/scala/xyz/driver/tracing/Tracer.scala
@@ -0,0 +1,5 @@
+package xyz.driver.tracing
+
+trait Tracer {
+ def submit(span: Span): Unit
+}
diff --git a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala b/src/main/scala/xyz/driver/tracing/TracingDirectives.scala
new file mode 100644
index 0000000..0c333f6
--- /dev/null
+++ b/src/main/scala/xyz/driver/tracing/TracingDirectives.scala
@@ -0,0 +1,96 @@
+package xyz.driver.tracing
+
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.headers._
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._ //{Directive, Directive0, Directive1}
+import java.util.UUID
+import scala.util.Random
+import java.time._
+import scala.concurrent._
+import scala.collection.immutable.Seq
+
+trait TracingDirectives {
+ import TracingDirectives._
+
+ def optionalTraceContext: Directive1[Option[TraceContext]] =
+ extractRequest.map { req =>
+ TraceContext.fromHeaders(req.headers)
+ }
+
+ def withTraceContext(ctx: TraceContext): Directive0 =
+ mapRequest(req => req.withHeaders(ctx.headers))
+
+ def trace(name: String, labels: Map[String, String] = Map.empty)(
+ implicit tracer: Tracer): Directive0 =
+ optionalTraceContext.flatMap {
+ case parent =>
+ val span: Span = parent match {
+ case None => // no parent span, create new trace
+ Span(
+ traceId = UUID.randomUUID,
+ spanId = UUID.randomUUID,
+ name = name,
+ labels = labels
+ )
+ case Some(TraceContext(traceId, parentSpanId)) =>
+ Span(
+ traceId = traceId,
+ spanId = UUID.randomUUID,
+ parentSpanId = parentSpanId,
+ name = name,
+ labels = labels
+ )
+ }
+
+ withTraceContext(TraceContext.fromSpan(span)) & mapRouteResult { res =>
+ tracer.submit(span.ended())
+ res
+ }
+ }
+
+ /*
+ def span2(name2: String, tracer: Tracer): Directive0 = {
+ val f: RouteResult ⇒ RouteResult = ???
+ Directive { inner ⇒ ctx ⇒
+ inner(())(ctx).map(f)(ctx.executionContext)
+ }
+ }*/
+
+}
+/*
+ def span2(name2: String, tracer: Tracer): Directive0 = {
+ val f: RouteResult ⇒ RouteResult = ???
+ Directive { inner ⇒ ctx ⇒
+ inner(())(ctx).map(f)(ctx.executionContext)
+ }
+ }*/
+
+object TracingDirectives {
+
+ case class TraceContext(traceId: UUID, parentSpanId: Option[UUID]) {
+ import TraceContext._
+
+ def headers: Seq[HttpHeader] =
+ Seq(RawHeader(TraceHeaderName, traceId.toString)) ++
+ parentSpanId.toSeq.map(id => RawHeader(SpanHeaderName, id.toString))
+ }
+ object TraceContext {
+ val TraceHeaderName = "Tracing-Trace-Id"
+ val SpanHeaderName = "Tracing-Span-Id"
+
+ def fromHeaders(headers: Seq[HttpHeader]): Option[TraceContext] = {
+ val traceId = headers
+ .find(_.name == TraceHeaderName)
+ .map(_.value)
+ .map(UUID.fromString)
+ val parentSpanId =
+ headers.find(_.name == SpanHeaderName).map(_.value).map(UUID.fromString)
+ traceId.map { tid =>
+ TraceContext(tid, parentSpanId)
+ }
+ }
+ def fromSpan(span: Span) = TraceContext(span.traceId, Some(span.spanId))
+ }
+
+}
diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
new file mode 100644
index 0000000..fb8d9e6
--- /dev/null
+++ b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
@@ -0,0 +1,64 @@
+package xyz.driver.tracing
+package google
+
+import akka.stream._
+import akka.stream.scaladsl._
+import akka.actor.ActorSystem
+import akka.http.scaladsl._
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server._
+import scala.util.control._
+import scala.concurrent.duration._
+import java.util.UUID
+
+class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)(
+ implicit system: ActorSystem,
+ materializer: Materializer)
+ extends Tracer {
+
+ lazy val connectionPool = Http().superPool[Unit]()
+
+ private val batchingPipeline: Flow[Span, Traces, _] =
+ Flow[Span]
+ .groupedWithin(bufferSize, 1.second)
+ .map { spans =>
+ val traces: Seq[Trace] = spans
+ .groupBy(_.traceId)
+ .map {
+ case (traceId, spans) =>
+ Trace(
+ traceId,
+ projectId,
+ spans.map(span => TraceSpan.fromSpan(span))
+ )
+ }
+ .toSeq
+ Traces(traces)
+ }
+
+ lazy val queue: SourceQueueWithComplete[Span] = {
+ Source
+ .queue[Span](bufferSize, OverflowStrategy.dropNew)
+ .viaMat(batchingPipeline)(Keep.left)
+ .map { traces =>
+ val entity = HttpEntity
+ val req = HttpRequest(
+ HttpMethods.PATCH,
+ s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces"
+ )
+ (req, ())
+ }
+ .viaMat(connectionPool)(Keep.left)
+ .mapError {
+ case NonFatal(e) =>
+ system.log.warning(
+ s"Exception encountered while submitting trace: $e")
+ e
+ }
+ .to(Sink.ignore)
+ .run()
+ }
+
+ override def submit(span: Span): Unit = queue.offer(span)
+
+}
diff --git a/src/main/scala/xyz/driver/tracing/google/api.scala b/src/main/scala/xyz/driver/tracing/google/api.scala
new file mode 100644
index 0000000..2dcac92
--- /dev/null
+++ b/src/main/scala/xyz/driver/tracing/google/api.scala
@@ -0,0 +1,111 @@
+package xyz.driver.tracing
+package google
+
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import java.util.UUID
+import java.nio.ByteBuffer
+import java.time._
+import java.time.format._
+
+case class TraceSpan(
+ spanId: Long,
+ kind: TraceSpan.SpanKind,
+ name: String,
+ startTime: Instant,
+ endTime: Instant,
+ parentSpanId: Option[Long],
+ labels: Map[String, String]
+)
+
+object TraceSpan {
+
+ sealed trait SpanKind
+ // Unspecified
+ case object Unspecified extends SpanKind
+ // Indicates that the span covers server-side handling of an RPC or other remote network request.
+ case object RpcServer extends SpanKind
+ // Indicates that the span covers the client-side wrapper around an RPC or other remote request.
+ case object RpcClient extends SpanKind
+
+ object SpanKind {
+ implicit val format: JsonFormat[SpanKind] = new JsonFormat[SpanKind] {
+ override def write(x: SpanKind): JsValue = x match {
+ case Unspecified => JsString("SPAN_KIND_UNSPECIFIED")
+ case RpcServer => JsString("RPC_SERVER")
+ case RpcClient => JsString("RPC_CLIENT")
+ }
+ override def read(x: JsValue): SpanKind = x match {
+ case JsString("SPAN_KIND_UNSPECIFIED") => Unspecified
+ case JsString("RPC_SERVER") => RpcServer
+ case JsString("RPC_CLIENT") => RpcClient
+ case other =>
+ spray.json.deserializationError(s"`$other` is not a valid span kind")
+ }
+ }
+ }
+
+ implicit val instantFormat = new JsonFormat[Instant] {
+ val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXXZ")
+ override def write(x: Instant): JsValue = JsString(formatter.format(x))
+ override def read(x: JsValue): Instant = x match {
+ case JsString(x) => Instant.parse(x)
+ case other =>
+ spray.json.deserializationError(s"`$other` is not a valid instant")
+ }
+ }
+
+ implicit val format: JsonFormat[TraceSpan] = jsonFormat7(TraceSpan.apply)
+
+ def fromSpan(span: Span) = TraceSpan(
+ span.spanId.getLeastSignificantBits,
+ Unspecified,
+ span.name,
+ span.startTime,
+ span.endTime,
+ span.parentSpanId.map(_.getLeastSignificantBits),
+ span.labels
+ )
+
+}
+
+case class Trace(
+ traceId: UUID,
+ projectId: String = "",
+ spans: Seq[TraceSpan] = Seq.empty
+)
+
+object Trace {
+
+ implicit val uuidFormat = new JsonFormat[UUID] {
+ override def write(x: UUID) = {
+ val buffer = ByteBuffer.allocate(16)
+ buffer.putLong(x.getMostSignificantBits)
+ buffer.putLong(x.getLeastSignificantBits)
+ val array = buffer.array()
+ val string = new StringBuilder
+ for (i <- 0 until 16) {
+ string ++= f"${array(i) & 0xff}%02x"
+ }
+ JsString(string.result)
+ }
+ override def read(x: JsValue): UUID = x match {
+ case JsString(str) if str.length == 32 =>
+ val (msb, lsb) = str.splitAt(16)
+ new UUID(java.lang.Long.decode(msb), java.lang.Long.decode(lsb))
+ case JsString(str) =>
+ spray.json.deserializationError(
+ "128-bit id string must be exactly 32 characters long")
+ case other =>
+ spray.json.deserializationError("expected 32 character hex string")
+ }
+ }
+
+ implicit val format: JsonFormat[Trace] = jsonFormat3(Trace.apply)
+
+}
+
+case class Traces(traces: Seq[Trace])
+object Traces {
+ implicit val format: JsonFormat[Traces] = jsonFormat1(Traces.apply)
+}