aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala')
-rw-r--r--src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala20
1 files changed, 17 insertions, 3 deletions
diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
index cd9170a..ca1eab2 100644
--- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
+++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
@@ -4,10 +4,12 @@ import akka.http.scaladsl.model.HttpRequest
import com.google.cloud.trace.core._
import com.google.cloud.trace.sink.TraceSink
import com.google.cloud.trace.v1.TraceSinkV1
-import com.google.cloud.trace.v1.consumer.{SizedBufferingTraceConsumer, TraceConsumer}
+import com.google.cloud.trace.v1.consumer.{ScheduledBufferingTraceConsumer, TraceConsumer}
import com.google.cloud.trace.v1.producer.TraceProducer
import com.google.cloud.trace.{SpanContextHandler, SpanContextHandlerTracer, Tracer}
import com.typesafe.scalalogging.Logger
+import java.util.concurrent.ScheduledThreadPoolExecutor
+import java.util.concurrent.TimeUnit
import scala.compat.java8.OptionConverters._
@@ -16,13 +18,25 @@ final class GoogleStackdriverTraceWithConsumer(projectId: String,
appEnvironment: String,
traceConsumer: TraceConsumer,
log: Logger,
- bufferSize: Int)
+ bufferSize: Int,
+ scheduledDelay: Int)
extends GoogleServiceTracer {
private val traceProducer: TraceProducer = new TraceProducer()
// use a UnitTraceSizer so the interpretation of bufferSize is # of spans to hold in memory prior to flushing
+
+ // now set up the scheduled executor for time-based flushing of our tracing
+ // see https://goo.gl/HrPLuC
+ private val scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1) // this is just used for it's timing execution
+ scheduledThreadPoolExecutor.setKeepAliveTime(scheduledDelay.toLong, TimeUnit.SECONDS)
+ scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true)
+ private val executorService = scheduledThreadPoolExecutor
private val threadSafeBufferingTraceConsumer = new ExceptionLoggingFlushableTraceConsumer(
- new SizedBufferingTraceConsumer(traceConsumer, new UnitTraceSizer(), bufferSize),
+ new ScheduledBufferingTraceConsumer(traceConsumer,
+ new UnitTraceSizer(),
+ bufferSize,
+ scheduledDelay,
+ executorService),
log
)