aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala
diff options
context:
space:
mode:
authorvlad <vlad@driver.xyz>2017-06-13 16:12:20 -0700
committervlad <vlad@driver.xyz>2017-06-13 16:12:20 -0700
commitcd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c (patch)
tree062e8dad1a1513e26b0fd08b1742d6ff2ee874f7 /src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala
parent0000a65ab4479a2a40e2d6468036438e9705b4aa (diff)
downloadrest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.tar.gz
rest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.tar.bz2
rest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.zip
Adding domain entitiesv0.1.0
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala')
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala113
1 files changed, 113 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala b/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala
new file mode 100644
index 0000000..6ad404f
--- /dev/null
+++ b/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala
@@ -0,0 +1,113 @@
+package xyz.driver.pdsuicommon.http
+
+import java.io.Closeable
+import java.net.URI
+import java.util.concurrent.{ExecutorService, Executors}
+
+import xyz.driver.pdsuicommon.concurrent.MdcThreadFactory
+import xyz.driver.pdsuicommon.http.AsyncHttpClientUploader._
+import xyz.driver.pdsuicommon.utils.RandomUtils
+import com.typesafe.scalalogging.StrictLogging
+import org.asynchttpclient._
+import org.slf4j.MDC
+
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+class AsyncHttpClientUploader(settings: Settings) extends Closeable with StrictLogging {
+
+ private val es: ExecutorService = {
+ val threadFactory = MdcThreadFactory.from(Executors.defaultThreadFactory())
+ Executors.newSingleThreadExecutor(threadFactory)
+ }
+
+ private implicit val executionContext = ExecutionContext.fromExecutor(es)
+
+ private def httpClientConfig: DefaultAsyncHttpClientConfig = {
+ val builder = new DefaultAsyncHttpClientConfig.Builder()
+ builder.setConnectTimeout(settings.connectTimeout.toMillis.toInt)
+ builder.setRequestTimeout(settings.requestTimeout.toMillis.toInt)
+ // builder.setThreadFactory(threadFactory) // Doesn't help to push MDC context into AsyncCompletionHandler
+ builder.build()
+ }
+
+ private val httpClient = new DefaultAsyncHttpClient(httpClientConfig)
+
+ def run(method: Method, uri: URI, contentType: String, data: String): Future[Unit] = {
+ // log all outcome connections
+ val fingerPrint = RandomUtils.randomString(10)
+ logger.info("{}, apply(method={}, uri={}, contentType={})", fingerPrint, method, uri, contentType)
+ val promise = Promise[Response]()
+
+ val q = new RequestBuilder(method.toString)
+ .setUrl(uri.toString)
+ .setBody(data)
+
+ settings.defaultHeaders.foreach {
+ case (k, v) =>
+ q.setHeader(k, v)
+ }
+
+ q.addHeader("Content-Type", contentType)
+
+ httpClient.prepareRequest(q).execute(new AsyncCompletionHandler[Unit] {
+ override def onCompleted(response: Response): Unit = {
+ promise.success(response)
+ }
+
+ override def onThrowable(t: Throwable): Unit = {
+ promise.failure(t)
+ super.onThrowable(t)
+ }
+ })
+
+ // see AsyncHttpClientFetcher
+ val parentMdcContext = MDC.getCopyOfContextMap
+ promise.future.flatMap { response =>
+ setContextMap(parentMdcContext)
+
+ val statusCode = response.getStatusCode
+ // https://en.wikipedia.org/wiki/List_of_HTTP_status_codes#2xx_Success
+ if (statusCode >= 200 && statusCode < 300) {
+ logger.debug("{}, success", fingerPrint)
+ Future.successful(())
+ } else {
+ logger.error(
+ "{}, HTTP {}, BODY:\n{}",
+ fingerPrint,
+ response.getStatusCode.asInstanceOf[AnyRef],
+ response.getResponseBody.take(100)
+ )
+ Future.failed(new IllegalStateException("An unexpected response from the server"))
+ }
+ }
+ }
+
+ private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
+ Option(context).fold(MDC.clear())(MDC.setContextMap)
+
+ override def close(): Unit = {
+ httpClient.close()
+ es.shutdown()
+ }
+}
+
+object AsyncHttpClientUploader {
+
+ final case class Settings(connectTimeout: FiniteDuration,
+ requestTimeout: FiniteDuration,
+ defaultHeaders: Map[String, String] = Map.empty)
+
+ sealed trait Method
+
+ object Method {
+
+ case object Put extends Method {
+ override val toString = "PUT"
+ }
+
+ case object Post extends Method {
+ override val toString = "POST"
+ }
+ }
+}