aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala')
-rw-r--r--src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala75
1 files changed, 75 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
new file mode 100644
index 0000000..7fed3c7
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
@@ -0,0 +1,75 @@
+package xyz.driver.core.trace
+
+import akka.http.scaladsl.model.HttpRequest
+import com.google.cloud.trace.core._
+import com.google.cloud.trace.sink.TraceSink
+import com.google.cloud.trace.v1.TraceSinkV1
+import com.google.cloud.trace.v1.consumer.{SizedBufferingTraceConsumer, TraceConsumer}
+import com.google.cloud.trace.v1.producer.TraceProducer
+import com.google.cloud.trace.v1.util.RoughTraceSizer
+import com.google.cloud.trace.{SpanContextHandler, SpanContextHandlerTracer, Tracer}
+
+import scala.compat.java8.OptionConverters._
+
+final class GoogleStackdriverTraceWithConsumer(projectId: String,
+ appName: String,
+ appEnvironment: String,
+ traceConsumer: TraceConsumer)
+ extends GoogleServiceTracer {
+
+ private val traceProducer: TraceProducer = new TraceProducer()
+ private val threadSafeBufferingTraceConsumer =
+ new SizedBufferingTraceConsumer(traceConsumer, new RoughTraceSizer(), 100)
+
+ private val traceSink: TraceSink = new TraceSinkV1(projectId, traceProducer, threadSafeBufferingTraceConsumer)
+
+ private val spanContextFactory: SpanContextFactory = new SpanContextFactory(
+ new ConstantTraceOptionsFactory(true, true))
+ private val timestampFactory: TimestampFactory = new JavaTimestampFactory()
+
+ override def startSpan(httpRequest: HttpRequest): TracerSpanPayload = {
+ val parentHeaderOption: Option[akka.http.javadsl.model.HttpHeader] =
+ httpRequest.getHeader(TracingHeaderKey).asScala
+ val (spanContext: SpanContext, spanKind: SpanKind) = parentHeaderOption.fold {
+ (spanContextFactory.initialContext(), SpanKind.RPC_CLIENT)
+ } { parentHeader =>
+ (spanContextFactory.fromHeader(parentHeader.value()), SpanKind.RPC_SERVER)
+ }
+
+ val contextHandler: SpanContextHandler = new SimpleSpanContextHandler(spanContext)
+ val httpMethod = httpRequest.method.value
+ val httpHost = httpRequest.uri.authority.host.address()
+ val httpRelative = httpRequest.uri.toRelative.toString()
+ val tracer: Tracer = new SpanContextHandlerTracer(traceSink, contextHandler, spanContextFactory, timestampFactory)
+ // Create a span using the given timestamps.
+ // https://cloud.google.com/trace/docs/reference/v1/rest/v1/projects.traces#TraceSpan
+ val spanOptions: StartSpanOptions = (new StartSpanOptions()).setSpanKind(spanKind)
+
+ val spanLabelBuilder = Labels
+ .builder()
+ .add("/http/method", httpMethod)
+ .add("/http/url", httpRelative)
+ .add("/http/host", httpHost)
+ .add("/component", appName)
+ .add("/environment", appEnvironment)
+
+ parentHeaderOption.foreach { parentHeader =>
+ spanLabelBuilder.add("/span/parent", parentHeader.value())
+ }
+
+ // The cloudTrace analysis reporting UI makes it easy to query by name prefix.
+ // this spanName gives us the ability to grab things that are specific to a particular UDE/env, as well as all
+ // endpoints in that service, as well as a particular endpoint in a particular environment/service.
+ val spanName: String = s"($appEnvironment->$appName)$httpRelative"
+
+ val context: TraceContext = tracer.startSpan(spanName, spanOptions)
+ tracer.annotateSpan(context, spanLabelBuilder.build())
+ GoogleStackdriverTraceSpan(tracer, context)
+ }
+
+ override def endSpan(span: TracerSpanPayload): Unit = {
+ span.tracer.endSpan(span.context)
+ threadSafeBufferingTraceConsumer.flush() // flush out the thread safe buffer
+ }
+
+}