diff options
-rw-r--r-- | async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala | 24 | ||||
-rw-r--r-- | tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala | 21 |
2 files changed, 40 insertions, 5 deletions
diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala index ab35725..d634446 100644 --- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala +++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala @@ -80,6 +80,7 @@ abstract class AsyncHttpClientHandler[R[_], S]( new StreamedAsyncHandler[Unit] { private val builder = new AsyncResponse.ResponseBuilder() private var publisher: Option[Publisher[ByteBuffer]] = None + private var completed = false override def onStream( p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = { @@ -95,6 +96,10 @@ abstract class AsyncHttpClientHandler[R[_], S]( s.onSubscribe(v) }) }) + // #2: sometimes onCompleted() isn't called, only onStream(); this + // seems to be true esp for https sites. For these cases, completing + // the request here. + doComplete() State.CONTINUE } @@ -116,11 +121,20 @@ abstract class AsyncHttpClientHandler[R[_], S]( } override def onCompleted(): Unit = { - val baseResponse = readResponseNoBody(builder.build()) - val p = publisher.getOrElse(EmptyPublisher) - val s = publisherToStreamBody(p) - val t = responseAs.responseIsStream(s) - success(rm.unit(baseResponse.copy(body = t))) + // if the request had no body, onStream() will never be called + doComplete() + } + + private def doComplete(): Unit = { + if (!completed) { + completed = true + + val baseResponse = readResponseNoBody(builder.build()) + val p = publisher.getOrElse(EmptyPublisher) + val s = publisherToStreamBody(p) + val t = responseAs.responseIsStream(s) + success(rm.unit(baseResponse.copy(body = t))) + } } override def onThrowable(t: Throwable): Unit = { diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala index 5e6db17..7bc842c 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala @@ -105,5 +105,26 @@ class StreamingTests new String(bytes, "utf-8") should be(body) } + + it should "receive a stream from an https site" in { + val response = sttp + // of course, you should never rely on the internet being available + // in tests, but that's so much easier than setting up an https + // testing server + .get(uri"https://softwaremill.com") + .response(asStream[Observable[ByteBuffer]]) + .send() + .runAsync + .futureValue + + val bytes = response.body + .flatMap(bb => Observable.fromIterable(bb.array())) + .toListL + .runAsync + .futureValue + .toArray + + new String(bytes, "utf-8") should include("</div>") + } } } |