diff options
author | Jakob Odersky <jakob@driver.xyz> | 2017-10-01 20:24:02 -0700 |
---|---|---|
committer | Jakob Odersky <jakob@driver.xyz> | 2017-10-01 20:24:29 -0700 |
commit | 2c08b51411be5b0cce57f876377fcd52bee99990 (patch) | |
tree | fee56a21e6a5f3d2dd459b51e5afb355db6c7f02 /src/main/scala/GoogleTracer.scala | |
parent | 5bd947dd08eec1d6c64a9549566f3ce0e91fe74f (diff) | |
download | tracing-2c08b51411be5b0cce57f876377fcd52bee99990.tar.gz tracing-2c08b51411be5b0cce57f876377fcd52bee99990.tar.bz2 tracing-2c08b51411be5b0cce57f876377fcd52bee99990.zip |
Flatten file hierarchy and implement OAUTH2 authentication
Diffstat (limited to 'src/main/scala/GoogleTracer.scala')
-rw-r--r-- | src/main/scala/GoogleTracer.scala | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/src/main/scala/GoogleTracer.scala b/src/main/scala/GoogleTracer.scala new file mode 100644 index 0000000..5b3c9df --- /dev/null +++ b/src/main/scala/GoogleTracer.scala @@ -0,0 +1,84 @@ +package xyz.driver.tracing + +import google._ +import java.nio.file.Path +import akka.stream._ +import akka.stream.scaladsl._ +import akka.actor.ActorSystem +import akka.http.scaladsl._ +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server._ +import scala.util.control._ +import scala.concurrent.duration._ +import spray.json.DefaultJsonProtocol._ +import java.util.UUID + +class GoogleTracer(projectId: String, + serviceAccountFile: Path, + bufferSize: Int = 1000, + concurrentConnections: Int = 1)(implicit system: ActorSystem, + materializer: Materializer) + extends Tracer { + + import system.dispatcher + + lazy val connectionPool = Http().superPool[Unit]() + + private val batchingPipeline: Flow[Span, Traces, _] = + Flow[Span] + .groupedWithin(bufferSize, 1.second) + .map { spans => + val traces: Seq[Trace] = spans + .groupBy(_.traceId) + .map { + case (traceId, spans) => + Trace( + traceId, + projectId, + spans.map(span => TraceSpan.fromSpan(span)) + ) + } + .toSeq + Traces(traces) + } + + lazy val queue: SourceQueueWithComplete[Span] = { + Source + .queue[Span](bufferSize, OverflowStrategy.dropNew) + .log("debug") + .viaMat(batchingPipeline)(Keep.left) + .mapAsync(concurrentConnections) { (traces: Traces) => + println(traces) + Marshal(traces).to[RequestEntity].map { entity => + HttpRequest( + method = HttpMethods.PATCH, + uri = + s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces", + entity = entity + ) + } + } + .viaMat( + OAuth2.authenticatedFlow( + Http(), + serviceAccountFile, + Seq( + "https://www.googleapis.com/auth/trace.append" + )))(Keep.left) + .map(req => (req, ())) + .viaMat(connectionPool)(Keep.left) + .mapError { + case NonFatal(e) => + system.log.error(s"Exception encountered while submitting trace", e) + e.printStackTrace + e + } + .to(Sink.ignore) + .run() + } + + override def submit(span: Span): Unit = queue.offer(span) + +} |