diff options
author | Bjørn Madsen <bm@aeons.dk> | 2017-08-30 14:00:19 +0200 |
---|---|---|
committer | Bjørn Madsen <bm@aeons.dk> | 2017-08-30 14:00:19 +0200 |
commit | a0491dc1f48c82904b7865ce9c0e2d8b11d4dca8 (patch) | |
tree | 3fbb79a9d961a068bd01ad5f87c67b175ac2831d /tests/src/test/scala/com/softwaremill/sttp/streaming | |
parent | dba1836f72dc38ab14ab4bb614b4101a80e97552 (diff) | |
download | sttp-a0491dc1f48c82904b7865ce9c0e2d8b11d4dca8.tar.gz sttp-a0491dc1f48c82904b7865ce9c0e2d8b11d4dca8.tar.bz2 sttp-a0491dc1f48c82904b7865ce9c0e2d8b11d4dca8.zip |
Add tests for fs2 module and refactor streaming tests
Diffstat (limited to 'tests/src/test/scala/com/softwaremill/sttp/streaming')
6 files changed, 132 insertions, 0 deletions
diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaStreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaStreamingTests.scala new file mode 100644 index 0000000..53fe63e --- /dev/null +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaStreamingTests.scala @@ -0,0 +1,29 @@ +package com.softwaremill.sttp.streaming + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.Materializer +import akka.stream.scaladsl.Source +import akka.util.ByteString +import com.softwaremill.sttp.{ForceWrappedValue, SttpHandler} +import com.softwaremill.sttp.akkahttp.AkkaHttpSttpHandler + +import scala.concurrent.Future + +class AkkaStreamingTests(actorSystem: ActorSystem)( + implicit materializer: Materializer) + extends TestStreamingHandler[Future, Source[ByteString, Any]] { + + override implicit val handler: SttpHandler[Future, Source[ByteString, Any]] = + AkkaHttpSttpHandler.usingActorSystem(actorSystem) + + override implicit val forceResponse: ForceWrappedValue[Future] = + ForceWrappedValue.future + + override def bodyProducer(body: String): Source[ByteString, NotUsed] = + Source.single(ByteString(body)) + + override def bodyConsumer(stream: Source[ByteString, Any]): Future[String] = + stream.map(_.utf8String).runReduce(_ + _) + +} diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/Fs2StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/Fs2StreamingTests.scala new file mode 100644 index 0000000..b9d249b --- /dev/null +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/Fs2StreamingTests.scala @@ -0,0 +1,29 @@ +package com.softwaremill.sttp.streaming + +import java.nio.ByteBuffer + +import cats.effect._ +import cats.implicits._ +import com.softwaremill.sttp.asynchttpclient.fs2.Fs2AsyncHttpClientHandler +import com.softwaremill.sttp.{ForceWrappedValue, SttpHandler} +import fs2._ + +class Fs2StreamingTests + extends TestStreamingHandler[IO, Stream[IO, ByteBuffer]] { + + override implicit val handler: SttpHandler[IO, Stream[IO, ByteBuffer]] = + Fs2AsyncHttpClientHandler[IO]() + + override implicit val forceResponse: ForceWrappedValue[IO] = + ForceWrappedValue.catsIo + + override def bodyProducer(body: String): Stream[IO, ByteBuffer] = + Stream.emits(body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) + + override def bodyConsumer(stream: Stream[IO, ByteBuffer]): IO[String] = + stream + .map(bb => Chunk.array(bb.array)) + .through(text.utf8DecodeC) + .runFoldMonoid + +} diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixAHCStreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixAHCStreamingTests.scala new file mode 100644 index 0000000..4a4ff96 --- /dev/null +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixAHCStreamingTests.scala @@ -0,0 +1,17 @@ +package com.softwaremill.sttp.streaming + +import java.nio.ByteBuffer + +import com.softwaremill.sttp.SttpHandler +import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler +import monix.eval.Task +import monix.reactive.Observable + +class MonixAHCStreamingTests extends MonixBaseHandler { + + import monix.execution.Scheduler.Implicits.global + + override implicit val handler: SttpHandler[Task, Observable[ByteBuffer]] = + MonixAsyncHttpClientHandler() + +} diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixBaseHandler.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixBaseHandler.scala new file mode 100644 index 0000000..b34a4ae --- /dev/null +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixBaseHandler.scala @@ -0,0 +1,25 @@ +package com.softwaremill.sttp.streaming + +import java.nio.ByteBuffer + +import com.softwaremill.sttp.ForceWrappedValue +import monix.eval.Task +import monix.reactive.Observable + +trait MonixBaseHandler + extends TestStreamingHandler[Task, Observable[ByteBuffer]] { + + override implicit def forceResponse: ForceWrappedValue[Task] = + ForceWrappedValue.monixTask + + override def bodyProducer(body: String): Observable[ByteBuffer] = + Observable.fromIterable( + body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) + + override def bodyConsumer(stream: Observable[ByteBuffer]): Task[String] = + stream + .flatMap(bb => Observable.fromIterable(bb.array())) + .toListL + .map(bs => new String(bs.toArray, "utf8")) + +} diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixOKHStreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixOKHStreamingTests.scala new file mode 100644 index 0000000..04666be --- /dev/null +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixOKHStreamingTests.scala @@ -0,0 +1,17 @@ +package com.softwaremill.sttp.streaming + +import java.nio.ByteBuffer + +import com.softwaremill.sttp.SttpHandler +import com.softwaremill.sttp.okhttp.monix.OkHttpMonixClientHandler +import monix.eval.Task +import monix.reactive.Observable + +class MonixOKHStreamingTests extends MonixBaseHandler { + + import monix.execution.Scheduler.Implicits.global + + override implicit val handler: SttpHandler[Task, Observable[ByteBuffer]] = + OkHttpMonixClientHandler() + +} diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/TestStreamingHandler.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/TestStreamingHandler.scala new file mode 100644 index 0000000..b786e49 --- /dev/null +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/TestStreamingHandler.scala @@ -0,0 +1,15 @@ +package com.softwaremill.sttp.streaming + +import com.softwaremill.sttp.{ForceWrappedValue, SttpHandler} + +import scala.language.higherKinds + +trait TestStreamingHandler[R[_], S] { + implicit def handler: SttpHandler[R, S] + + implicit def forceResponse: ForceWrappedValue[R] + + def bodyProducer(body: String): S + + def bodyConsumer(stream: S): R[String] +} |