From b175e287bab6ed7d40ca4c9291cb24e20d93398d Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 12 Sep 2018 17:47:25 -0700 Subject: Move init package to separate project --- .../scala/xyz/driver/core/init/AkkaBootable.scala | 190 +++++++++++++++++++++ .../xyz/driver/core/init/BuildInfoReflection.scala | 37 ++++ .../scala/xyz/driver/core/init/CloudServices.scala | 89 ++++++++++ .../main/scala/xyz/driver/core/init/HttpApi.scala | 100 +++++++++++ .../main/scala/xyz/driver/core/init/Platform.scala | 31 ++++ .../scala/xyz/driver/core/init/ProtobufApi.scala | 7 + .../scala/xyz/driver/core/init/SimpleHttpApp.scala | 4 + .../scala/xyz/driver/core/init/AkkaBootable.scala | 190 --------------------- .../xyz/driver/core/init/BuildInfoReflection.scala | 37 ---- .../scala/xyz/driver/core/init/CloudServices.scala | 89 ---------- src/main/scala/xyz/driver/core/init/HttpApi.scala | 100 ----------- src/main/scala/xyz/driver/core/init/Platform.scala | 31 ---- .../scala/xyz/driver/core/init/ProtobufApi.scala | 7 - .../scala/xyz/driver/core/init/SimpleHttpApp.scala | 4 - 14 files changed, 458 insertions(+), 458 deletions(-) create mode 100644 core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/Platform.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala delete mode 100644 src/main/scala/xyz/driver/core/init/AkkaBootable.scala delete mode 100644 src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala delete mode 100644 src/main/scala/xyz/driver/core/init/CloudServices.scala delete mode 100644 src/main/scala/xyz/driver/core/init/HttpApi.scala delete mode 100644 src/main/scala/xyz/driver/core/init/Platform.scala delete mode 100644 src/main/scala/xyz/driver/core/init/ProtobufApi.scala delete mode 100644 src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala diff --git a/core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala b/core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala new file mode 100644 index 0000000..df6611e --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala @@ -0,0 +1,190 @@ +package xyz.driver.core +package init + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.server.{RequestContext, Route} +import akka.stream.scaladsl.Source +import akka.stream.{ActorMaterializer, Materializer} +import akka.util.ByteString +import com.softwaremill.sttp.SttpBackend +import com.softwaremill.sttp.akkahttp.AkkaHttpBackend +import com.typesafe.config.Config +import kamon.Kamon +import kamon.statsd.StatsDReporter +import kamon.system.SystemMetrics +import xyz.driver.core.reporting.{NoTraceReporter, Reporter, ScalaLoggingCompat, SpanContext} +import xyz.driver.core.rest.HttpRestServiceTransport + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + +/** Provides standard scaffolding for applications that use Akka HTTP. + * + * Among the features provided are: + * + * - execution contexts of various kinds + * - basic JVM metrics collection via Kamon + * - startup and shutdown hooks + * + * This trait provides a minimal, runnable application. It is designed to be extended by various mixins (see + * Known Subclasses) in this package. + * + * By implementing a "main" method, mixing this trait into a singleton object will result in a runnable + * application. + * I.e. + * {{{ + * object Main extends AkkaBootable // this is a runnable application + * }}} + * In case this trait isn't mixed into a top-level singleton object, the [[AkkaBootable#main main]] method should + * be called explicitly, in order to initialize and start this application. + * I.e. + * {{{ + * object Main { + * val bootable = new AkkaBootable {} + * def main(args: Array[String]): Unit = { + * bootable.main(args) + * } + * } + * }}} + * + * @groupname config Configuration + * @groupname contexts Contexts + * @groupname utilities Utilities + * @groupname hooks Overrideable Hooks + */ +trait AkkaBootable { + + /** The application's name. This value is extracted from the build configuration. + * @group config + */ + def name: String = BuildInfoReflection.name + + /** The application's version (or git sha). This value is extracted from the build configuration. + * @group config + */ + def version: Option[String] = BuildInfoReflection.version + + /** TCP port that this application will listen on. + * @group config + */ + def port: Int = 8080 + + // contexts + /** General-purpose actor system for this application. + * @group contexts + */ + implicit lazy val system: ActorSystem = ActorSystem(name) + + /** General-purpose stream materializer for this application. + * @group contexts + */ + implicit lazy val materializer: Materializer = ActorMaterializer() + + /** General-purpose execution context for this application. + * + * Note that no thread-blocking tasks should be submitted to this context. In cases that do require blocking, + * a custom execution context should be defined and used. See + * [[https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html this guide]] + * on how to configure custom execution contexts in Akka. + * + * @group contexts + */ + implicit lazy val executionContext: ExecutionContext = system.dispatcher + + /** Default HTTP client, backed by this application's actor system. + * @group contexts + */ + implicit lazy val httpClient: SttpBackend[Future, Source[ByteString, Any]] = AkkaHttpBackend.usingActorSystem(system) + + /** Client RPC transport abstraction. + * @group contexts + */ + implicit lazy val clientTransport: HttpRestServiceTransport = new HttpRestServiceTransport( + applicationName = Name(name), + applicationVersion = version.getOrElse(""), + actorSystem = system, + executionContext = executionContext, + reporter = reporter + ) + + // utilities + /** Default reporter instance. + * + * Note that this is currently defined to be a ScalaLoggerLike, so that it can be implicitly converted to a + * [[com.typesafe.scalalogging.Logger]] when necessary. This conversion is provided to ensure backwards + * compatibility with code that requires such a logger. Warning: using a logger instead of a reporter will + * not include tracing information in any messages! + * + * @group utilities + */ + def reporter: Reporter with ScalaLoggingCompat = + new Reporter with NoTraceReporter with ScalaLoggingCompat { + val logger = ScalaLoggingCompat.defaultScalaLogger(json = false) + } + + /** Top-level application configuration. + * + * TODO: should we expose some config wrapper rather than the typesafe config library? + * (Author's note: I'm a fan of TOML since it's so simple. There's already an implementation for Scala + * [[https://github.com/jvican/stoml]].) + * + * @group utilities + */ + def config: Config = system.settings.config + + /** Overridable startup hook. + * + * Invoked by [[main]] during application startup. + * + * @group hooks + */ + def startup(): Unit = () + + /** Overridable shutdown hook. + * + * Invoked on an arbitrary thread when a shutdown signal is caught. + * + * @group hooks + */ + def shutdown(): Unit = () + + /** Overridable HTTP route. + * + * Any services that present an HTTP interface should implement this method. + * + * @group hooks + * @see [[HttpApi]] + */ + def route: Route = (ctx: RequestContext) => ctx.complete(StatusCodes.NotFound) + + private def syslog(message: String)(implicit ctx: SpanContext) = reporter.info(s"application: " + message) + + /** This application's entry point. */ + def main(args: Array[String]): Unit = { + implicit val ctx = SpanContext.fresh() + syslog("initializing metrics collection") + Kamon.addReporter(new StatsDReporter()) + SystemMetrics.startCollecting() + + system.registerOnTermination { + syslog("running shutdown hooks") + shutdown() + syslog("bye!") + } + + syslog("running startup hooks") + startup() + + syslog("binding to network interface") + val binding = Await.result( + Http().bindAndHandle(route, "::", port), + 2.seconds + ) + syslog(s"listening to ${binding.localAddress}") + + syslog("startup complete") + } + +} diff --git a/core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala b/core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala new file mode 100644 index 0000000..0e53085 --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala @@ -0,0 +1,37 @@ +package xyz.driver.core +package init + +import scala.reflect.runtime +import scala.util.Try +import scala.util.control.NonFatal + +/** Utility object to retrieve fields from static build configuration objects. */ +private[init] object BuildInfoReflection { + + final val BuildInfoName = "xyz.driver.BuildInfo" + + lazy val name: String = get[String]("name") + lazy val version: Option[String] = find[String]("version") + + /** Lookup a given field in the build configuration. This field is required to exist. */ + private def get[A](fieldName: String): A = + try { + val mirror = runtime.currentMirror + val module = mirror.staticModule(BuildInfoName) + val instance = mirror.reflectModule(module).instance + val accessor = module.info.decl(mirror.universe.TermName(fieldName)).asMethod + mirror.reflect(instance).reflectMethod(accessor).apply().asInstanceOf[A] + } catch { + case NonFatal(err) => + throw new RuntimeException( + s"Cannot find field name '$fieldName' in $BuildInfoName. Please define (or generate) a singleton " + + s"object with that field. Alternatively, in order to avoid runtime reflection, you may override the " + + s"caller with a static value.", + err + ) + } + + /** Try finding a given field in the build configuration. If the field does not exist, None is returned. */ + private def find[A](fieldName: String): Option[A] = Try { get[A](fieldName) }.toOption + +} diff --git a/core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala b/core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala new file mode 100644 index 0000000..857dd4c --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala @@ -0,0 +1,89 @@ +package xyz.driver.core +package init + +import java.nio.file.Paths + +import xyz.driver.core.messaging.{CreateOnDemand, GoogleBus, QueueBus, StreamBus} +import xyz.driver.core.reporting._ +import xyz.driver.core.rest.DnsDiscovery +import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage, GcsBlobStorage} + +import scala.collection.JavaConverters._ + +/** Mixin trait that provides essential cloud utilities. */ +trait CloudServices extends AkkaBootable { self => + + /** The platform that this application is running on. + * @group config + */ + def platform: Platform = Platform.current + + /** Service discovery for the current platform. + * @group utilities + */ + lazy val discovery: DnsDiscovery = { + def getOverrides(): Map[String, String] = { + val block = config.getObject("services.dev-overrides").unwrapped().asScala + for ((key, value) <- block) yield { + require(value.isInstanceOf[String], s"Service URL override for '$key' must be a string. Found '$value'.") + key -> value.toString + } + }.toMap + val overrides = platform match { + case Platform.Dev => getOverrides() + // TODO: currently, deployed services must be configured via Kubernetes DNS resolver. Maybe we may want to + // provide a way to override deployed services as well. + case _ => Map.empty[String, String] + } + new DnsDiscovery(clientTransport, overrides) + } + + /* TODO: this reporter uses the platform to determine if JSON logging should be enabled. + * Since the default logger uses slf4j, its settings must be specified before a logger + * is first accessed. This in turn leads to somewhat convoluted code, + * since we can't log when the platform being is determined. + * A potential fix would be to make the log format independent of the platform, and always log + * as JSON for example. + */ + override lazy val reporter: Reporter with ScalaLoggingCompat = { + Console.println("determining platform") // scalastyle:ignore + val r = platform match { + case p @ Platform.GoogleCloud(_, _) => + new GoogleReporter(p.credentials, p.namespace) with ScalaLoggingCompat with GoogleMdcLogger { + val logger = ScalaLoggingCompat.defaultScalaLogger(true) + } + case Platform.Dev => + new NoTraceReporter with ScalaLoggingCompat { + val logger = ScalaLoggingCompat.defaultScalaLogger(false) + } + } + r.info(s"application started on platform '${platform}'")(SpanContext.fresh()) + r + } + + /** Object storage. + * + * When running on a cloud platform, prepends `$project-` to bucket names, where `$project` + * is the project ID (for example 'driverinc-production` or `driverinc-sandbox`). + * + * @group utilities + */ + def storage(bucketName: String): BlobStorage = + platform match { + case p @ Platform.GoogleCloud(keyfile, _) => + GcsBlobStorage.fromKeyfile(keyfile, s"${p.project}-$bucketName") + case Platform.Dev => + new FileSystemBlobStorage(Paths.get(s".data-$bucketName")) + } + + /** Message bus. + * @group utilities + */ + def messageBus: StreamBus = platform match { + case p @ Platform.GoogleCloud(_, namespace) => + new GoogleBus(p.credentials, namespace) with StreamBus with CreateOnDemand + case Platform.Dev => + new QueueBus()(self.system) with StreamBus + } + +} diff --git a/core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala b/core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala new file mode 100644 index 0000000..81428bf --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala @@ -0,0 +1,100 @@ +package xyz.driver.core +package init + +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import akka.http.scaladsl.server.{RequestContext, Route, RouteConcatenation} +import spray.json.DefaultJsonProtocol._ +import spray.json._ +import xyz.driver.core.rest.Swagger +import xyz.driver.core.rest.directives.Directives +import akka.http.scaladsl.model.headers._ +import xyz.driver.core.reporting.Reporter.CausalRelation +import xyz.driver.core.rest.headers.Traceparent + +import scala.collection.JavaConverters._ + +/** Mixin trait that provides some well-known HTTP endpoints, diagnostic header injection and forwarding, + * and exposes an application-specific route that must be implemented by services. + * @see ProtobufApi + */ +trait HttpApi extends CloudServices with Directives with SprayJsonSupport { self => + + /** Route that handles the application's business logic. + * @group hooks + */ + def applicationRoute: Route + + /** Classes with Swagger annotations. + * @group hooks + */ + def swaggerRouteClasses: Set[Class[_]] + + private val healthRoute = path("health") { + complete(Map("status" -> "good").toJson) + } + + private val versionRoute = path("version") { + complete(Map("name" -> self.name.toJson, "version" -> self.version.toJson).toJson) + } + + private lazy val swaggerRoute = { + val generator = new Swagger( + "", + "https" :: "http" :: Nil, + self.version.getOrElse(""), + swaggerRouteClasses, + config, + reporter + ) + generator.routes ~ generator.swaggerUINew + } + + private def cors(inner: Route): Route = + cors( + config.getStringList("application.cors.allowedOrigins").asScala.toSet, + xyz.driver.core.rest.AllowedHeaders + )(inner) + + private def traced(inner: Route): Route = (ctx: RequestContext) => { + val tags = Map( + "service.version" -> version.getOrElse(""), + // open tracing semantic tags + "span.kind" -> "server", + "service" -> name, + "http.url" -> ctx.request.uri.toString, + "http.method" -> ctx.request.method.value, + "peer.hostname" -> ctx.request.uri.authority.host.toString, + // google's tracing console provides extra search features if we define these tags + "/http/path" -> ctx.request.uri.path.toString, + "/http/method" -> ctx.request.method.value.toString, + "/http/url" -> ctx.request.uri.toString, + "/http/user_agent" -> ctx.request.header[`User-Agent`].map(_.value).getOrElse("") + ) + val parent = ctx.request.header[Traceparent].map { header => + header.spanContext -> CausalRelation.Child + } + reporter + .traceWithOptionalParentAsync(s"http_handle_rpc", tags, parent) { spanContext => + val header = Traceparent(spanContext) + val withHeader = ctx.withRequest( + ctx.request + .removeHeader(header.name) + .addHeader(header) + ) + inner(withHeader) + } + } + + /** Extended route. */ + override lazy val route: Route = traced( + cors( + RouteConcatenation.concat( + healthRoute, + versionRoute, + swaggerRoute, + applicationRoute + ) + ) + ) + +} diff --git a/core-init/src/main/scala/xyz/driver/core/init/Platform.scala b/core-init/src/main/scala/xyz/driver/core/init/Platform.scala new file mode 100644 index 0000000..2daa2c8 --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/Platform.scala @@ -0,0 +1,31 @@ +package xyz.driver.core +package init + +import java.nio.file.{Files, Path, Paths} + +import com.google.auth.oauth2.ServiceAccountCredentials + +sealed trait Platform +object Platform { + case class GoogleCloud(keyfile: Path, namespace: String) extends Platform { + def credentials: ServiceAccountCredentials = ServiceAccountCredentials.fromStream( + Files.newInputStream(keyfile) + ) + def project: String = credentials.getProjectId + } + // case object AliCloud extends Platform + case object Dev extends Platform + + lazy val fromEnv: Platform = { + def isGoogle = sys.env.get("GOOGLE_APPLICATION_CREDENTIALS").map { value => + val keyfile = Paths.get(value) + require(Files.isReadable(keyfile), s"Google credentials file $value is not readable.") + val namespace = sys.env.getOrElse("SERVICE_NAMESPACE", sys.error("Namespace not set")) + GoogleCloud(keyfile, namespace) + } + isGoogle.getOrElse(Dev) + } + + def current: Platform = fromEnv + +} diff --git a/core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala b/core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala new file mode 100644 index 0000000..284ac67 --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala @@ -0,0 +1,7 @@ +package xyz.driver.core +package init + +/** Mixin trait for services that implement an API based on Protocol Buffers and gRPC. + * TODO: implement + */ +trait ProtobufApi extends AkkaBootable diff --git a/core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala b/core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala new file mode 100644 index 0000000..61ca363 --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala @@ -0,0 +1,4 @@ +package xyz.driver.core +package init + +trait SimpleHttpApp extends AkkaBootable with HttpApi with CloudServices diff --git a/src/main/scala/xyz/driver/core/init/AkkaBootable.scala b/src/main/scala/xyz/driver/core/init/AkkaBootable.scala deleted file mode 100644 index df6611e..0000000 --- a/src/main/scala/xyz/driver/core/init/AkkaBootable.scala +++ /dev/null @@ -1,190 +0,0 @@ -package xyz.driver.core -package init - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.StatusCodes -import akka.http.scaladsl.server.{RequestContext, Route} -import akka.stream.scaladsl.Source -import akka.stream.{ActorMaterializer, Materializer} -import akka.util.ByteString -import com.softwaremill.sttp.SttpBackend -import com.softwaremill.sttp.akkahttp.AkkaHttpBackend -import com.typesafe.config.Config -import kamon.Kamon -import kamon.statsd.StatsDReporter -import kamon.system.SystemMetrics -import xyz.driver.core.reporting.{NoTraceReporter, Reporter, ScalaLoggingCompat, SpanContext} -import xyz.driver.core.rest.HttpRestServiceTransport - -import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} - -/** Provides standard scaffolding for applications that use Akka HTTP. - * - * Among the features provided are: - * - * - execution contexts of various kinds - * - basic JVM metrics collection via Kamon - * - startup and shutdown hooks - * - * This trait provides a minimal, runnable application. It is designed to be extended by various mixins (see - * Known Subclasses) in this package. - * - * By implementing a "main" method, mixing this trait into a singleton object will result in a runnable - * application. - * I.e. - * {{{ - * object Main extends AkkaBootable // this is a runnable application - * }}} - * In case this trait isn't mixed into a top-level singleton object, the [[AkkaBootable#main main]] method should - * be called explicitly, in order to initialize and start this application. - * I.e. - * {{{ - * object Main { - * val bootable = new AkkaBootable {} - * def main(args: Array[String]): Unit = { - * bootable.main(args) - * } - * } - * }}} - * - * @groupname config Configuration - * @groupname contexts Contexts - * @groupname utilities Utilities - * @groupname hooks Overrideable Hooks - */ -trait AkkaBootable { - - /** The application's name. This value is extracted from the build configuration. - * @group config - */ - def name: String = BuildInfoReflection.name - - /** The application's version (or git sha). This value is extracted from the build configuration. - * @group config - */ - def version: Option[String] = BuildInfoReflection.version - - /** TCP port that this application will listen on. - * @group config - */ - def port: Int = 8080 - - // contexts - /** General-purpose actor system for this application. - * @group contexts - */ - implicit lazy val system: ActorSystem = ActorSystem(name) - - /** General-purpose stream materializer for this application. - * @group contexts - */ - implicit lazy val materializer: Materializer = ActorMaterializer() - - /** General-purpose execution context for this application. - * - * Note that no thread-blocking tasks should be submitted to this context. In cases that do require blocking, - * a custom execution context should be defined and used. See - * [[https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html this guide]] - * on how to configure custom execution contexts in Akka. - * - * @group contexts - */ - implicit lazy val executionContext: ExecutionContext = system.dispatcher - - /** Default HTTP client, backed by this application's actor system. - * @group contexts - */ - implicit lazy val httpClient: SttpBackend[Future, Source[ByteString, Any]] = AkkaHttpBackend.usingActorSystem(system) - - /** Client RPC transport abstraction. - * @group contexts - */ - implicit lazy val clientTransport: HttpRestServiceTransport = new HttpRestServiceTransport( - applicationName = Name(name), - applicationVersion = version.getOrElse(""), - actorSystem = system, - executionContext = executionContext, - reporter = reporter - ) - - // utilities - /** Default reporter instance. - * - * Note that this is currently defined to be a ScalaLoggerLike, so that it can be implicitly converted to a - * [[com.typesafe.scalalogging.Logger]] when necessary. This conversion is provided to ensure backwards - * compatibility with code that requires such a logger. Warning: using a logger instead of a reporter will - * not include tracing information in any messages! - * - * @group utilities - */ - def reporter: Reporter with ScalaLoggingCompat = - new Reporter with NoTraceReporter with ScalaLoggingCompat { - val logger = ScalaLoggingCompat.defaultScalaLogger(json = false) - } - - /** Top-level application configuration. - * - * TODO: should we expose some config wrapper rather than the typesafe config library? - * (Author's note: I'm a fan of TOML since it's so simple. There's already an implementation for Scala - * [[https://github.com/jvican/stoml]].) - * - * @group utilities - */ - def config: Config = system.settings.config - - /** Overridable startup hook. - * - * Invoked by [[main]] during application startup. - * - * @group hooks - */ - def startup(): Unit = () - - /** Overridable shutdown hook. - * - * Invoked on an arbitrary thread when a shutdown signal is caught. - * - * @group hooks - */ - def shutdown(): Unit = () - - /** Overridable HTTP route. - * - * Any services that present an HTTP interface should implement this method. - * - * @group hooks - * @see [[HttpApi]] - */ - def route: Route = (ctx: RequestContext) => ctx.complete(StatusCodes.NotFound) - - private def syslog(message: String)(implicit ctx: SpanContext) = reporter.info(s"application: " + message) - - /** This application's entry point. */ - def main(args: Array[String]): Unit = { - implicit val ctx = SpanContext.fresh() - syslog("initializing metrics collection") - Kamon.addReporter(new StatsDReporter()) - SystemMetrics.startCollecting() - - system.registerOnTermination { - syslog("running shutdown hooks") - shutdown() - syslog("bye!") - } - - syslog("running startup hooks") - startup() - - syslog("binding to network interface") - val binding = Await.result( - Http().bindAndHandle(route, "::", port), - 2.seconds - ) - syslog(s"listening to ${binding.localAddress}") - - syslog("startup complete") - } - -} diff --git a/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala b/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala deleted file mode 100644 index 0e53085..0000000 --- a/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala +++ /dev/null @@ -1,37 +0,0 @@ -package xyz.driver.core -package init - -import scala.reflect.runtime -import scala.util.Try -import scala.util.control.NonFatal - -/** Utility object to retrieve fields from static build configuration objects. */ -private[init] object BuildInfoReflection { - - final val BuildInfoName = "xyz.driver.BuildInfo" - - lazy val name: String = get[String]("name") - lazy val version: Option[String] = find[String]("version") - - /** Lookup a given field in the build configuration. This field is required to exist. */ - private def get[A](fieldName: String): A = - try { - val mirror = runtime.currentMirror - val module = mirror.staticModule(BuildInfoName) - val instance = mirror.reflectModule(module).instance - val accessor = module.info.decl(mirror.universe.TermName(fieldName)).asMethod - mirror.reflect(instance).reflectMethod(accessor).apply().asInstanceOf[A] - } catch { - case NonFatal(err) => - throw new RuntimeException( - s"Cannot find field name '$fieldName' in $BuildInfoName. Please define (or generate) a singleton " + - s"object with that field. Alternatively, in order to avoid runtime reflection, you may override the " + - s"caller with a static value.", - err - ) - } - - /** Try finding a given field in the build configuration. If the field does not exist, None is returned. */ - private def find[A](fieldName: String): Option[A] = Try { get[A](fieldName) }.toOption - -} diff --git a/src/main/scala/xyz/driver/core/init/CloudServices.scala b/src/main/scala/xyz/driver/core/init/CloudServices.scala deleted file mode 100644 index 857dd4c..0000000 --- a/src/main/scala/xyz/driver/core/init/CloudServices.scala +++ /dev/null @@ -1,89 +0,0 @@ -package xyz.driver.core -package init - -import java.nio.file.Paths - -import xyz.driver.core.messaging.{CreateOnDemand, GoogleBus, QueueBus, StreamBus} -import xyz.driver.core.reporting._ -import xyz.driver.core.rest.DnsDiscovery -import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage, GcsBlobStorage} - -import scala.collection.JavaConverters._ - -/** Mixin trait that provides essential cloud utilities. */ -trait CloudServices extends AkkaBootable { self => - - /** The platform that this application is running on. - * @group config - */ - def platform: Platform = Platform.current - - /** Service discovery for the current platform. - * @group utilities - */ - lazy val discovery: DnsDiscovery = { - def getOverrides(): Map[String, String] = { - val block = config.getObject("services.dev-overrides").unwrapped().asScala - for ((key, value) <- block) yield { - require(value.isInstanceOf[String], s"Service URL override for '$key' must be a string. Found '$value'.") - key -> value.toString - } - }.toMap - val overrides = platform match { - case Platform.Dev => getOverrides() - // TODO: currently, deployed services must be configured via Kubernetes DNS resolver. Maybe we may want to - // provide a way to override deployed services as well. - case _ => Map.empty[String, String] - } - new DnsDiscovery(clientTransport, overrides) - } - - /* TODO: this reporter uses the platform to determine if JSON logging should be enabled. - * Since the default logger uses slf4j, its settings must be specified before a logger - * is first accessed. This in turn leads to somewhat convoluted code, - * since we can't log when the platform being is determined. - * A potential fix would be to make the log format independent of the platform, and always log - * as JSON for example. - */ - override lazy val reporter: Reporter with ScalaLoggingCompat = { - Console.println("determining platform") // scalastyle:ignore - val r = platform match { - case p @ Platform.GoogleCloud(_, _) => - new GoogleReporter(p.credentials, p.namespace) with ScalaLoggingCompat with GoogleMdcLogger { - val logger = ScalaLoggingCompat.defaultScalaLogger(true) - } - case Platform.Dev => - new NoTraceReporter with ScalaLoggingCompat { - val logger = ScalaLoggingCompat.defaultScalaLogger(false) - } - } - r.info(s"application started on platform '${platform}'")(SpanContext.fresh()) - r - } - - /** Object storage. - * - * When running on a cloud platform, prepends `$project-` to bucket names, where `$project` - * is the project ID (for example 'driverinc-production` or `driverinc-sandbox`). - * - * @group utilities - */ - def storage(bucketName: String): BlobStorage = - platform match { - case p @ Platform.GoogleCloud(keyfile, _) => - GcsBlobStorage.fromKeyfile(keyfile, s"${p.project}-$bucketName") - case Platform.Dev => - new FileSystemBlobStorage(Paths.get(s".data-$bucketName")) - } - - /** Message bus. - * @group utilities - */ - def messageBus: StreamBus = platform match { - case p @ Platform.GoogleCloud(_, namespace) => - new GoogleBus(p.credentials, namespace) with StreamBus with CreateOnDemand - case Platform.Dev => - new QueueBus()(self.system) with StreamBus - } - -} diff --git a/src/main/scala/xyz/driver/core/init/HttpApi.scala b/src/main/scala/xyz/driver/core/init/HttpApi.scala deleted file mode 100644 index 81428bf..0000000 --- a/src/main/scala/xyz/driver/core/init/HttpApi.scala +++ /dev/null @@ -1,100 +0,0 @@ -package xyz.driver.core -package init - -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport -import akka.http.scaladsl.server.{RequestContext, Route, RouteConcatenation} -import spray.json.DefaultJsonProtocol._ -import spray.json._ -import xyz.driver.core.rest.Swagger -import xyz.driver.core.rest.directives.Directives -import akka.http.scaladsl.model.headers._ -import xyz.driver.core.reporting.Reporter.CausalRelation -import xyz.driver.core.rest.headers.Traceparent - -import scala.collection.JavaConverters._ - -/** Mixin trait that provides some well-known HTTP endpoints, diagnostic header injection and forwarding, - * and exposes an application-specific route that must be implemented by services. - * @see ProtobufApi - */ -trait HttpApi extends CloudServices with Directives with SprayJsonSupport { self => - - /** Route that handles the application's business logic. - * @group hooks - */ - def applicationRoute: Route - - /** Classes with Swagger annotations. - * @group hooks - */ - def swaggerRouteClasses: Set[Class[_]] - - private val healthRoute = path("health") { - complete(Map("status" -> "good").toJson) - } - - private val versionRoute = path("version") { - complete(Map("name" -> self.name.toJson, "version" -> self.version.toJson).toJson) - } - - private lazy val swaggerRoute = { - val generator = new Swagger( - "", - "https" :: "http" :: Nil, - self.version.getOrElse(""), - swaggerRouteClasses, - config, - reporter - ) - generator.routes ~ generator.swaggerUINew - } - - private def cors(inner: Route): Route = - cors( - config.getStringList("application.cors.allowedOrigins").asScala.toSet, - xyz.driver.core.rest.AllowedHeaders - )(inner) - - private def traced(inner: Route): Route = (ctx: RequestContext) => { - val tags = Map( - "service.version" -> version.getOrElse(""), - // open tracing semantic tags - "span.kind" -> "server", - "service" -> name, - "http.url" -> ctx.request.uri.toString, - "http.method" -> ctx.request.method.value, - "peer.hostname" -> ctx.request.uri.authority.host.toString, - // google's tracing console provides extra search features if we define these tags - "/http/path" -> ctx.request.uri.path.toString, - "/http/method" -> ctx.request.method.value.toString, - "/http/url" -> ctx.request.uri.toString, - "/http/user_agent" -> ctx.request.header[`User-Agent`].map(_.value).getOrElse("") - ) - val parent = ctx.request.header[Traceparent].map { header => - header.spanContext -> CausalRelation.Child - } - reporter - .traceWithOptionalParentAsync(s"http_handle_rpc", tags, parent) { spanContext => - val header = Traceparent(spanContext) - val withHeader = ctx.withRequest( - ctx.request - .removeHeader(header.name) - .addHeader(header) - ) - inner(withHeader) - } - } - - /** Extended route. */ - override lazy val route: Route = traced( - cors( - RouteConcatenation.concat( - healthRoute, - versionRoute, - swaggerRoute, - applicationRoute - ) - ) - ) - -} diff --git a/src/main/scala/xyz/driver/core/init/Platform.scala b/src/main/scala/xyz/driver/core/init/Platform.scala deleted file mode 100644 index 2daa2c8..0000000 --- a/src/main/scala/xyz/driver/core/init/Platform.scala +++ /dev/null @@ -1,31 +0,0 @@ -package xyz.driver.core -package init - -import java.nio.file.{Files, Path, Paths} - -import com.google.auth.oauth2.ServiceAccountCredentials - -sealed trait Platform -object Platform { - case class GoogleCloud(keyfile: Path, namespace: String) extends Platform { - def credentials: ServiceAccountCredentials = ServiceAccountCredentials.fromStream( - Files.newInputStream(keyfile) - ) - def project: String = credentials.getProjectId - } - // case object AliCloud extends Platform - case object Dev extends Platform - - lazy val fromEnv: Platform = { - def isGoogle = sys.env.get("GOOGLE_APPLICATION_CREDENTIALS").map { value => - val keyfile = Paths.get(value) - require(Files.isReadable(keyfile), s"Google credentials file $value is not readable.") - val namespace = sys.env.getOrElse("SERVICE_NAMESPACE", sys.error("Namespace not set")) - GoogleCloud(keyfile, namespace) - } - isGoogle.getOrElse(Dev) - } - - def current: Platform = fromEnv - -} diff --git a/src/main/scala/xyz/driver/core/init/ProtobufApi.scala b/src/main/scala/xyz/driver/core/init/ProtobufApi.scala deleted file mode 100644 index 284ac67..0000000 --- a/src/main/scala/xyz/driver/core/init/ProtobufApi.scala +++ /dev/null @@ -1,7 +0,0 @@ -package xyz.driver.core -package init - -/** Mixin trait for services that implement an API based on Protocol Buffers and gRPC. - * TODO: implement - */ -trait ProtobufApi extends AkkaBootable diff --git a/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala b/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala deleted file mode 100644 index 61ca363..0000000 --- a/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala +++ /dev/null @@ -1,4 +0,0 @@ -package xyz.driver.core -package init - -trait SimpleHttpApp extends AkkaBootable with HttpApi with CloudServices -- cgit v1.2.3