package xyz.driver.pdsuicommon.http import java.io.Closeable import java.net.URI import java.util.concurrent.{ExecutorService, Executors} import xyz.driver.pdsuicommon.concurrent.MdcThreadFactory import xyz.driver.pdsuicommon.http.AsyncHttpClientUploader._ import xyz.driver.pdsuicommon.utils.RandomUtils import com.typesafe.scalalogging.StrictLogging import org.asynchttpclient._ import org.slf4j.MDC import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future, Promise} class AsyncHttpClientUploader(settings: Settings) extends Closeable with StrictLogging { private val es: ExecutorService = { val threadFactory = MdcThreadFactory.from(Executors.defaultThreadFactory()) Executors.newSingleThreadExecutor(threadFactory) } private implicit val executionContext = ExecutionContext.fromExecutor(es) private def httpClientConfig: DefaultAsyncHttpClientConfig = { val builder = new DefaultAsyncHttpClientConfig.Builder() builder.setConnectTimeout(settings.connectTimeout.toMillis.toInt) builder.setRequestTimeout(settings.requestTimeout.toMillis.toInt) // builder.setThreadFactory(threadFactory) // Doesn't help to push MDC context into AsyncCompletionHandler builder.build() } private val httpClient = new DefaultAsyncHttpClient(httpClientConfig) def run(method: Method, uri: URI, contentType: String, data: String): Future[Unit] = { // log all outcome connections val fingerPrint = RandomUtils.randomString(10) logger.info("{}, apply(method={}, uri={}, contentType={})", fingerPrint, method, uri, contentType) val promise = Promise[Response]() val q = new RequestBuilder(method.toString) .setUrl(uri.toString) .setBody(data) settings.defaultHeaders.foreach { case (k, v) => q.setHeader(k, v) } q.addHeader("Content-Type", contentType) httpClient .prepareRequest(q) .execute(new AsyncCompletionHandler[Unit] { override def onCompleted(response: Response): Unit = { promise.success(response) } override def onThrowable(t: Throwable): Unit = { promise.failure(t) super.onThrowable(t) } }) // see AsyncHttpClientFetcher val parentMdcContext = MDC.getCopyOfContextMap promise.future.flatMap { response => setContextMap(parentMdcContext) val statusCode = response.getStatusCode // https://en.wikipedia.org/wiki/List_of_HTTP_status_codes#2xx_Success if (statusCode >= 200 && statusCode < 300) { logger.debug("{}, success", fingerPrint) Future.successful(()) } else { logger.error( "{}, HTTP {}, BODY:\n{}", fingerPrint, response.getStatusCode.asInstanceOf[AnyRef], response.getResponseBody.take(100) ) Future.failed(new IllegalStateException("An unexpected response from the server")) } } } private[this] def setContextMap(context: java.util.Map[String, String]): Unit = Option(context).fold(MDC.clear())(MDC.setContextMap) override def close(): Unit = { httpClient.close() es.shutdown() } } object AsyncHttpClientUploader { final case class Settings(connectTimeout: FiniteDuration, requestTimeout: FiniteDuration, defaultHeaders: Map[String, String] = Map.empty) sealed trait Method object Method { case object Put extends Method { override val toString = "PUT" } case object Post extends Method { override val toString = "POST" } } }