From 40288a1aaddfc27e141771371d69122ce222a8d0 Mon Sep 17 00:00:00 2001 From: Sam Guymer Date: Thu, 17 May 2018 20:07:04 +1000 Subject: Extract MonadAsyncError implementations Extract MonadAsyncError implementations into their own projects to allow reuse by multiple backends. --- .../sttp/impl/monix/TaskMonadAsyncError.scala | 31 ++++++++++++++++++++++ .../impl/monix/MonixTestStreamingBackend.scala | 22 +++++++++++++++ .../com/softwaremill/sttp/impl/monix/package.scala | 15 +++++++++++ 3 files changed, 68 insertions(+) create mode 100644 implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala create mode 100644 implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala create mode 100644 implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala (limited to 'implementations/monix/src') diff --git a/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala b/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala new file mode 100644 index 0000000..1806100 --- /dev/null +++ b/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala @@ -0,0 +1,31 @@ +package com.softwaremill.sttp.impl.monix + +import com.softwaremill.sttp.MonadAsyncError +import _root_.monix.eval.Task +import _root_.monix.execution.Cancelable + +import scala.util.{Failure, Success} + +object TaskMonadAsyncError extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { (_, cb) => + register { + case Left(t) => cb(Failure(t)) + case Right(t) => cb(Success(t)) + } + + Cancelable.empty + } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) + + override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] = + rt.onErrorRecoverWith(h) +} diff --git a/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala new file mode 100644 index 0000000..c6e44da --- /dev/null +++ b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala @@ -0,0 +1,22 @@ +package com.softwaremill.sttp.impl.monix + +import java.nio.ByteBuffer + +import _root_.monix.eval.Task +import _root_.monix.reactive.Observable +import com.softwaremill.sttp.testing.streaming.{ConvertToFuture, TestStreamingBackend} + +trait MonixTestStreamingBackend extends TestStreamingBackend[Task, Observable[ByteBuffer]] { + + override implicit def convertToFuture: ConvertToFuture[Task] = com.softwaremill.sttp.impl.monix.convertToFuture + + override def bodyProducer(body: String): Observable[ByteBuffer] = + Observable.fromIterable(body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) + + override def bodyConsumer(stream: Observable[ByteBuffer]): Task[String] = + stream + .flatMap(bb => Observable.fromIterable(bb.array())) + .toListL + .map(bs => new String(bs.toArray, "utf8")) + +} diff --git a/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala new file mode 100644 index 0000000..25cf7e1 --- /dev/null +++ b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala @@ -0,0 +1,15 @@ +package com.softwaremill.sttp.impl + +import scala.concurrent.Future + +import _root_.monix.eval.Task +import com.softwaremill.sttp.testing.streaming.ConvertToFuture + +package object monix { + + val convertToFuture: ConvertToFuture[Task] = new ConvertToFuture[Task] { + import _root_.monix.execution.Scheduler.Implicits.global + + override def toFuture[T](value: Task[T]): Future[T] = value.runAsync + } +} -- cgit v1.2.3 From 5980017ece9be1ebf30775e5babf81e0e2f1fcd9 Mon Sep 17 00:00:00 2001 From: Sam Guymer Date: Thu, 17 May 2018 20:07:04 +1000 Subject: Code review updates --- .../softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala | 2 +- .../scala/com/softwaremill/sttp/impl/cats/package.scala | 2 +- .../softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala | 4 ++-- .../sttp/impl/monix/MonixTestStreamingBackend.scala | 6 +++--- .../scala/com/softwaremill/sttp/impl/monix/package.scala | 2 +- .../scala/com/softwaremill/sttp/impl/scalaz/package.scala | 2 +- tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala | 13 +++++++------ .../sttp/streaming/AsyncHttpClientFs2StreamingTests.scala | 4 ++-- 8 files changed, 18 insertions(+), 17 deletions(-) (limited to 'implementations/monix/src') diff --git a/implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala b/implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala index 02f9643..902422e 100644 --- a/implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala +++ b/implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala @@ -1,6 +1,6 @@ package com.softwaremill.sttp.impl.cats -import _root_.cats.effect.{Async, Effect} +import cats.effect.{Async, Effect} import com.softwaremill.sttp.MonadAsyncError import scala.language.higherKinds diff --git a/implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala b/implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala index 65be9f2..abffc90 100644 --- a/implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala +++ b/implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala @@ -7,7 +7,7 @@ import scala.concurrent.Future package object cats { - val convertToFuture: ConvertToFuture[IO] = new ConvertToFuture[IO] { + val convertCatsIOToFuture: ConvertToFuture[IO] = new ConvertToFuture[IO] { override def toFuture[T](value: IO[T]): Future[T] = value.unsafeToFuture() } } diff --git a/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala b/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala index 1806100..75992d4 100644 --- a/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala +++ b/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala @@ -1,8 +1,8 @@ package com.softwaremill.sttp.impl.monix import com.softwaremill.sttp.MonadAsyncError -import _root_.monix.eval.Task -import _root_.monix.execution.Cancelable +import monix.eval.Task +import monix.execution.Cancelable import scala.util.{Failure, Success} diff --git a/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala index c6e44da..3f84ec3 100644 --- a/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala +++ b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala @@ -2,13 +2,13 @@ package com.softwaremill.sttp.impl.monix import java.nio.ByteBuffer -import _root_.monix.eval.Task -import _root_.monix.reactive.Observable import com.softwaremill.sttp.testing.streaming.{ConvertToFuture, TestStreamingBackend} +import monix.eval.Task +import monix.reactive.Observable trait MonixTestStreamingBackend extends TestStreamingBackend[Task, Observable[ByteBuffer]] { - override implicit def convertToFuture: ConvertToFuture[Task] = com.softwaremill.sttp.impl.monix.convertToFuture + override implicit def convertToFuture: ConvertToFuture[Task] = convertMonixTaskToFuture override def bodyProducer(body: String): Observable[ByteBuffer] = Observable.fromIterable(body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) diff --git a/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala index 25cf7e1..f77aa93 100644 --- a/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala +++ b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala @@ -7,7 +7,7 @@ import com.softwaremill.sttp.testing.streaming.ConvertToFuture package object monix { - val convertToFuture: ConvertToFuture[Task] = new ConvertToFuture[Task] { + val convertMonixTaskToFuture: ConvertToFuture[Task] = new ConvertToFuture[Task] { import _root_.monix.execution.Scheduler.Implicits.global override def toFuture[T](value: Task[T]): Future[T] = value.runAsync diff --git a/implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala b/implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala index 72dbf31..8ac6446 100644 --- a/implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala +++ b/implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala @@ -9,7 +9,7 @@ import scala.util.{Failure, Success} package object scalaz { - val convertToFuture: ConvertToFuture[Task] = new ConvertToFuture[Task] { + val convertScalazTaskToFuture: ConvertToFuture[Task] = new ConvertToFuture[Task] { // from https://github.com/Verizon/delorean override def toFuture[T](value: Task[T]): Future[T] = { val p = Promise[T]() diff --git a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala index 821b951..55e21b8 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala @@ -19,6 +19,9 @@ import com.softwaremill.sttp.asynchttpclient.cats.AsyncHttpClientCatsBackend import com.softwaremill.sttp.asynchttpclient.future.AsyncHttpClientFutureBackend import com.softwaremill.sttp.asynchttpclient.monix.AsyncHttpClientMonixBackend import com.softwaremill.sttp.asynchttpclient.scalaz.AsyncHttpClientScalazBackend +import com.softwaremill.sttp.impl.cats.convertCatsIOToFuture +import com.softwaremill.sttp.impl.monix.convertMonixTaskToFuture +import com.softwaremill.sttp.impl.scalaz.convertScalazTaskToFuture import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend import com.softwaremill.sttp.okhttp.{OkHttpFutureBackend, OkHttpSyncBackend} import com.softwaremill.sttp.testing.streaming.ConvertToFuture @@ -185,14 +188,12 @@ class BasicTests runTests("TryHttpURLConnection")(TryHttpURLConnectionBackend(), ConvertToFuture.scalaTry) runTests("Akka HTTP")(AkkaHttpBackend.usingActorSystem(actorSystem), ConvertToFuture.future) runTests("Async Http Client - Future")(AsyncHttpClientFutureBackend(), ConvertToFuture.future) - runTests("Async Http Client - Scalaz")(AsyncHttpClientScalazBackend(), - com.softwaremill.sttp.impl.scalaz.convertToFuture) - runTests("Async Http Client - Monix")(AsyncHttpClientMonixBackend(), com.softwaremill.sttp.impl.monix.convertToFuture) - runTests("Async Http Client - Cats Effect")(AsyncHttpClientCatsBackend[cats.effect.IO](), - com.softwaremill.sttp.impl.cats.convertToFuture) + runTests("Async Http Client - Scalaz")(AsyncHttpClientScalazBackend(), convertScalazTaskToFuture) + runTests("Async Http Client - Monix")(AsyncHttpClientMonixBackend(), convertMonixTaskToFuture) + runTests("Async Http Client - Cats IO")(AsyncHttpClientCatsBackend[cats.effect.IO](), convertCatsIOToFuture) runTests("OkHttpSyncClientHandler")(OkHttpSyncBackend(), ConvertToFuture.id) runTests("OkHttpAsyncClientHandler - Future")(OkHttpFutureBackend(), ConvertToFuture.future) - runTests("OkHttpAsyncClientHandler - Monix")(OkHttpMonixBackend(), com.softwaremill.sttp.impl.monix.convertToFuture) + runTests("OkHttpAsyncClientHandler - Monix")(OkHttpMonixBackend(), convertMonixTaskToFuture) def runTests[R[_]](name: String)(implicit backend: SttpBackend[R, Nothing], diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala index 680e91a..8959ccc 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala @@ -6,6 +6,7 @@ import cats.effect._ import cats.instances.string._ import com.softwaremill.sttp.SttpBackend import com.softwaremill.sttp.asynchttpclient.fs2.AsyncHttpClientFs2Backend +import com.softwaremill.sttp.impl.cats.convertCatsIOToFuture import com.softwaremill.sttp.testing.streaming.{ConvertToFuture, TestStreamingBackend} import fs2.{Chunk, Stream, text} @@ -14,8 +15,7 @@ class AsyncHttpClientFs2StreamingTests extends TestStreamingBackend[IO, Stream[I override implicit val backend: SttpBackend[IO, Stream[IO, ByteBuffer]] = AsyncHttpClientFs2Backend[IO]() - override implicit val convertToFuture: ConvertToFuture[IO] = - com.softwaremill.sttp.impl.cats.convertToFuture + override implicit val convertToFuture: ConvertToFuture[IO] = convertCatsIOToFuture override def bodyProducer(body: String): Stream[IO, ByteBuffer] = Stream.emits(body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) -- cgit v1.2.3