aboutsummaryrefslogtreecommitdiff
path: root/async-http-client-handler/src/main/scala/com
diff options
context:
space:
mode:
Diffstat (limited to 'async-http-client-handler/src/main/scala/com')
-rw-r--r--async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala113
1 files changed, 113 insertions, 0 deletions
diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
new file mode 100644
index 0000000..ecd49cd
--- /dev/null
+++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
@@ -0,0 +1,113 @@
+package com.softwaremill.sttp.asynchttpclient
+
+import java.nio.charset.Charset
+
+import com.softwaremill.sttp.model._
+import com.softwaremill.sttp.{Request, Response, SttpHandler}
+import org.asynchttpclient.{
+ AsyncCompletionHandler,
+ AsyncHttpClient,
+ AsyncHttpClientConfig,
+ DefaultAsyncHttpClient,
+ RequestBuilder,
+ Request => AsyncRequest,
+ Response => AsyncResponse
+}
+
+import scala.concurrent.{Future, Promise}
+import scala.collection.JavaConverters._
+
+class AsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient)
+ extends SttpHandler[Future, Nothing] {
+ def this() = this(new DefaultAsyncHttpClient())
+ def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg))
+
+ override def send[T](r: Request[T, Nothing]): Future[Response[T]] = {
+ val p = Promise[Response[T]]()
+ asyncHttpClient
+ .prepareRequest(requestToAsync(r))
+ .execute(new AsyncCompletionHandler[AsyncResponse] {
+ override def onCompleted(response: AsyncResponse): AsyncResponse = {
+ p.success(readResponse(response, r.responseAs))
+ response
+ }
+ override def onThrowable(t: Throwable): Unit = p.failure(t)
+ })
+
+ p.future
+ }
+
+ private def requestToAsync(r: Request[_, Nothing]): AsyncRequest = {
+ val rb = new RequestBuilder(r.method.m).setUrl(r.uri.toString)
+ r.headers.foreach { case (k, v) => rb.setHeader(k, v) }
+ setBody(r.body, rb)
+ rb.build()
+ }
+
+ private def setBody(body: RequestBody[Nothing], rb: RequestBuilder): Unit = {
+ body match {
+ case NoBody => // skip
+
+ case StringBody(b, encoding) =>
+ rb.setBody(b.getBytes(encoding))
+
+ case ByteArrayBody(b) =>
+ rb.setBody(b)
+
+ case ByteBufferBody(b) =>
+ rb.setBody(b)
+
+ case InputStreamBody(b) =>
+ rb.setBody(b)
+
+ case PathBody(b) =>
+ rb.setBody(b.toFile)
+
+ case SerializableBody(f, t) =>
+ setBody(f(t), rb)
+
+ case StreamBody(s) =>
+ // we have an instance of nothing - everything's possible!
+ s
+ }
+ }
+
+ private def readResponse[T](
+ response: AsyncResponse,
+ responseAs: ResponseAs[T, Nothing]): Response[T] = {
+ Response(readResponseBody(response, responseAs),
+ response.getStatusCode,
+ response.getHeaders
+ .iterator()
+ .asScala
+ .map(e => (e.getKey, e.getValue))
+ .toList)
+ }
+
+ private def readResponseBody[T](response: AsyncResponse,
+ responseAs: ResponseAs[T, Nothing]): T = {
+
+ def asString(enc: String) = response.getResponseBody(Charset.forName(enc))
+
+ responseAs match {
+ case IgnoreResponse(g) =>
+ // getting the body and discarding it
+ response.getResponseBodyAsBytes
+ g(())
+
+ case ResponseAsString(enc, g) =>
+ g(asString(enc))
+
+ case ResponseAsByteArray(g) =>
+ g(response.getResponseBodyAsBytes)
+
+ case r @ ResponseAsParams(enc, g) =>
+ g(r.parse(asString(enc)))
+
+ case ResponseAsStream(_) =>
+ // only possible when the user requests the response as a stream of
+ // Nothing. Oh well ...
+ throw new IllegalStateException()
+ }
+ }
+}