aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
blob: fb8d9e6c6b00ee43a5436a03f819f87bc39c73fa (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package xyz.driver.tracing
package google

import akka.stream._
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.server._
import scala.util.control._
import scala.concurrent.duration._
import java.util.UUID

class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)(
    implicit system: ActorSystem,
    materializer: Materializer)
    extends Tracer {

  lazy val connectionPool = Http().superPool[Unit]()

  private val batchingPipeline: Flow[Span, Traces, _] =
    Flow[Span]
      .groupedWithin(bufferSize, 1.second)
      .map { spans =>
        val traces: Seq[Trace] = spans
          .groupBy(_.traceId)
          .map {
            case (traceId, spans) =>
              Trace(
                traceId,
                projectId,
                spans.map(span => TraceSpan.fromSpan(span))
              )
          }
          .toSeq
        Traces(traces)
      }

  lazy val queue: SourceQueueWithComplete[Span] = {
    Source
      .queue[Span](bufferSize, OverflowStrategy.dropNew)
      .viaMat(batchingPipeline)(Keep.left)
      .map { traces =>
        val entity = HttpEntity
        val req = HttpRequest(
          HttpMethods.PATCH,
          s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces"
        )
        (req, ())
      }
      .viaMat(connectionPool)(Keep.left)
      .mapError {
        case NonFatal(e) =>
          system.log.warning(
            s"Exception encountered while submitting trace: $e")
          e
      }
      .to(Sink.ignore)
      .run()
  }

  override def submit(span: Span): Unit = queue.offer(span)

}