aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala')
-rw-r--r--src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala64
1 files changed, 64 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
new file mode 100644
index 0000000..fb8d9e6
--- /dev/null
+++ b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
@@ -0,0 +1,64 @@
+package xyz.driver.tracing
+package google
+
+import akka.stream._
+import akka.stream.scaladsl._
+import akka.actor.ActorSystem
+import akka.http.scaladsl._
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server._
+import scala.util.control._
+import scala.concurrent.duration._
+import java.util.UUID
+
+class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)(
+ implicit system: ActorSystem,
+ materializer: Materializer)
+ extends Tracer {
+
+ lazy val connectionPool = Http().superPool[Unit]()
+
+ private val batchingPipeline: Flow[Span, Traces, _] =
+ Flow[Span]
+ .groupedWithin(bufferSize, 1.second)
+ .map { spans =>
+ val traces: Seq[Trace] = spans
+ .groupBy(_.traceId)
+ .map {
+ case (traceId, spans) =>
+ Trace(
+ traceId,
+ projectId,
+ spans.map(span => TraceSpan.fromSpan(span))
+ )
+ }
+ .toSeq
+ Traces(traces)
+ }
+
+ lazy val queue: SourceQueueWithComplete[Span] = {
+ Source
+ .queue[Span](bufferSize, OverflowStrategy.dropNew)
+ .viaMat(batchingPipeline)(Keep.left)
+ .map { traces =>
+ val entity = HttpEntity
+ val req = HttpRequest(
+ HttpMethods.PATCH,
+ s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces"
+ )
+ (req, ())
+ }
+ .viaMat(connectionPool)(Keep.left)
+ .mapError {
+ case NonFatal(e) =>
+ system.log.warning(
+ s"Exception encountered while submitting trace: $e")
+ e
+ }
+ .to(Sink.ignore)
+ .run()
+ }
+
+ override def submit(span: Span): Unit = queue.offer(span)
+
+}