From b175e287bab6ed7d40ca4c9291cb24e20d93398d Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 12 Sep 2018 17:47:25 -0700 Subject: Move init package to separate project --- .../scala/xyz/driver/core/init/AkkaBootable.scala | 190 +++++++++++++++++++++ .../xyz/driver/core/init/BuildInfoReflection.scala | 37 ++++ .../scala/xyz/driver/core/init/CloudServices.scala | 89 ++++++++++ .../main/scala/xyz/driver/core/init/HttpApi.scala | 100 +++++++++++ .../main/scala/xyz/driver/core/init/Platform.scala | 31 ++++ .../scala/xyz/driver/core/init/ProtobufApi.scala | 7 + .../scala/xyz/driver/core/init/SimpleHttpApp.scala | 4 + 7 files changed, 458 insertions(+) create mode 100644 core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/Platform.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala create mode 100644 core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala (limited to 'core-init') diff --git a/core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala b/core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala new file mode 100644 index 0000000..df6611e --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/AkkaBootable.scala @@ -0,0 +1,190 @@ +package xyz.driver.core +package init + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.server.{RequestContext, Route} +import akka.stream.scaladsl.Source +import akka.stream.{ActorMaterializer, Materializer} +import akka.util.ByteString +import com.softwaremill.sttp.SttpBackend +import com.softwaremill.sttp.akkahttp.AkkaHttpBackend +import com.typesafe.config.Config +import kamon.Kamon +import kamon.statsd.StatsDReporter +import kamon.system.SystemMetrics +import xyz.driver.core.reporting.{NoTraceReporter, Reporter, ScalaLoggingCompat, SpanContext} +import xyz.driver.core.rest.HttpRestServiceTransport + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + +/** Provides standard scaffolding for applications that use Akka HTTP. + * + * Among the features provided are: + * + * - execution contexts of various kinds + * - basic JVM metrics collection via Kamon + * - startup and shutdown hooks + * + * This trait provides a minimal, runnable application. It is designed to be extended by various mixins (see + * Known Subclasses) in this package. + * + * By implementing a "main" method, mixing this trait into a singleton object will result in a runnable + * application. + * I.e. + * {{{ + * object Main extends AkkaBootable // this is a runnable application + * }}} + * In case this trait isn't mixed into a top-level singleton object, the [[AkkaBootable#main main]] method should + * be called explicitly, in order to initialize and start this application. + * I.e. + * {{{ + * object Main { + * val bootable = new AkkaBootable {} + * def main(args: Array[String]): Unit = { + * bootable.main(args) + * } + * } + * }}} + * + * @groupname config Configuration + * @groupname contexts Contexts + * @groupname utilities Utilities + * @groupname hooks Overrideable Hooks + */ +trait AkkaBootable { + + /** The application's name. This value is extracted from the build configuration. + * @group config + */ + def name: String = BuildInfoReflection.name + + /** The application's version (or git sha). This value is extracted from the build configuration. + * @group config + */ + def version: Option[String] = BuildInfoReflection.version + + /** TCP port that this application will listen on. + * @group config + */ + def port: Int = 8080 + + // contexts + /** General-purpose actor system for this application. + * @group contexts + */ + implicit lazy val system: ActorSystem = ActorSystem(name) + + /** General-purpose stream materializer for this application. + * @group contexts + */ + implicit lazy val materializer: Materializer = ActorMaterializer() + + /** General-purpose execution context for this application. + * + * Note that no thread-blocking tasks should be submitted to this context. In cases that do require blocking, + * a custom execution context should be defined and used. See + * [[https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html this guide]] + * on how to configure custom execution contexts in Akka. + * + * @group contexts + */ + implicit lazy val executionContext: ExecutionContext = system.dispatcher + + /** Default HTTP client, backed by this application's actor system. + * @group contexts + */ + implicit lazy val httpClient: SttpBackend[Future, Source[ByteString, Any]] = AkkaHttpBackend.usingActorSystem(system) + + /** Client RPC transport abstraction. + * @group contexts + */ + implicit lazy val clientTransport: HttpRestServiceTransport = new HttpRestServiceTransport( + applicationName = Name(name), + applicationVersion = version.getOrElse(""), + actorSystem = system, + executionContext = executionContext, + reporter = reporter + ) + + // utilities + /** Default reporter instance. + * + * Note that this is currently defined to be a ScalaLoggerLike, so that it can be implicitly converted to a + * [[com.typesafe.scalalogging.Logger]] when necessary. This conversion is provided to ensure backwards + * compatibility with code that requires such a logger. Warning: using a logger instead of a reporter will + * not include tracing information in any messages! + * + * @group utilities + */ + def reporter: Reporter with ScalaLoggingCompat = + new Reporter with NoTraceReporter with ScalaLoggingCompat { + val logger = ScalaLoggingCompat.defaultScalaLogger(json = false) + } + + /** Top-level application configuration. + * + * TODO: should we expose some config wrapper rather than the typesafe config library? + * (Author's note: I'm a fan of TOML since it's so simple. There's already an implementation for Scala + * [[https://github.com/jvican/stoml]].) + * + * @group utilities + */ + def config: Config = system.settings.config + + /** Overridable startup hook. + * + * Invoked by [[main]] during application startup. + * + * @group hooks + */ + def startup(): Unit = () + + /** Overridable shutdown hook. + * + * Invoked on an arbitrary thread when a shutdown signal is caught. + * + * @group hooks + */ + def shutdown(): Unit = () + + /** Overridable HTTP route. + * + * Any services that present an HTTP interface should implement this method. + * + * @group hooks + * @see [[HttpApi]] + */ + def route: Route = (ctx: RequestContext) => ctx.complete(StatusCodes.NotFound) + + private def syslog(message: String)(implicit ctx: SpanContext) = reporter.info(s"application: " + message) + + /** This application's entry point. */ + def main(args: Array[String]): Unit = { + implicit val ctx = SpanContext.fresh() + syslog("initializing metrics collection") + Kamon.addReporter(new StatsDReporter()) + SystemMetrics.startCollecting() + + system.registerOnTermination { + syslog("running shutdown hooks") + shutdown() + syslog("bye!") + } + + syslog("running startup hooks") + startup() + + syslog("binding to network interface") + val binding = Await.result( + Http().bindAndHandle(route, "::", port), + 2.seconds + ) + syslog(s"listening to ${binding.localAddress}") + + syslog("startup complete") + } + +} diff --git a/core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala b/core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala new file mode 100644 index 0000000..0e53085 --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/BuildInfoReflection.scala @@ -0,0 +1,37 @@ +package xyz.driver.core +package init + +import scala.reflect.runtime +import scala.util.Try +import scala.util.control.NonFatal + +/** Utility object to retrieve fields from static build configuration objects. */ +private[init] object BuildInfoReflection { + + final val BuildInfoName = "xyz.driver.BuildInfo" + + lazy val name: String = get[String]("name") + lazy val version: Option[String] = find[String]("version") + + /** Lookup a given field in the build configuration. This field is required to exist. */ + private def get[A](fieldName: String): A = + try { + val mirror = runtime.currentMirror + val module = mirror.staticModule(BuildInfoName) + val instance = mirror.reflectModule(module).instance + val accessor = module.info.decl(mirror.universe.TermName(fieldName)).asMethod + mirror.reflect(instance).reflectMethod(accessor).apply().asInstanceOf[A] + } catch { + case NonFatal(err) => + throw new RuntimeException( + s"Cannot find field name '$fieldName' in $BuildInfoName. Please define (or generate) a singleton " + + s"object with that field. Alternatively, in order to avoid runtime reflection, you may override the " + + s"caller with a static value.", + err + ) + } + + /** Try finding a given field in the build configuration. If the field does not exist, None is returned. */ + private def find[A](fieldName: String): Option[A] = Try { get[A](fieldName) }.toOption + +} diff --git a/core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala b/core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala new file mode 100644 index 0000000..857dd4c --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/CloudServices.scala @@ -0,0 +1,89 @@ +package xyz.driver.core +package init + +import java.nio.file.Paths + +import xyz.driver.core.messaging.{CreateOnDemand, GoogleBus, QueueBus, StreamBus} +import xyz.driver.core.reporting._ +import xyz.driver.core.rest.DnsDiscovery +import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage, GcsBlobStorage} + +import scala.collection.JavaConverters._ + +/** Mixin trait that provides essential cloud utilities. */ +trait CloudServices extends AkkaBootable { self => + + /** The platform that this application is running on. + * @group config + */ + def platform: Platform = Platform.current + + /** Service discovery for the current platform. + * @group utilities + */ + lazy val discovery: DnsDiscovery = { + def getOverrides(): Map[String, String] = { + val block = config.getObject("services.dev-overrides").unwrapped().asScala + for ((key, value) <- block) yield { + require(value.isInstanceOf[String], s"Service URL override for '$key' must be a string. Found '$value'.") + key -> value.toString + } + }.toMap + val overrides = platform match { + case Platform.Dev => getOverrides() + // TODO: currently, deployed services must be configured via Kubernetes DNS resolver. Maybe we may want to + // provide a way to override deployed services as well. + case _ => Map.empty[String, String] + } + new DnsDiscovery(clientTransport, overrides) + } + + /* TODO: this reporter uses the platform to determine if JSON logging should be enabled. + * Since the default logger uses slf4j, its settings must be specified before a logger + * is first accessed. This in turn leads to somewhat convoluted code, + * since we can't log when the platform being is determined. + * A potential fix would be to make the log format independent of the platform, and always log + * as JSON for example. + */ + override lazy val reporter: Reporter with ScalaLoggingCompat = { + Console.println("determining platform") // scalastyle:ignore + val r = platform match { + case p @ Platform.GoogleCloud(_, _) => + new GoogleReporter(p.credentials, p.namespace) with ScalaLoggingCompat with GoogleMdcLogger { + val logger = ScalaLoggingCompat.defaultScalaLogger(true) + } + case Platform.Dev => + new NoTraceReporter with ScalaLoggingCompat { + val logger = ScalaLoggingCompat.defaultScalaLogger(false) + } + } + r.info(s"application started on platform '${platform}'")(SpanContext.fresh()) + r + } + + /** Object storage. + * + * When running on a cloud platform, prepends `$project-` to bucket names, where `$project` + * is the project ID (for example 'driverinc-production` or `driverinc-sandbox`). + * + * @group utilities + */ + def storage(bucketName: String): BlobStorage = + platform match { + case p @ Platform.GoogleCloud(keyfile, _) => + GcsBlobStorage.fromKeyfile(keyfile, s"${p.project}-$bucketName") + case Platform.Dev => + new FileSystemBlobStorage(Paths.get(s".data-$bucketName")) + } + + /** Message bus. + * @group utilities + */ + def messageBus: StreamBus = platform match { + case p @ Platform.GoogleCloud(_, namespace) => + new GoogleBus(p.credentials, namespace) with StreamBus with CreateOnDemand + case Platform.Dev => + new QueueBus()(self.system) with StreamBus + } + +} diff --git a/core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala b/core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala new file mode 100644 index 0000000..81428bf --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/HttpApi.scala @@ -0,0 +1,100 @@ +package xyz.driver.core +package init + +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import akka.http.scaladsl.server.{RequestContext, Route, RouteConcatenation} +import spray.json.DefaultJsonProtocol._ +import spray.json._ +import xyz.driver.core.rest.Swagger +import xyz.driver.core.rest.directives.Directives +import akka.http.scaladsl.model.headers._ +import xyz.driver.core.reporting.Reporter.CausalRelation +import xyz.driver.core.rest.headers.Traceparent + +import scala.collection.JavaConverters._ + +/** Mixin trait that provides some well-known HTTP endpoints, diagnostic header injection and forwarding, + * and exposes an application-specific route that must be implemented by services. + * @see ProtobufApi + */ +trait HttpApi extends CloudServices with Directives with SprayJsonSupport { self => + + /** Route that handles the application's business logic. + * @group hooks + */ + def applicationRoute: Route + + /** Classes with Swagger annotations. + * @group hooks + */ + def swaggerRouteClasses: Set[Class[_]] + + private val healthRoute = path("health") { + complete(Map("status" -> "good").toJson) + } + + private val versionRoute = path("version") { + complete(Map("name" -> self.name.toJson, "version" -> self.version.toJson).toJson) + } + + private lazy val swaggerRoute = { + val generator = new Swagger( + "", + "https" :: "http" :: Nil, + self.version.getOrElse(""), + swaggerRouteClasses, + config, + reporter + ) + generator.routes ~ generator.swaggerUINew + } + + private def cors(inner: Route): Route = + cors( + config.getStringList("application.cors.allowedOrigins").asScala.toSet, + xyz.driver.core.rest.AllowedHeaders + )(inner) + + private def traced(inner: Route): Route = (ctx: RequestContext) => { + val tags = Map( + "service.version" -> version.getOrElse(""), + // open tracing semantic tags + "span.kind" -> "server", + "service" -> name, + "http.url" -> ctx.request.uri.toString, + "http.method" -> ctx.request.method.value, + "peer.hostname" -> ctx.request.uri.authority.host.toString, + // google's tracing console provides extra search features if we define these tags + "/http/path" -> ctx.request.uri.path.toString, + "/http/method" -> ctx.request.method.value.toString, + "/http/url" -> ctx.request.uri.toString, + "/http/user_agent" -> ctx.request.header[`User-Agent`].map(_.value).getOrElse("") + ) + val parent = ctx.request.header[Traceparent].map { header => + header.spanContext -> CausalRelation.Child + } + reporter + .traceWithOptionalParentAsync(s"http_handle_rpc", tags, parent) { spanContext => + val header = Traceparent(spanContext) + val withHeader = ctx.withRequest( + ctx.request + .removeHeader(header.name) + .addHeader(header) + ) + inner(withHeader) + } + } + + /** Extended route. */ + override lazy val route: Route = traced( + cors( + RouteConcatenation.concat( + healthRoute, + versionRoute, + swaggerRoute, + applicationRoute + ) + ) + ) + +} diff --git a/core-init/src/main/scala/xyz/driver/core/init/Platform.scala b/core-init/src/main/scala/xyz/driver/core/init/Platform.scala new file mode 100644 index 0000000..2daa2c8 --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/Platform.scala @@ -0,0 +1,31 @@ +package xyz.driver.core +package init + +import java.nio.file.{Files, Path, Paths} + +import com.google.auth.oauth2.ServiceAccountCredentials + +sealed trait Platform +object Platform { + case class GoogleCloud(keyfile: Path, namespace: String) extends Platform { + def credentials: ServiceAccountCredentials = ServiceAccountCredentials.fromStream( + Files.newInputStream(keyfile) + ) + def project: String = credentials.getProjectId + } + // case object AliCloud extends Platform + case object Dev extends Platform + + lazy val fromEnv: Platform = { + def isGoogle = sys.env.get("GOOGLE_APPLICATION_CREDENTIALS").map { value => + val keyfile = Paths.get(value) + require(Files.isReadable(keyfile), s"Google credentials file $value is not readable.") + val namespace = sys.env.getOrElse("SERVICE_NAMESPACE", sys.error("Namespace not set")) + GoogleCloud(keyfile, namespace) + } + isGoogle.getOrElse(Dev) + } + + def current: Platform = fromEnv + +} diff --git a/core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala b/core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala new file mode 100644 index 0000000..284ac67 --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/ProtobufApi.scala @@ -0,0 +1,7 @@ +package xyz.driver.core +package init + +/** Mixin trait for services that implement an API based on Protocol Buffers and gRPC. + * TODO: implement + */ +trait ProtobufApi extends AkkaBootable diff --git a/core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala b/core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala new file mode 100644 index 0000000..61ca363 --- /dev/null +++ b/core-init/src/main/scala/xyz/driver/core/init/SimpleHttpApp.scala @@ -0,0 +1,4 @@ +package xyz.driver.core +package init + +trait SimpleHttpApp extends AkkaBootable with HttpApi with CloudServices -- cgit v1.2.3