diff options
Diffstat (limited to 'src/main/scala/xyz/driver/tracing/google')
-rw-r--r-- | src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala | 64 | ||||
-rw-r--r-- | src/main/scala/xyz/driver/tracing/google/api.scala | 111 |
2 files changed, 175 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala new file mode 100644 index 0000000..fb8d9e6 --- /dev/null +++ b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala @@ -0,0 +1,64 @@ +package xyz.driver.tracing +package google + +import akka.stream._ +import akka.stream.scaladsl._ +import akka.actor.ActorSystem +import akka.http.scaladsl._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server._ +import scala.util.control._ +import scala.concurrent.duration._ +import java.util.UUID + +class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)( + implicit system: ActorSystem, + materializer: Materializer) + extends Tracer { + + lazy val connectionPool = Http().superPool[Unit]() + + private val batchingPipeline: Flow[Span, Traces, _] = + Flow[Span] + .groupedWithin(bufferSize, 1.second) + .map { spans => + val traces: Seq[Trace] = spans + .groupBy(_.traceId) + .map { + case (traceId, spans) => + Trace( + traceId, + projectId, + spans.map(span => TraceSpan.fromSpan(span)) + ) + } + .toSeq + Traces(traces) + } + + lazy val queue: SourceQueueWithComplete[Span] = { + Source + .queue[Span](bufferSize, OverflowStrategy.dropNew) + .viaMat(batchingPipeline)(Keep.left) + .map { traces => + val entity = HttpEntity + val req = HttpRequest( + HttpMethods.PATCH, + s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces" + ) + (req, ()) + } + .viaMat(connectionPool)(Keep.left) + .mapError { + case NonFatal(e) => + system.log.warning( + s"Exception encountered while submitting trace: $e") + e + } + .to(Sink.ignore) + .run() + } + + override def submit(span: Span): Unit = queue.offer(span) + +} diff --git a/src/main/scala/xyz/driver/tracing/google/api.scala b/src/main/scala/xyz/driver/tracing/google/api.scala new file mode 100644 index 0000000..2dcac92 --- /dev/null +++ b/src/main/scala/xyz/driver/tracing/google/api.scala @@ -0,0 +1,111 @@ +package xyz.driver.tracing +package google + +import spray.json._ +import spray.json.DefaultJsonProtocol._ +import java.util.UUID +import java.nio.ByteBuffer +import java.time._ +import java.time.format._ + +case class TraceSpan( + spanId: Long, + kind: TraceSpan.SpanKind, + name: String, + startTime: Instant, + endTime: Instant, + parentSpanId: Option[Long], + labels: Map[String, String] +) + +object TraceSpan { + + sealed trait SpanKind + // Unspecified + case object Unspecified extends SpanKind + // Indicates that the span covers server-side handling of an RPC or other remote network request. + case object RpcServer extends SpanKind + // Indicates that the span covers the client-side wrapper around an RPC or other remote request. + case object RpcClient extends SpanKind + + object SpanKind { + implicit val format: JsonFormat[SpanKind] = new JsonFormat[SpanKind] { + override def write(x: SpanKind): JsValue = x match { + case Unspecified => JsString("SPAN_KIND_UNSPECIFIED") + case RpcServer => JsString("RPC_SERVER") + case RpcClient => JsString("RPC_CLIENT") + } + override def read(x: JsValue): SpanKind = x match { + case JsString("SPAN_KIND_UNSPECIFIED") => Unspecified + case JsString("RPC_SERVER") => RpcServer + case JsString("RPC_CLIENT") => RpcClient + case other => + spray.json.deserializationError(s"`$other` is not a valid span kind") + } + } + } + + implicit val instantFormat = new JsonFormat[Instant] { + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXXZ") + override def write(x: Instant): JsValue = JsString(formatter.format(x)) + override def read(x: JsValue): Instant = x match { + case JsString(x) => Instant.parse(x) + case other => + spray.json.deserializationError(s"`$other` is not a valid instant") + } + } + + implicit val format: JsonFormat[TraceSpan] = jsonFormat7(TraceSpan.apply) + + def fromSpan(span: Span) = TraceSpan( + span.spanId.getLeastSignificantBits, + Unspecified, + span.name, + span.startTime, + span.endTime, + span.parentSpanId.map(_.getLeastSignificantBits), + span.labels + ) + +} + +case class Trace( + traceId: UUID, + projectId: String = "", + spans: Seq[TraceSpan] = Seq.empty +) + +object Trace { + + implicit val uuidFormat = new JsonFormat[UUID] { + override def write(x: UUID) = { + val buffer = ByteBuffer.allocate(16) + buffer.putLong(x.getMostSignificantBits) + buffer.putLong(x.getLeastSignificantBits) + val array = buffer.array() + val string = new StringBuilder + for (i <- 0 until 16) { + string ++= f"${array(i) & 0xff}%02x" + } + JsString(string.result) + } + override def read(x: JsValue): UUID = x match { + case JsString(str) if str.length == 32 => + val (msb, lsb) = str.splitAt(16) + new UUID(java.lang.Long.decode(msb), java.lang.Long.decode(lsb)) + case JsString(str) => + spray.json.deserializationError( + "128-bit id string must be exactly 32 characters long") + case other => + spray.json.deserializationError("expected 32 character hex string") + } + } + + implicit val format: JsonFormat[Trace] = jsonFormat3(Trace.apply) + +} + +case class Traces(traces: Seq[Trace]) +object Traces { + implicit val format: JsonFormat[Traces] = jsonFormat1(Traces.apply) +} |