diff options
Diffstat (limited to 'src/main/scala/xyz/driver/core/app.scala')
-rw-r--r-- | src/main/scala/xyz/driver/core/app.scala | 68 |
1 files changed, 42 insertions, 26 deletions
diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala index deb0e6a..12c6027 100644 --- a/src/main/scala/xyz/driver/core/app.scala +++ b/src/main/scala/xyz/driver/core/app.scala @@ -6,27 +6,27 @@ import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import akka.http.scaladsl.model.StatusCodes._ +import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.RawHeader -import akka.http.scaladsl.model.{HttpResponse, StatusCodes} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.RouteResult._ import akka.http.scaladsl.server.{ExceptionHandler, Route, RouteConcatenation} -import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Sink +import akka.stream.{ActorMaterializer, Materializer} import com.typesafe.config.Config import io.swagger.models.Scheme -import org.slf4j.LoggerFactory +import org.slf4j.{LoggerFactory, MDC} import spray.json.DefaultJsonProtocol import xyz.driver.core import xyz.driver.core.logging.{Logger, TypesafeScalaLogger} -import xyz.driver.core.rest.ContextHeaders -import xyz.driver.core.rest.Swagger +import xyz.driver.core.rest.{ContextHeaders, Swagger} import xyz.driver.core.stats.SystemStats import xyz.driver.core.time.Time import xyz.driver.core.time.provider.{SystemTimeProvider, TimeProvider} import scala.compat.Platform.ConcurrentModificationException import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} +import scala.concurrent.{Await, ExecutionContext, Future} object app { @@ -40,12 +40,10 @@ object app { interface: String = "::0", baseUrl: String = "localhost:8080", scheme: String = "http", - port: Int = 8080) { + port: Int = 8080)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext) { - implicit private lazy val actorSystem = ActorSystem("spray-routing", config) - implicit private lazy val executionContext = actorSystem.dispatcher - implicit private lazy val materializer = ActorMaterializer()(actorSystem) - private lazy val http = Http()(actorSystem) + implicit private lazy val materializer = ActorMaterializer()(actorSystem) + private lazy val http = Http()(actorSystem) def run(): Unit = { activateServices(modules) @@ -69,17 +67,35 @@ object app { val swaggerRoutes = swaggerService.routes ~ swaggerService.swaggerUI val versionRt = versionRoute(version, gitHash, time.currentTime()) + def entityAsString(entity: HttpEntity)(implicit m: Materializer, ex: ExecutionContext): Future[String] = + entity.dataBytes + .map(_.decodeString(entity.contentType.charsetOption.fold("UTF-8")(_.value))) + .runWith(Sink.head) + val _ = Future { - http.bindAndHandle(route2HandlerFlow(handleExceptions(ExceptionHandler(exceptionHandler)) { ctx => + http.bindAndHandle(route2HandlerFlow({ ctx => val trackingId = rest.extractTrackingId(ctx.request) - log.audit(s"Received request ${ctx.request} with tracking id $trackingId") + MDC.put("trackingId", trackingId) + + def requestLogging: Future[Unit] = { + entityAsString(ctx.request.entity).map { data => + s"""{"method":"${ctx.request.method.value}","url": "${ctx.request.uri}","entity":"$data"""".stripMargin + } map { requestJson => + MDC.put("message", "Received request") + log.audit(requestJson) + } + } val contextWithTrackingId = ctx.withRequest(ctx.request.addHeader(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) - respondWithHeaders(List(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) { - modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _) - }(contextWithTrackingId) + handleExceptions(ExceptionHandler(exceptionHandler))({ c => + requestLogging.flatMap { _ => + respondWithHeaders(List(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) { + modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _) + }(c) + } + })(contextWithTrackingId) }), interface, port)(materializer) } } @@ -93,27 +109,27 @@ object app { case is: IllegalStateException => ctx => - val trackingId = rest.extractTrackingId(ctx.request) - log.debug(s"Request is not allowed to ${ctx.request.uri} ($trackingId)", is) + MDC.put("trackingId", rest.extractTrackingId(ctx.request)) + log.error(s"Request is not allowed to ${ctx.request.uri}", is) complete(HttpResponse(BadRequest, entity = is.getMessage))(ctx) case cm: ConcurrentModificationException => ctx => - val trackingId = rest.extractTrackingId(ctx.request) - log.audit(s"Concurrent modification of the resource ${ctx.request.uri} ($trackingId)", cm) + MDC.put("trackingId", rest.extractTrackingId(ctx.request)) + log.error(s"Concurrent modification of the resource ${ctx.request.uri}", cm) complete( HttpResponse(Conflict, entity = "Resource was changed concurrently, try requesting a newer version"))(ctx) case sex: SQLException => ctx => - val trackingId = rest.extractTrackingId(ctx.request) - log.audit(s"Database exception for the resource ${ctx.request.uri} ($trackingId)", sex) + MDC.put("trackingId", rest.extractTrackingId(ctx.request)) + log.error(s"Database exception for the resource ${ctx.request.uri}", sex) complete(HttpResponse(InternalServerError, entity = "Data access error"))(ctx) case t: Throwable => ctx => - val trackingId = rest.extractTrackingId(ctx.request) - log.error(s"Request to ${ctx.request.uri} could not be handled normally ($trackingId)", t) + MDC.put("trackingId", rest.extractTrackingId(ctx.request)) + log.error(s"Request to ${ctx.request.uri} could not be handled normally", t) complete(HttpResponse(InternalServerError, entity = t.getMessage))(ctx) } @@ -191,7 +207,7 @@ object app { Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = { services.foreach { service => - Console.print(s"Service ${service.name} shutting down ...") + Console.print(s"Service ${service.name} shutting down ...\n") try { service.deactivate() } catch { @@ -199,7 +215,7 @@ object app { log.fatal(s"Service ${service.name} failed to deactivate", t) Console.print(" Failed! (check log)") } - Console.print(" Done\n") + Console.print(s"Service ${service.name} is shut down\n") } } }) |