aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/app.scala
diff options
context:
space:
mode:
authorVlad Uspensky <v.uspenskiy@icloud.com>2017-03-14 12:17:55 -0700
committerGitHub <noreply@github.com>2017-03-14 12:17:55 -0700
commitd30510d9976f0e140646a476d523d1aaa63d8192 (patch)
tree99af43977a6661a1a2152237bb015c7804a01053 /src/main/scala/xyz/driver/core/app.scala
parentc4ccac2705346d2c150612783a53bf6df50fece7 (diff)
parent8c4ccfbb428d236799a0178867906a6fee800a1d (diff)
downloaddriver-core-d30510d9976f0e140646a476d523d1aaa63d8192.tar.gz
driver-core-d30510d9976f0e140646a476d523d1aaa63d8192.tar.bz2
driver-core-d30510d9976f0e140646a476d523d1aaa63d8192.zip
Merge pull request #24 from drivergroup/mdc-revisionv0.10.27
Added writing tracking Id to MDC so it can be logged for all messages…
Diffstat (limited to 'src/main/scala/xyz/driver/core/app.scala')
-rw-r--r--src/main/scala/xyz/driver/core/app.scala68
1 files changed, 42 insertions, 26 deletions
diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala
index deb0e6a..12c6027 100644
--- a/src/main/scala/xyz/driver/core/app.scala
+++ b/src/main/scala/xyz/driver/core/app.scala
@@ -6,27 +6,27 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.model.StatusCodes._
+import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
-import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.RouteResult._
import akka.http.scaladsl.server.{ExceptionHandler, Route, RouteConcatenation}
-import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.Sink
+import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.Config
import io.swagger.models.Scheme
-import org.slf4j.LoggerFactory
+import org.slf4j.{LoggerFactory, MDC}
import spray.json.DefaultJsonProtocol
import xyz.driver.core
import xyz.driver.core.logging.{Logger, TypesafeScalaLogger}
-import xyz.driver.core.rest.ContextHeaders
-import xyz.driver.core.rest.Swagger
+import xyz.driver.core.rest.{ContextHeaders, Swagger}
import xyz.driver.core.stats.SystemStats
import xyz.driver.core.time.Time
import xyz.driver.core.time.provider.{SystemTimeProvider, TimeProvider}
import scala.compat.Platform.ConcurrentModificationException
import scala.concurrent.duration._
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{Await, ExecutionContext, Future}
object app {
@@ -40,12 +40,10 @@ object app {
interface: String = "::0",
baseUrl: String = "localhost:8080",
scheme: String = "http",
- port: Int = 8080) {
+ port: Int = 8080)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext) {
- implicit private lazy val actorSystem = ActorSystem("spray-routing", config)
- implicit private lazy val executionContext = actorSystem.dispatcher
- implicit private lazy val materializer = ActorMaterializer()(actorSystem)
- private lazy val http = Http()(actorSystem)
+ implicit private lazy val materializer = ActorMaterializer()(actorSystem)
+ private lazy val http = Http()(actorSystem)
def run(): Unit = {
activateServices(modules)
@@ -69,17 +67,35 @@ object app {
val swaggerRoutes = swaggerService.routes ~ swaggerService.swaggerUI
val versionRt = versionRoute(version, gitHash, time.currentTime())
+ def entityAsString(entity: HttpEntity)(implicit m: Materializer, ex: ExecutionContext): Future[String] =
+ entity.dataBytes
+ .map(_.decodeString(entity.contentType.charsetOption.fold("UTF-8")(_.value)))
+ .runWith(Sink.head)
+
val _ = Future {
- http.bindAndHandle(route2HandlerFlow(handleExceptions(ExceptionHandler(exceptionHandler)) { ctx =>
+ http.bindAndHandle(route2HandlerFlow({ ctx =>
val trackingId = rest.extractTrackingId(ctx.request)
- log.audit(s"Received request ${ctx.request} with tracking id $trackingId")
+ MDC.put("trackingId", trackingId)
+
+ def requestLogging: Future[Unit] = {
+ entityAsString(ctx.request.entity).map { data =>
+ s"""{"method":"${ctx.request.method.value}","url": "${ctx.request.uri}","entity":"$data"""".stripMargin
+ } map { requestJson =>
+ MDC.put("message", "Received request")
+ log.audit(requestJson)
+ }
+ }
val contextWithTrackingId =
ctx.withRequest(ctx.request.addHeader(RawHeader(ContextHeaders.TrackingIdHeader, trackingId)))
- respondWithHeaders(List(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) {
- modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _)
- }(contextWithTrackingId)
+ handleExceptions(ExceptionHandler(exceptionHandler))({ c =>
+ requestLogging.flatMap { _ =>
+ respondWithHeaders(List(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) {
+ modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _)
+ }(c)
+ }
+ })(contextWithTrackingId)
}), interface, port)(materializer)
}
}
@@ -93,27 +109,27 @@ object app {
case is: IllegalStateException =>
ctx =>
- val trackingId = rest.extractTrackingId(ctx.request)
- log.debug(s"Request is not allowed to ${ctx.request.uri} ($trackingId)", is)
+ MDC.put("trackingId", rest.extractTrackingId(ctx.request))
+ log.error(s"Request is not allowed to ${ctx.request.uri}", is)
complete(HttpResponse(BadRequest, entity = is.getMessage))(ctx)
case cm: ConcurrentModificationException =>
ctx =>
- val trackingId = rest.extractTrackingId(ctx.request)
- log.audit(s"Concurrent modification of the resource ${ctx.request.uri} ($trackingId)", cm)
+ MDC.put("trackingId", rest.extractTrackingId(ctx.request))
+ log.error(s"Concurrent modification of the resource ${ctx.request.uri}", cm)
complete(
HttpResponse(Conflict, entity = "Resource was changed concurrently, try requesting a newer version"))(ctx)
case sex: SQLException =>
ctx =>
- val trackingId = rest.extractTrackingId(ctx.request)
- log.audit(s"Database exception for the resource ${ctx.request.uri} ($trackingId)", sex)
+ MDC.put("trackingId", rest.extractTrackingId(ctx.request))
+ log.error(s"Database exception for the resource ${ctx.request.uri}", sex)
complete(HttpResponse(InternalServerError, entity = "Data access error"))(ctx)
case t: Throwable =>
ctx =>
- val trackingId = rest.extractTrackingId(ctx.request)
- log.error(s"Request to ${ctx.request.uri} could not be handled normally ($trackingId)", t)
+ MDC.put("trackingId", rest.extractTrackingId(ctx.request))
+ log.error(s"Request to ${ctx.request.uri} could not be handled normally", t)
complete(HttpResponse(InternalServerError, entity = t.getMessage))(ctx)
}
@@ -191,7 +207,7 @@ object app {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
services.foreach { service =>
- Console.print(s"Service ${service.name} shutting down ...")
+ Console.print(s"Service ${service.name} shutting down ...\n")
try {
service.deactivate()
} catch {
@@ -199,7 +215,7 @@ object app {
log.fatal(s"Service ${service.name} failed to deactivate", t)
Console.print(" Failed! (check log)")
}
- Console.print(" Done\n")
+ Console.print(s"Service ${service.name} is shut down\n")
}
}
})