aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala24
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala21
2 files changed, 40 insertions, 5 deletions
diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
index ab35725..d634446 100644
--- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
+++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
@@ -80,6 +80,7 @@ abstract class AsyncHttpClientHandler[R[_], S](
new StreamedAsyncHandler[Unit] {
private val builder = new AsyncResponse.ResponseBuilder()
private var publisher: Option[Publisher[ByteBuffer]] = None
+ private var completed = false
override def onStream(
p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = {
@@ -95,6 +96,10 @@ abstract class AsyncHttpClientHandler[R[_], S](
s.onSubscribe(v)
})
})
+ // #2: sometimes onCompleted() isn't called, only onStream(); this
+ // seems to be true esp for https sites. For these cases, completing
+ // the request here.
+ doComplete()
State.CONTINUE
}
@@ -116,11 +121,20 @@ abstract class AsyncHttpClientHandler[R[_], S](
}
override def onCompleted(): Unit = {
- val baseResponse = readResponseNoBody(builder.build())
- val p = publisher.getOrElse(EmptyPublisher)
- val s = publisherToStreamBody(p)
- val t = responseAs.responseIsStream(s)
- success(rm.unit(baseResponse.copy(body = t)))
+ // if the request had no body, onStream() will never be called
+ doComplete()
+ }
+
+ private def doComplete(): Unit = {
+ if (!completed) {
+ completed = true
+
+ val baseResponse = readResponseNoBody(builder.build())
+ val p = publisher.getOrElse(EmptyPublisher)
+ val s = publisherToStreamBody(p)
+ val t = responseAs.responseIsStream(s)
+ success(rm.unit(baseResponse.copy(body = t)))
+ }
}
override def onThrowable(t: Throwable): Unit = {
diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
index 5e6db17..7bc842c 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
@@ -105,5 +105,26 @@ class StreamingTests
new String(bytes, "utf-8") should be(body)
}
+
+ it should "receive a stream from an https site" in {
+ val response = sttp
+ // of course, you should never rely on the internet being available
+ // in tests, but that's so much easier than setting up an https
+ // testing server
+ .get(uri"https://softwaremill.com")
+ .response(asStream[Observable[ByteBuffer]])
+ .send()
+ .runAsync
+ .futureValue
+
+ val bytes = response.body
+ .flatMap(bb => Observable.fromIterable(bb.array()))
+ .toListL
+ .runAsync
+ .futureValue
+ .toArray
+
+ new String(bytes, "utf-8") should include("</div>")
+ }
}
}