aboutsummaryrefslogtreecommitdiff
path: root/core-init
diff options
context:
space:
mode:
authorJakob Odersky <jakob@driver.xyz>2018-09-12 17:47:25 -0700
committerJakob Odersky <jakob@odersky.com>2018-10-09 16:19:39 -0700
commitb175e287bab6ed7d40ca4c9291cb24e20d93398d (patch)
tree83859b36d1770beaf38631e006d014e0314a22f0 /core-init
parenteb6f97b4cac548999cbf192ee83d9ba9a253b7c8 (diff)
downloaddriver-core-b175e287bab6ed7d40ca4c9291cb24e20d93398d.tar.gz
driver-core-b175e287bab6ed7d40ca4c9291cb24e20d93398d.tar.bz2
driver-core-b175e287bab6ed7d40ca4c9291cb24e20d93398d.zip
Move init package to separate project
Diffstat (limited to 'core-init')
-rw-r--r--core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala190
-rw-r--r--core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala37
-rw-r--r--core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala89
-rw-r--r--core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala100
-rw-r--r--core-init/src/main/scala/xyz/driver/core/init/Platform.scala31
-rw-r--r--core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala7
-rw-r--r--core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala4
7 files changed, 458 insertions, 0 deletions
diff --git a/core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala b/core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala
new file mode 100644
index 0000000..df6611e
--- /dev/null
+++ b/core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala
@@ -0,0 +1,190 @@
+package xyz.driver.core
+package init
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.server.{RequestContext, Route}
+import akka.stream.scaladsl.Source
+import akka.stream.{ActorMaterializer, Materializer}
+import akka.util.ByteString
+import com.softwaremill.sttp.SttpBackend
+import com.softwaremill.sttp.akkahttp.AkkaHttpBackend
+import com.typesafe.config.Config
+import kamon.Kamon
+import kamon.statsd.StatsDReporter
+import kamon.system.SystemMetrics
+import xyz.driver.core.reporting.{NoTraceReporter, Reporter, ScalaLoggingCompat, SpanContext}
+import xyz.driver.core.rest.HttpRestServiceTransport
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+/** Provides standard scaffolding for applications that use Akka HTTP.
+ *
+ * Among the features provided are:
+ *
+ * - execution contexts of various kinds
+ * - basic JVM metrics collection via Kamon
+ * - startup and shutdown hooks
+ *
+ * This trait provides a minimal, runnable application. It is designed to be extended by various mixins (see
+ * Known Subclasses) in this package.
+ *
+ * By implementing a "main" method, mixing this trait into a singleton object will result in a runnable
+ * application.
+ * I.e.
+ * {{{
+ * object Main extends AkkaBootable // this is a runnable application
+ * }}}
+ * In case this trait isn't mixed into a top-level singleton object, the [[AkkaBootable#main main]] method should
+ * be called explicitly, in order to initialize and start this application.
+ * I.e.
+ * {{{
+ * object Main {
+ * val bootable = new AkkaBootable {}
+ * def main(args: Array[String]): Unit = {
+ * bootable.main(args)
+ * }
+ * }
+ * }}}
+ *
+ * @groupname config Configuration
+ * @groupname contexts Contexts
+ * @groupname utilities Utilities
+ * @groupname hooks Overrideable Hooks
+ */
+trait AkkaBootable {
+
+ /** The application's name. This value is extracted from the build configuration.
+ * @group config
+ */
+ def name: String = BuildInfoReflection.name
+
+ /** The application's version (or git sha). This value is extracted from the build configuration.
+ * @group config
+ */
+ def version: Option[String] = BuildInfoReflection.version
+
+ /** TCP port that this application will listen on.
+ * @group config
+ */
+ def port: Int = 8080
+
+ // contexts
+ /** General-purpose actor system for this application.
+ * @group contexts
+ */
+ implicit lazy val system: ActorSystem = ActorSystem(name)
+
+ /** General-purpose stream materializer for this application.
+ * @group contexts
+ */
+ implicit lazy val materializer: Materializer = ActorMaterializer()
+
+ /** General-purpose execution context for this application.
+ *
+ * Note that no thread-blocking tasks should be submitted to this context. In cases that do require blocking,
+ * a custom execution context should be defined and used. See
+ * [[https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html this guide]]
+ * on how to configure custom execution contexts in Akka.
+ *
+ * @group contexts
+ */
+ implicit lazy val executionContext: ExecutionContext = system.dispatcher
+
+ /** Default HTTP client, backed by this application's actor system.
+ * @group contexts
+ */
+ implicit lazy val httpClient: SttpBackend[Future, Source[ByteString, Any]] = AkkaHttpBackend.usingActorSystem(system)
+
+ /** Client RPC transport abstraction.
+ * @group contexts
+ */
+ implicit lazy val clientTransport: HttpRestServiceTransport = new HttpRestServiceTransport(
+ applicationName = Name(name),
+ applicationVersion = version.getOrElse("<unknown>"),
+ actorSystem = system,
+ executionContext = executionContext,
+ reporter = reporter
+ )
+
+ // utilities
+ /** Default reporter instance.
+ *
+ * Note that this is currently defined to be a ScalaLoggerLike, so that it can be implicitly converted to a
+ * [[com.typesafe.scalalogging.Logger]] when necessary. This conversion is provided to ensure backwards
+ * compatibility with code that requires such a logger. Warning: using a logger instead of a reporter will
+ * not include tracing information in any messages!
+ *
+ * @group utilities
+ */
+ def reporter: Reporter with ScalaLoggingCompat =
+ new Reporter with NoTraceReporter with ScalaLoggingCompat {
+ val logger = ScalaLoggingCompat.defaultScalaLogger(json = false)
+ }
+
+ /** Top-level application configuration.
+ *
+ * TODO: should we expose some config wrapper rather than the typesafe config library?
+ * (Author's note: I'm a fan of TOML since it's so simple. There's already an implementation for Scala
+ * [[https://github.com/jvican/stoml]].)
+ *
+ * @group utilities
+ */
+ def config: Config = system.settings.config
+
+ /** Overridable startup hook.
+ *
+ * Invoked by [[main]] during application startup.
+ *
+ * @group hooks
+ */
+ def startup(): Unit = ()
+
+ /** Overridable shutdown hook.
+ *
+ * Invoked on an arbitrary thread when a shutdown signal is caught.
+ *
+ * @group hooks
+ */
+ def shutdown(): Unit = ()
+
+ /** Overridable HTTP route.
+ *
+ * Any services that present an HTTP interface should implement this method.
+ *
+ * @group hooks
+ * @see [[HttpApi]]
+ */
+ def route: Route = (ctx: RequestContext) => ctx.complete(StatusCodes.NotFound)
+
+ private def syslog(message: String)(implicit ctx: SpanContext) = reporter.info(s"application: " + message)
+
+ /** This application's entry point. */
+ def main(args: Array[String]): Unit = {
+ implicit val ctx = SpanContext.fresh()
+ syslog("initializing metrics collection")
+ Kamon.addReporter(new StatsDReporter())
+ SystemMetrics.startCollecting()
+
+ system.registerOnTermination {
+ syslog("running shutdown hooks")
+ shutdown()
+ syslog("bye!")
+ }
+
+ syslog("running startup hooks")
+ startup()
+
+ syslog("binding to network interface")
+ val binding = Await.result(
+ Http().bindAndHandle(route, "::", port),
+ 2.seconds
+ )
+ syslog(s"listening to ${binding.localAddress}")
+
+ syslog("startup complete")
+ }
+
+}
diff --git a/core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala b/core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala
new file mode 100644
index 0000000..0e53085
--- /dev/null
+++ b/core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala
@@ -0,0 +1,37 @@
+package xyz.driver.core
+package init
+
+import scala.reflect.runtime
+import scala.util.Try
+import scala.util.control.NonFatal
+
+/** Utility object to retrieve fields from static build configuration objects. */
+private[init] object BuildInfoReflection {
+
+ final val BuildInfoName = "xyz.driver.BuildInfo"
+
+ lazy val name: String = get[String]("name")
+ lazy val version: Option[String] = find[String]("version")
+
+ /** Lookup a given field in the build configuration. This field is required to exist. */
+ private def get[A](fieldName: String): A =
+ try {
+ val mirror = runtime.currentMirror
+ val module = mirror.staticModule(BuildInfoName)
+ val instance = mirror.reflectModule(module).instance
+ val accessor = module.info.decl(mirror.universe.TermName(fieldName)).asMethod
+ mirror.reflect(instance).reflectMethod(accessor).apply().asInstanceOf[A]
+ } catch {
+ case NonFatal(err) =>
+ throw new RuntimeException(
+ s"Cannot find field name '$fieldName' in $BuildInfoName. Please define (or generate) a singleton " +
+ s"object with that field. Alternatively, in order to avoid runtime reflection, you may override the " +
+ s"caller with a static value.",
+ err
+ )
+ }
+
+ /** Try finding a given field in the build configuration. If the field does not exist, None is returned. */
+ private def find[A](fieldName: String): Option[A] = Try { get[A](fieldName) }.toOption
+
+}
diff --git a/core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala b/core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala
new file mode 100644
index 0000000..857dd4c
--- /dev/null
+++ b/core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala
@@ -0,0 +1,89 @@
+package xyz.driver.core
+package init
+
+import java.nio.file.Paths
+
+import xyz.driver.core.messaging.{CreateOnDemand, GoogleBus, QueueBus, StreamBus}
+import xyz.driver.core.reporting._
+import xyz.driver.core.rest.DnsDiscovery
+import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage, GcsBlobStorage}
+
+import scala.collection.JavaConverters._
+
+/** Mixin trait that provides essential cloud utilities. */
+trait CloudServices extends AkkaBootable { self =>
+
+ /** The platform that this application is running on.
+ * @group config
+ */
+ def platform: Platform = Platform.current
+
+ /** Service discovery for the current platform.
+ * @group utilities
+ */
+ lazy val discovery: DnsDiscovery = {
+ def getOverrides(): Map[String, String] = {
+ val block = config.getObject("services.dev-overrides").unwrapped().asScala
+ for ((key, value) <- block) yield {
+ require(value.isInstanceOf[String], s"Service URL override for '$key' must be a string. Found '$value'.")
+ key -> value.toString
+ }
+ }.toMap
+ val overrides = platform match {
+ case Platform.Dev => getOverrides()
+ // TODO: currently, deployed services must be configured via Kubernetes DNS resolver. Maybe we may want to
+ // provide a way to override deployed services as well.
+ case _ => Map.empty[String, String]
+ }
+ new DnsDiscovery(clientTransport, overrides)
+ }
+
+ /* TODO: this reporter uses the platform to determine if JSON logging should be enabled.
+ * Since the default logger uses slf4j, its settings must be specified before a logger
+ * is first accessed. This in turn leads to somewhat convoluted code,
+ * since we can't log when the platform being is determined.
+ * A potential fix would be to make the log format independent of the platform, and always log
+ * as JSON for example.
+ */
+ override lazy val reporter: Reporter with ScalaLoggingCompat = {
+ Console.println("determining platform") // scalastyle:ignore
+ val r = platform match {
+ case p @ Platform.GoogleCloud(_, _) =>
+ new GoogleReporter(p.credentials, p.namespace) with ScalaLoggingCompat with GoogleMdcLogger {
+ val logger = ScalaLoggingCompat.defaultScalaLogger(true)
+ }
+ case Platform.Dev =>
+ new NoTraceReporter with ScalaLoggingCompat {
+ val logger = ScalaLoggingCompat.defaultScalaLogger(false)
+ }
+ }
+ r.info(s"application started on platform '${platform}'")(SpanContext.fresh())
+ r
+ }
+
+ /** Object storage.
+ *
+ * When running on a cloud platform, prepends `$project-` to bucket names, where `$project`
+ * is the project ID (for example 'driverinc-production` or `driverinc-sandbox`).
+ *
+ * @group utilities
+ */
+ def storage(bucketName: String): BlobStorage =
+ platform match {
+ case p @ Platform.GoogleCloud(keyfile, _) =>
+ GcsBlobStorage.fromKeyfile(keyfile, s"${p.project}-$bucketName")
+ case Platform.Dev =>
+ new FileSystemBlobStorage(Paths.get(s".data-$bucketName"))
+ }
+
+ /** Message bus.
+ * @group utilities
+ */
+ def messageBus: StreamBus = platform match {
+ case p @ Platform.GoogleCloud(_, namespace) =>
+ new GoogleBus(p.credentials, namespace) with StreamBus with CreateOnDemand
+ case Platform.Dev =>
+ new QueueBus()(self.system) with StreamBus
+ }
+
+}
diff --git a/core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala b/core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala
new file mode 100644
index 0000000..81428bf
--- /dev/null
+++ b/core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala
@@ -0,0 +1,100 @@
+package xyz.driver.core
+package init
+
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
+import akka.http.scaladsl.server.{RequestContext, Route, RouteConcatenation}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+import xyz.driver.core.rest.Swagger
+import xyz.driver.core.rest.directives.Directives
+import akka.http.scaladsl.model.headers._
+import xyz.driver.core.reporting.Reporter.CausalRelation
+import xyz.driver.core.rest.headers.Traceparent
+
+import scala.collection.JavaConverters._
+
+/** Mixin trait that provides some well-known HTTP endpoints, diagnostic header injection and forwarding,
+ * and exposes an application-specific route that must be implemented by services.
+ * @see ProtobufApi
+ */
+trait HttpApi extends CloudServices with Directives with SprayJsonSupport { self =>
+
+ /** Route that handles the application's business logic.
+ * @group hooks
+ */
+ def applicationRoute: Route
+
+ /** Classes with Swagger annotations.
+ * @group hooks
+ */
+ def swaggerRouteClasses: Set[Class[_]]
+
+ private val healthRoute = path("health") {
+ complete(Map("status" -> "good").toJson)
+ }
+
+ private val versionRoute = path("version") {
+ complete(Map("name" -> self.name.toJson, "version" -> self.version.toJson).toJson)
+ }
+
+ private lazy val swaggerRoute = {
+ val generator = new Swagger(
+ "",
+ "https" :: "http" :: Nil,
+ self.version.getOrElse("<unknown>"),
+ swaggerRouteClasses,
+ config,
+ reporter
+ )
+ generator.routes ~ generator.swaggerUINew
+ }
+
+ private def cors(inner: Route): Route =
+ cors(
+ config.getStringList("application.cors.allowedOrigins").asScala.toSet,
+ xyz.driver.core.rest.AllowedHeaders
+ )(inner)
+
+ private def traced(inner: Route): Route = (ctx: RequestContext) => {
+ val tags = Map(
+ "service.version" -> version.getOrElse("<unknown>"),
+ // open tracing semantic tags
+ "span.kind" -> "server",
+ "service" -> name,
+ "http.url" -> ctx.request.uri.toString,
+ "http.method" -> ctx.request.method.value,
+ "peer.hostname" -> ctx.request.uri.authority.host.toString,
+ // google's tracing console provides extra search features if we define these tags
+ "/http/path" -> ctx.request.uri.path.toString,
+ "/http/method" -> ctx.request.method.value.toString,
+ "/http/url" -> ctx.request.uri.toString,
+ "/http/user_agent" -> ctx.request.header[`User-Agent`].map(_.value).getOrElse("<unknown>")
+ )
+ val parent = ctx.request.header[Traceparent].map { header =>
+ header.spanContext -> CausalRelation.Child
+ }
+ reporter
+ .traceWithOptionalParentAsync(s"http_handle_rpc", tags, parent) { spanContext =>
+ val header = Traceparent(spanContext)
+ val withHeader = ctx.withRequest(
+ ctx.request
+ .removeHeader(header.name)
+ .addHeader(header)
+ )
+ inner(withHeader)
+ }
+ }
+
+ /** Extended route. */
+ override lazy val route: Route = traced(
+ cors(
+ RouteConcatenation.concat(
+ healthRoute,
+ versionRoute,
+ swaggerRoute,
+ applicationRoute
+ )
+ )
+ )
+
+}
diff --git a/core-init/src/main/scala/xyz/driver/core/init/Platform.scala b/core-init/src/main/scala/xyz/driver/core/init/Platform.scala
new file mode 100644
index 0000000..2daa2c8
--- /dev/null
+++ b/core-init/src/main/scala/xyz/driver/core/init/Platform.scala
@@ -0,0 +1,31 @@
+package xyz.driver.core
+package init
+
+import java.nio.file.{Files, Path, Paths}
+
+import com.google.auth.oauth2.ServiceAccountCredentials
+
+sealed trait Platform
+object Platform {
+ case class GoogleCloud(keyfile: Path, namespace: String) extends Platform {
+ def credentials: ServiceAccountCredentials = ServiceAccountCredentials.fromStream(
+ Files.newInputStream(keyfile)
+ )
+ def project: String = credentials.getProjectId
+ }
+ // case object AliCloud extends Platform
+ case object Dev extends Platform
+
+ lazy val fromEnv: Platform = {
+ def isGoogle = sys.env.get("GOOGLE_APPLICATION_CREDENTIALS").map { value =>
+ val keyfile = Paths.get(value)
+ require(Files.isReadable(keyfile), s"Google credentials file $value is not readable.")
+ val namespace = sys.env.getOrElse("SERVICE_NAMESPACE", sys.error("Namespace not set"))
+ GoogleCloud(keyfile, namespace)
+ }
+ isGoogle.getOrElse(Dev)
+ }
+
+ def current: Platform = fromEnv
+
+}
diff --git a/core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala b/core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala
new file mode 100644
index 0000000..284ac67
--- /dev/null
+++ b/core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala
@@ -0,0 +1,7 @@
+package xyz.driver.core
+package init
+
+/** Mixin trait for services that implement an API based on Protocol Buffers and gRPC.
+ * TODO: implement
+ */
+trait ProtobufApi extends AkkaBootable
diff --git a/core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala b/core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala
new file mode 100644
index 0000000..61ca363
--- /dev/null
+++ b/core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala
@@ -0,0 +1,4 @@
+package xyz.driver.core
+package init
+
+trait SimpleHttpApp extends AkkaBootable with HttpApi with CloudServices