aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/trace
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/trace')
-rw-r--r--src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala11
-rw-r--r--src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala20
-rw-r--r--src/main/scala/xyz/driver/core/trace/LoggingTrace.scala5
3 files changed, 29 insertions, 7 deletions
diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala
index ce84f9d..fe4bb5c 100644
--- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala
+++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala
@@ -15,7 +15,8 @@ final class GoogleStackdriverTrace(projectId: String,
appName: String,
appEnvironment: String,
log: Logger,
- bufferSize: Int = 10)
+ bufferSize: Int = 1024,
+ scheduledDelay: Int = 15)
extends GoogleServiceTracer {
// initialize our various tracking storage systems
@@ -38,7 +39,13 @@ final class GoogleStackdriverTrace(projectId: String,
}
private val googleServiceTracer =
- new GoogleStackdriverTraceWithConsumer(projectId, appName, appEnvironment, traceConsumer, log, bufferSize)
+ new GoogleStackdriverTraceWithConsumer(projectId,
+ appName,
+ appEnvironment,
+ traceConsumer,
+ log,
+ bufferSize,
+ scheduledDelay)
override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan =
googleServiceTracer.startSpan(httpRequest)
diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
index cd9170a..ca1eab2 100644
--- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
+++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
@@ -4,10 +4,12 @@ import akka.http.scaladsl.model.HttpRequest
import com.google.cloud.trace.core._
import com.google.cloud.trace.sink.TraceSink
import com.google.cloud.trace.v1.TraceSinkV1
-import com.google.cloud.trace.v1.consumer.{SizedBufferingTraceConsumer, TraceConsumer}
+import com.google.cloud.trace.v1.consumer.{ScheduledBufferingTraceConsumer, TraceConsumer}
import com.google.cloud.trace.v1.producer.TraceProducer
import com.google.cloud.trace.{SpanContextHandler, SpanContextHandlerTracer, Tracer}
import com.typesafe.scalalogging.Logger
+import java.util.concurrent.ScheduledThreadPoolExecutor
+import java.util.concurrent.TimeUnit
import scala.compat.java8.OptionConverters._
@@ -16,13 +18,25 @@ final class GoogleStackdriverTraceWithConsumer(projectId: String,
appEnvironment: String,
traceConsumer: TraceConsumer,
log: Logger,
- bufferSize: Int)
+ bufferSize: Int,
+ scheduledDelay: Int)
extends GoogleServiceTracer {
private val traceProducer: TraceProducer = new TraceProducer()
// use a UnitTraceSizer so the interpretation of bufferSize is # of spans to hold in memory prior to flushing
+
+ // now set up the scheduled executor for time-based flushing of our tracing
+ // see https://goo.gl/HrPLuC
+ private val scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1) // this is just used for it's timing execution
+ scheduledThreadPoolExecutor.setKeepAliveTime(scheduledDelay.toLong, TimeUnit.SECONDS)
+ scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true)
+ private val executorService = scheduledThreadPoolExecutor
private val threadSafeBufferingTraceConsumer = new ExceptionLoggingFlushableTraceConsumer(
- new SizedBufferingTraceConsumer(traceConsumer, new UnitTraceSizer(), bufferSize),
+ new ScheduledBufferingTraceConsumer(traceConsumer,
+ new UnitTraceSizer(),
+ bufferSize,
+ scheduledDelay,
+ executorService),
log
)
diff --git a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala
index cd920f0..6a13708 100644
--- a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala
+++ b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala
@@ -4,7 +4,7 @@ import akka.http.scaladsl.model.HttpRequest
import com.google.cloud.trace.v1.consumer.TraceConsumer
import com.typesafe.scalalogging.Logger
-final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, bufferSize: Int)
+final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, bufferSize: Int, scheduledDelay: Int)
extends GoogleServiceTracer {
private val traceConsumer: TraceConsumer = new LoggingTraceConsumer(log)
@@ -14,7 +14,8 @@ final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, b
appEnvironment,
traceConsumer,
log,
- bufferSize
+ bufferSize,
+ scheduledDelay
)
override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan =