From 2c08b51411be5b0cce57f876377fcd52bee99990 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 1 Oct 2017 20:24:02 -0700 Subject: Flatten file hierarchy and implement OAUTH2 authentication --- build.sbt | 6 +- src/main/scala/GoogleTracer.scala | 84 +++++++++++++++ src/main/scala/Span.scala | 24 +++++ src/main/scala/Tracer.scala | 5 + src/main/scala/TracingDirectives.scala | 87 ++++++++++++++++ src/main/scala/google/OAuth2.scala | 108 ++++++++++++++++++++ src/main/scala/google/api.scala | 113 +++++++++++++++++++++ src/main/scala/xyz/driver/tracing/Span.scala | 24 ----- src/main/scala/xyz/driver/tracing/Tracer.scala | 5 - .../xyz/driver/tracing/TracingDirectives.scala | 90 ---------------- .../xyz/driver/tracing/google/GoogleTracer.scala | 70 ------------- src/main/scala/xyz/driver/tracing/google/api.scala | 111 -------------------- 12 files changed, 426 insertions(+), 301 deletions(-) create mode 100644 src/main/scala/GoogleTracer.scala create mode 100644 src/main/scala/Span.scala create mode 100644 src/main/scala/Tracer.scala create mode 100644 src/main/scala/TracingDirectives.scala create mode 100644 src/main/scala/google/OAuth2.scala create mode 100644 src/main/scala/google/api.scala delete mode 100644 src/main/scala/xyz/driver/tracing/Span.scala delete mode 100644 src/main/scala/xyz/driver/tracing/Tracer.scala delete mode 100644 src/main/scala/xyz/driver/tracing/TracingDirectives.scala delete mode 100644 src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala delete mode 100644 src/main/scala/xyz/driver/tracing/google/api.scala diff --git a/build.sbt b/build.sbt index 733739e..2aa0701 100644 --- a/build.sbt +++ b/build.sbt @@ -3,5 +3,9 @@ scalaVersion := "2.12.3" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.0.10", "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10", - "io.spray" %% "spray-json" % "1.3.3" + "com.pauldijou" %% "jwt-core" % "0.14.0", + "io.spray" %% "spray-json" % "1.3.3", + "org.scalatest" %% "scalatest" % "3.0.2" % "test", ) + +fork in test := true diff --git a/src/main/scala/GoogleTracer.scala b/src/main/scala/GoogleTracer.scala new file mode 100644 index 0000000..5b3c9df --- /dev/null +++ b/src/main/scala/GoogleTracer.scala @@ -0,0 +1,84 @@ +package xyz.driver.tracing + +import google._ +import java.nio.file.Path +import akka.stream._ +import akka.stream.scaladsl._ +import akka.actor.ActorSystem +import akka.http.scaladsl._ +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server._ +import scala.util.control._ +import scala.concurrent.duration._ +import spray.json.DefaultJsonProtocol._ +import java.util.UUID + +class GoogleTracer(projectId: String, + serviceAccountFile: Path, + bufferSize: Int = 1000, + concurrentConnections: Int = 1)(implicit system: ActorSystem, + materializer: Materializer) + extends Tracer { + + import system.dispatcher + + lazy val connectionPool = Http().superPool[Unit]() + + private val batchingPipeline: Flow[Span, Traces, _] = + Flow[Span] + .groupedWithin(bufferSize, 1.second) + .map { spans => + val traces: Seq[Trace] = spans + .groupBy(_.traceId) + .map { + case (traceId, spans) => + Trace( + traceId, + projectId, + spans.map(span => TraceSpan.fromSpan(span)) + ) + } + .toSeq + Traces(traces) + } + + lazy val queue: SourceQueueWithComplete[Span] = { + Source + .queue[Span](bufferSize, OverflowStrategy.dropNew) + .log("debug") + .viaMat(batchingPipeline)(Keep.left) + .mapAsync(concurrentConnections) { (traces: Traces) => + println(traces) + Marshal(traces).to[RequestEntity].map { entity => + HttpRequest( + method = HttpMethods.PATCH, + uri = + s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces", + entity = entity + ) + } + } + .viaMat( + OAuth2.authenticatedFlow( + Http(), + serviceAccountFile, + Seq( + "https://www.googleapis.com/auth/trace.append" + )))(Keep.left) + .map(req => (req, ())) + .viaMat(connectionPool)(Keep.left) + .mapError { + case NonFatal(e) => + system.log.error(s"Exception encountered while submitting trace", e) + e.printStackTrace + e + } + .to(Sink.ignore) + .run() + } + + override def submit(span: Span): Unit = queue.offer(span) + +} diff --git a/src/main/scala/Span.scala b/src/main/scala/Span.scala new file mode 100644 index 0000000..fcd52b6 --- /dev/null +++ b/src/main/scala/Span.scala @@ -0,0 +1,24 @@ +package xyz.driver.tracing + +import java.util.UUID +import java.time._ + +case class Span( + name: String, + traceId: UUID = UUID.randomUUID(), + spanId: UUID = UUID.randomUUID(), + parentSpanId: Option[UUID] = None, + labels: Map[String, String] = Map.empty, + startTime: Instant = Instant.now, + endTime: Instant = Instant.now +) { + + def start(clock: Clock = Clock.systemUTC): Span = + this.copy(startTime = clock.instant()) + def end(clock: Clock = Clock.systemUTC): Span = + this.copy(endTime = clock.instant()) + + def withLabels(extraLabels: (String, String)*) = + this.copy(labels = this.labels ++ extraLabels) + +} diff --git a/src/main/scala/Tracer.scala b/src/main/scala/Tracer.scala new file mode 100644 index 0000000..00a1fbf --- /dev/null +++ b/src/main/scala/Tracer.scala @@ -0,0 +1,5 @@ +package xyz.driver.tracing + +trait Tracer { + def submit(span: Span): Unit +} diff --git a/src/main/scala/TracingDirectives.scala b/src/main/scala/TracingDirectives.scala new file mode 100644 index 0000000..c55ac40 --- /dev/null +++ b/src/main/scala/TracingDirectives.scala @@ -0,0 +1,87 @@ +package xyz.driver.tracing + +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers._ +import akka.http.scaladsl.server._ +import akka.http.scaladsl.server.Directives._ +import java.util.UUID +import scala.util.Random +import java.time._ +import scala.concurrent._ +import scala.collection.immutable.Seq + +trait TracingDirectives { + import TracingDirectives._ + + def optionalTraceContext: Directive1[Option[TraceContext]] = + extractRequest.map { req => + TraceContext.fromHeaders(req.headers) + } + + def withTraceContext(ctx: TraceContext): Directive0 = + mapRequest(req => req.withHeaders(ctx.headers)) + + def trace(name: String, labels: Map[String, String] = Map.empty)( + implicit tracer: Tracer): Directive0 = + optionalTraceContext.flatMap { + case parent => + val span: Span = parent match { + case None => // no parent span, create new trace + Span( + name = name, + labels = labels + ) + case Some(TraceContext(traceId, parentSpanId)) => + Span( + name = name, + traceId = traceId, + parentSpanId = parentSpanId, + labels = labels + ) + } + + withTraceContext(TraceContext.fromSpan(span)) & mapRouteResult { res => + tracer.submit(span.end()) + res + } + } + + /* + def span2(name2: String, tracer: Tracer): Directive0 = { + val f: RouteResult ⇒ RouteResult = ??? + Directive { inner ⇒ ctx ⇒ + inner(())(ctx).map(f)(ctx.executionContext) + } + } + */ + +} + +object TracingDirectives { + + case class TraceContext(traceId: UUID, parentSpanId: Option[UUID]) { + import TraceContext._ + + def headers: Seq[HttpHeader] = + Seq(RawHeader(TraceHeaderName, traceId.toString)) ++ + parentSpanId.toSeq.map(id => RawHeader(SpanHeaderName, id.toString)) + } + object TraceContext { + val TraceHeaderName = "Tracing-Trace-Id" + val SpanHeaderName = "Tracing-Span-Id" + + def fromHeaders(headers: Seq[HttpHeader]): Option[TraceContext] = { + val traceId = headers + .find(_.name == TraceHeaderName) + .map(_.value) + .map(UUID.fromString) + val parentSpanId = + headers.find(_.name == SpanHeaderName).map(_.value).map(UUID.fromString) + traceId.map { tid => + TraceContext(tid, parentSpanId) + } + } + def fromSpan(span: Span) = TraceContext(span.traceId, Some(span.spanId)) + } + +} diff --git a/src/main/scala/google/OAuth2.scala b/src/main/scala/google/OAuth2.scala new file mode 100644 index 0000000..43811c4 --- /dev/null +++ b/src/main/scala/google/OAuth2.scala @@ -0,0 +1,108 @@ +package xyz.driver.tracing +package google + +import akka.stream.scaladsl._ +import akka.stream._ +import akka.stream.stage._ +import akka.http.scaladsl._ +import akka.http.scaladsl.unmarshalling._ +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers._ +import akka.util.ByteString +import java.time._ +import java.nio.file._ +import spray.json._ +import spray.json.DefaultJsonProtocol._ +import pdi.jwt._ +import scala.concurrent._ +import scala.concurrent.duration._ + +object OAuth2 { + + private case class ServiceAccount(project_id: String, + private_key: String, + client_email: String) + private implicit val serviceAccountFormat = jsonFormat3(ServiceAccount) + + private case class GrantResponse(access_token: String, expires_in: Int) + private implicit val grantResponseFormat = jsonFormat2(GrantResponse) + + /** Request a new access token for the given scopes. + * + * Implements the OAUTH2 workflow as descried here + * https://developers.google.com/identity/protocols/OAuth2ServiceAccount + */ + def requestAccessToken( + http: HttpExt, + serviceAccountFile: Path, + scopes: Seq[String] + )(implicit ec: ExecutionContext, + mat: Materializer): Future[(Instant, String)] = + Future { + val now = Instant.now.toEpochMilli / 1000 + val credentials = + (new String(Files.readAllBytes(serviceAccountFile), "utf-8")).parseJson + .convertTo[ServiceAccount] + + val claim = JwtClaim( + issuer = Some(credentials.client_email), + expiration = Some(now + 60 * 60), + issuedAt = Some(now) + ) + + ("aud", "https://www.googleapis.com/oauth2/v4/token") + + ("scope", scopes.mkString(" ")) + + Jwt.encode(claim, credentials.private_key, JwtAlgorithm.RS256) + } flatMap { assertion => + http.singleRequest( + HttpRequest( + method = HttpMethods.POST, + uri = "https://www.googleapis.com/oauth2/v4/token" + ).withEntity( + FormData( + "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", + "assertion" -> assertion + ).toEntity)) + } flatMap { response => + Unmarshal(response).to[GrantResponse] + } map { grant => + (Instant.now.plusSeconds(grant.expires_in), grant.access_token) + } + + /** Flow that injects access tokens into a stream of HTTP requests. + * + * Re-authentication happens transparently when access tokens expire. Note: + * in case an access token gets revoked, this flow needs to be restarted in + * order to re-authenticate + */ + def authenticatedFlow(http: HttpExt, + serviceAccountFile: Path, + scopes: Seq[String], + graceSeconds: Int = 300)( + implicit ec: ExecutionContext, + mat: Materializer): Flow[HttpRequest, HttpRequest, _] = + Flow[HttpRequest] + .scanAsync[(HttpRequest, Instant, String)]( + (HttpRequest(), Instant.now, "")) { + case ((_, expiration, accessToken), request) => + if (Instant.now isAfter expiration.minusSeconds(graceSeconds)) { + http.system.log.info("tracing access token expired, refreshing") + requestAccessToken(http, serviceAccountFile, scopes).map { + case (newExpiration, newToken) => + http.system.log.debug("new tracing access token otained") + (request, newExpiration, newToken) + } + } else { + Future.successful((request, expiration, accessToken)) + } + } + .drop(1) // drop initial element + .map { + case (request, _, accessToken) => + request.withHeaders( + RawHeader("Authorization", "Bearer " + accessToken) + ) + } + +} diff --git a/src/main/scala/google/api.scala b/src/main/scala/google/api.scala new file mode 100644 index 0000000..122b695 --- /dev/null +++ b/src/main/scala/google/api.scala @@ -0,0 +1,113 @@ +package xyz.driver.tracing +package google + +import spray.json._ +import spray.json.DefaultJsonProtocol._ +import java.util.UUID +import java.nio.ByteBuffer +import java.time._ +import java.time.format._ + +case class TraceSpan( + spanId: Long, + kind: TraceSpan.SpanKind, + name: String, + startTime: Instant, + endTime: Instant, + parentSpanId: Option[Long], + labels: Map[String, String] +) + +object TraceSpan { + + sealed trait SpanKind + // Unspecified + case object Unspecified extends SpanKind + // Indicates that the span covers server-side handling of an RPC or other remote network request. + case object RpcServer extends SpanKind + // Indicates that the span covers the client-side wrapper around an RPC or other remote request. + case object RpcClient extends SpanKind + + object SpanKind { + implicit val format: JsonFormat[SpanKind] = new JsonFormat[SpanKind] { + override def write(x: SpanKind): JsValue = x match { + case Unspecified => JsString("SPAN_KIND_UNSPECIFIED") + case RpcServer => JsString("RPC_SERVER") + case RpcClient => JsString("RPC_CLIENT") + } + override def read(x: JsValue): SpanKind = x match { + case JsString("SPAN_KIND_UNSPECIFIED") => Unspecified + case JsString("RPC_SERVER") => RpcServer + case JsString("RPC_CLIENT") => RpcClient + case other => + spray.json.deserializationError(s"`$other` is not a valid span kind") + } + } + } + + implicit val instantFormat = new JsonFormat[Instant] { + val formatter = DateTimeFormatter + .ofPattern("yyyy-MM-dd'T'HH:mm:ssXXXZ") + .withZone(ZoneId.of("UTC")) + override def write(x: Instant): JsValue = JsString(formatter.format(x)) + override def read(x: JsValue): Instant = x match { + case JsString(x) => Instant.parse(x) + case other => + spray.json.deserializationError(s"`$other` is not a valid instant") + } + } + + implicit val format: JsonFormat[TraceSpan] = jsonFormat7(TraceSpan.apply) + + def fromSpan(span: Span) = TraceSpan( + span.spanId.getLeastSignificantBits, + Unspecified, + span.name, + span.startTime, + span.endTime, + span.parentSpanId.map(_.getLeastSignificantBits), + span.labels + ) + +} + +case class Trace( + traceId: UUID, + projectId: String = "", + spans: Seq[TraceSpan] = Seq.empty +) + +object Trace { + + implicit val uuidFormat = new JsonFormat[UUID] { + override def write(x: UUID) = { + val buffer = ByteBuffer.allocate(16) + buffer.putLong(x.getMostSignificantBits) + buffer.putLong(x.getLeastSignificantBits) + val array = buffer.array() + val string = new StringBuilder + for (i <- 0 until 16) { + string ++= f"${array(i) & 0xff}%02x" + } + JsString(string.result) + } + override def read(x: JsValue): UUID = x match { + case JsString(str) if str.length == 32 => + val (msb, lsb) = str.splitAt(16) + new UUID(java.lang.Long.decode(msb), java.lang.Long.decode(lsb)) + case JsString(str) => + spray.json.deserializationError( + "128-bit id string must be exactly 32 characters long") + case other => + spray.json.deserializationError("expected 32 character hex string") + } + } + + implicit val format: JsonFormat[Trace] = jsonFormat3(Trace.apply) + +} + +case class Traces(traces: Seq[Trace]) +object Traces { + implicit val format: RootJsonFormat[Traces] = jsonFormat1(Traces.apply) +} diff --git a/src/main/scala/xyz/driver/tracing/Span.scala b/src/main/scala/xyz/driver/tracing/Span.scala deleted file mode 100644 index 2ae640e..0000000 --- a/src/main/scala/xyz/driver/tracing/Span.scala +++ /dev/null @@ -1,24 +0,0 @@ -package xyz.driver.tracing - -import java.util.UUID -import java.time._ - -case class Span( - traceId: UUID, - spanId: UUID, - name: String, - parentSpanId: Option[UUID] = None, - labels: Map[String, String] = Map.empty, - startTime: Instant = Instant.now, - endTime: Instant = Instant.now -) { - - def start(clock: Clock = Clock.systemUTC): Span = - this.copy(startTime = clock.instant()) - def end(clock: Clock = Clock.systemUTC): Span = - this.copy(endTime = clock.instant()) - - def withLabels(extraLabels: (String, String)*) = - this.copy(labels = this.labels ++ extraLabels) - -} diff --git a/src/main/scala/xyz/driver/tracing/Tracer.scala b/src/main/scala/xyz/driver/tracing/Tracer.scala deleted file mode 100644 index 00a1fbf..0000000 --- a/src/main/scala/xyz/driver/tracing/Tracer.scala +++ /dev/null @@ -1,5 +0,0 @@ -package xyz.driver.tracing - -trait Tracer { - def submit(span: Span): Unit -} diff --git a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala b/src/main/scala/xyz/driver/tracing/TracingDirectives.scala deleted file mode 100644 index 9e0237c..0000000 --- a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala +++ /dev/null @@ -1,90 +0,0 @@ -package xyz.driver.tracing - -import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.server._ -import akka.http.scaladsl.server.Directives._ -import java.util.UUID -import scala.util.Random -import java.time._ -import scala.concurrent._ -import scala.collection.immutable.Seq - -trait TracingDirectives { - import TracingDirectives._ - - def optionalTraceContext: Directive1[Option[TraceContext]] = - extractRequest.map { req => - TraceContext.fromHeaders(req.headers) - } - - def withTraceContext(ctx: TraceContext): Directive0 = - mapRequest(req => req.withHeaders(ctx.headers)) - - def trace(name: String, labels: Map[String, String] = Map.empty)( - implicit tracer: Tracer): Directive0 = - optionalTraceContext.flatMap { - case parent => - val span: Span = parent match { - case None => // no parent span, create new trace - Span( - traceId = UUID.randomUUID, - spanId = UUID.randomUUID, - name = name, - labels = labels - ) - case Some(TraceContext(traceId, parentSpanId)) => - Span( - traceId = traceId, - spanId = UUID.randomUUID, - parentSpanId = parentSpanId, - name = name, - labels = labels - ) - } - - withTraceContext(TraceContext.fromSpan(span)) & mapRouteResult { res => - tracer.submit(span.end()) - res - } - } - - /* - def span2(name2: String, tracer: Tracer): Directive0 = { - val f: RouteResult ⇒ RouteResult = ??? - Directive { inner ⇒ ctx ⇒ - inner(())(ctx).map(f)(ctx.executionContext) - } - } - */ - -} - -object TracingDirectives { - - case class TraceContext(traceId: UUID, parentSpanId: Option[UUID]) { - import TraceContext._ - - def headers: Seq[HttpHeader] = - Seq(RawHeader(TraceHeaderName, traceId.toString)) ++ - parentSpanId.toSeq.map(id => RawHeader(SpanHeaderName, id.toString)) - } - object TraceContext { - val TraceHeaderName = "Tracing-Trace-Id" - val SpanHeaderName = "Tracing-Span-Id" - - def fromHeaders(headers: Seq[HttpHeader]): Option[TraceContext] = { - val traceId = headers - .find(_.name == TraceHeaderName) - .map(_.value) - .map(UUID.fromString) - val parentSpanId = - headers.find(_.name == SpanHeaderName).map(_.value).map(UUID.fromString) - traceId.map { tid => - TraceContext(tid, parentSpanId) - } - } - def fromSpan(span: Span) = TraceContext(span.traceId, Some(span.spanId)) - } - -} diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala deleted file mode 100644 index 6a4d4fa..0000000 --- a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala +++ /dev/null @@ -1,70 +0,0 @@ -package xyz.driver.tracing -package google - -import akka.stream._ -import akka.stream.scaladsl._ -import akka.actor.ActorSystem -import akka.http.scaladsl._ -import akka.http.scaladsl.marshalling._ -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ -import akka.http.scaladsl.model._ -import akka.http.scaladsl.server._ -import scala.util.control._ -import scala.concurrent.duration._ -import spray.json.DefaultJsonProtocol._ -import java.util.UUID - -class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000, concurrentConnections: Int = 1)( - implicit system: ActorSystem, - materializer: Materializer) - extends Tracer { - import system.dispatcher - - lazy val connectionPool = Http().superPool[Unit]() - - private val batchingPipeline: Flow[Span, Traces, _] = - Flow[Span] - .groupedWithin(bufferSize, 1.second) - .map { spans => - val traces: Seq[Trace] = spans - .groupBy(_.traceId) - .map { - case (traceId, spans) => - Trace( - traceId, - projectId, - spans.map(span => TraceSpan.fromSpan(span)) - ) - } - .toSeq - Traces(traces) - } - - lazy val queue: SourceQueueWithComplete[Span] = { - Source - .queue[Span](bufferSize, OverflowStrategy.dropNew) - .viaMat(batchingPipeline)(Keep.left) - .mapAsync(concurrentConnections) { (traces: Traces) => - Marshal(traces).to[RequestEntity].map{ entity => - HttpRequest( - HttpMethods.PATCH, - s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces", - entity = entity - ) - } - } - .map(req => (req, ())) - .viaMat(connectionPool)(Keep.left) - .mapError { - case NonFatal(e) => - system.log.warning( - s"Exception encountered while submitting trace: $e") - e - } - .to(Sink.ignore) - .run() - } - - override def submit(span: Span): Unit = queue.offer(span) - -} diff --git a/src/main/scala/xyz/driver/tracing/google/api.scala b/src/main/scala/xyz/driver/tracing/google/api.scala deleted file mode 100644 index 356e3da..0000000 --- a/src/main/scala/xyz/driver/tracing/google/api.scala +++ /dev/null @@ -1,111 +0,0 @@ -package xyz.driver.tracing -package google - -import spray.json._ -import spray.json.DefaultJsonProtocol._ -import java.util.UUID -import java.nio.ByteBuffer -import java.time._ -import java.time.format._ - -case class TraceSpan( - spanId: Long, - kind: TraceSpan.SpanKind, - name: String, - startTime: Instant, - endTime: Instant, - parentSpanId: Option[Long], - labels: Map[String, String] -) - -object TraceSpan { - - sealed trait SpanKind - // Unspecified - case object Unspecified extends SpanKind - // Indicates that the span covers server-side handling of an RPC or other remote network request. - case object RpcServer extends SpanKind - // Indicates that the span covers the client-side wrapper around an RPC or other remote request. - case object RpcClient extends SpanKind - - object SpanKind { - implicit val format: JsonFormat[SpanKind] = new JsonFormat[SpanKind] { - override def write(x: SpanKind): JsValue = x match { - case Unspecified => JsString("SPAN_KIND_UNSPECIFIED") - case RpcServer => JsString("RPC_SERVER") - case RpcClient => JsString("RPC_CLIENT") - } - override def read(x: JsValue): SpanKind = x match { - case JsString("SPAN_KIND_UNSPECIFIED") => Unspecified - case JsString("RPC_SERVER") => RpcServer - case JsString("RPC_CLIENT") => RpcClient - case other => - spray.json.deserializationError(s"`$other` is not a valid span kind") - } - } - } - - implicit val instantFormat = new JsonFormat[Instant] { - val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXXZ") - override def write(x: Instant): JsValue = JsString(formatter.format(x)) - override def read(x: JsValue): Instant = x match { - case JsString(x) => Instant.parse(x) - case other => - spray.json.deserializationError(s"`$other` is not a valid instant") - } - } - - implicit val format: JsonFormat[TraceSpan] = jsonFormat7(TraceSpan.apply) - - def fromSpan(span: Span) = TraceSpan( - span.spanId.getLeastSignificantBits, - Unspecified, - span.name, - span.startTime, - span.endTime, - span.parentSpanId.map(_.getLeastSignificantBits), - span.labels - ) - -} - -case class Trace( - traceId: UUID, - projectId: String = "", - spans: Seq[TraceSpan] = Seq.empty -) - -object Trace { - - implicit val uuidFormat = new JsonFormat[UUID] { - override def write(x: UUID) = { - val buffer = ByteBuffer.allocate(16) - buffer.putLong(x.getMostSignificantBits) - buffer.putLong(x.getLeastSignificantBits) - val array = buffer.array() - val string = new StringBuilder - for (i <- 0 until 16) { - string ++= f"${array(i) & 0xff}%02x" - } - JsString(string.result) - } - override def read(x: JsValue): UUID = x match { - case JsString(str) if str.length == 32 => - val (msb, lsb) = str.splitAt(16) - new UUID(java.lang.Long.decode(msb), java.lang.Long.decode(lsb)) - case JsString(str) => - spray.json.deserializationError( - "128-bit id string must be exactly 32 characters long") - case other => - spray.json.deserializationError("expected 32 character hex string") - } - } - - implicit val format: JsonFormat[Trace] = jsonFormat3(Trace.apply) - -} - -case class Traces(traces: Seq[Trace]) -object Traces { - implicit val format: RootJsonFormat[Traces] = jsonFormat1(Traces.apply) -} -- cgit v1.2.3