From 065d207a9b9b17e370b93bf17c2a8edf5ef472fe Mon Sep 17 00:00:00 2001 From: Omar Alejandro Mainegra Sarduy Date: Sun, 13 Aug 2017 17:06:39 -0400 Subject: Generalize streaming test cases --- .../com/softwaremill/sttp/StreamingTests.scala | 182 ++++++--------------- 1 file changed, 52 insertions(+), 130 deletions(-) (limited to 'tests/src/test/scala') diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala index 5820153..3238c3c 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala @@ -10,9 +10,12 @@ import com.softwaremill.sttp.akkahttp.AkkaHttpSttpHandler import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler import com.softwaremill.sttp.okhttp.monix.OkHttpMonixClientHandler import com.typesafe.scalalogging.StrictLogging +import monix.execution.Scheduler.Implicits.global import monix.reactive.Observable -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.language.higherKinds class StreamingTests extends FlatSpec @@ -21,6 +24,7 @@ class StreamingTests with ScalaFutures with StrictLogging with IntegrationPatience + with ForceWrapped with TestHttpServer { override val serverRoutes: Route = @@ -34,122 +38,56 @@ class StreamingTests } } + type BodyProducer[S] = String => S + type BodyConsumer[S] = S => String + override def port = 51824 + val body = "streaming test" val akkaHandler = AkkaHttpSttpHandler.usingActorSystem(actorSystem) val monixAsyncHttpClient = MonixAsyncHttpClientHandler() val monixOkHttpClient = OkHttpMonixClientHandler() - akkaStreamingTests() - monixAsyncHttpClientStreamingTests() - monixOkHttpClientStreamingTests() - - val body = "streaming test" - - def akkaStreamingTests(): Unit = { - implicit val handler = akkaHandler - - "Akka HTTP" should "stream request body" in { - val response = sttp - .post(uri"$endpoint/echo") - .streamBody(Source.single(ByteString(body))) - .send() - .futureValue - - response.body should be(body) - } - - it should "receive a stream" in { - val response = sttp - .post(uri"$endpoint/echo") - .body(body) - .response(asStream[Source[ByteString, Any]]) - .send() - .futureValue - - val responseBody = response.body.runReduce(_ ++ _).futureValue.utf8String - - responseBody should be(body) - } - } - - def monixOkHttpClientStreamingTests(): Unit = { - implicit val handler = monixOkHttpClient - import monix.execution.Scheduler.Implicits.global - - val body = "streaming test" - - "Monix Async Http Client" should "stream request body" in { - val source = Observable.fromIterable( - body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) - - val response = sttp - .post(uri"$endpoint/echo") - .streamBody(source) - .send() - .runAsync - .futureValue - - response.body should be(body) - } - - it should "receive a stream" in { + val akkaHttpBodyProducer: BodyProducer[Source[ByteString, Any]] = s => + Source.single(ByteString(s)) + val akkaHttpBodyConsumer: BodyConsumer[Source[ByteString, Any]] = + _.runReduce(_ ++ _).futureValue.utf8String + + val monixBodyProducer: BodyProducer[Observable[ByteBuffer]] = + s => + Observable.fromIterable( + s.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) + + val monixBodyConsumer: BodyConsumer[Observable[ByteBuffer]] = stream => + new String(stream + .flatMap(bb => Observable.fromIterable(bb.array())) + .toListL + .runAsync + .futureValue + .toArray, + "utf-8") + + runTests("Akka HTTP", akkaHttpBodyProducer, akkaHttpBodyConsumer)( + akkaHandler, + ForceWrappedValue.future) + runTests("Monix Async Http Client", monixBodyProducer, monixBodyConsumer)( + monixAsyncHttpClient, + ForceWrappedValue.monixTask) + runTests("Monix OkHttp Client", monixBodyProducer, monixBodyConsumer)( + monixOkHttpClient, + ForceWrappedValue.monixTask) + + def runTests[R[_], S](name: String, + bodyProducer: BodyProducer[S], + bodyConsumer: BodyConsumer[S])( + implicit handler: SttpHandler[R, S], + forceResponse: ForceWrappedValue[R]): Unit = { + name should "stream request body" in { val response = sttp .post(uri"$endpoint/echo") - .body(body) - .response(asStream[Observable[ByteBuffer]]) + .streamBody(bodyProducer(body)) .send() - .runAsync - .futureValue - - val bytes = response.body - .flatMap(bb => Observable.fromIterable(bb.array())) - .toListL - .runAsync - .futureValue - .toArray - - new String(bytes, "utf-8") should be(body) - } - - it should "receive a stream from an https site" in { - val response = sttp - // of course, you should never rely on the internet being available - // in tests, but that's so much easier than setting up an https - // testing server - .get(uri"https://softwaremill.com") - .response(asStream[Observable[ByteBuffer]]) - .send() - .runAsync - .futureValue - - val bytes = response.body - .flatMap(bb => Observable.fromIterable(bb.array())) - .toListL - .runAsync - .futureValue - .toArray - - new String(bytes, "utf-8") should include("") - } - } - - def monixAsyncHttpClientStreamingTests(): Unit = { - implicit val handler = monixAsyncHttpClient - import monix.execution.Scheduler.Implicits.global - - val body = "streaming test" - - "Monix OkHttp Client" should "stream request body" in { - val source = Observable.fromIterable( - body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) - - val response = sttp - .post(uri"$endpoint/echo") - .streamBody(source) - .send() - .runAsync - .futureValue + .force() response.body should be(body) } @@ -158,19 +96,11 @@ class StreamingTests val response = sttp .post(uri"$endpoint/echo") .body(body) - .response(asStream[Observable[ByteBuffer]]) + .response(asStream[S]) .send() - .runAsync - .futureValue - - val bytes = response.body - .flatMap(bb => Observable.fromIterable(bb.array())) - .toListL - .runAsync - .futureValue - .toArray + .force() - new String(bytes, "utf-8") should be(body) + bodyConsumer(response.body) should be(body) } it should "receive a stream from an https site" in { @@ -179,19 +109,11 @@ class StreamingTests // in tests, but that's so much easier than setting up an https // testing server .get(uri"https://softwaremill.com") - .response(asStream[Observable[ByteBuffer]]) + .response(asStream[S]) .send() - .runAsync - .futureValue - - val bytes = response.body - .flatMap(bb => Observable.fromIterable(bb.array())) - .toListL - .runAsync - .futureValue - .toArray + .force() - new String(bytes, "utf-8") should include("") + bodyConsumer(response.body) should include("") } } -- cgit v1.2.3