package xyz.driver.tracing
import java.nio.file.Path
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.stream._
import akka.stream.scaladsl._
import xyz.driver.tracing.google._
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.control._
import scala.util._
class GoogleTracer(projectId: String,
serviceAccountFile: Path,
bufferSize: Int = 1000,
bufferDelay: FiniteDuration = 15.seconds,
concurrentConnections: Int = 1)(implicit system: ActorSystem,
materializer: Materializer)
extends Tracer {
import system.dispatcher
lazy val connectionPool = Http().superPool[Unit]()
private val batchingPipeline: Flow[Span, Traces, _] =
Flow[Span]
.groupedWithin(bufferSize, bufferDelay)
.map { spans =>
val traces: Seq[Trace] = spans
.groupBy(_.traceId)
.map {
case (traceId, spans) =>
Trace(
traceId,
projectId,
spans.map(span => TraceSpan.fromSpan(span))
)
}
.toSeq
Traces(traces)
}
lazy val (queue: SourceQueueWithComplete[Span], completed: Future[Done]) =
Source
.queue[Span](bufferSize, OverflowStrategy.dropNew)
.viaMat(batchingPipeline)(Keep.left)
.mapAsync(concurrentConnections) { (traces: Traces) =>
Marshal(traces).to[RequestEntity].map { entity =>
HttpRequest(
method = HttpMethods.PATCH,
uri =
s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces",
entity = entity
)
}
}
.viaMat(
OAuth2.authenticatedFlow(
Http(),
serviceAccountFile,
Seq(
"https://www.googleapis.com/auth/trace.append"
)))(Keep.left)
.map(req => (req, ()))
.viaMat(connectionPool)(Keep.left)
.mapError {
case NonFatal(e) =>
system.log.error(e, s"Exception encountered while submitting trace")
e
}
.toMat(Sink.ignore)(Keep.both)
.run()
override def submit(span: Span): Unit = queue.offer(span)
override def close() = {
queue.complete()
completed
}
}