aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStewart Stewart <stewinsalot@gmail.com>2017-04-03 11:54:55 -0700
committerStewart Stewart <stewinsalot@gmail.com>2017-04-03 11:54:55 -0700
commit18b978b49649faf15ccb7c3a1f28c61d68bae20a (patch)
treea5058ccd44e673f9f4cb999bcc012b45cc3f88ce
parent5831ed23235daccc773362bfb5bb5dda70f7dc1b (diff)
parent0ed008ab290074eae0f11fa0149a736e8abd7064 (diff)
downloaddriver-core-18b978b49649faf15ccb7c3a1f28c61d68bae20a.tar.gz
driver-core-18b978b49649faf15ccb7c3a1f28c61d68bae20a.tar.bz2
driver-core-18b978b49649faf15ccb7c3a1f28c61d68bae20a.zip
Merge branch 'master' of github.com:drivergroup/driver-core
-rw-r--r--src/main/scala/xyz/driver/core/app.scala50
-rw-r--r--src/main/scala/xyz/driver/core/execution.scala45
-rw-r--r--src/main/scala/xyz/driver/core/rest.scala8
3 files changed, 78 insertions, 25 deletions
diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala
index 4966b9d..361fb74 100644
--- a/src/main/scala/xyz/driver/core/app.scala
+++ b/src/main/scala/xyz/driver/core/app.scala
@@ -67,24 +67,32 @@ object app {
val versionRt = versionRoute(version, gitHash, time.currentTime())
val _ = Future {
- http.bindAndHandle(route2HandlerFlow({ ctx =>
- val trackingId = rest.extractTrackingId(ctx.request)
- MDC.put("trackingId", trackingId)
-
- def requestLogging: Future[Unit] = Future {
- log.audit(s"""Received request {"method":"${ctx.request.method.value}","url": "${ctx.request.uri}"}""")
+ http.bindAndHandle(route2HandlerFlow(extractHost { origin =>
+ extractClientIP {
+ ip =>
+ { ctx =>
+ val trackingId = rest.extractTrackingId(ctx.request)
+ MDC.put("trackingId", trackingId)
+ MDC.put("origin", origin)
+ MDC.put("ip", ip.toOption.map(_.getHostAddress).getOrElse("unknown"))
+
+ def requestLogging: Future[Unit] = Future {
+ log.audit(
+ s"""Received request {"method":"${ctx.request.method.value}","url": "${ctx.request.uri}"}""")
+ }
+
+ val contextWithTrackingId =
+ ctx.withRequest(ctx.request.addHeader(RawHeader(ContextHeaders.TrackingIdHeader, trackingId)))
+
+ handleExceptions(ExceptionHandler(exceptionHandler))({ c =>
+ requestLogging.flatMap { _ =>
+ respondWithHeaders(List(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) {
+ modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _)
+ }(c)
+ }
+ })(contextWithTrackingId)
+ }
}
-
- val contextWithTrackingId =
- ctx.withRequest(ctx.request.addHeader(RawHeader(ContextHeaders.TrackingIdHeader, trackingId)))
-
- handleExceptions(ExceptionHandler(exceptionHandler))({ c =>
- requestLogging.flatMap { _ =>
- respondWithHeaders(List(RawHeader(ContextHeaders.TrackingIdHeader, trackingId))) {
- modules.map(_.route).foldLeft(versionRt ~ healthRoute ~ swaggerRoutes)(_ ~ _)
- }(c)
- }
- })(contextWithTrackingId)
}), interface, port)(materializer)
}
}
@@ -99,26 +107,26 @@ object app {
case is: IllegalStateException =>
ctx =>
MDC.put("trackingId", rest.extractTrackingId(ctx.request))
- log.error(s"Request is not allowed to ${ctx.request.uri}", is)
+ log.error(s"Request is not allowed to ${ctx.request.method} ${ctx.request.uri}", is)
complete(HttpResponse(BadRequest, entity = is.getMessage))(ctx)
case cm: ConcurrentModificationException =>
ctx =>
MDC.put("trackingId", rest.extractTrackingId(ctx.request))
- log.error(s"Concurrent modification of the resource ${ctx.request.uri}", cm)
+ log.error(s"Concurrent modification of the resource ${ctx.request.method} ${ctx.request.uri}", cm)
complete(
HttpResponse(Conflict, entity = "Resource was changed concurrently, try requesting a newer version"))(ctx)
case sex: SQLException =>
ctx =>
MDC.put("trackingId", rest.extractTrackingId(ctx.request))
- log.error(s"Database exception for the resource ${ctx.request.uri}", sex)
+ log.error(s"Database exception for the resource ${ctx.request.method} ${ctx.request.uri}", sex)
complete(HttpResponse(InternalServerError, entity = "Data access error"))(ctx)
case t: Throwable =>
ctx =>
MDC.put("trackingId", rest.extractTrackingId(ctx.request))
- log.error(s"Request to ${ctx.request.uri} could not be handled normally", t)
+ log.error(s"Request to ${ctx.request.method} ${ctx.request.uri} could not be handled normally", t)
complete(HttpResponse(InternalServerError, entity = t.getMessage))(ctx)
}
diff --git a/src/main/scala/xyz/driver/core/execution.scala b/src/main/scala/xyz/driver/core/execution.scala
new file mode 100644
index 0000000..0cf92fd
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/execution.scala
@@ -0,0 +1,45 @@
+package xyz.driver.core
+
+import scala.concurrent.{ExecutionContext, Future}
+import scalaz.OptionT
+
+object execution {
+
+ implicit class FutureOptionTExtensions[T](future: Future[T])(implicit executionContext: ExecutionContext) {
+
+ def toOptionT: OptionT[Future, T] =
+ OptionT.optionT[Future](future.map(value => Option(value)))
+
+ def returnUnit: Future[Unit] =
+ future.map(_ => Option(()))
+
+ def returnUnitOpt: OptionT[Future, Unit] =
+ OptionT.optionT[Future](future.map(_ => Option(())))
+
+ def andEffect[E](effect: Future[E]): Future[T] =
+ for {
+ result <- future
+ _ <- effect
+ } yield result
+
+ def andEffect[E](effect: OptionT[Future, E]): Future[T] =
+ andEffect(effect.run)
+ }
+
+ def illegalState[T](message: String): OptionT[Future, T] =
+ failure[T](new IllegalStateException(message))
+
+ def illegalArgument[T](message: String): OptionT[Future, T] =
+ failure[T](new IllegalArgumentException(message))
+
+ def failure[T](throwable: Throwable): OptionT[Future, T] =
+ OptionT.optionT(Future.failed[Option[T]](throwable))
+
+ def collectOrNone[T, R](value: T)(f: PartialFunction[T, OptionT[Future, R]]): OptionT[Future, R] =
+ f.lift(value).getOrElse(OptionT.optionT(Future.successful(Option.empty[R])))
+
+ def collectOrDoNothing[T](value: T)(f: PartialFunction[T, OptionT[Future, Unit]]): OptionT[Future, Unit] =
+ f.lift(value).getOrElse(doNothing)
+
+ val doNothing = OptionT.optionT(Future.successful(Option(())))
+}
diff --git a/src/main/scala/xyz/driver/core/rest.scala b/src/main/scala/xyz/driver/core/rest.scala
index b3a9116..29c965c 100644
--- a/src/main/scala/xyz/driver/core/rest.scala
+++ b/src/main/scala/xyz/driver/core/rest.scala
@@ -277,7 +277,7 @@ package rest {
RawHeader(h._1, h._2): HttpHeader
}: _*)
- log.audit(s"Sending to ${request.uri} request $request with tracking id ${context.trackingId}")
+ log.audit(s"Sending request to ${request.method} ${request.uri}")
val response = Http()(actorSystem).singleRequest(request)(materializer)
@@ -289,8 +289,8 @@ package rest {
case Failure(t: Throwable) =>
val responseTime = time.currentTime()
- log.audit(s"Failed to receive response from ${request.uri} to request $requestStub", t)
- log.error(s"Failed to receive response from ${request.uri} to request $requestStub", t)
+ log.audit(s"Failed to receive response from ${request.method} ${request.uri}", t)
+ log.error(s"Failed to receive response from ${request.method} ${request.uri}", t)
stats.recordStats(Seq("request", request.uri.toString, "fail"), TimeRange(requestTime, responseTime), 1)
}(executionContext)
@@ -303,7 +303,7 @@ package rest {
if (response.status == StatusCodes.NotFound) {
Unmarshal(HttpEntity.Empty: ResponseEntity)
} else if (response.status.isFailure()) {
- throw new Exception(s"Http status is failure ${response.status}")
+ throw new Exception(s"Http status is failure ${response.status} for ${requestStub.method} ${requestStub.uri}")
} else {
Unmarshal(response.entity)
}