aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/GoogleTracer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/GoogleTracer.scala')
-rw-r--r--src/main/scala/GoogleTracer.scala84
1 files changed, 84 insertions, 0 deletions
diff --git a/src/main/scala/GoogleTracer.scala b/src/main/scala/GoogleTracer.scala
new file mode 100644
index 0000000..5b3c9df
--- /dev/null
+++ b/src/main/scala/GoogleTracer.scala
@@ -0,0 +1,84 @@
+package xyz.driver.tracing
+
+import google._
+import java.nio.file.Path
+import akka.stream._
+import akka.stream.scaladsl._
+import akka.actor.ActorSystem
+import akka.http.scaladsl._
+import akka.http.scaladsl.marshalling._
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server._
+import scala.util.control._
+import scala.concurrent.duration._
+import spray.json.DefaultJsonProtocol._
+import java.util.UUID
+
+class GoogleTracer(projectId: String,
+ serviceAccountFile: Path,
+ bufferSize: Int = 1000,
+ concurrentConnections: Int = 1)(implicit system: ActorSystem,
+ materializer: Materializer)
+ extends Tracer {
+
+ import system.dispatcher
+
+ lazy val connectionPool = Http().superPool[Unit]()
+
+ private val batchingPipeline: Flow[Span, Traces, _] =
+ Flow[Span]
+ .groupedWithin(bufferSize, 1.second)
+ .map { spans =>
+ val traces: Seq[Trace] = spans
+ .groupBy(_.traceId)
+ .map {
+ case (traceId, spans) =>
+ Trace(
+ traceId,
+ projectId,
+ spans.map(span => TraceSpan.fromSpan(span))
+ )
+ }
+ .toSeq
+ Traces(traces)
+ }
+
+ lazy val queue: SourceQueueWithComplete[Span] = {
+ Source
+ .queue[Span](bufferSize, OverflowStrategy.dropNew)
+ .log("debug")
+ .viaMat(batchingPipeline)(Keep.left)
+ .mapAsync(concurrentConnections) { (traces: Traces) =>
+ println(traces)
+ Marshal(traces).to[RequestEntity].map { entity =>
+ HttpRequest(
+ method = HttpMethods.PATCH,
+ uri =
+ s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces",
+ entity = entity
+ )
+ }
+ }
+ .viaMat(
+ OAuth2.authenticatedFlow(
+ Http(),
+ serviceAccountFile,
+ Seq(
+ "https://www.googleapis.com/auth/trace.append"
+ )))(Keep.left)
+ .map(req => (req, ()))
+ .viaMat(connectionPool)(Keep.left)
+ .mapError {
+ case NonFatal(e) =>
+ system.log.error(s"Exception encountered while submitting trace", e)
+ e.printStackTrace
+ e
+ }
+ .to(Sink.ignore)
+ .run()
+ }
+
+ override def submit(span: Span): Unit = queue.offer(span)
+
+}