diff options
author | Stewart Stewart <stewinsalot@gmail.com> | 2017-04-03 11:54:55 -0700 |
---|---|---|
committer | Stewart Stewart <stewinsalot@gmail.com> | 2017-04-03 11:54:55 -0700 |
commit | 18b978b49649faf15ccb7c3a1f28c61d68bae20a (patch) | |
tree | a5058ccd44e673f9f4cb999bcc012b45cc3f88ce | |
parent | 5831ed23235daccc773362bfb5bb5dda70f7dc1b (diff) | |
parent | 0ed008ab290074eae0f11fa0149a736e8abd7064 (diff) | |
download | driver-core-18b978b49649faf15ccb7c3a1f28c61d68bae20a.tar.gz driver-core-18b978b49649faf15ccb7c3a1f28c61d68bae20a.tar.bz2 driver-core-18b978b49649faf15ccb7c3a1f28c61d68bae20a.zip |
Merge branch 'master' of github.com:drivergroup/driver-core
-rw-r--r-- | src/main/scala/xyz/driver/core/app.scala | 50 | ||||
-rw-r--r-- | src/main/scala/xyz/driver/core/execution.scala | 45 | ||||
-rw-r--r-- | src/main/scala/xyz/driver/core/rest.scala | 8 |
3 files changed, 78 insertions, 25 deletions
diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala index 4966b9d..361fb74 100644 --- a/src/main/scala/xyz/driver/core/app.scala +++ b/src/main/scala/xyz/driver/core/app.scala @@ -67,24 +67,32 @@ object app { val versionRt = versionRoute(version, gitHash, time.currentTime()) val _ = Future { - http.bindAndHandle(route2HandlerFlow({ ctx => - val trackingId = rest.extractTrackingId(ctx.request) - MDC.put("trackingId", trackingId) - - def requestLogging: Future[Unit] = Future { - log.audit(s"""Received request {"method":"${ctx.request.method.value}","url": "${ctx.request.uri}"}""") + http.bindAndHandle(route2HandlerFlow(extractHost { origin => + extractClientIP { + ip => + { ctx => + val trackingId = rest.extractTrackingId(ctx.request) + MDC.put("trackingId", trackingId) + MDC.put("origin", origin) + MDC.put("ip", ip.toOption.map(_.getHostAddress).getOrElse("unknown")) + + def requestLogging: Future[Unit] = Future { + log.audit( + s"""Received request {"method":"${ctx.request.method.value}","url": "${ctx.request.uri}"}""") + } + + val contextWithTrackingId = + ctx.withRequest(ctx.request.addHeader(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) + + handleExceptions(ExceptionHandler(exceptionHandler))({ c => + requestLogging.flatMap { _ => + respondWithHeaders(List(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) { + modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _) + }(c) + } + })(contextWithTrackingId) + } } - - val contextWithTrackingId = - ctx.withRequest(ctx.request.addHeader(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) - - handleExceptions(ExceptionHandler(exceptionHandler))({ c => - requestLogging.flatMap { _ => - respondWithHeaders(List(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) { - modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _) - }(c) - } - })(contextWithTrackingId) }), interface, port)(materializer) } } @@ -99,26 +107,26 @@ object app { case is: IllegalStateException => ctx => MDC.put("trackingId", rest.extractTrackingId(ctx.request)) - log.error(s"Request is not allowed to ${ctx.request.uri}", is) + log.error(s"Request is not allowed to ${ctx.request.method} ${ctx.request.uri}", is) complete(HttpResponse(BadRequest, entity = is.getMessage))(ctx) case cm: ConcurrentModificationException => ctx => MDC.put("trackingId", rest.extractTrackingId(ctx.request)) - log.error(s"Concurrent modification of the resource ${ctx.request.uri}", cm) + log.error(s"Concurrent modification of the resource ${ctx.request.method} ${ctx.request.uri}", cm) complete( HttpResponse(Conflict, entity = "Resource was changed concurrently, try requesting a newer version"))(ctx) case sex: SQLException => ctx => MDC.put("trackingId", rest.extractTrackingId(ctx.request)) - log.error(s"Database exception for the resource ${ctx.request.uri}", sex) + log.error(s"Database exception for the resource ${ctx.request.method} ${ctx.request.uri}", sex) complete(HttpResponse(InternalServerError, entity = "Data access error"))(ctx) case t: Throwable => ctx => MDC.put("trackingId", rest.extractTrackingId(ctx.request)) - log.error(s"Request to ${ctx.request.uri} could not be handled normally", t) + log.error(s"Request to ${ctx.request.method} ${ctx.request.uri} could not be handled normally", t) complete(HttpResponse(InternalServerError, entity = t.getMessage))(ctx) } diff --git a/src/main/scala/xyz/driver/core/execution.scala b/src/main/scala/xyz/driver/core/execution.scala new file mode 100644 index 0000000..0cf92fd --- /dev/null +++ b/src/main/scala/xyz/driver/core/execution.scala @@ -0,0 +1,45 @@ +package xyz.driver.core + +import scala.concurrent.{ExecutionContext, Future} +import scalaz.OptionT + +object execution { + + implicit class FutureOptionTExtensions[T](future: Future[T])(implicit executionContext: ExecutionContext) { + + def toOptionT: OptionT[Future, T] = + OptionT.optionT[Future](future.map(value => Option(value))) + + def returnUnit: Future[Unit] = + future.map(_ => Option(())) + + def returnUnitOpt: OptionT[Future, Unit] = + OptionT.optionT[Future](future.map(_ => Option(()))) + + def andEffect[E](effect: Future[E]): Future[T] = + for { + result <- future + _ <- effect + } yield result + + def andEffect[E](effect: OptionT[Future, E]): Future[T] = + andEffect(effect.run) + } + + def illegalState[T](message: String): OptionT[Future, T] = + failure[T](new IllegalStateException(message)) + + def illegalArgument[T](message: String): OptionT[Future, T] = + failure[T](new IllegalArgumentException(message)) + + def failure[T](throwable: Throwable): OptionT[Future, T] = + OptionT.optionT(Future.failed[Option[T]](throwable)) + + def collectOrNone[T, R](value: T)(f: PartialFunction[T, OptionT[Future, R]]): OptionT[Future, R] = + f.lift(value).getOrElse(OptionT.optionT(Future.successful(Option.empty[R]))) + + def collectOrDoNothing[T](value: T)(f: PartialFunction[T, OptionT[Future, Unit]]): OptionT[Future, Unit] = + f.lift(value).getOrElse(doNothing) + + val doNothing = OptionT.optionT(Future.successful(Option(()))) +} diff --git a/src/main/scala/xyz/driver/core/rest.scala b/src/main/scala/xyz/driver/core/rest.scala index b3a9116..29c965c 100644 --- a/src/main/scala/xyz/driver/core/rest.scala +++ b/src/main/scala/xyz/driver/core/rest.scala @@ -277,7 +277,7 @@ package rest { RawHeader(h._1, h._2): HttpHeader }: _*) - log.audit(s"Sending to ${request.uri} request $request with tracking id ${context.trackingId}") + log.audit(s"Sending request to ${request.method} ${request.uri}") val response = Http()(actorSystem).singleRequest(request)(materializer) @@ -289,8 +289,8 @@ package rest { case Failure(t: Throwable) => val responseTime = time.currentTime() - log.audit(s"Failed to receive response from ${request.uri} to request $requestStub", t) - log.error(s"Failed to receive response from ${request.uri} to request $requestStub", t) + log.audit(s"Failed to receive response from ${request.method} ${request.uri}", t) + log.error(s"Failed to receive response from ${request.method} ${request.uri}", t) stats.recordStats(Seq("request", request.uri.toString, "fail"), TimeRange(requestTime, responseTime), 1) }(executionContext) @@ -303,7 +303,7 @@ package rest { if (response.status == StatusCodes.NotFound) { Unmarshal(HttpEntity.Empty: ResponseEntity) } else if (response.status.isFailure()) { - throw new Exception(s"Http status is failure ${response.status}") + throw new Exception(s"Http status is failure ${response.status} for ${requestStub.method} ${requestStub.uri}") } else { Unmarshal(response.entity) } |