From 8a21ee2028b5f11fe0b9148078b49e4000937202 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 11 Sep 2018 13:35:03 -0700 Subject: Rearchitect reporting stack to mixin-based structure --- src/main/scala/xyz/driver/core/app/DriverApp.scala | 3 +- src/main/scala/xyz/driver/core/app/init.scala | 9 +++-- .../scala/xyz/driver/core/init/AkkaBootable.scala | 7 +++- .../scala/xyz/driver/core/init/CloudServices.scala | 28 +++++++------ .../driver/core/reporting/GoogleMdcLogger.scala | 14 +++++++ .../xyz/driver/core/reporting/GoogleReporter.scala | 17 ++++---- .../xyz/driver/core/reporting/NoReporter.scala | 8 ++++ .../driver/core/reporting/NoTraceReporter.scala | 8 ++-- .../scala/xyz/driver/core/reporting/Reporter.scala | 46 +++++++++++----------- .../driver/core/reporting/ScalaLoggerLike.scala | 32 --------------- .../driver/core/reporting/ScalaLoggingCompat.scala | 36 +++++++++++++++++ .../xyz/driver/core/reporting/SpanContext.scala | 4 +- 12 files changed, 123 insertions(+), 89 deletions(-) create mode 100644 src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala create mode 100644 src/main/scala/xyz/driver/core/reporting/NoReporter.scala delete mode 100644 src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala create mode 100644 src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala diff --git a/src/main/scala/xyz/driver/core/app/DriverApp.scala b/src/main/scala/xyz/driver/core/app/DriverApp.scala index 50e471c..03da72b 100644 --- a/src/main/scala/xyz/driver/core/app/DriverApp.scala +++ b/src/main/scala/xyz/driver/core/app/DriverApp.scala @@ -91,8 +91,7 @@ class DriverApp( def route: Route = versionRt ~ healthRoute } val combinedRoute = - Route.seal( - modules.map(_.route).foldLeft(basicRoutes.routeWithDefaults)(_ ~ _) ~ swaggerRoute.route ~ defaultOptionsRoute) + Route.seal(modules.map(_.route).foldLeft(basicRoutes.routeWithDefaults)(_ ~ _) ~ swaggerRoute.route) (extractHost & extractClientIP & trace(tracer) & handleRejections(authenticationRejectionHandler)) { case (origin, ip) => ctx => diff --git a/src/main/scala/xyz/driver/core/app/init.scala b/src/main/scala/xyz/driver/core/app/init.scala index 767fd0b..bb7a798 100644 --- a/src/main/scala/xyz/driver/core/app/init.scala +++ b/src/main/scala/xyz/driver/core/app/init.scala @@ -10,7 +10,7 @@ import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.Logger import org.slf4j.LoggerFactory import xyz.driver.core.logging.MdcExecutionContext -import xyz.driver.core.reporting.{NoTraceReporter, ScalaLoggerLike} +import xyz.driver.core.reporting.{ScalaLoggingCompat, NoTraceReporter} import xyz.driver.core.time.provider.TimeProvider import xyz.driver.tracing.{GoogleTracer, NoTracer, Tracer} @@ -26,7 +26,7 @@ object init { val gitHeadCommit: scala.Option[String] } - case class ApplicationContext(config: Config, clock: Clock, reporter: ScalaLoggerLike) { + case class ApplicationContext(config: Config, clock: Clock, reporter: ScalaLoggingCompat) { val time: TimeProvider = clock } @@ -90,7 +90,10 @@ object init { ApplicationContext( config = getEnvironmentSpecificConfig(), clock = Clock.systemUTC(), - new NoTraceReporter(Logger(LoggerFactory.getLogger(classOf[DriverApp])))) + new NoTraceReporter with ScalaLoggingCompat { + val logger = Logger(LoggerFactory.getLogger(classOf[DriverApp])) + } + ) def createDefaultApplication( modules: Seq[Module], diff --git a/src/main/scala/xyz/driver/core/init/AkkaBootable.scala b/src/main/scala/xyz/driver/core/init/AkkaBootable.scala index 5e0b36c..df6611e 100644 --- a/src/main/scala/xyz/driver/core/init/AkkaBootable.scala +++ b/src/main/scala/xyz/driver/core/init/AkkaBootable.scala @@ -14,7 +14,7 @@ import com.typesafe.config.Config import kamon.Kamon import kamon.statsd.StatsDReporter import kamon.system.SystemMetrics -import xyz.driver.core.reporting.{NoTraceReporter, Reporter, ScalaLoggerLike, SpanContext} +import xyz.driver.core.reporting.{NoTraceReporter, Reporter, ScalaLoggingCompat, SpanContext} import xyz.driver.core.rest.HttpRestServiceTransport import scala.concurrent.duration._ @@ -119,7 +119,10 @@ trait AkkaBootable { * * @group utilities */ - def reporter: Reporter with ScalaLoggerLike = new NoTraceReporter(ScalaLoggerLike.defaultScalaLogger(json = false)) + def reporter: Reporter with ScalaLoggingCompat = + new Reporter with NoTraceReporter with ScalaLoggingCompat { + val logger = ScalaLoggingCompat.defaultScalaLogger(json = false) + } /** Top-level application configuration. * diff --git a/src/main/scala/xyz/driver/core/init/CloudServices.scala b/src/main/scala/xyz/driver/core/init/CloudServices.scala index 31faf4c..a6a477a 100644 --- a/src/main/scala/xyz/driver/core/init/CloudServices.scala +++ b/src/main/scala/xyz/driver/core/init/CloudServices.scala @@ -3,14 +3,12 @@ package init import java.nio.file.Paths -import xyz.driver.core.messaging.{GoogleBus, QueueBus, StreamBus} +import xyz.driver.core.messaging.{CreateOnDemand, GoogleBus, QueueBus, StreamBus} import xyz.driver.core.reporting._ -import xyz.driver.core.reporting.ScalaLoggerLike.defaultScalaLogger import xyz.driver.core.rest.{DnsDiscovery, ServiceDescriptor} import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage, GcsBlobStorage} import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext /** Mixin trait that provides essential cloud utilities. */ trait CloudServices extends AkkaBootable { self => @@ -21,9 +19,8 @@ trait CloudServices extends AkkaBootable { self => def platform: Platform = Platform.current /** Service discovery for the current platform. - * */ - private lazy val discovery = { + private lazy val discovery: DnsDiscovery = { def getOverrides(): Map[String, String] = { val block = config.getObject("services.dev-overrides").unwrapped().asScala for ((key, value) <- block) yield { @@ -33,7 +30,9 @@ trait CloudServices extends AkkaBootable { self => }.toMap val overrides = platform match { case Platform.Dev => getOverrides() - case _ => Map.empty[String, String] // TODO we may want to provide a way to override deployed services as well + // TODO: currently, deployed services must be configured via Kubernetes DNS resolver. Maybe we may want to + // provide a way to override deployed services as well. + case _ => Map.empty[String, String] } new DnsDiscovery(clientTransport, overrides) } @@ -47,13 +46,17 @@ trait CloudServices extends AkkaBootable { self => * A potential fix would be to make the log format independent of the platform, and always log * as JSON for example. */ - override lazy val reporter: Reporter with ScalaLoggerLike = { + override lazy val reporter: Reporter with ScalaLoggingCompat = { Console.println("determining platform") // scalastyle:ignore val r = platform match { case p @ Platform.GoogleCloud(_, _) => - new GoogleReporter(p.credentials, p.namespace, defaultScalaLogger(true)) + new GoogleReporter(p.credentials, p.namespace) with ScalaLoggingCompat with GoogleMdcLogger { + val logger = ScalaLoggingCompat.defaultScalaLogger(true) + } case Platform.Dev => - new NoTraceReporter(defaultScalaLogger(false)) + new NoTraceReporter with ScalaLoggingCompat { + val logger = ScalaLoggingCompat.defaultScalaLogger(false) + } } r.info(s"application started on platform '${platform}'")(SpanContext.fresh()) r @@ -78,11 +81,10 @@ trait CloudServices extends AkkaBootable { self => * @group utilities */ def messageBus: StreamBus = platform match { - case Platform.GoogleCloud(keyfile, namespace) => GoogleBus.fromKeyfile(keyfile, namespace) + case p @ Platform.GoogleCloud(_, namespace) => + new GoogleBus(p.credentials, namespace) with StreamBus with CreateOnDemand case Platform.Dev => - new QueueBus()(self.system) with StreamBus { - override def executionContext: ExecutionContext = self.executionContext - } + new QueueBus()(self.system) with StreamBus } } diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala b/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala new file mode 100644 index 0000000..f5c41cf --- /dev/null +++ b/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala @@ -0,0 +1,14 @@ +package xyz.driver.core +package reporting + +import org.slf4j.MDC + +trait GoogleMdcLogger extends Reporter { self: GoogleReporter => + + abstract override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( + implicit ctx: SpanContext): Unit = { + MDC.put("trace", s"projects/${credentials.getProjectId}/traces/${ctx.traceId}") + super.log(severity, message, reason) + } + +} diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala index d4d20a4..14c4954 100644 --- a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala +++ b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala @@ -1,5 +1,6 @@ package xyz.driver.core package reporting + import java.security.Signature import java.time.Instant import java.util @@ -9,8 +10,6 @@ import akka.stream.scaladsl.{Flow, RestartSink, Sink, Source, SourceQueueWithCom import akka.stream.{Materializer, OverflowStrategy} import com.google.auth.oauth2.ServiceAccountCredentials import com.softwaremill.sttp._ -import com.typesafe.scalalogging.Logger -import org.slf4j.MDC import spray.json.DerivedJsonProtocol._ import spray.json._ import xyz.driver.core.reporting.Reporter.CausalRelation @@ -18,22 +17,21 @@ import xyz.driver.core.reporting.Reporter.CausalRelation import scala.async.Async._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Random, Success, Try} import scala.util.control.NonFatal +import scala.util.{Failure, Random, Success, Try} /** A reporter that collects traces and submits them to * [[https://cloud.google.com/trace/docs/reference/v2/rest/ Google's Stackdriver Trace API]]. */ class GoogleReporter( - credentials: ServiceAccountCredentials, + val credentials: ServiceAccountCredentials, namespace: String, - val logger: Logger, buffer: Int = GoogleReporter.DefaultBufferSize, interval: FiniteDuration = GoogleReporter.DefaultInterval)( implicit client: SttpBackend[Future, _], mat: Materializer, ec: ExecutionContext -) extends Reporter with ScalaLoggerLike { +) extends Reporter { import GoogleReporter._ private val getToken: () => Future[String] = Refresh.every(55.minutes) { @@ -156,10 +154,9 @@ class GoogleReporter( } override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( - implicit ctx: SpanContext): Unit = { - MDC.put("trace", s"projects/${credentials.getProjectId}/traces/${ctx.traceId}") - super.log(severity, message, reason) - } + implicit ctx: SpanContext): Unit = + sys.error("Submitting logs directly to GCP is " + + "currently not supported. Messages should go to stdout.") // TODO: attach logs to traces and submit them directly } diff --git a/src/main/scala/xyz/driver/core/reporting/NoReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoReporter.scala new file mode 100644 index 0000000..c1c81f4 --- /dev/null +++ b/src/main/scala/xyz/driver/core/reporting/NoReporter.scala @@ -0,0 +1,8 @@ +package xyz.driver.core +package reporting + +trait NoReporter extends NoTraceReporter { + override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( + implicit ctx: SpanContext): Unit = () +} +object NoReporter extends NoReporter diff --git a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala index 9179f42..b49cfda 100644 --- a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala +++ b/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala @@ -1,18 +1,20 @@ package xyz.driver.core package reporting -import com.typesafe.scalalogging.Logger - import scala.concurrent.Future -class NoTraceReporter(val logger: Logger) extends Reporter with ScalaLoggerLike { +/** A reporter mixin that does not emit traces. */ +trait NoTraceReporter extends Reporter { + override def traceWithOptionalParent[A]( name: String, tags: Map[String, String], parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => A): A = op(SpanContext.fresh()) + override def traceWithOptionalParentAsync[A]( name: String, tags: Map[String, String], parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => Future[A]): Future[A] = op(SpanContext.fresh()) + } diff --git a/src/main/scala/xyz/driver/core/reporting/Reporter.scala b/src/main/scala/xyz/driver/core/reporting/Reporter.scala index 57e2310..469084c 100644 --- a/src/main/scala/xyz/driver/core/reporting/Reporter.scala +++ b/src/main/scala/xyz/driver/core/reporting/Reporter.scala @@ -1,8 +1,5 @@ -package xyz.driver.core.reporting - -import com.typesafe.scalalogging.Logger -import org.slf4j.helpers.NOPLogger -import xyz.driver.core.reporting.Reporter.{CausalRelation, Severity} +package xyz.driver.core +package reporting import scala.concurrent.Future @@ -13,7 +10,7 @@ import scala.concurrent.Future * that led to a particular message. In synchronous systems, execution contexts can easily be determined by an * external observer, and, as such, do not need to be propagated explicitly to sub-components (e.g. a stack trace on * the JVM shows all relevant information). In asynchronous systems and especially distributed systems however, - * execution contexts are not easily determined by an external observer and hence need to be explcictly passed across + * execution contexts are not easily determined by an external observer and hence need to be explicitly passed across * service boundaries. * * This reporter provides tracing and logging utilities that explicitly require references to execution contexts @@ -44,14 +41,15 @@ import scala.concurrent.Future * } * }}} * - * Note that computing traces may be a more expensive operation than traditional logging frameworks provide (in terms - * of memory and processing). It should be used in interesting and actionable code paths. + * '''Note that computing traces may be a more expensive operation than traditional logging frameworks provide (in terms + * of memory and processing). It should be used in interesting and actionable code paths.''' * * @define rootWarning Note: the idea of the reporting framework is to pass along references to traces as * implicit parameters. This method should only be used for top-level traces when no parent * traces are available. */ trait Reporter { + import Reporter._ def traceWithOptionalParent[A]( name: String, @@ -66,7 +64,7 @@ trait Reporter { * * $rootWarning */ - def traceRoot[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => A): A = + final def traceRoot[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => A): A = traceWithOptionalParent( name, tags, @@ -79,7 +77,8 @@ trait Reporter { * * @see traceRoot */ - def traceRootAsync[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => Future[A]): Future[A] = + final def traceRootAsync[A](name: String, tags: Map[String, String] = Map.empty)( + op: SpanContext => Future[A]): Future[A] = traceWithOptionalParentAsync( name, tags, @@ -102,8 +101,11 @@ trait Reporter { * @tparam A Return type of the operation. * @return The value of the child operation. */ - def trace[A](name: String, tags: Map[String, String] = Map.empty, relation: CausalRelation = CausalRelation.Child)( - op: /* implicit (gotta wait for Scala 3) */ SpanContext => A)(implicit ctx: SpanContext): A = + final def trace[A]( + name: String, + tags: Map[String, String] = Map.empty, + relation: CausalRelation = CausalRelation.Child)(op: /* implicit (gotta wait for Scala 3) */ SpanContext => A)( + implicit ctx: SpanContext): A = traceWithOptionalParent( name, tags, @@ -117,7 +119,7 @@ trait Reporter { * * @see trace */ - def traceAsync[A]( + final def traceAsync[A]( name: String, tags: Map[String, String] = Map.empty, relation: CausalRelation = CausalRelation.Child)( @@ -132,31 +134,29 @@ trait Reporter { def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit /** Log a debug message. */ - def debug(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, None) - def debug(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + final def debug(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, None) + final def debug(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, Some(reason)) /** Log an informational message. */ - def info(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, None) - def info(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + final def info(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, None) + final def info(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, Some(reason)) /** Log a warning message. */ - def warn(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, None) - def warn(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + final def warn(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, None) + final def warn(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, Some(reason)) /** Log an error message. */ - def error(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, None) - def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + final def error(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, None) + final def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, Some(reason)) } object Reporter { - val NoReporter: Reporter = new NoTraceReporter(Logger.apply(NOPLogger.NOP_LOGGER)) - /** A relation in cause. * * Corresponds to diff --git a/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala b/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala deleted file mode 100644 index eda81fb..0000000 --- a/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala +++ /dev/null @@ -1,32 +0,0 @@ -package xyz.driver.core.reporting -import com.typesafe.scalalogging.Logger - -trait ScalaLoggerLike extends Reporter { - - def logger: Logger - - override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( - implicit ctx: SpanContext): Unit = severity match { - case Reporter.Severity.Debug => logger.debug(message, reason.orNull) - case Reporter.Severity.Informational => logger.info(message, reason.orNull) - case Reporter.Severity.Warning => logger.warn(message, reason.orNull) - case Reporter.Severity.Error => logger.error(message, reason.orNull) - } - -} - -object ScalaLoggerLike { - import scala.language.implicitConversions - - def defaultScalaLogger(json: Boolean = false): Logger = { - if (json) { - System.setProperty("logback.configurationFile", "deployed-logback.xml") - } else { - System.setProperty("logback.configurationFile", "logback.xml") - } - Logger.apply("application") - } - - implicit def toScalaLogger(reporter: ScalaLoggerLike): Logger = reporter.logger - -} diff --git a/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala b/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala new file mode 100644 index 0000000..0ff5574 --- /dev/null +++ b/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala @@ -0,0 +1,36 @@ +package xyz.driver.core +package reporting + +import com.typesafe.scalalogging.{Logger => ScalaLogger} + +/** Compatibility mixin for reporters, that enables implicit conversions to scala-logging loggers. */ +trait ScalaLoggingCompat extends Reporter { + import Reporter.Severity + + def logger: ScalaLogger + + override def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit = + severity match { + case Severity.Debug => logger.debug(message, reason.orNull) + case Severity.Informational => logger.info(message, reason.orNull) + case Severity.Warning => logger.warn(message, reason.orNull) + case Severity.Error => logger.error(message, reason.orNull) + } + +} + +object ScalaLoggingCompat { + import scala.language.implicitConversions + + def defaultScalaLogger(json: Boolean = false): ScalaLogger = { + if (json) { + System.setProperty("logback.configurationFile", "deployed-logback.xml") + } else { + System.setProperty("logback.configurationFile", "logback.xml") + } + ScalaLogger.apply("application") + } + + implicit def toScalaLogger(logger: ScalaLoggingCompat): ScalaLogger = logger.logger + +} diff --git a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala b/src/main/scala/xyz/driver/core/reporting/SpanContext.scala index ecc2ba3..04a822d 100644 --- a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala +++ b/src/main/scala/xyz/driver/core/reporting/SpanContext.scala @@ -1,10 +1,12 @@ package xyz.driver.core package reporting + import scala.util.Random case class SpanContext private[core] (traceId: String, spanId: String) + object SpanContext { - def fresh() = SpanContext( + def fresh(): SpanContext = SpanContext( f"${Random.nextLong()}%016x${Random.nextLong()}%016x", f"${Random.nextLong()}%016x" ) -- cgit v1.2.3