aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@driver.xyz>2018-09-11 13:35:03 -0700
committerJakob Odersky <jakob@driver.xyz>2018-09-12 14:17:39 -0700
commit8a21ee2028b5f11fe0b9148078b49e4000937202 (patch)
tree3a17b99676e729988ca41ba61199b33d2afbbcbe
parent8b2cd70a7189775cb23dafbbd3670b8050dd28dc (diff)
downloaddriver-core-8a21ee2028b5f11fe0b9148078b49e4000937202.tar.gz
driver-core-8a21ee2028b5f11fe0b9148078b49e4000937202.tar.bz2
driver-core-8a21ee2028b5f11fe0b9148078b49e4000937202.zip
Rearchitect reporting stack to mixin-based structure
-rw-r--r--src/main/scala/xyz/driver/core/app/DriverApp.scala3
-rw-r--r--src/main/scala/xyz/driver/core/app/init.scala9
-rw-r--r--src/main/scala/xyz/driver/core/init/AkkaBootable.scala7
-rw-r--r--src/main/scala/xyz/driver/core/init/CloudServices.scala28
-rw-r--r--src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala14
-rw-r--r--src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala17
-rw-r--r--src/main/scala/xyz/driver/core/reporting/NoReporter.scala8
-rw-r--r--src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala8
-rw-r--r--src/main/scala/xyz/driver/core/reporting/Reporter.scala46
-rw-r--r--src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala32
-rw-r--r--src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala36
-rw-r--r--src/main/scala/xyz/driver/core/reporting/SpanContext.scala4
12 files changed, 123 insertions, 89 deletions
diff --git a/src/main/scala/xyz/driver/core/app/DriverApp.scala b/src/main/scala/xyz/driver/core/app/DriverApp.scala
index 50e471c..03da72b 100644
--- a/src/main/scala/xyz/driver/core/app/DriverApp.scala
+++ b/src/main/scala/xyz/driver/core/app/DriverApp.scala
@@ -91,8 +91,7 @@ class DriverApp(
def route: Route = versionRt ~ healthRoute
}
val combinedRoute =
- Route.seal(
- modules.map(_.route).foldLeft(basicRoutes.routeWithDefaults)(_ ~ _) ~ swaggerRoute.route ~ defaultOptionsRoute)
+ Route.seal(modules.map(_.route).foldLeft(basicRoutes.routeWithDefaults)(_ ~ _) ~ swaggerRoute.route)
(extractHost & extractClientIP & trace(tracer) & handleRejections(authenticationRejectionHandler)) {
case (origin, ip) =>
ctx =>
diff --git a/src/main/scala/xyz/driver/core/app/init.scala b/src/main/scala/xyz/driver/core/app/init.scala
index 767fd0b..bb7a798 100644
--- a/src/main/scala/xyz/driver/core/app/init.scala
+++ b/src/main/scala/xyz/driver/core/app/init.scala
@@ -10,7 +10,7 @@ import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory
import xyz.driver.core.logging.MdcExecutionContext
-import xyz.driver.core.reporting.{NoTraceReporter, ScalaLoggerLike}
+import xyz.driver.core.reporting.{ScalaLoggingCompat, NoTraceReporter}
import xyz.driver.core.time.provider.TimeProvider
import xyz.driver.tracing.{GoogleTracer, NoTracer, Tracer}
@@ -26,7 +26,7 @@ object init {
val gitHeadCommit: scala.Option[String]
}
- case class ApplicationContext(config: Config, clock: Clock, reporter: ScalaLoggerLike) {
+ case class ApplicationContext(config: Config, clock: Clock, reporter: ScalaLoggingCompat) {
val time: TimeProvider = clock
}
@@ -90,7 +90,10 @@ object init {
ApplicationContext(
config = getEnvironmentSpecificConfig(),
clock = Clock.systemUTC(),
- new NoTraceReporter(Logger(LoggerFactory.getLogger(classOf[DriverApp]))))
+ new NoTraceReporter with ScalaLoggingCompat {
+ val logger = Logger(LoggerFactory.getLogger(classOf[DriverApp]))
+ }
+ )
def createDefaultApplication(
modules: Seq[Module],
diff --git a/src/main/scala/xyz/driver/core/init/AkkaBootable.scala b/src/main/scala/xyz/driver/core/init/AkkaBootable.scala
index 5e0b36c..df6611e 100644
--- a/src/main/scala/xyz/driver/core/init/AkkaBootable.scala
+++ b/src/main/scala/xyz/driver/core/init/AkkaBootable.scala
@@ -14,7 +14,7 @@ import com.typesafe.config.Config
import kamon.Kamon
import kamon.statsd.StatsDReporter
import kamon.system.SystemMetrics
-import xyz.driver.core.reporting.{NoTraceReporter, Reporter, ScalaLoggerLike, SpanContext}
+import xyz.driver.core.reporting.{NoTraceReporter, Reporter, ScalaLoggingCompat, SpanContext}
import xyz.driver.core.rest.HttpRestServiceTransport
import scala.concurrent.duration._
@@ -119,7 +119,10 @@ trait AkkaBootable {
*
* @group utilities
*/
- def reporter: Reporter with ScalaLoggerLike = new NoTraceReporter(ScalaLoggerLike.defaultScalaLogger(json = false))
+ def reporter: Reporter with ScalaLoggingCompat =
+ new Reporter with NoTraceReporter with ScalaLoggingCompat {
+ val logger = ScalaLoggingCompat.defaultScalaLogger(json = false)
+ }
/** Top-level application configuration.
*
diff --git a/src/main/scala/xyz/driver/core/init/CloudServices.scala b/src/main/scala/xyz/driver/core/init/CloudServices.scala
index 31faf4c..a6a477a 100644
--- a/src/main/scala/xyz/driver/core/init/CloudServices.scala
+++ b/src/main/scala/xyz/driver/core/init/CloudServices.scala
@@ -3,14 +3,12 @@ package init
import java.nio.file.Paths
-import xyz.driver.core.messaging.{GoogleBus, QueueBus, StreamBus}
+import xyz.driver.core.messaging.{CreateOnDemand, GoogleBus, QueueBus, StreamBus}
import xyz.driver.core.reporting._
-import xyz.driver.core.reporting.ScalaLoggerLike.defaultScalaLogger
import xyz.driver.core.rest.{DnsDiscovery, ServiceDescriptor}
import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage, GcsBlobStorage}
import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext
/** Mixin trait that provides essential cloud utilities. */
trait CloudServices extends AkkaBootable { self =>
@@ -21,9 +19,8 @@ trait CloudServices extends AkkaBootable { self =>
def platform: Platform = Platform.current
/** Service discovery for the current platform.
- *
*/
- private lazy val discovery = {
+ private lazy val discovery: DnsDiscovery = {
def getOverrides(): Map[String, String] = {
val block = config.getObject("services.dev-overrides").unwrapped().asScala
for ((key, value) <- block) yield {
@@ -33,7 +30,9 @@ trait CloudServices extends AkkaBootable { self =>
}.toMap
val overrides = platform match {
case Platform.Dev => getOverrides()
- case _ => Map.empty[String, String] // TODO we may want to provide a way to override deployed services as well
+ // TODO: currently, deployed services must be configured via Kubernetes DNS resolver. Maybe we may want to
+ // provide a way to override deployed services as well.
+ case _ => Map.empty[String, String]
}
new DnsDiscovery(clientTransport, overrides)
}
@@ -47,13 +46,17 @@ trait CloudServices extends AkkaBootable { self =>
* A potential fix would be to make the log format independent of the platform, and always log
* as JSON for example.
*/
- override lazy val reporter: Reporter with ScalaLoggerLike = {
+ override lazy val reporter: Reporter with ScalaLoggingCompat = {
Console.println("determining platform") // scalastyle:ignore
val r = platform match {
case p @ Platform.GoogleCloud(_, _) =>
- new GoogleReporter(p.credentials, p.namespace, defaultScalaLogger(true))
+ new GoogleReporter(p.credentials, p.namespace) with ScalaLoggingCompat with GoogleMdcLogger {
+ val logger = ScalaLoggingCompat.defaultScalaLogger(true)
+ }
case Platform.Dev =>
- new NoTraceReporter(defaultScalaLogger(false))
+ new NoTraceReporter with ScalaLoggingCompat {
+ val logger = ScalaLoggingCompat.defaultScalaLogger(false)
+ }
}
r.info(s"application started on platform '${platform}'")(SpanContext.fresh())
r
@@ -78,11 +81,10 @@ trait CloudServices extends AkkaBootable { self =>
* @group utilities
*/
def messageBus: StreamBus = platform match {
- case Platform.GoogleCloud(keyfile, namespace) => GoogleBus.fromKeyfile(keyfile, namespace)
+ case p @ Platform.GoogleCloud(_, namespace) =>
+ new GoogleBus(p.credentials, namespace) with StreamBus with CreateOnDemand
case Platform.Dev =>
- new QueueBus()(self.system) with StreamBus {
- override def executionContext: ExecutionContext = self.executionContext
- }
+ new QueueBus()(self.system) with StreamBus
}
}
diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala b/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala
new file mode 100644
index 0000000..f5c41cf
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala
@@ -0,0 +1,14 @@
+package xyz.driver.core
+package reporting
+
+import org.slf4j.MDC
+
+trait GoogleMdcLogger extends Reporter { self: GoogleReporter =>
+
+ abstract override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])(
+ implicit ctx: SpanContext): Unit = {
+ MDC.put("trace", s"projects/${credentials.getProjectId}/traces/${ctx.traceId}")
+ super.log(severity, message, reason)
+ }
+
+}
diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala
index d4d20a4..14c4954 100644
--- a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala
+++ b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala
@@ -1,5 +1,6 @@
package xyz.driver.core
package reporting
+
import java.security.Signature
import java.time.Instant
import java.util
@@ -9,8 +10,6 @@ import akka.stream.scaladsl.{Flow, RestartSink, Sink, Source, SourceQueueWithCom
import akka.stream.{Materializer, OverflowStrategy}
import com.google.auth.oauth2.ServiceAccountCredentials
import com.softwaremill.sttp._
-import com.typesafe.scalalogging.Logger
-import org.slf4j.MDC
import spray.json.DerivedJsonProtocol._
import spray.json._
import xyz.driver.core.reporting.Reporter.CausalRelation
@@ -18,22 +17,21 @@ import xyz.driver.core.reporting.Reporter.CausalRelation
import scala.async.Async._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Random, Success, Try}
import scala.util.control.NonFatal
+import scala.util.{Failure, Random, Success, Try}
/** A reporter that collects traces and submits them to
* [[https://cloud.google.com/trace/docs/reference/v2/rest/ Google's Stackdriver Trace API]].
*/
class GoogleReporter(
- credentials: ServiceAccountCredentials,
+ val credentials: ServiceAccountCredentials,
namespace: String,
- val logger: Logger,
buffer: Int = GoogleReporter.DefaultBufferSize,
interval: FiniteDuration = GoogleReporter.DefaultInterval)(
implicit client: SttpBackend[Future, _],
mat: Materializer,
ec: ExecutionContext
-) extends Reporter with ScalaLoggerLike {
+) extends Reporter {
import GoogleReporter._
private val getToken: () => Future[String] = Refresh.every(55.minutes) {
@@ -156,10 +154,9 @@ class GoogleReporter(
}
override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])(
- implicit ctx: SpanContext): Unit = {
- MDC.put("trace", s"projects/${credentials.getProjectId}/traces/${ctx.traceId}")
- super.log(severity, message, reason)
- }
+ implicit ctx: SpanContext): Unit =
+ sys.error("Submitting logs directly to GCP is " +
+ "currently not supported. Messages should go to stdout.") // TODO: attach logs to traces and submit them directly
}
diff --git a/src/main/scala/xyz/driver/core/reporting/NoReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoReporter.scala
new file mode 100644
index 0000000..c1c81f4
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/reporting/NoReporter.scala
@@ -0,0 +1,8 @@
+package xyz.driver.core
+package reporting
+
+trait NoReporter extends NoTraceReporter {
+ override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])(
+ implicit ctx: SpanContext): Unit = ()
+}
+object NoReporter extends NoReporter
diff --git a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala
index 9179f42..b49cfda 100644
--- a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala
+++ b/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala
@@ -1,18 +1,20 @@
package xyz.driver.core
package reporting
-import com.typesafe.scalalogging.Logger
-
import scala.concurrent.Future
-class NoTraceReporter(val logger: Logger) extends Reporter with ScalaLoggerLike {
+/** A reporter mixin that does not emit traces. */
+trait NoTraceReporter extends Reporter {
+
override def traceWithOptionalParent[A](
name: String,
tags: Map[String, String],
parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => A): A = op(SpanContext.fresh())
+
override def traceWithOptionalParentAsync[A](
name: String,
tags: Map[String, String],
parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => Future[A]): Future[A] =
op(SpanContext.fresh())
+
}
diff --git a/src/main/scala/xyz/driver/core/reporting/Reporter.scala b/src/main/scala/xyz/driver/core/reporting/Reporter.scala
index 57e2310..469084c 100644
--- a/src/main/scala/xyz/driver/core/reporting/Reporter.scala
+++ b/src/main/scala/xyz/driver/core/reporting/Reporter.scala
@@ -1,8 +1,5 @@
-package xyz.driver.core.reporting
-
-import com.typesafe.scalalogging.Logger
-import org.slf4j.helpers.NOPLogger
-import xyz.driver.core.reporting.Reporter.{CausalRelation, Severity}
+package xyz.driver.core
+package reporting
import scala.concurrent.Future
@@ -13,7 +10,7 @@ import scala.concurrent.Future
* that led to a particular message. In synchronous systems, execution contexts can easily be determined by an
* external observer, and, as such, do not need to be propagated explicitly to sub-components (e.g. a stack trace on
* the JVM shows all relevant information). In asynchronous systems and especially distributed systems however,
- * execution contexts are not easily determined by an external observer and hence need to be explcictly passed across
+ * execution contexts are not easily determined by an external observer and hence need to be explicitly passed across
* service boundaries.
*
* This reporter provides tracing and logging utilities that explicitly require references to execution contexts
@@ -44,14 +41,15 @@ import scala.concurrent.Future
* }
* }}}
*
- * Note that computing traces may be a more expensive operation than traditional logging frameworks provide (in terms
- * of memory and processing). It should be used in interesting and actionable code paths.
+ * '''Note that computing traces may be a more expensive operation than traditional logging frameworks provide (in terms
+ * of memory and processing). It should be used in interesting and actionable code paths.'''
*
* @define rootWarning Note: the idea of the reporting framework is to pass along references to traces as
* implicit parameters. This method should only be used for top-level traces when no parent
* traces are available.
*/
trait Reporter {
+ import Reporter._
def traceWithOptionalParent[A](
name: String,
@@ -66,7 +64,7 @@ trait Reporter {
*
* $rootWarning
*/
- def traceRoot[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => A): A =
+ final def traceRoot[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => A): A =
traceWithOptionalParent(
name,
tags,
@@ -79,7 +77,8 @@ trait Reporter {
*
* @see traceRoot
*/
- def traceRootAsync[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => Future[A]): Future[A] =
+ final def traceRootAsync[A](name: String, tags: Map[String, String] = Map.empty)(
+ op: SpanContext => Future[A]): Future[A] =
traceWithOptionalParentAsync(
name,
tags,
@@ -102,8 +101,11 @@ trait Reporter {
* @tparam A Return type of the operation.
* @return The value of the child operation.
*/
- def trace[A](name: String, tags: Map[String, String] = Map.empty, relation: CausalRelation = CausalRelation.Child)(
- op: /* implicit (gotta wait for Scala 3) */ SpanContext => A)(implicit ctx: SpanContext): A =
+ final def trace[A](
+ name: String,
+ tags: Map[String, String] = Map.empty,
+ relation: CausalRelation = CausalRelation.Child)(op: /* implicit (gotta wait for Scala 3) */ SpanContext => A)(
+ implicit ctx: SpanContext): A =
traceWithOptionalParent(
name,
tags,
@@ -117,7 +119,7 @@ trait Reporter {
*
* @see trace
*/
- def traceAsync[A](
+ final def traceAsync[A](
name: String,
tags: Map[String, String] = Map.empty,
relation: CausalRelation = CausalRelation.Child)(
@@ -132,31 +134,29 @@ trait Reporter {
def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit
/** Log a debug message. */
- def debug(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, None)
- def debug(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
+ final def debug(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, None)
+ final def debug(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
log(Severity.Debug, message, Some(reason))
/** Log an informational message. */
- def info(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, None)
- def info(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
+ final def info(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, None)
+ final def info(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
log(Severity.Informational, message, Some(reason))
/** Log a warning message. */
- def warn(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, None)
- def warn(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
+ final def warn(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, None)
+ final def warn(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
log(Severity.Warning, message, Some(reason))
/** Log an error message. */
- def error(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, None)
- def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
+ final def error(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, None)
+ final def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
log(Severity.Error, message, Some(reason))
}
object Reporter {
- val NoReporter: Reporter = new NoTraceReporter(Logger.apply(NOPLogger.NOP_LOGGER))
-
/** A relation in cause.
*
* Corresponds to
diff --git a/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala b/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala
deleted file mode 100644
index eda81fb..0000000
--- a/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package xyz.driver.core.reporting
-import com.typesafe.scalalogging.Logger
-
-trait ScalaLoggerLike extends Reporter {
-
- def logger: Logger
-
- override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])(
- implicit ctx: SpanContext): Unit = severity match {
- case Reporter.Severity.Debug => logger.debug(message, reason.orNull)
- case Reporter.Severity.Informational => logger.info(message, reason.orNull)
- case Reporter.Severity.Warning => logger.warn(message, reason.orNull)
- case Reporter.Severity.Error => logger.error(message, reason.orNull)
- }
-
-}
-
-object ScalaLoggerLike {
- import scala.language.implicitConversions
-
- def defaultScalaLogger(json: Boolean = false): Logger = {
- if (json) {
- System.setProperty("logback.configurationFile", "deployed-logback.xml")
- } else {
- System.setProperty("logback.configurationFile", "logback.xml")
- }
- Logger.apply("application")
- }
-
- implicit def toScalaLogger(reporter: ScalaLoggerLike): Logger = reporter.logger
-
-}
diff --git a/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala b/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala
new file mode 100644
index 0000000..0ff5574
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala
@@ -0,0 +1,36 @@
+package xyz.driver.core
+package reporting
+
+import com.typesafe.scalalogging.{Logger => ScalaLogger}
+
+/** Compatibility mixin for reporters, that enables implicit conversions to scala-logging loggers. */
+trait ScalaLoggingCompat extends Reporter {
+ import Reporter.Severity
+
+ def logger: ScalaLogger
+
+ override def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit =
+ severity match {
+ case Severity.Debug => logger.debug(message, reason.orNull)
+ case Severity.Informational => logger.info(message, reason.orNull)
+ case Severity.Warning => logger.warn(message, reason.orNull)
+ case Severity.Error => logger.error(message, reason.orNull)
+ }
+
+}
+
+object ScalaLoggingCompat {
+ import scala.language.implicitConversions
+
+ def defaultScalaLogger(json: Boolean = false): ScalaLogger = {
+ if (json) {
+ System.setProperty("logback.configurationFile", "deployed-logback.xml")
+ } else {
+ System.setProperty("logback.configurationFile", "logback.xml")
+ }
+ ScalaLogger.apply("application")
+ }
+
+ implicit def toScalaLogger(logger: ScalaLoggingCompat): ScalaLogger = logger.logger
+
+}
diff --git a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala b/src/main/scala/xyz/driver/core/reporting/SpanContext.scala
index ecc2ba3..04a822d 100644
--- a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala
+++ b/src/main/scala/xyz/driver/core/reporting/SpanContext.scala
@@ -1,10 +1,12 @@
package xyz.driver.core
package reporting
+
import scala.util.Random
case class SpanContext private[core] (traceId: String, spanId: String)
+
object SpanContext {
- def fresh() = SpanContext(
+ def fresh(): SpanContext = SpanContext(
f"${Random.nextLong()}%016x${Random.nextLong()}%016x",
f"${Random.nextLong()}%016x"
)