diff options
author | Jakob Odersky <jakob@driver.xyz> | 2017-09-29 13:41:52 -0700 |
---|---|---|
committer | Jakob Odersky <jakob@driver.xyz> | 2017-09-29 13:44:10 -0700 |
commit | 03503f6be54cbde22261c56919f5ef45ebdcb21d (patch) | |
tree | fd85cb40860674bd404476346b9ed8ea84a1cfe1 /src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala | |
download | tracing-03503f6be54cbde22261c56919f5ef45ebdcb21d.tar.gz tracing-03503f6be54cbde22261c56919f5ef45ebdcb21d.tar.bz2 tracing-03503f6be54cbde22261c56919f5ef45ebdcb21d.zip |
Initial commit
Diffstat (limited to 'src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala')
-rw-r--r-- | src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala new file mode 100644 index 0000000..fb8d9e6 --- /dev/null +++ b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala @@ -0,0 +1,64 @@ +package xyz.driver.tracing +package google + +import akka.stream._ +import akka.stream.scaladsl._ +import akka.actor.ActorSystem +import akka.http.scaladsl._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server._ +import scala.util.control._ +import scala.concurrent.duration._ +import java.util.UUID + +class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)( + implicit system: ActorSystem, + materializer: Materializer) + extends Tracer { + + lazy val connectionPool = Http().superPool[Unit]() + + private val batchingPipeline: Flow[Span, Traces, _] = + Flow[Span] + .groupedWithin(bufferSize, 1.second) + .map { spans => + val traces: Seq[Trace] = spans + .groupBy(_.traceId) + .map { + case (traceId, spans) => + Trace( + traceId, + projectId, + spans.map(span => TraceSpan.fromSpan(span)) + ) + } + .toSeq + Traces(traces) + } + + lazy val queue: SourceQueueWithComplete[Span] = { + Source + .queue[Span](bufferSize, OverflowStrategy.dropNew) + .viaMat(batchingPipeline)(Keep.left) + .map { traces => + val entity = HttpEntity + val req = HttpRequest( + HttpMethods.PATCH, + s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces" + ) + (req, ()) + } + .viaMat(connectionPool)(Keep.left) + .mapError { + case NonFatal(e) => + system.log.warning( + s"Exception encountered while submitting trace: $e") + e + } + .to(Sink.ignore) + .run() + } + + override def submit(span: Span): Unit = queue.offer(span) + +} |