aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/http/AsyncHttpClientUploader.scala
blob: d7bc3d34afc34cecad0185670236484814e5c1d3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package xyz.driver.pdsuicommon.http

import java.io.Closeable
import java.net.URI
import java.util.concurrent.{ExecutorService, Executors}

import xyz.driver.pdsuicommon.concurrent.MdcThreadFactory
import xyz.driver.pdsuicommon.http.AsyncHttpClientUploader._
import xyz.driver.pdsuicommon.utils.RandomUtils
import com.typesafe.scalalogging.StrictLogging
import org.asynchttpclient._
import org.slf4j.MDC

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}

class AsyncHttpClientUploader(settings: Settings) extends Closeable with StrictLogging {

  private val es: ExecutorService = {
    val threadFactory = MdcThreadFactory.from(Executors.defaultThreadFactory())
    Executors.newSingleThreadExecutor(threadFactory)
  }

  private implicit val executionContext = ExecutionContext.fromExecutor(es)

  private def httpClientConfig: DefaultAsyncHttpClientConfig = {
    val builder = new DefaultAsyncHttpClientConfig.Builder()
    builder.setConnectTimeout(settings.connectTimeout.toMillis.toInt)
    builder.setRequestTimeout(settings.requestTimeout.toMillis.toInt)
    // builder.setThreadFactory(threadFactory) // Doesn't help to push MDC context into AsyncCompletionHandler
    builder.build()
  }

  private val httpClient = new DefaultAsyncHttpClient(httpClientConfig)

  def run(method: Method, uri: URI, contentType: String, data: String): Future[Unit] = {
    // log all outcome connections
    val fingerPrint = RandomUtils.randomString(10)
    logger.info("{}, apply(method={}, uri={}, contentType={})", fingerPrint, method, uri, contentType)
    val promise = Promise[Response]()

    val q = new RequestBuilder(method.toString)
      .setUrl(uri.toString)
      .setBody(data)

    settings.defaultHeaders.foreach {
      case (k, v) =>
        q.setHeader(k, v)
    }

    q.addHeader("Content-Type", contentType)

    httpClient
      .prepareRequest(q)
      .execute(new AsyncCompletionHandler[Unit] {
        override def onCompleted(response: Response): Unit = {
          promise.success(response)
        }

        override def onThrowable(t: Throwable): Unit = {
          promise.failure(t)
          super.onThrowable(t)
        }
      })

    // see AsyncHttpClientFetcher
    val parentMdcContext = MDC.getCopyOfContextMap
    promise.future.flatMap { response =>
      setContextMap(parentMdcContext)

      val statusCode = response.getStatusCode
      // https://en.wikipedia.org/wiki/List_of_HTTP_status_codes#2xx_Success
      if (statusCode >= 200 && statusCode < 300) {
        logger.debug("{}, success", fingerPrint)
        Future.successful(())
      } else {
        logger.error(
          "{}, HTTP {}, BODY:\n{}",
          fingerPrint,
          response.getStatusCode.asInstanceOf[AnyRef],
          response.getResponseBody.take(100)
        )
        Future.failed(new IllegalStateException("An unexpected response from the server"))
      }
    }
  }

  private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
    Option(context).fold(MDC.clear())(MDC.setContextMap)

  override def close(): Unit = {
    httpClient.close()
    es.shutdown()
  }
}

object AsyncHttpClientUploader {

  final case class Settings(connectTimeout: FiniteDuration,
                            requestTimeout: FiniteDuration,
                            defaultHeaders: Map[String, String] = Map.empty)

  sealed trait Method

  object Method {

    case object Put extends Method {
      override val toString = "PUT"
    }

    case object Post extends Method {
      override val toString = "POST"
    }
  }
}