aboutsummaryrefslogblamecommitdiff
path: root/src/main/scala/GoogleTracer.scala
blob: f01b070f28869ae7c0d839b0d77b8b2798cb35dc (plain) (tree)
1
2
3
4
5
6
7
8
9
                          
 
                         
 
                

                             
                                                                  
                                       
                                 



                                  
                         
                                  
                           
                   
 


                                            

                                                                                
                                                                              
                    
 
                          




                                                       
                                             














                                                           
                                                                            


                                                        
                                                            
                                                         
                      


                                                                                   


                           
       






                                                          
                            


                                        
                                                                              

           
                                    
            


                                                           




                          
 
package xyz.driver.tracing

import java.nio.file.Path

import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.stream._
import akka.stream.scaladsl._
import xyz.driver.tracing.google._

import scala.concurrent._
import scala.concurrent.duration._
import scala.util.control._
import scala.util._

class GoogleTracer(projectId: String,
                   serviceAccountFile: Path,
                   bufferSize: Int = 1000,
                   bufferDelay: FiniteDuration = 15.seconds,
                   concurrentConnections: Int = 1)(implicit system: ActorSystem,
                                                   materializer: Materializer)
    extends Tracer {

  import system.dispatcher

  lazy val connectionPool = Http().superPool[Unit]()

  private val batchingPipeline: Flow[Span, Traces, _] =
    Flow[Span]
      .groupedWithin(bufferSize, bufferDelay)
      .map { spans =>
        val traces: Seq[Trace] = spans
          .groupBy(_.traceId)
          .map {
            case (traceId, spans) =>
              Trace(
                traceId,
                projectId,
                spans.map(span => TraceSpan.fromSpan(span))
              )
          }
          .toSeq
        Traces(traces)
      }

  lazy val (queue: SourceQueueWithComplete[Span], completed: Future[Done]) =
    Source
      .queue[Span](bufferSize, OverflowStrategy.dropNew)
      .viaMat(batchingPipeline)(Keep.left)
      .mapAsync(concurrentConnections) { (traces: Traces) =>
        Marshal(traces).to[RequestEntity].map { entity =>
          HttpRequest(
            method = HttpMethods.PATCH,
            uri =
              s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces",
            entity = entity
          )
        }
      }
      .viaMat(
        OAuth2.authenticatedFlow(
          Http(),
          serviceAccountFile,
          Seq(
            "https://www.googleapis.com/auth/trace.append"
          )))(Keep.left)
      .map(req => (req, ()))
      .viaMat(connectionPool)(Keep.left)
      .mapError {
        case NonFatal(e) =>
          system.log.error(e, s"Exception encountered while submitting trace")
          e
      }
      .toMat(Sink.ignore)(Keep.both)
      .run()

  override def submit(span: Span): Unit = queue.offer(span)

  override def close() = {
    queue.complete()
    completed
  }

}