Trait-based initialization and other utilities
Adds the concept of a 'platform', a centralized place in which environment-specific information will be managed, and provides common initialization logic for most "standard" apps. As part of the common initialization, other parts of core have also been reworked: - HTTP-related unmarshallers and path matchers have been factored out from core.json to a new core.rest.directives package (core.json extends those unmarshallers and matchers for backwards compatibility) - CORS handling has also been moved to a dedicated utility trait - Some custom headers have been moved from raw headers to typed ones in core.rest.headers - The concept of a "reporter" has been introduced. A reporter is a context-aware combination of tracing and logging. It is intended to issue diagnostic messages that can be traced across service boundaries. Closes #192 Closes #195
6 files changed, 413 insertions, 0 deletions
+package xyz.driver.core
+package init
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.server.{RequestContext, Route}
+import akka.stream.scaladsl.Source
+import akka.stream.{ActorMaterializer, Materializer}
+import akka.util.ByteString
+import com.softwaremill.sttp.SttpBackend
+import com.softwaremill.sttp.akkahttp.AkkaHttpBackend
+import com.typesafe.config.Config
+import kamon.Kamon
+import kamon.statsd.StatsDReporter
+import kamon.system.SystemMetrics
+import xyz.driver.core.reporting.{NoTraceReporter, Reporter, ScalaLoggerLike, SpanContext}
+import xyz.driver.core.rest.HttpRestServiceTransport
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+/** Provides standard scaffolding for applications that use Akka HTTP.
+ *
+ * Among the features provided are:
+ *
+ * - execution contexts of various kinds
+ * - basic JVM metrics collection via Kamon
+ * - startup and shutdown hooks
+ *
+ * This trait provides a minimal, runnable application. It is designed to be extended by various mixins (see
+ * Known Subclasses) in this package.
+ *
+ * By implementing a "main" method, mixing this trait into a singleton object will result in a runnable
+ * application.
+ * I.e.
+ * {{{
+ * object Main extends AkkaBootable // this is a runnable application
+ * }}}
+ * In case this trait isn't mixed into a top-level singleton object, the [[AkkaBootable#main main]] method should
+ * be called explicitly, in order to initialize and start this application.
+ * I.e.
+ * {{{
+ * object Main {
+ * val bootable = new AkkaBootable {}
+ * def main(args: Array[String]): Unit = {
+ * bootable.main(args)
+ * }
+ * }
+ * }}}
+ *
+ * @groupname config Configuration
+ * @groupname contexts Contexts
+ * @groupname utilities Utilities
+ * @groupname hooks Overrideable Hooks
+ */
+trait AkkaBootable {
+ /** The application's name. This value is extracted from the build configuration.
+ * @group config
+ */
+ def name: String = BuildInfoReflection.name
+ /** The application's version (or git sha). This value is extracted from the build configuration.
+ * @group config
+ */
+ def version: Option[String] = BuildInfoReflection.version
+ /** TCP port that this application will listen on.
+ * @group config
+ */
+ def port: Int = 8080
+ // contexts
+ /** General-purpose actor system for this application.
+ * @group contexts
+ */
+ implicit lazy val system: ActorSystem = ActorSystem("app")
+ /** General-purpose stream materializer for this application.
+ * @group contexts
+ */
+ implicit lazy val materializer: Materializer = ActorMaterializer()
+ /** General-purpose execution context for this application.
+ *
+ * Note that no thread-blocking tasks should be submitted to this context. In cases that do require blocking,
+ * a custom execution context should be defined and used. See
+ * [[https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html this guide]]
+ * on how to configure custom execution contexts in Akka.
+ *
+ * @group contexts
+ */
+ implicit lazy val executionContext: ExecutionContext = system.dispatcher
+ /** Default HTTP client, backed by this application's actor system.
+ * @group contexts
+ */
+ implicit lazy val httpClient: SttpBackend[Future, Source[ByteString, Any]] = AkkaHttpBackend.usingActorSystem(system)
+ /** Old HTTP client system. Prefer using an sttp backend for new service clients.
+ * @group contexts
+ * @see httpClient
+ */
+ implicit lazy val clientTransport: HttpRestServiceTransport = new HttpRestServiceTransport(
+ applicationName = Name(name),
+ applicationVersion = version.getOrElse("<unknown>"),
+ actorSystem = system,
+ executionContext = executionContext,
+ log = reporter
+ )
+ // utilities
+ /** Default reporter instance.
+ *
+ * Note that this is currently defined to be a ScalaLoggerLike, so that it can be implicitly converted to a
+ * [[com.typesafe.scalalogging.Logger]] when necessary. This conversion is provided to ensure backwards
+ * compatibility with code that requires such a logger. Warning: using a logger instead of a reporter will
+ * not include tracing information in any messages!
+ *
+ * @group utilities
+ */
+ def reporter: Reporter with ScalaLoggerLike = new NoTraceReporter(ScalaLoggerLike.defaultScalaLogger(json = false))
+ /** Top-level application configuration.
+ *
+ * TODO: should we expose some config wrapper rather than the typesafe config library?
+ * (Author's note: I'm a fan of TOML since it's so simple. There's already an implementation for Scala
+ * [[https://github.com/jvican/stoml]].)
+ *
+ * @group utilities
+ */
+ def config: Config = system.settings.config
+ /** Overridable startup hook.
+ *
+ * Invoked by [[main]] during application startup.
+ *
+ * @group hooks
+ */
+ def startup(): Unit = ()
+ /** Overridable shutdown hook.
+ *
+ * Invoked on an arbitrary thread when a shutdown signal is caught.
+ *
+ * @group hooks
+ */
+ def shutdown(): Unit = ()
+ /** Overridable HTTP route.
+ *
+ * Any services that present an HTTP interface should implement this method.
+ *
+ * @group hooks
+ * @see [[HttpApi]]
+ */
+ def route: Route = (ctx: RequestContext) => ctx.complete(StatusCodes.NotFound)
+ private def syslog(message: String)(implicit ctx: SpanContext) = reporter.info(s"application: " + message)
+ /** This application's entry point. */
+ def main(args: Array[String]): Unit = {
+ implicit val ctx = SpanContext.fresh()
+ syslog("initializing metrics collection")
+ Kamon.addReporter(new StatsDReporter())
+ SystemMetrics.startCollecting()
+ system.registerOnTermination {
+ syslog("running shutdown hooks")
+ shutdown()
+ syslog("bye!")
+ }
+ syslog("running startup hooks")
+ startup()
+ syslog("binding to network interface")
+ val binding = Await.result(
+ Http().bindAndHandle(route, "::", port),
+ 2.seconds
+ )
+ syslog(s"listening to ${binding.localAddress}")
+ syslog("startup complete")
+ }
+package xyz.driver.core
+package init
+import scala.reflect.runtime
+import scala.util.Try
+import scala.util.control.NonFatal
+/** Utility object to retrieve fields from static build configuration objects. */
+private[init] object BuildInfoReflection {
+ final val BuildInfoName = "xyz.driver.BuildInfo"
+ lazy val name: String = get[String]("name")
+ lazy val version: Option[String] = find[String]("version")
+ /** Lookup a given field in the build configuration. This field is required to exist. */
+ private def get[A](fieldName: String): A =
+ try {
+ val mirror = runtime.currentMirror
+ val module = mirror.staticModule(BuildInfoName)
+ val instance = mirror.reflectModule(module).instance
+ val accessor = module.info.decl(mirror.universe.TermName(fieldName)).asMethod
+ mirror.reflect(instance).reflectMethod(accessor).apply().asInstanceOf[A]
+ } catch {
+ case NonFatal(err) =>
+ throw new RuntimeException(
+ s"Cannot find field name '$fieldName' in $BuildInfoName. Please define (or generate) a singleton " +
+ s"object with that field. Alternatively, in order to avoid runtime reflection, you may override the " +
+ s"caller with a static value.",
+ err
+ )
+ }
+ /** Try finding a given field in the build configuration. If the field does not exist, None is returned. */
+ private def find[A](fieldName: String): Option[A] = Try { get[A](fieldName) }.toOption
+package xyz.driver.core
+package init
+import java.nio.file.Paths
+import xyz.driver.core.discovery.CanDiscoverService
+import xyz.driver.core.messaging.{GoogleBus, QueueBus, StreamBus}
+import xyz.driver.core.reporting._
+import xyz.driver.core.reporting.ScalaLoggerLike.defaultScalaLogger
+import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage, GcsBlobStorage}
+import scala.concurrent.ExecutionContext
+/** Mixin trait that provides essential cloud utilities. */
+trait CloudServices extends AkkaBootable { self =>
+ /** The platform that this application is running on.
+ * @group config
+ */
+ def platform: Platform = Platform.current
+ /** Service discovery for the current platform.
+ *
+ * Define a service trait and companion object:
+ * {{{
+ * trait MyService {
+ * def call(): Int
+ * }
+ * object MyService {
+ * implicit val isDiscoverable = new xyz.driver.core.discovery.CanDiscoverService[MyService] {
+ * def discover(p: xyz.driver.core.Platform): MyService = new MyService {
+ * def call() = 42
+ * }
+ * }
+ * }
+ * }}}
+ *
+ * Then discover and use it:
+ * {{{
+ * discover[MyService].call()
+ * }}}
+ *
+ * @group utilities
+ */
+ def discover[A](implicit cds: CanDiscoverService[A]): A = cds.discover(platform)
+ /* TODO: this reporter uses the platform to determine if JSON logging should be enabled.
+ * Since the default logger uses slf4j, its settings must be specified before a logger
+ * is first accessed. This in turn leads to somewhat convoluted code,
+ * since we can't log when the platform being is determined.
+ * A potential fix would be to make the log format independent of the platform, and always log
+ * as JSON for example.
+ */
+ override lazy val reporter: Reporter with ScalaLoggerLike = {
+ Console.println("determining platform") // scalastyle:ignore
+ val r = platform match {
+ case p @ Platform.GoogleCloud(_, _) =>
+ new GoogleReporter(p.credentials, p.namespace, defaultScalaLogger(true))
+ case Platform.Dev =>
+ new NoTraceReporter(defaultScalaLogger(false))
+ }
+ r.info(s"application started on platform '${platform}'")(SpanContext.fresh())
+ r
+ }
+ /** Object storage.
+ * @group utilities
+ */
+ def storage(bucketId: String): BlobStorage =
+ platform match {
+ case Platform.GoogleCloud(keyfile, _) =>
+ GcsBlobStorage.fromKeyfile(keyfile, bucketId)
+ case Platform.Dev =>
+ new FileSystemBlobStorage(Paths.get(s".data-$bucketId"))
+ }
+ /** Message bus.
+ * @group utilities
+ */
+ def messageBus: StreamBus = platform match {
+ case Platform.GoogleCloud(keyfile, namespace) => GoogleBus.fromKeyfile(keyfile, namespace)
+ case Platform.Dev =>
+ new QueueBus()(self.system) with StreamBus {
+ override def executionContext: ExecutionContext = self.executionContext
+ }
+ }
+package xyz.driver.core
+package init
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
+import akka.http.scaladsl.server.{RequestContext, Route, RouteConcatenation}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+import xyz.driver.core.rest.Swagger
+import xyz.driver.core.rest.directives.Directives
+import akka.http.scaladsl.model.headers._
+import xyz.driver.core.reporting.Reporter.CausalRelation
+import xyz.driver.core.reporting.SpanContext
+import xyz.driver.core.rest.headers.Traceparent
+import scala.collection.JavaConverters._
+/** Mixin trait that provides some well-known HTTP endpoints, diagnostic header injection and forwarding,
+ * and exposes an application-specific route that must be implemented by services.
+ * @see ProtobufApi
+ */
+trait HttpApi extends CloudServices with Directives with SprayJsonSupport { self =>
+ /** Route that handles the application's business logic.
+ * @group hooks
+ */
+ def applicationRoute: Route
+ /** Classes with Swagger annotations.
+ * @group hooks
+ */
+ def swaggerRouteClasses: Set[Class[_]]
+ private val healthRoute = path("health") {
+ complete(Map("status" -> "good").toJson)
+ }
+ private val versionRoute = path("version") {
+ complete(Map("name" -> self.name.toJson, "version" -> self.version.toJson).toJson)
+ }
+ private lazy val swaggerRoute = {
+ val generator = new Swagger(
+ "",
+ "https" :: "http" :: Nil,
+ self.version.getOrElse("<unknown>"),
+ swaggerRouteClasses,
+ config,
+ reporter
+ )
+ generator.routes ~ generator.swaggerUI
+ }
+ private def cors(inner: Route): Route =
+ cors(
+ config.getStringList("application.cors.allowedOrigins").asScala.toSet,
+ xyz.driver.core.rest.AllowedHeaders
+ )(inner)
+ private def traced(inner: Route): Route = (ctx: RequestContext) => {
+ val tags = Map(
+ "service_name" -> name,
+ "service_version" -> version.getOrElse("<unknown>"),
+ "http_user_agent" -> ctx.request.header[`User-Agent`].map(_.value).getOrElse("<unknown>"),
+ "http_uri" -> ctx.request.uri.toString,
+ "http_path" -> ctx.request.uri.path.toString
+ )
+ val parent = ctx.request.header[Traceparent].map { p =>
+ SpanContext(p.traceId, p.spanId) -> CausalRelation.Child
+ }
+ reporter.traceWithOptionalParentAsync("handle_service_request", tags, parent) { sctx =>
+ val header = Traceparent(sctx.traceId, sctx.spanId)
+ val withHeader = ctx.withRequest(ctx.request.withHeaders(header))
+ inner(withHeader)
+ }
+ }
+ /** Extended route. */
+ override lazy val route: Route = traced(
+ cors(
+ RouteConcatenation.concat(
+ healthRoute,
+ versionRoute,
+ swaggerRoute,
+ applicationRoute
+ )
+ )
+ )
+package xyz.driver.core
+package init
+/** Mixin trait for services that implement an API based on Protocol Buffers and gRPC.
+ * TODO: implement
+ */
+trait ProtobufApi extends AkkaBootable
+package xyz.driver.core
+package init
+trait SimpleHttpApp extends AkkaBootable with HttpApi with CloudServices