aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/http
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/http')
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala90
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala115
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/http/Directives.scala114
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/http/package.scala9
4 files changed, 0 insertions, 328 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala b/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala
deleted file mode 100644
index 085dcd8..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-package xyz.driver.pdsuicommon.http
-
-import java.io.Closeable
-import java.net.URL
-import java.util.concurrent.{ExecutorService, Executors}
-
-import com.typesafe.scalalogging.StrictLogging
-import org.asynchttpclient._
-import org.slf4j.MDC
-import xyz.driver.pdsuicommon.concurrent.MdcThreadFactory
-import xyz.driver.pdsuicommon.utils.RandomUtils
-
-import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{ExecutionContext, Future, Promise}
-
-class AsyncHttpClientFetcher(settings: AsyncHttpClientFetcher.Settings)
- extends HttpFetcher with Closeable with StrictLogging {
-
- private val es: ExecutorService = {
- val threadFactory = MdcThreadFactory.from(Executors.defaultThreadFactory())
- Executors.newSingleThreadExecutor(threadFactory)
- }
-
- private implicit val executionContext = ExecutionContext.fromExecutor(es)
-
- private def httpClientConfig: DefaultAsyncHttpClientConfig = {
- val builder = new DefaultAsyncHttpClientConfig.Builder()
- builder.setConnectTimeout(settings.connectTimeout.toMillis.toInt)
- builder.setReadTimeout(settings.readTimeout.toMillis.toInt)
- // builder.setThreadFactory(threadFactory) // Doesn't help to push MDC context into AsyncCompletionHandler
- builder.build()
- }
-
- private val httpClient = new DefaultAsyncHttpClient(httpClientConfig)
-
- override def apply(url: URL): Future[Array[Byte]] = {
- val fingerPrint = RandomUtils.randomString(10)
-
- // log all outcome connections
- logger.info("{}, apply({})", fingerPrint, url)
- val promise = Promise[Response]()
-
- httpClient
- .prepareGet(url.toString)
- .execute(new AsyncCompletionHandler[Response] {
- override def onCompleted(response: Response): Response = {
- promise.success(response)
- response
- }
-
- override def onThrowable(t: Throwable): Unit = {
- promise.failure(t)
- super.onThrowable(t)
- }
- })
-
- // Promises have their own ExecutionContext
- // So, we have to hack it.
- val parentMdcContext = MDC.getCopyOfContextMap
- promise.future.flatMap { response =>
- setContextMap(parentMdcContext)
-
- if (response.getStatusCode == 200) {
- // DO NOT LOG body, it could be PHI
- val bytes = response.getResponseBodyAsBytes
- logger.debug("{}, size is {}B", fingerPrint, bytes.size.asInstanceOf[AnyRef])
- Future.successful(bytes)
- } else {
- logger.error("{}, HTTP {}", fingerPrint, response.getStatusCode.asInstanceOf[AnyRef])
- logger.trace(response.getResponseBody().take(100))
- Future.failed(new IllegalStateException("An unexpected response from the server"))
- }
- }
- }
-
- private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
- Option(context).fold(MDC.clear())(MDC.setContextMap)
-
- override def close(): Unit = {
- httpClient.close()
- es.shutdown()
- }
-
-}
-
-object AsyncHttpClientFetcher {
-
- final case class Settings(connectTimeout: FiniteDuration, readTimeout: FiniteDuration)
-
-}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala b/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala
deleted file mode 100644
index d7bc3d3..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-package xyz.driver.pdsuicommon.http
-
-import java.io.Closeable
-import java.net.URI
-import java.util.concurrent.{ExecutorService, Executors}
-
-import xyz.driver.pdsuicommon.concurrent.MdcThreadFactory
-import xyz.driver.pdsuicommon.http.AsyncHttpClientUploader._
-import xyz.driver.pdsuicommon.utils.RandomUtils
-import com.typesafe.scalalogging.StrictLogging
-import org.asynchttpclient._
-import org.slf4j.MDC
-
-import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{ExecutionContext, Future, Promise}
-
-class AsyncHttpClientUploader(settings: Settings) extends Closeable with StrictLogging {
-
- private val es: ExecutorService = {
- val threadFactory = MdcThreadFactory.from(Executors.defaultThreadFactory())
- Executors.newSingleThreadExecutor(threadFactory)
- }
-
- private implicit val executionContext = ExecutionContext.fromExecutor(es)
-
- private def httpClientConfig: DefaultAsyncHttpClientConfig = {
- val builder = new DefaultAsyncHttpClientConfig.Builder()
- builder.setConnectTimeout(settings.connectTimeout.toMillis.toInt)
- builder.setRequestTimeout(settings.requestTimeout.toMillis.toInt)
- // builder.setThreadFactory(threadFactory) // Doesn't help to push MDC context into AsyncCompletionHandler
- builder.build()
- }
-
- private val httpClient = new DefaultAsyncHttpClient(httpClientConfig)
-
- def run(method: Method, uri: URI, contentType: String, data: String): Future[Unit] = {
- // log all outcome connections
- val fingerPrint = RandomUtils.randomString(10)
- logger.info("{}, apply(method={}, uri={}, contentType={})", fingerPrint, method, uri, contentType)
- val promise = Promise[Response]()
-
- val q = new RequestBuilder(method.toString)
- .setUrl(uri.toString)
- .setBody(data)
-
- settings.defaultHeaders.foreach {
- case (k, v) =>
- q.setHeader(k, v)
- }
-
- q.addHeader("Content-Type", contentType)
-
- httpClient
- .prepareRequest(q)
- .execute(new AsyncCompletionHandler[Unit] {
- override def onCompleted(response: Response): Unit = {
- promise.success(response)
- }
-
- override def onThrowable(t: Throwable): Unit = {
- promise.failure(t)
- super.onThrowable(t)
- }
- })
-
- // see AsyncHttpClientFetcher
- val parentMdcContext = MDC.getCopyOfContextMap
- promise.future.flatMap { response =>
- setContextMap(parentMdcContext)
-
- val statusCode = response.getStatusCode
- // https://en.wikipedia.org/wiki/List_of_HTTP_status_codes#2xx_Success
- if (statusCode >= 200 && statusCode < 300) {
- logger.debug("{}, success", fingerPrint)
- Future.successful(())
- } else {
- logger.error(
- "{}, HTTP {}, BODY:\n{}",
- fingerPrint,
- response.getStatusCode.asInstanceOf[AnyRef],
- response.getResponseBody.take(100)
- )
- Future.failed(new IllegalStateException("An unexpected response from the server"))
- }
- }
- }
-
- private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
- Option(context).fold(MDC.clear())(MDC.setContextMap)
-
- override def close(): Unit = {
- httpClient.close()
- es.shutdown()
- }
-}
-
-object AsyncHttpClientUploader {
-
- final case class Settings(connectTimeout: FiniteDuration,
- requestTimeout: FiniteDuration,
- defaultHeaders: Map[String, String] = Map.empty)
-
- sealed trait Method
-
- object Method {
-
- case object Put extends Method {
- override val toString = "PUT"
- }
-
- case object Post extends Method {
- override val toString = "POST"
- }
- }
-}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/http/Directives.scala b/src/main/scala/xyz/driver/pdsuicommon/http/Directives.scala
deleted file mode 100644
index 46b86a6..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/http/Directives.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-package xyz.driver.pdsuicommon.http
-
-import akka.http.scaladsl.server._
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.model._
-import xyz.driver.core.app.DriverApp
-import xyz.driver.pdsuicommon.error._
-import xyz.driver.pdsuicommon.error.DomainError._
-import xyz.driver.pdsuicommon.error.ErrorsResponse.ResponseError
-import xyz.driver.pdsuicommon.parsers._
-import xyz.driver.pdsuicommon.db.{Pagination, SearchFilterExpr, Sorting}
-import xyz.driver.pdsuicommon.domain._
-import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import xyz.driver.core.generators
-import xyz.driver.core.rest.ContextHeaders
-import xyz.driver.core.rest.errors.{InvalidActionException, InvalidInputException, ResourceNotFoundException}
-
-import scala.util.control._
-import scala.util._
-
-trait Directives {
-
- val paginated: Directive1[Pagination] = parameterSeq.flatMap { params =>
- PaginationParser.parse(params) match {
- case Success(pagination) => provide(pagination)
- case Failure(ex) =>
- reject(ValidationRejection("invalid pagination parameter", Some(ex)))
- }
- }
-
- def sorted(validDimensions: Set[String] = Set.empty): Directive1[Sorting] = parameterSeq.flatMap { params =>
- SortingParser.parse(validDimensions, params) match {
- case Success(sorting) => provide(sorting)
- case Failure(ex) =>
- reject(ValidationRejection("invalid sorting parameter", Some(ex)))
- }
- }
-
- val dimensioned: Directive1[Dimensions] = parameterSeq.flatMap { params =>
- DimensionsParser.tryParse(params) match {
- case Success(dims) => provide(dims)
- case Failure(ex) =>
- reject(ValidationRejection("invalid dimension parameter", Some(ex)))
- }
- }
-
- val searchFiltered: Directive1[SearchFilterExpr] = parameterSeq.flatMap { params =>
- SearchFilterParser.parse(params) match {
- case Success(sorting) => provide(sorting)
- case Failure(ex) =>
- reject(ValidationRejection("invalid filter parameter", Some(ex)))
- }
- }
-
- def StringIdInPath[T]: PathMatcher1[StringId[T]] =
- PathMatchers.Segment.map((id) => StringId(id.toString))
-
- def LongIdInPath[T]: PathMatcher1[LongId[T]] =
- PathMatchers.LongNumber.map((id) => LongId(id))
-
- def UuidIdInPath[T]: PathMatcher1[UuidId[T]] =
- PathMatchers.JavaUUID.map((id) => UuidId(id))
-
- def failFast[A](reply: A): A = reply match {
- case err: NotFoundError => throw ResourceNotFoundException(err.getMessage)
- case err: AuthorizationError => throw InvalidActionException(err.getMessage)
- case err: DomainError => throw InvalidInputException(err.getMessage)
- case other => other
- }
-
- def domainExceptionHandler(req: String): ExceptionHandler = {
- def errorResponse(msg: String, code: Int) =
- ErrorsResponse(Seq(ResponseError(None, msg, code)), req)
- ExceptionHandler {
- case ex: InvalidActionException =>
- complete(StatusCodes.Forbidden -> errorResponse(ex.message, 403))
-
- case ex: ResourceNotFoundException =>
- complete(StatusCodes.NotFound -> errorResponse(ex.message, 404))
-
- case ex: InvalidInputException =>
- complete(StatusCodes.BadRequest -> errorResponse(ex.message, 400))
-
- case NonFatal(ex) =>
- complete(StatusCodes.InternalServerError -> errorResponse(ex.getMessage, 500))
- }
- }
-
- def domainRejectionHandler(req: String): RejectionHandler = {
- def wrapContent(message: String) = {
- import ErrorsResponse._
- val err: ErrorsResponse = ErrorsResponse(Seq(ResponseError(None, message, 1)), req)
- val text = errorsResponseJsonFormat.write(err).toString()
- HttpEntity(ContentTypes.`application/json`, text)
- }
- DriverApp.rejectionHandler.mapRejectionResponse {
- case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) =>
- res.copy(entity = wrapContent(ent.data.utf8String))
- case x => x // pass through all other types of responses
- }
- }
-
- val tracked: Directive1[String] = optionalHeaderValueByName(ContextHeaders.TrackingIdHeader) flatMap {
- case Some(id) => provide(id)
- case None => provide(generators.nextUuid().toString)
- }
-
- val domainResponse: Directive0 = tracked.flatMap { id =>
- handleExceptions(domainExceptionHandler(id)) & handleRejections(domainRejectionHandler(id))
- }
-
-}
-
-object Directives extends Directives
diff --git a/src/main/scala/xyz/driver/pdsuicommon/http/package.scala b/src/main/scala/xyz/driver/pdsuicommon/http/package.scala
deleted file mode 100644
index 20b1c97..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/http/package.scala
+++ /dev/null
@@ -1,9 +0,0 @@
-package xyz.driver.pdsuicommon
-
-import java.net.URL
-
-import scala.concurrent.Future
-
-package object http {
- type HttpFetcher = URL => Future[Array[Byte]]
-}