From 2c08b51411be5b0cce57f876377fcd52bee99990 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 1 Oct 2017 20:24:02 -0700 Subject: Flatten file hierarchy and implement OAUTH2 authentication --- src/main/scala/GoogleTracer.scala | 84 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 src/main/scala/GoogleTracer.scala (limited to 'src/main/scala/GoogleTracer.scala') diff --git a/src/main/scala/GoogleTracer.scala b/src/main/scala/GoogleTracer.scala new file mode 100644 index 0000000..5b3c9df --- /dev/null +++ b/src/main/scala/GoogleTracer.scala @@ -0,0 +1,84 @@ +package xyz.driver.tracing + +import google._ +import java.nio.file.Path +import akka.stream._ +import akka.stream.scaladsl._ +import akka.actor.ActorSystem +import akka.http.scaladsl._ +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server._ +import scala.util.control._ +import scala.concurrent.duration._ +import spray.json.DefaultJsonProtocol._ +import java.util.UUID + +class GoogleTracer(projectId: String, + serviceAccountFile: Path, + bufferSize: Int = 1000, + concurrentConnections: Int = 1)(implicit system: ActorSystem, + materializer: Materializer) + extends Tracer { + + import system.dispatcher + + lazy val connectionPool = Http().superPool[Unit]() + + private val batchingPipeline: Flow[Span, Traces, _] = + Flow[Span] + .groupedWithin(bufferSize, 1.second) + .map { spans => + val traces: Seq[Trace] = spans + .groupBy(_.traceId) + .map { + case (traceId, spans) => + Trace( + traceId, + projectId, + spans.map(span => TraceSpan.fromSpan(span)) + ) + } + .toSeq + Traces(traces) + } + + lazy val queue: SourceQueueWithComplete[Span] = { + Source + .queue[Span](bufferSize, OverflowStrategy.dropNew) + .log("debug") + .viaMat(batchingPipeline)(Keep.left) + .mapAsync(concurrentConnections) { (traces: Traces) => + println(traces) + Marshal(traces).to[RequestEntity].map { entity => + HttpRequest( + method = HttpMethods.PATCH, + uri = + s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces", + entity = entity + ) + } + } + .viaMat( + OAuth2.authenticatedFlow( + Http(), + serviceAccountFile, + Seq( + "https://www.googleapis.com/auth/trace.append" + )))(Keep.left) + .map(req => (req, ())) + .viaMat(connectionPool)(Keep.left) + .mapError { + case NonFatal(e) => + system.log.error(s"Exception encountered while submitting trace", e) + e.printStackTrace + e + } + .to(Sink.ignore) + .run() + } + + override def submit(span: Span): Unit = queue.offer(span) + +} -- cgit v1.2.3