From cacf57dc4af5b0104b28b6417715c660aef613f3 Mon Sep 17 00:00:00 2001 From: John St John Date: Wed, 4 Oct 2017 10:39:21 -0700 Subject: Follow google example to implement a scheduled flusing trace consumer by default --- .../trace/GoogleStackdriverTraceWithConsumer.scala | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) (limited to 'src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala') diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala index cd9170a..ca1eab2 100644 --- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala @@ -4,10 +4,12 @@ import akka.http.scaladsl.model.HttpRequest import com.google.cloud.trace.core._ import com.google.cloud.trace.sink.TraceSink import com.google.cloud.trace.v1.TraceSinkV1 -import com.google.cloud.trace.v1.consumer.{SizedBufferingTraceConsumer, TraceConsumer} +import com.google.cloud.trace.v1.consumer.{ScheduledBufferingTraceConsumer, TraceConsumer} import com.google.cloud.trace.v1.producer.TraceProducer import com.google.cloud.trace.{SpanContextHandler, SpanContextHandlerTracer, Tracer} import com.typesafe.scalalogging.Logger +import java.util.concurrent.ScheduledThreadPoolExecutor +import java.util.concurrent.TimeUnit import scala.compat.java8.OptionConverters._ @@ -16,13 +18,25 @@ final class GoogleStackdriverTraceWithConsumer(projectId: String, appEnvironment: String, traceConsumer: TraceConsumer, log: Logger, - bufferSize: Int) + bufferSize: Int, + scheduledDelay: Int) extends GoogleServiceTracer { private val traceProducer: TraceProducer = new TraceProducer() // use a UnitTraceSizer so the interpretation of bufferSize is # of spans to hold in memory prior to flushing + + // now set up the scheduled executor for time-based flushing of our tracing + // see https://goo.gl/HrPLuC + private val scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1) // this is just used for it's timing execution + scheduledThreadPoolExecutor.setKeepAliveTime(scheduledDelay.toLong, TimeUnit.SECONDS) + scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true) + private val executorService = scheduledThreadPoolExecutor private val threadSafeBufferingTraceConsumer = new ExceptionLoggingFlushableTraceConsumer( - new SizedBufferingTraceConsumer(traceConsumer, new UnitTraceSizer(), bufferSize), + new ScheduledBufferingTraceConsumer(traceConsumer, + new UnitTraceSizer(), + bufferSize, + scheduledDelay, + executorService), log ) -- cgit v1.2.3