package xyz.driver.tracing package google import akka.stream._ import akka.stream.scaladsl._ import akka.actor.ActorSystem import akka.http.scaladsl._ import akka.http.scaladsl.model._ import akka.http.scaladsl.server._ import scala.util.control._ import scala.concurrent.duration._ import java.util.UUID class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)( implicit system: ActorSystem, materializer: Materializer) extends Tracer { lazy val connectionPool = Http().superPool[Unit]() private val batchingPipeline: Flow[Span, Traces, _] = Flow[Span] .groupedWithin(bufferSize, 1.second) .map { spans => val traces: Seq[Trace] = spans .groupBy(_.traceId) .map { case (traceId, spans) => Trace( traceId, projectId, spans.map(span => TraceSpan.fromSpan(span)) ) } .toSeq Traces(traces) } lazy val queue: SourceQueueWithComplete[Span] = { Source .queue[Span](bufferSize, OverflowStrategy.dropNew) .viaMat(batchingPipeline)(Keep.left) .map { traces => val entity = HttpEntity val req = HttpRequest( HttpMethods.PATCH, s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces" ) (req, ()) } .viaMat(connectionPool)(Keep.left) .mapError { case NonFatal(e) => system.log.warning( s"Exception encountered while submitting trace: $e") e } .to(Sink.ignore) .run() } override def submit(span: Span): Unit = queue.offer(span) }