diff options
Diffstat (limited to 'async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala')
-rw-r--r-- | async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala | 79 |
1 files changed, 21 insertions, 58 deletions
diff --git a/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala b/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala index fb0f780..d804f08 100644 --- a/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala +++ b/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala @@ -8,25 +8,8 @@ import com.softwaremill.sttp._ import org.asynchttpclient.AsyncHandler.State import org.asynchttpclient.handler.StreamedAsyncHandler import org.asynchttpclient.proxy.ProxyServer -import org.asynchttpclient.request.body.multipart.{ - ByteArrayPart, - FilePart, - StringPart -} -import org.asynchttpclient.{ - AsyncCompletionHandler, - AsyncHandler, - AsyncHttpClient, - DefaultAsyncHttpClient, - DefaultAsyncHttpClientConfig, - HttpResponseBodyPart, - HttpResponseHeaders, - HttpResponseStatus, - Param, - RequestBuilder, - Request => AsyncRequest, - Response => AsyncResponse -} +import org.asynchttpclient.request.body.multipart.{ByteArrayPart, FilePart, StringPart} +import org.asynchttpclient.{AsyncCompletionHandler, AsyncHandler, AsyncHttpClient, DefaultAsyncHttpClient, DefaultAsyncHttpClientConfig, HttpResponseBodyPart, HttpResponseHeaders, HttpResponseStatus, Param, RequestBuilder, Request => AsyncRequest, Response => AsyncResponse} import org.reactivestreams.{Publisher, Subscriber, Subscription} import scala.collection.JavaConverters._ @@ -66,10 +49,9 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient, protected def publisherToString(p: Publisher[ByteBuffer]): R[String] - private def eagerAsyncHandler[T]( - responseAs: ResponseAs[T, S], - success: R[Response[T]] => Unit, - error: Throwable => Unit): AsyncHandler[Unit] = { + private def eagerAsyncHandler[T](responseAs: ResponseAs[T, S], + success: R[Response[T]] => Unit, + error: Throwable => Unit): AsyncHandler[Unit] = { new AsyncCompletionHandler[Unit] { override def onCompleted(response: AsyncResponse): Unit = @@ -79,17 +61,15 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient, } } - private def streamingAsyncHandler[T]( - responseAs: ResponseAsStream[T, S], - success: R[Response[T]] => Unit, - error: Throwable => Unit): AsyncHandler[Unit] = { + private def streamingAsyncHandler[T](responseAs: ResponseAsStream[T, S], + success: R[Response[T]] => Unit, + error: Throwable => Unit): AsyncHandler[Unit] = { new StreamedAsyncHandler[Unit] { private val builder = new AsyncResponse.ResponseBuilder() private var publisher: Option[Publisher[ByteBuffer]] = None private var completed = false - override def onStream( - p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = { + override def onStream(p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = { // Sadly we don't have .map on Publisher publisher = Some(new Publisher[ByteBuffer] { override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = @@ -109,19 +89,15 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient, State.CONTINUE } - override def onBodyPartReceived( - bodyPart: HttpResponseBodyPart): AsyncHandler.State = - throw new IllegalStateException( - "Requested a streaming backend, unexpected eager body parts.") + override def onBodyPartReceived(bodyPart: HttpResponseBodyPart): AsyncHandler.State = + throw new IllegalStateException("Requested a streaming backend, unexpected eager body parts.") - override def onHeadersReceived( - headers: HttpResponseHeaders): AsyncHandler.State = { + override def onHeadersReceived(headers: HttpResponseHeaders): AsyncHandler.State = { builder.accumulate(headers) State.CONTINUE } - override def onStatusReceived( - responseStatus: HttpResponseStatus): AsyncHandler.State = { + override def onStatusReceived(responseStatus: HttpResponseStatus): AsyncHandler.State = { builder.accumulate(responseStatus) State.CONTINUE } @@ -160,16 +136,13 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient, val readTimeout = r.options.readTimeout val rb = new RequestBuilder(r.method.m) .setUrl(r.uri.toString) - .setRequestTimeout( - if (readTimeout.isFinite()) readTimeout.toMillis.toInt else -1) + .setRequestTimeout(if (readTimeout.isFinite()) readTimeout.toMillis.toInt else -1) r.headers.foreach { case (k, v) => rb.setHeader(k, v) } setBody(r, r.body, rb) rb.build() } - private def setBody(r: Request[_, S], - body: RequestBody[S], - rb: RequestBuilder): Unit = { + private def setBody(r: Request[_, S], body: RequestBody[S], rb: RequestBuilder): Unit = { body match { case NoBody => // skip @@ -211,10 +184,7 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient, val bodyPart = mp.body match { case StringBody(b, encoding, _) => - new StringPart(nameWithFilename, - b, - mp.contentType.getOrElse(TextPlainContentType), - Charset.forName(encoding)) + new StringPart(nameWithFilename, b, mp.contentType.getOrElse(TextPlainContentType), Charset.forName(encoding)) case ByteArrayBody(b, _) => new ByteArrayPart(nameWithFilename, b) case ByteBufferBody(b, _) => @@ -227,15 +197,12 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient, new FilePart(mp.name, b.toFile, null, null, mp.fileName.orNull) } - bodyPart.setCustomHeaders( - mp.additionalHeaders.map(h => new Param(h._1, h._2)).toList.asJava) + bodyPart.setCustomHeaders(mp.additionalHeaders.map(h => new Param(h._1, h._2)).toList.asJava) rb.addBodyPart(bodyPart) } - private def readEagerResponse[T]( - response: AsyncResponse, - responseAs: ResponseAs[T, S]): R[Response[T]] = { + private def readEagerResponse[T](response: AsyncResponse, responseAs: ResponseAs[T, S]): R[Response[T]] = { val base = readResponseNoBody(response) val body = if (codeIsSuccess(base.code)) { @@ -280,9 +247,7 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient, Try(response.getResponseBodyAsBytes) case ResponseAsStream() => - Failure( - new IllegalStateException( - "Requested a streaming response, trying to read eagerly.")) + Failure(new IllegalStateException("Requested a streaming response, trying to read eagerly.")) case ResponseAsFile(file, overwrite) => Try( @@ -299,8 +264,7 @@ abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient, object AsyncHttpClientBackend { - private[asynchttpclient] def defaultClient( - options: SttpBackendOptions): AsyncHttpClient = { + private[asynchttpclient] def defaultClient(options: SttpBackendOptions): AsyncHttpClient = { var configBuilder = new DefaultAsyncHttpClientConfig.Builder() .setConnectTimeout(options.connectionTimeout.toMillis.toInt) @@ -308,8 +272,7 @@ object AsyncHttpClientBackend { configBuilder = options.proxy match { case None => configBuilder case Some(p) => - configBuilder.setProxyServer( - new ProxyServer.Builder(p.host, p.port).build()) + configBuilder.setProxyServer(new ProxyServer.Builder(p.host, p.port).build()) } new DefaultAsyncHttpClient(configBuilder.build()) |