aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala
diff options
context:
space:
mode:
authorvlad <vlad@driver.xyz>2017-06-13 16:12:20 -0700
committervlad <vlad@driver.xyz>2017-06-13 16:12:20 -0700
commitcd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c (patch)
tree062e8dad1a1513e26b0fd08b1742d6ff2ee874f7 /src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala
parent0000a65ab4479a2a40e2d6468036438e9705b4aa (diff)
downloadrest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.tar.gz
rest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.tar.bz2
rest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.zip
Adding domain entitiesv0.1.0
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala')
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala90
1 files changed, 90 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala b/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala
new file mode 100644
index 0000000..4b0d897
--- /dev/null
+++ b/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientFetcher.scala
@@ -0,0 +1,90 @@
+package xyz.driver.pdsuicommon.http
+
+import java.io.Closeable
+import java.net.URL
+import java.util.concurrent.{ExecutorService, Executors}
+
+import com.typesafe.scalalogging.StrictLogging
+import org.asynchttpclient._
+import org.slf4j.MDC
+import xyz.driver.pdsuicommon.concurrent.MdcThreadFactory
+import xyz.driver.pdsuicommon.utils.RandomUtils
+
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+class AsyncHttpClientFetcher(settings: AsyncHttpClientFetcher.Settings)
+ extends HttpFetcher with Closeable with StrictLogging {
+
+ private val es: ExecutorService = {
+ val threadFactory = MdcThreadFactory.from(Executors.defaultThreadFactory())
+ Executors.newSingleThreadExecutor(threadFactory)
+ }
+
+ private implicit val executionContext = ExecutionContext.fromExecutor(es)
+
+ private def httpClientConfig: DefaultAsyncHttpClientConfig = {
+ val builder = new DefaultAsyncHttpClientConfig.Builder()
+ builder.setConnectTimeout(settings.connectTimeout.toMillis.toInt)
+ builder.setReadTimeout(settings.readTimeout.toMillis.toInt)
+ // builder.setThreadFactory(threadFactory) // Doesn't help to push MDC context into AsyncCompletionHandler
+ builder.build()
+ }
+
+ private val httpClient = new DefaultAsyncHttpClient(httpClientConfig)
+
+ override def apply(url: URL): Future[Array[Byte]] = {
+ val fingerPrint = RandomUtils.randomString(10)
+
+ // log all outcome connections
+ logger.info("{}, apply({})", fingerPrint, url)
+ val promise = Promise[Response]()
+
+ httpClient.prepareGet(url.toString).execute(new AsyncCompletionHandler[Response] {
+ override def onCompleted(response: Response): Response = {
+ promise.success(response)
+ response
+ }
+
+ override def onThrowable(t: Throwable): Unit = {
+ promise.failure(t)
+ super.onThrowable(t)
+ }
+ })
+
+ // Promises have their own ExecutionContext
+ // So, we have to hack it.
+ val parentMdcContext = MDC.getCopyOfContextMap
+ promise.future.flatMap { response =>
+ setContextMap(parentMdcContext)
+
+ if (response.getStatusCode == 200) {
+ // DO NOT LOG body, it could be PHI
+ // logger.trace(response.getResponseBody())
+ val bytes = response.getResponseBodyAsBytes
+ logger.debug("{}, size is {}B", fingerPrint, bytes.size.asInstanceOf[AnyRef])
+ Future.successful(bytes)
+ } else {
+ logger.error("{}, HTTP {}", fingerPrint, response.getStatusCode.asInstanceOf[AnyRef])
+ logger.trace(response.getResponseBody().take(100))
+ Future.failed(new IllegalStateException("An unexpected response from the server"))
+ }
+ }
+ }
+
+ private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
+ Option(context).fold(MDC.clear())(MDC.setContextMap)
+
+ override def close(): Unit = {
+ httpClient.close()
+ es.shutdown()
+ }
+
+}
+
+object AsyncHttpClientFetcher {
+
+ case class Settings(connectTimeout: FiniteDuration,
+ readTimeout: FiniteDuration)
+
+}