From 6e9f40e4cacedfab43c92248d425866d73ea700e Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Mon, 16 Oct 2017 09:46:20 -0700 Subject: Split up app package into separate files --- src/main/scala/xyz/driver/core/app.scala | 426 --------------------- src/main/scala/xyz/driver/core/app/DriverApp.scala | 361 +++++++++++++++++ src/main/scala/xyz/driver/core/app/Module.scala | 53 +++ src/main/scala/xyz/driver/core/rest/package.scala | 49 ++- 4 files changed, 452 insertions(+), 437 deletions(-) delete mode 100644 src/main/scala/xyz/driver/core/app.scala create mode 100644 src/main/scala/xyz/driver/core/app/DriverApp.scala create mode 100644 src/main/scala/xyz/driver/core/app/Module.scala (limited to 'src') diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala deleted file mode 100644 index 19eef52..0000000 --- a/src/main/scala/xyz/driver/core/app.scala +++ /dev/null @@ -1,426 +0,0 @@ -package xyz.driver.core - -import java.sql.SQLException - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.RouteResult._ -import akka.http.scaladsl.server._ -import akka.stream.ActorMaterializer -import com.github.swagger.akka.SwaggerHttpService._ -import com.typesafe.config.Config -import com.typesafe.scalalogging.Logger -import io.swagger.models.Scheme -import io.swagger.util.Json -import org.slf4j.{LoggerFactory, MDC} -import xyz.driver.core -import xyz.driver.core.rest._ -import xyz.driver.core.stats.SystemStats -import xyz.driver.core.time.Time -import xyz.driver.core.time.provider.{SystemTimeProvider, TimeProvider} -import xyz.driver.tracing.TracingDirectives._ -import xyz.driver.tracing._ - -import scala.compat.Platform.ConcurrentModificationException -import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.reflect.runtime.universe._ -import scala.util.Try -import scala.util.control.NonFatal -import scalaz.Scalaz.stringInstance -import scalaz.syntax.equal._ - -object app { - - class DriverApp(appName: String, - version: String, - gitHash: String, - modules: Seq[Module], - time: TimeProvider = new SystemTimeProvider(), - log: Logger = Logger(LoggerFactory.getLogger(classOf[DriverApp])), - config: Config = core.config.loadDefaultConfig, - interface: String = "::0", - baseUrl: String = "localhost:8080", - scheme: String = "http", - port: Int = 8080, - tracer: Tracer = NoTracer)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext) { - - implicit private lazy val materializer = ActorMaterializer()(actorSystem) - private lazy val http = Http()(actorSystem) - val appEnvironment = config.getString("application.environment") - - def run(): Unit = { - activateServices(modules) - scheduleServicesDeactivation(modules) - bindHttp(modules) - Console.print(s"${this.getClass.getName} App is started\n") - } - - def stop(): Unit = { - http.shutdownAllConnectionPools().onComplete { _ => - Await.result(tracer.close(), 15.seconds) // flush out any remaining traces from the buffer - val _ = actorSystem.terminate() - val terminated = Await.result(actorSystem.whenTerminated, 30.seconds) - val addressTerminated = if (terminated.addressTerminated) "is" else "is not" - Console.print(s"${this.getClass.getName} App $addressTerminated stopped ") - } - } - - private def extractHeader(request: HttpRequest)(headerName: String): Option[String] = - request.headers.find(_.name().toLowerCase === headerName).map(_.value()) - - private val allowedHeaders = - Seq( - "Origin", - "X-Requested-With", - "Content-Type", - "Content-Length", - "Accept", - "X-Trace", - "Access-Control-Allow-Methods", - "Access-Control-Allow-Origin", - "Access-Control-Allow-Headers", - "Server", - "Date", - ContextHeaders.TrackingIdHeader, - ContextHeaders.TraceHeaderName, - ContextHeaders.SpanHeaderName, - ContextHeaders.StacktraceHeader, - ContextHeaders.AuthenticationTokenHeader, - "X-Frame-Options", - "X-Content-Type-Options", - "Strict-Transport-Security", - AuthProvider.SetAuthenticationTokenHeader, - AuthProvider.SetPermissionsTokenHeader - ) - - private def allowOrigin(originHeader: Option[Origin]) = - `Access-Control-Allow-Origin`( - originHeader.fold[HttpOriginRange](HttpOriginRange.*)(h => HttpOriginRange(h.origins: _*))) - - protected implicit def rejectionHandler = - RejectionHandler - .newBuilder() - .handleAll[MethodRejection] { rejections => - val methods = rejections map (_.supported) - lazy val names = methods map (_.name) mkString ", " - - options { ctx => - optionalHeaderValueByType[Origin](()) { originHeader => - respondWithHeaders(List[HttpHeader]( - Allow(methods), - `Access-Control-Allow-Methods`(methods), - allowOrigin(originHeader), - `Access-Control-Allow-Headers`(allowedHeaders: _*), - `Access-Control-Expose-Headers`(allowedHeaders: _*) - )) { - complete(s"Supported methods: $names.") - } - }(ctx) - } ~ - complete(MethodNotAllowed -> s"HTTP method not allowed, supported methods: $names!") - } - .result() - - protected def bindHttp(modules: Seq[Module]): Unit = { - val serviceTypes = modules.flatMap(_.routeTypes) - val swaggerService = swaggerOverride(serviceTypes) - val swaggerRoutes = swaggerService.routes ~ swaggerService.swaggerUI - val versionRt = versionRoute(version, gitHash, time.currentTime()) - - val _ = Future { - http.bindAndHandle( - route2HandlerFlow(extractHost { origin => - trace(tracer) { - extractClientIP { - ip => - optionalHeaderValueByType[Origin](()) { - originHeader => - { - ctx => - val trackingId = rest.extractTrackingId(ctx.request) - MDC.put("trackingId", trackingId) - - val updatedStacktrace = - (rest.extractStacktrace(ctx.request) ++ Array(appName)).mkString("->") - MDC.put("stack", updatedStacktrace) - - storeRequestContextToMdc(ctx.request, origin, ip) - - def requestLogging: Future[Unit] = Future { - log.info( - s"""Received request {"method":"${ctx.request.method.value}","url": "${ctx.request.uri}"}""") - } - - val contextWithTrackingId = - ctx.withRequest( - ctx.request - .addHeader(RawHeader(ContextHeaders.TrackingIdHeader, trackingId)) - .addHeader(RawHeader(ContextHeaders.StacktraceHeader, updatedStacktrace))) - - handleExceptions(ExceptionHandler(exceptionHandler))({ - c => - requestLogging.flatMap { _ => - val trackingHeader = RawHeader(ContextHeaders.TrackingIdHeader, trackingId) - - val responseHeaders = List[HttpHeader]( - trackingHeader, - allowOrigin(originHeader), - `Access-Control-Allow-Headers`(allowedHeaders: _*), - `Access-Control-Expose-Headers`(allowedHeaders: _*) - ) - - respondWithHeaders(responseHeaders) { - modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _) - }(c) - } - })(contextWithTrackingId) - } - } - } - } - }), - interface, - port - )(materializer) - } - } - - private def storeRequestContextToMdc(request: HttpRequest, origin: String, ip: RemoteAddress) = { - - MDC.put("origin", origin) - MDC.put("ip", ip.toOption.map(_.getHostAddress).getOrElse("unknown")) - MDC.put("remoteHost", ip.toOption.map(_.getHostName).getOrElse("unknown")) - - MDC.put("xForwardedFor", - extractHeader(request)("x-forwarded-for") - .orElse(extractHeader(request)("x_forwarded_for")) - .getOrElse("unknown")) - MDC.put("remoteAddress", extractHeader(request)("remote-address").getOrElse("unknown")) - MDC.put("userAgent", extractHeader(request)("user-agent").getOrElse("unknown")) - } - - protected def swaggerOverride(apiTypes: Seq[Type]) = { - new Swagger(baseUrl, Scheme.forValue(scheme), version, actorSystem, apiTypes, config) { - override def generateSwaggerJson: String = { - import io.swagger.models.Swagger - - import scala.collection.JavaConverters._ - - try { - val swagger: Swagger = reader.read(toJavaTypeSet(apiTypes).asJava) - - // Removing trailing spaces - swagger.setPaths( - swagger.getPaths.asScala - .map { - case (key, path) => - key.trim -> path - } - .toMap - .asJava) - - Json.pretty().writeValueAsString(swagger) - } catch { - case NonFatal(t) => { - logger.error("Issue with creating swagger.json", t) - throw t - } - } - } - } - } - - /** - * Override me for custom exception handling - * - * @return Exception handling route for exception type - */ - protected def exceptionHandler = PartialFunction[Throwable, Route] { - - case is: IllegalStateException => - ctx => - log.warn(s"Request is not allowed to ${ctx.request.method} ${ctx.request.uri}", is) - errorResponse(ctx, BadRequest, message = is.getMessage, is)(ctx) - - case cm: ConcurrentModificationException => - ctx => - log.warn(s"Concurrent modification of the resource ${ctx.request.method} ${ctx.request.uri}", cm) - errorResponse(ctx, Conflict, "Resource was changed concurrently, try requesting a newer version", cm)(ctx) - - case se: SQLException => - ctx => - log.warn(s"Database exception for the resource ${ctx.request.method} ${ctx.request.uri}", se) - errorResponse(ctx, InternalServerError, "Data access error", se)(ctx) - - case t: Throwable => - ctx => - log.warn(s"Request to ${ctx.request.method} ${ctx.request.uri} could not be handled normally", t) - errorResponse(ctx, InternalServerError, t.getMessage, t)(ctx) - } - - protected def errorResponse[T <: Throwable](ctx: RequestContext, - statusCode: StatusCode, - message: String, - exception: T): Route = { - - val trackingId = rest.extractTrackingId(ctx.request) - val tracingHeader = RawHeader(ContextHeaders.TrackingIdHeader, rest.extractTrackingId(ctx.request)) - - MDC.put("trackingId", trackingId) - - optionalHeaderValueByType[Origin](()) { originHeader => - val responseHeaders = List[HttpHeader](tracingHeader, - allowOrigin(originHeader), - `Access-Control-Allow-Headers`(allowedHeaders: _*), - `Access-Control-Expose-Headers`(allowedHeaders: _*)) - - respondWithHeaders(responseHeaders) { - complete(HttpResponse(statusCode, entity = message)) - } - } - } - - protected def versionRoute(version: String, gitHash: String, startupTime: Time): Route = { - import spray.json._ - import DefaultJsonProtocol._ - import SprayJsonSupport._ - - path("version") { - val currentTime = time.currentTime().millis - complete( - Map( - "version" -> version.toJson, - "gitHash" -> gitHash.toJson, - "modules" -> modules.map(_.name).toJson, - "dependencies" -> collectAppDependencies().toJson, - "startupTime" -> startupTime.millis.toString.toJson, - "serverTime" -> currentTime.toString.toJson, - "uptime" -> (currentTime - startupTime.millis).toString.toJson - ).toJson) - } - } - - protected def collectAppDependencies(): Map[String, String] = { - - def serviceWithLocation(serviceName: String): (String, String) = - serviceName -> Try(config.getString(s"services.$serviceName.baseUrl")).getOrElse("not-detected") - - modules.flatMap(module => module.serviceDiscovery.getUsedServices.map(serviceWithLocation).toSeq).toMap - } - - protected def healthRoute: Route = { - import spray.json._ - import DefaultJsonProtocol._ - import SprayJsonSupport._ - import spray.json._ - - val memoryUsage = SystemStats.memoryUsage - val gcStats = SystemStats.garbageCollectorStats - - path("health") { - complete( - Map( - "availableProcessors" -> SystemStats.availableProcessors.toJson, - "memoryUsage" -> Map( - "free" -> memoryUsage.free.toJson, - "total" -> memoryUsage.total.toJson, - "max" -> memoryUsage.max.toJson - ).toJson, - "gcStats" -> Map( - "garbageCollectionTime" -> gcStats.garbageCollectionTime.toJson, - "totalGarbageCollections" -> gcStats.totalGarbageCollections.toJson - ).toJson, - "fileSystemSpace" -> SystemStats.fileSystemSpace.map { f => - Map("path" -> f.path.toJson, - "freeSpace" -> f.freeSpace.toJson, - "totalSpace" -> f.totalSpace.toJson, - "usableSpace" -> f.usableSpace.toJson) - }.toJson, - "operatingSystem" -> SystemStats.operatingSystemStats.toJson - )) - } - } - - /** - * Initializes services - */ - protected def activateServices(services: Seq[Module]): Unit = { - services.foreach { service => - Console.print(s"Service ${service.name} starts ...") - try { - service.activate() - } catch { - case t: Throwable => - log.error(s"Service ${service.name} failed to activate", t) - Console.print(" Failed! (check log)") - } - Console.print(" Done\n") - } - } - - /** - * Schedules services to be deactivated on the app shutdown - */ - protected def scheduleServicesDeactivation(services: Seq[Module]) = { - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { - services.foreach { service => - Console.print(s"Service ${service.name} shutting down ...\n") - try { - service.deactivate() - } catch { - case t: Throwable => - log.error(s"Service ${service.name} failed to deactivate", t) - Console.print(" Failed! (check log)") - } - Console.print(s"Service ${service.name} is shut down\n") - } - } - }) - } - } - - trait Module { - val name: String - def route: Route - def routeTypes: Seq[Type] - - val serviceDiscovery: ServiceDiscovery with SavingUsedServiceDiscovery = new NoServiceDiscovery() - - def activate(): Unit = {} - def deactivate(): Unit = {} - } - - class EmptyModule extends Module { - val name = "Nothing" - def route: Route = complete(StatusCodes.OK) - def routeTypes = Seq.empty[Type] - } - - class SimpleModule(val name: String, val route: Route, routeType: Type) extends Module { - def routeTypes: Seq[Type] = Seq(routeType) - } - - /** - * Module implementation which may be used to composed a few - * - * @param name more general name of the composite module, - * must be provided as there is no good way to automatically - * generalize the name from the composed modules' names - * @param modules modules to compose into a single one - */ - class CompositeModule(val name: String, modules: Seq[Module]) extends Module with RouteConcatenation { - - def route: Route = RouteConcatenation.concat(modules.map(_.route): _*) - def routeTypes = modules.flatMap(_.routeTypes) - - override def activate() = modules.foreach(_.activate()) - override def deactivate() = modules.reverse.foreach(_.deactivate()) - } -} diff --git a/src/main/scala/xyz/driver/core/app/DriverApp.scala b/src/main/scala/xyz/driver/core/app/DriverApp.scala new file mode 100644 index 0000000..b73f426 --- /dev/null +++ b/src/main/scala/xyz/driver/core/app/DriverApp.scala @@ -0,0 +1,361 @@ +package xyz.driver.core.app + +import java.sql.SQLException + +import akka.actor.ActorSystem +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import akka.http.scaladsl.model.StatusCodes.{BadRequest, Conflict, InternalServerError, MethodNotAllowed} +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.RouteResult.route2HandlerFlow +import akka.http.scaladsl.server._ +import akka.http.scaladsl.{Http, HttpExt} +import akka.stream.ActorMaterializer +import com.github.swagger.akka.SwaggerHttpService.{logger, toJavaTypeSet} +import com.typesafe.config.Config +import com.typesafe.scalalogging.Logger +import io.swagger.models.Scheme +import io.swagger.util.Json +import org.slf4j.{LoggerFactory, MDC} +import xyz.driver.core +import xyz.driver.core.rest +import xyz.driver.core.rest.{ContextHeaders, Swagger} +import xyz.driver.core.stats.SystemStats +import xyz.driver.core.time.Time +import xyz.driver.core.time.provider.{SystemTimeProvider, TimeProvider} +import xyz.driver.tracing.TracingDirectives.trace +import xyz.driver.tracing.{NoTracer, Tracer} + +import scala.compat.Platform.ConcurrentModificationException +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.reflect.runtime.universe._ +import scala.util.Try +import scala.util.control.NonFatal +import scalaz.Scalaz.stringInstance +import scalaz.syntax.equal._ + +class DriverApp(appName: String, + version: String, + gitHash: String, + modules: Seq[Module], + time: TimeProvider = new SystemTimeProvider(), + log: Logger = Logger(LoggerFactory.getLogger(classOf[DriverApp])), + config: Config = core.config.loadDefaultConfig, + interface: String = "::0", + baseUrl: String = "localhost:8080", + scheme: String = "http", + port: Int = 8080, + tracer: Tracer = NoTracer)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext) { + + implicit private lazy val materializer: ActorMaterializer = ActorMaterializer()(actorSystem) + private lazy val http: HttpExt = Http()(actorSystem) + val appEnvironment: String = config.getString("application.environment") + + def run(): Unit = { + activateServices(modules) + scheduleServicesDeactivation(modules) + bindHttp(modules) + Console.print(s"${this.getClass.getName} App is started\n") + } + + def stop(): Unit = { + http.shutdownAllConnectionPools().onComplete { _ => + Await.result(tracer.close(), 15.seconds) // flush out any remaining traces from the buffer + val _ = actorSystem.terminate() + val terminated = Await.result(actorSystem.whenTerminated, 30.seconds) + val addressTerminated = if (terminated.addressTerminated) "is" else "is not" + Console.print(s"${this.getClass.getName} App $addressTerminated stopped ") + } + } + + private def extractHeader(request: HttpRequest)(headerName: String): Option[String] = + request.headers.find(_.name().toLowerCase === headerName).map(_.value()) + + private def allowOrigin(originHeader: Option[Origin]) = + `Access-Control-Allow-Origin`( + originHeader.fold[HttpOriginRange](HttpOriginRange.*)(h => HttpOriginRange(h.origins: _*))) + + protected implicit def rejectionHandler: RejectionHandler = + RejectionHandler + .newBuilder() + .handleAll[MethodRejection] { rejections => + val methods = rejections map (_.supported) + lazy val names = methods map (_.name) mkString ", " + + options { ctx => + optionalHeaderValueByType[Origin](()) { originHeader => + respondWithHeaders(List[HttpHeader]( + Allow(methods), + `Access-Control-Allow-Methods`(methods), + allowOrigin(originHeader), + `Access-Control-Allow-Headers`(rest.AllowedHeaders: _*), + `Access-Control-Expose-Headers`(rest.AllowedHeaders: _*) + )) { + complete(s"Supported methods: $names.") + } + }(ctx) + } ~ + complete(MethodNotAllowed -> s"HTTP method not allowed, supported methods: $names!") + } + .result() + + protected def bindHttp(modules: Seq[Module]): Unit = { + val serviceTypes = modules.flatMap(_.routeTypes) + val swaggerService = swaggerOverride(serviceTypes) + val swaggerRoutes = swaggerService.routes ~ swaggerService.swaggerUI + val versionRt = versionRoute(version, gitHash, time.currentTime()) + + val _ = Future { + http.bindAndHandle( + route2HandlerFlow(extractHost { origin => + trace(tracer) { + extractClientIP { ip => + optionalHeaderValueByType[Origin](()) { + originHeader => + { + ctx => + val trackingId = rest.extractTrackingId(ctx.request) + MDC.put("trackingId", trackingId) + + val updatedStacktrace = + (rest.extractStacktrace(ctx.request) ++ Array(appName)).mkString("->") + MDC.put("stack", updatedStacktrace) + + storeRequestContextToMdc(ctx.request, origin, ip) + + def requestLogging: Future[Unit] = Future { + log.info( + s"""Received request {"method":"${ctx.request.method.value}","url": "${ctx.request.uri}"}""") + } + + val contextWithTrackingId = + ctx.withRequest( + ctx.request + .addHeader(RawHeader(ContextHeaders.TrackingIdHeader, trackingId)) + .addHeader(RawHeader(ContextHeaders.StacktraceHeader, updatedStacktrace))) + + handleExceptions(ExceptionHandler(exceptionHandler))({ + c => + requestLogging.flatMap { _ => + val trackingHeader = RawHeader(ContextHeaders.TrackingIdHeader, trackingId) + + val responseHeaders = List[HttpHeader]( + trackingHeader, + allowOrigin(originHeader), + `Access-Control-Allow-Headers`(rest.AllowedHeaders: _*), + `Access-Control-Expose-Headers`(rest.AllowedHeaders: _*) + ) + + respondWithHeaders(responseHeaders) { + modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _) + }(c) + } + })(contextWithTrackingId) + } + } + } + } + }), + interface, + port + )(materializer) + } + } + + private def storeRequestContextToMdc(request: HttpRequest, origin: String, ip: RemoteAddress): Unit = { + + MDC.put("origin", origin) + MDC.put("ip", ip.toOption.map(_.getHostAddress).getOrElse("unknown")) + MDC.put("remoteHost", ip.toOption.map(_.getHostName).getOrElse("unknown")) + + MDC.put("xForwardedFor", + extractHeader(request)("x-forwarded-for") + .orElse(extractHeader(request)("x_forwarded_for")) + .getOrElse("unknown")) + MDC.put("remoteAddress", extractHeader(request)("remote-address").getOrElse("unknown")) + MDC.put("userAgent", extractHeader(request)("user-agent").getOrElse("unknown")) + } + + protected def swaggerOverride(apiTypes: Seq[Type]): Swagger = { + new Swagger(baseUrl, Scheme.forValue(scheme), version, actorSystem, apiTypes, config) { + override def generateSwaggerJson: String = { + import io.swagger.models.Swagger + + import scala.collection.JavaConverters._ + + try { + val swagger: Swagger = reader.read(toJavaTypeSet(apiTypes).asJava) + + // Removing trailing spaces + swagger.setPaths( + swagger.getPaths.asScala + .map { + case (key, path) => + key.trim -> path + } + .toMap + .asJava) + + Json.pretty().writeValueAsString(swagger) + } catch { + case NonFatal(t) => + logger.error("Issue with creating swagger.json", t) + throw t + } + } + } + } + + /** + * Override me for custom exception handling + * + * @return Exception handling route for exception type + */ + protected def exceptionHandler: PartialFunction[Throwable, Route] = { + + case is: IllegalStateException => + ctx => + log.warn(s"Request is not allowed to ${ctx.request.method} ${ctx.request.uri}", is) + errorResponse(ctx, BadRequest, message = is.getMessage, is)(ctx) + + case cm: ConcurrentModificationException => + ctx => + log.warn(s"Concurrent modification of the resource ${ctx.request.method} ${ctx.request.uri}", cm) + errorResponse(ctx, Conflict, "Resource was changed concurrently, try requesting a newer version", cm)(ctx) + + case se: SQLException => + ctx => + log.warn(s"Database exception for the resource ${ctx.request.method} ${ctx.request.uri}", se) + errorResponse(ctx, InternalServerError, "Data access error", se)(ctx) + + case t: Throwable => + ctx => + log.warn(s"Request to ${ctx.request.method} ${ctx.request.uri} could not be handled normally", t) + errorResponse(ctx, InternalServerError, t.getMessage, t)(ctx) + } + + protected def errorResponse[T <: Throwable](ctx: RequestContext, + statusCode: StatusCode, + message: String, + exception: T): Route = { + + val trackingId = rest.extractTrackingId(ctx.request) + val tracingHeader = RawHeader(ContextHeaders.TrackingIdHeader, rest.extractTrackingId(ctx.request)) + + MDC.put("trackingId", trackingId) + + optionalHeaderValueByType[Origin](()) { originHeader => + val responseHeaders = List[HttpHeader]( + tracingHeader, + allowOrigin(originHeader), + `Access-Control-Allow-Headers`(rest.AllowedHeaders: _*), + `Access-Control-Expose-Headers`(rest.AllowedHeaders: _*) + ) + + respondWithHeaders(responseHeaders) { + complete(HttpResponse(statusCode, entity = message)) + } + } + } + + protected def versionRoute(version: String, gitHash: String, startupTime: Time): Route = { + import spray.json._ + import DefaultJsonProtocol._ + import SprayJsonSupport._ + + path("version") { + val currentTime = time.currentTime().millis + complete( + Map( + "version" -> version.toJson, + "gitHash" -> gitHash.toJson, + "modules" -> modules.map(_.name).toJson, + "dependencies" -> collectAppDependencies().toJson, + "startupTime" -> startupTime.millis.toString.toJson, + "serverTime" -> currentTime.toString.toJson, + "uptime" -> (currentTime - startupTime.millis).toString.toJson + ).toJson) + } + } + + protected def collectAppDependencies(): Map[String, String] = { + + def serviceWithLocation(serviceName: String): (String, String) = + serviceName -> Try(config.getString(s"services.$serviceName.baseUrl")).getOrElse("not-detected") + + modules.flatMap(module => module.serviceDiscovery.getUsedServices.map(serviceWithLocation).toSeq).toMap + } + + protected def healthRoute: Route = { + import spray.json._ + import DefaultJsonProtocol._ + import SprayJsonSupport._ + import spray.json._ + + val memoryUsage = SystemStats.memoryUsage + val gcStats = SystemStats.garbageCollectorStats + + path("health") { + complete( + Map( + "availableProcessors" -> SystemStats.availableProcessors.toJson, + "memoryUsage" -> Map( + "free" -> memoryUsage.free.toJson, + "total" -> memoryUsage.total.toJson, + "max" -> memoryUsage.max.toJson + ).toJson, + "gcStats" -> Map( + "garbageCollectionTime" -> gcStats.garbageCollectionTime.toJson, + "totalGarbageCollections" -> gcStats.totalGarbageCollections.toJson + ).toJson, + "fileSystemSpace" -> SystemStats.fileSystemSpace.map { f => + Map("path" -> f.path.toJson, + "freeSpace" -> f.freeSpace.toJson, + "totalSpace" -> f.totalSpace.toJson, + "usableSpace" -> f.usableSpace.toJson) + }.toJson, + "operatingSystem" -> SystemStats.operatingSystemStats.toJson + )) + } + } + + /** + * Initializes services + */ + protected def activateServices(services: Seq[Module]): Unit = { + services.foreach { service => + Console.print(s"Service ${service.name} starts ...") + try { + service.activate() + } catch { + case t: Throwable => + log.error(s"Service ${service.name} failed to activate", t) + Console.print(" Failed! (check log)") + } + Console.print(" Done\n") + } + } + + /** + * Schedules services to be deactivated on the app shutdown + */ + protected def scheduleServicesDeactivation(services: Seq[Module]): Unit = { + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run(): Unit = { + services.foreach { service => + Console.print(s"Service ${service.name} shutting down ...\n") + try { + service.deactivate() + } catch { + case t: Throwable => + log.error(s"Service ${service.name} failed to deactivate", t) + Console.print(" Failed! (check log)") + } + Console.print(s"Service ${service.name} is shut down\n") + } + } + }) + } +} diff --git a/src/main/scala/xyz/driver/core/app/Module.scala b/src/main/scala/xyz/driver/core/app/Module.scala new file mode 100644 index 0000000..933b408 --- /dev/null +++ b/src/main/scala/xyz/driver/core/app/Module.scala @@ -0,0 +1,53 @@ +package xyz.driver.core.app + +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.server.Directives.complete +import akka.http.scaladsl.server.{Route, RouteConcatenation} +import xyz.driver.core.rest.{NoServiceDiscovery, SavingUsedServiceDiscovery, ServiceDiscovery} + +import scala.reflect.runtime.universe._ + +trait Module { + val name: String + def route: Route + def routeTypes: Seq[Type] + + val serviceDiscovery: ServiceDiscovery with SavingUsedServiceDiscovery = new NoServiceDiscovery() + + def activate(): Unit = {} + def deactivate(): Unit = {} +} + +object Module { + + class EmptyModule extends Module { + override val name: String = "Nothing" + + override def route: Route = complete(StatusCodes.OK) + + override def routeTypes: Seq[Type] = Seq.empty[Type] + } + + class SimpleModule(override val name: String, override val route: Route, routeType: Type) extends Module { + def routeTypes: Seq[Type] = Seq(routeType) + } + + /** + * Module implementation which may be used to composed a few + * + * @param name more general name of the composite module, + * must be provided as there is no good way to automatically + * generalize the name from the composed modules' names + * @param modules modules to compose into a single one + */ + class CompositeModule(override val name: String, modules: Seq[Module]) extends Module with RouteConcatenation { + + override def route: Route = RouteConcatenation.concat(modules.map(_.route): _*) + + override def routeTypes: Seq[Type] = modules.flatMap(_.routeTypes) + + override def activate(): Unit = modules.foreach(_.activate()) + + override def deactivate(): Unit = modules.reverse.foreach(_.deactivate()) + } +} diff --git a/src/main/scala/xyz/driver/core/rest/package.scala b/src/main/scala/xyz/driver/core/rest/package.scala index 4c8e13c..e6eb8d6 100644 --- a/src/main/scala/xyz/driver/core/rest/package.scala +++ b/src/main/scala/xyz/driver/core/rest/package.scala @@ -5,27 +5,54 @@ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ import akka.stream.scaladsl.Flow import akka.util.ByteString +import xyz.driver.tracing.TracingDirectives import scalaz.Scalaz.{intInstance, stringInstance} import scalaz.syntax.equal._ package object rest { object ContextHeaders { - val AuthenticationTokenHeader = "Authorization" - val PermissionsTokenHeader = "Permissions" - val AuthenticationHeaderPrefix = "Bearer" - val TrackingIdHeader = "X-Trace" - val StacktraceHeader = "X-Stacktrace" - val TracingHeader = trace.TracingHeaderKey + val AuthenticationTokenHeader: String = "Authorization" + val PermissionsTokenHeader: String = "Permissions" + val AuthenticationHeaderPrefix: String = "Bearer" + val TrackingIdHeader: String = "X-Trace" + val StacktraceHeader: String = "X-Stacktrace" + val TraceHeaderName: String = TracingDirectives.TraceHeaderName + val SpanHeaderName: String = TracingDirectives.SpanHeaderName } object AuthProvider { - val AuthenticationTokenHeader = ContextHeaders.AuthenticationTokenHeader - val PermissionsTokenHeader = ContextHeaders.PermissionsTokenHeader - val SetAuthenticationTokenHeader = "set-authorization" - val SetPermissionsTokenHeader = "set-permissions" + val AuthenticationTokenHeader: String = ContextHeaders.AuthenticationTokenHeader + val PermissionsTokenHeader: String = ContextHeaders.PermissionsTokenHeader + val SetAuthenticationTokenHeader: String = "set-authorization" + val SetPermissionsTokenHeader: String = "set-permissions" } + val AllowedHeaders: Seq[String] = + Seq( + "Origin", + "X-Requested-With", + "Content-Type", + "Content-Length", + "Accept", + "X-Trace", + "Access-Control-Allow-Methods", + "Access-Control-Allow-Origin", + "Access-Control-Allow-Headers", + "Server", + "Date", + ContextHeaders.TrackingIdHeader, + ContextHeaders.TraceHeaderName, + ContextHeaders.SpanHeaderName, + ContextHeaders.StacktraceHeader, + ContextHeaders.AuthenticationTokenHeader, + "X-Frame-Options", + "X-Content-Type-Options", + "Strict-Transport-Security", + AuthProvider.SetAuthenticationTokenHeader, + AuthProvider.SetPermissionsTokenHeader + ) + def serviceContext: Directive1[ServiceRequestContext] = extract(ctx => extractServiceContext(ctx.request)) def extractServiceContext(request: HttpRequest): ServiceRequestContext = @@ -44,7 +71,7 @@ package object rest { request.headers.filter { h => h.name === ContextHeaders.AuthenticationTokenHeader || h.name === ContextHeaders.TrackingIdHeader || h.name === ContextHeaders.PermissionsTokenHeader || h.name === ContextHeaders.StacktraceHeader || - h.name === ContextHeaders.TracingHeader + h.name === ContextHeaders.TraceHeaderName || h.name === ContextHeaders.SpanHeaderName } map { header => if (header.name === ContextHeaders.AuthenticationTokenHeader) { header.name -> header.value.stripPrefix(ContextHeaders.AuthenticationHeaderPrefix).trim -- cgit v1.2.3