aboutsummaryrefslogtreecommitdiff
path: root/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala')
-rw-r--r--async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala79
1 files changed, 21 insertions, 58 deletions
diff --git a/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala b/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala
index fb0f780..d804f08 100644
--- a/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala
+++ b/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala
@@ -8,25 +8,8 @@ import com.softwaremill.sttp._
import org.asynchttpclient.AsyncHandler.State
import org.asynchttpclient.handler.StreamedAsyncHandler
import org.asynchttpclient.proxy.ProxyServer
-import org.asynchttpclient.request.body.multipart.{
- ByteArrayPart,
- FilePart,
- StringPart
-}
-import org.asynchttpclient.{
- AsyncCompletionHandler,
- AsyncHandler,
- AsyncHttpClient,
- DefaultAsyncHttpClient,
- DefaultAsyncHttpClientConfig,
- HttpResponseBodyPart,
- HttpResponseHeaders,
- HttpResponseStatus,
- Param,
- RequestBuilder,
- Request => AsyncRequest,
- Response => AsyncResponse
-}
+import org.asynchttpclient.request.body.multipart.{ByteArrayPart, FilePart, StringPart}
+import org.asynchttpclient.{AsyncCompletionHandler, AsyncHandler, AsyncHttpClient, DefaultAsyncHttpClient, DefaultAsyncHttpClientConfig, HttpResponseBodyPart, HttpResponseHeaders, HttpResponseStatus, Param, RequestBuilder, Request => AsyncRequest, Response => AsyncResponse}
import org.reactivestreams.{Publisher, Subscriber, Subscription}
import scala.collection.JavaConverters._
@@ -66,10 +49,9 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient,
protected def publisherToString(p: Publisher[ByteBuffer]): R[String]
- private def eagerAsyncHandler[T](
- responseAs: ResponseAs[T, S],
- success: R[Response[T]] => Unit,
- error: Throwable => Unit): AsyncHandler[Unit] = {
+ private def eagerAsyncHandler[T](responseAs: ResponseAs[T, S],
+ success: R[Response[T]] => Unit,
+ error: Throwable => Unit): AsyncHandler[Unit] = {
new AsyncCompletionHandler[Unit] {
override def onCompleted(response: AsyncResponse): Unit =
@@ -79,17 +61,15 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient,
}
}
- private def streamingAsyncHandler[T](
- responseAs: ResponseAsStream[T, S],
- success: R[Response[T]] => Unit,
- error: Throwable => Unit): AsyncHandler[Unit] = {
+ private def streamingAsyncHandler[T](responseAs: ResponseAsStream[T, S],
+ success: R[Response[T]] => Unit,
+ error: Throwable => Unit): AsyncHandler[Unit] = {
new StreamedAsyncHandler[Unit] {
private val builder = new AsyncResponse.ResponseBuilder()
private var publisher: Option[Publisher[ByteBuffer]] = None
private var completed = false
- override def onStream(
- p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = {
+ override def onStream(p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = {
// Sadly we don't have .map on Publisher
publisher = Some(new Publisher[ByteBuffer] {
override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit =
@@ -109,19 +89,15 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient,
State.CONTINUE
}
- override def onBodyPartReceived(
- bodyPart: HttpResponseBodyPart): AsyncHandler.State =
- throw new IllegalStateException(
- "Requested a streaming backend, unexpected eager body parts.")
+ override def onBodyPartReceived(bodyPart: HttpResponseBodyPart): AsyncHandler.State =
+ throw new IllegalStateException("Requested a streaming backend, unexpected eager body parts.")
- override def onHeadersReceived(
- headers: HttpResponseHeaders): AsyncHandler.State = {
+ override def onHeadersReceived(headers: HttpResponseHeaders): AsyncHandler.State = {
builder.accumulate(headers)
State.CONTINUE
}
- override def onStatusReceived(
- responseStatus: HttpResponseStatus): AsyncHandler.State = {
+ override def onStatusReceived(responseStatus: HttpResponseStatus): AsyncHandler.State = {
builder.accumulate(responseStatus)
State.CONTINUE
}
@@ -160,16 +136,13 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient,
val readTimeout = r.options.readTimeout
val rb = new RequestBuilder(r.method.m)
.setUrl(r.uri.toString)
- .setRequestTimeout(
- if (readTimeout.isFinite()) readTimeout.toMillis.toInt else -1)
+ .setRequestTimeout(if (readTimeout.isFinite()) readTimeout.toMillis.toInt else -1)
r.headers.foreach { case (k, v) => rb.setHeader(k, v) }
setBody(r, r.body, rb)
rb.build()
}
- private def setBody(r: Request[_, S],
- body: RequestBody[S],
- rb: RequestBuilder): Unit = {
+ private def setBody(r: Request[_, S], body: RequestBody[S], rb: RequestBuilder): Unit = {
body match {
case NoBody => // skip
@@ -211,10 +184,7 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient,
val bodyPart = mp.body match {
case StringBody(b, encoding, _) =>
- new StringPart(nameWithFilename,
- b,
- mp.contentType.getOrElse(TextPlainContentType),
- Charset.forName(encoding))
+ new StringPart(nameWithFilename, b, mp.contentType.getOrElse(TextPlainContentType), Charset.forName(encoding))
case ByteArrayBody(b, _) =>
new ByteArrayPart(nameWithFilename, b)
case ByteBufferBody(b, _) =>
@@ -227,15 +197,12 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient,
new FilePart(mp.name, b.toFile, null, null, mp.fileName.orNull)
}
- bodyPart.setCustomHeaders(
- mp.additionalHeaders.map(h => new Param(h._1, h._2)).toList.asJava)
+ bodyPart.setCustomHeaders(mp.additionalHeaders.map(h => new Param(h._1, h._2)).toList.asJava)
rb.addBodyPart(bodyPart)
}
- private def readEagerResponse[T](
- response: AsyncResponse,
- responseAs: ResponseAs[T, S]): R[Response[T]] = {
+ private def readEagerResponse[T](response: AsyncResponse, responseAs: ResponseAs[T, S]): R[Response[T]] = {
val base = readResponseNoBody(response)
val body = if (codeIsSuccess(base.code)) {
@@ -280,9 +247,7 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient,
Try(response.getResponseBodyAsBytes)
case ResponseAsStream() =>
- Failure(
- new IllegalStateException(
- "Requested a streaming response, trying to read eagerly."))
+ Failure(new IllegalStateException("Requested a streaming response, trying to read eagerly."))
case ResponseAsFile(file, overwrite) =>
Try(
@@ -299,8 +264,7 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient,
object AsyncHttpClientBackend {
- private[asynchttpclient] def defaultClient(
- options: SttpBackendOptions): AsyncHttpClient = {
+ private[asynchttpclient] def defaultClient(options: SttpBackendOptions): AsyncHttpClient = {
var configBuilder = new DefaultAsyncHttpClientConfig.Builder()
.setConnectTimeout(options.connectionTimeout.toMillis.toInt)
@@ -308,8 +272,7 @@ object AsyncHttpClientBackend {
configBuilder = options.proxy match {
case None => configBuilder
case Some(p) =>
- configBuilder.setProxyServer(
- new ProxyServer.Builder(p.host, p.port).build())
+ configBuilder.setProxyServer(new ProxyServer.Builder(p.host, p.port).build())
}
new DefaultAsyncHttpClient(configBuilder.build())