From 1173aa8b702cd45afc106b3a07316a5812ffce50 Mon Sep 17 00:00:00 2001 From: adamw Date: Wed, 26 Jul 2017 22:03:53 +0200 Subject: #2: onCompleted() in async-http-client-handler sometimes wasn't called, calling it earlier from onStream() --- .../asynchttpclient/AsyncHttpClientHandler.scala | 24 +++++++++++++++++----- .../com/softwaremill/sttp/StreamingTests.scala | 21 +++++++++++++++++++ 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala index ab35725..d634446 100644 --- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala +++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala @@ -80,6 +80,7 @@ abstract class AsyncHttpClientHandler[R[_], S]( new StreamedAsyncHandler[Unit] { private val builder = new AsyncResponse.ResponseBuilder() private var publisher: Option[Publisher[ByteBuffer]] = None + private var completed = false override def onStream( p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = { @@ -95,6 +96,10 @@ abstract class AsyncHttpClientHandler[R[_], S]( s.onSubscribe(v) }) }) + // #2: sometimes onCompleted() isn't called, only onStream(); this + // seems to be true esp for https sites. For these cases, completing + // the request here. + doComplete() State.CONTINUE } @@ -116,11 +121,20 @@ abstract class AsyncHttpClientHandler[R[_], S]( } override def onCompleted(): Unit = { - val baseResponse = readResponseNoBody(builder.build()) - val p = publisher.getOrElse(EmptyPublisher) - val s = publisherToStreamBody(p) - val t = responseAs.responseIsStream(s) - success(rm.unit(baseResponse.copy(body = t))) + // if the request had no body, onStream() will never be called + doComplete() + } + + private def doComplete(): Unit = { + if (!completed) { + completed = true + + val baseResponse = readResponseNoBody(builder.build()) + val p = publisher.getOrElse(EmptyPublisher) + val s = publisherToStreamBody(p) + val t = responseAs.responseIsStream(s) + success(rm.unit(baseResponse.copy(body = t))) + } } override def onThrowable(t: Throwable): Unit = { diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala index 5e6db17..7bc842c 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala @@ -105,5 +105,26 @@ class StreamingTests new String(bytes, "utf-8") should be(body) } + + it should "receive a stream from an https site" in { + val response = sttp + // of course, you should never rely on the internet being available + // in tests, but that's so much easier than setting up an https + // testing server + .get(uri"https://softwaremill.com") + .response(asStream[Observable[ByteBuffer]]) + .send() + .runAsync + .futureValue + + val bytes = response.body + .flatMap(bb => Observable.fromIterable(bb.array())) + .toListL + .runAsync + .futureValue + .toArray + + new String(bytes, "utf-8") should include("") + } } } -- cgit v1.2.3