diff options
23 files changed, 344 insertions, 267 deletions
diff --git a/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala b/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala index 506ad42..0e5955a 100644 --- a/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala +++ b/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala @@ -2,9 +2,10 @@ package com.softwaremill.sttp.asynchttpclient.cats import java.nio.ByteBuffer -import cats.effect._ +import cats.effect.Async import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend -import com.softwaremill.sttp.{FollowRedirectsBackend, MonadAsyncError, SttpBackend, SttpBackendOptions} +import com.softwaremill.sttp.impl.cats.AsyncMonadAsyncError +import com.softwaremill.sttp.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions} import io.netty.buffer.ByteBuf import org.asynchttpclient.{AsyncHttpClient, AsyncHttpClientConfig, DefaultAsyncHttpClient} import org.reactivestreams.Publisher @@ -16,7 +17,7 @@ class AsyncHttpClientCatsBackend[F[_]: Async] private ( closeClient: Boolean ) extends AsyncHttpClientBackend[F, Nothing]( asyncHttpClient, - new AsyncMonad, + new AsyncMonadAsyncError, closeClient ) { override protected def streamBodyToPublisher(s: Nothing): Publisher[ByteBuf] = @@ -43,21 +44,3 @@ object AsyncHttpClientCatsBackend { def usingClient[F[_]: Async](client: AsyncHttpClient): SttpBackend[F, Nothing] = AsyncHttpClientCatsBackend(client, closeClient = false) } - -private[cats] class AsyncMonad[F[_]](implicit F: Async[F]) extends MonadAsyncError[F] { - - override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = - F.async(register) - - override def unit[T](t: T): F[T] = F.pure(t) - - override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f) - - override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] = - F.flatMap(fa)(f) - - override def error[T](t: Throwable): F[T] = F.raiseError(t) - - override protected def handleWrappedError[T](rt: F[T])(h: PartialFunction[Throwable, F[T]]): F[T] = - F.recoverWith(rt)(h) -} diff --git a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala index 35366c1..50d9057 100644 --- a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala +++ b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala @@ -79,6 +79,7 @@ object AsyncHttpClientFs2Backend { AsyncHttpClientFs2Backend[F](client, closeClient = false) } +// TODO replace with EffectMonadAsyncError when cats-effect versions are bin compat private[fs2] class EffectMonad[F[_]](implicit F: Effect[F]) extends MonadAsyncError[F] { override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = diff --git a/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala b/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala index 915b325..a181517 100644 --- a/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala +++ b/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala @@ -3,26 +3,18 @@ package com.softwaremill.sttp.asynchttpclient.monix import java.nio.ByteBuffer import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend -import com.softwaremill.sttp.{ - FollowRedirectsBackend, - MonadAsyncError, - SttpBackend, - SttpBackendOptions, - Utf8, - concatByteBuffers -} +import com.softwaremill.sttp.impl.monix.TaskMonadAsyncError +import com.softwaremill.sttp.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions, Utf8, concatByteBuffers} import io.netty.buffer.{ByteBuf, Unpooled} import monix.eval.Task -import monix.execution.{Cancelable, Scheduler} +import monix.execution.Scheduler import monix.reactive.Observable import org.asynchttpclient.{AsyncHttpClient, AsyncHttpClientConfig, DefaultAsyncHttpClient} import org.reactivestreams.Publisher -import scala.util.{Failure, Success} - class AsyncHttpClientMonixBackend private (asyncHttpClient: AsyncHttpClient, closeClient: Boolean)( implicit scheduler: Scheduler) - extends AsyncHttpClientBackend[Task, Observable[ByteBuffer]](asyncHttpClient, TaskMonad, closeClient) { + extends AsyncHttpClientBackend[Task, Observable[ByteBuffer]](asyncHttpClient, TaskMonadAsyncError, closeClient) { override protected def streamBodyToPublisher(s: Observable[ByteBuffer]): Publisher[ByteBuf] = s.map(Unpooled.wrappedBuffer).toReactivePublisher @@ -70,27 +62,3 @@ object AsyncHttpClientMonixBackend { implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] = AsyncHttpClientMonixBackend(client, closeClient = false) } - -private[monix] object TaskMonad extends MonadAsyncError[Task] { - override def unit[T](t: T): Task[T] = Task.now(t) - - override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) - - override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = - fa.flatMap(f) - - override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = - Task.async { (_, cb) => - register { - case Left(t) => cb(Failure(t)) - case Right(t) => cb(Success(t)) - } - - Cancelable.empty - } - - override def error[T](t: Throwable): Task[T] = Task.raiseError(t) - - override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] = - rt.onErrorRecoverWith(h) -} diff --git a/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala b/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala index 7c2343d..db8731e 100644 --- a/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala +++ b/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala @@ -3,16 +3,16 @@ package com.softwaremill.sttp.asynchttpclient.scalaz import java.nio.ByteBuffer import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend -import com.softwaremill.sttp.{FollowRedirectsBackend, MonadAsyncError, SttpBackend, SttpBackendOptions} +import com.softwaremill.sttp.impl.scalaz.TaskMonadAsyncError +import com.softwaremill.sttp.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions} import io.netty.buffer.ByteBuf import org.asynchttpclient.{AsyncHttpClient, AsyncHttpClientConfig, DefaultAsyncHttpClient} import org.reactivestreams.Publisher import scalaz.concurrent.Task -import scalaz.{-\/, \/-} class AsyncHttpClientScalazBackend private (asyncHttpClient: AsyncHttpClient, closeClient: Boolean) - extends AsyncHttpClientBackend[Task, Nothing](asyncHttpClient, TaskMonad, closeClient) { + extends AsyncHttpClientBackend[Task, Nothing](asyncHttpClient, TaskMonadAsyncError, closeClient) { override protected def streamBodyToPublisher(s: Nothing): Publisher[ByteBuf] = s // nothing is everything @@ -37,25 +37,3 @@ object AsyncHttpClientScalazBackend { def usingClient(client: AsyncHttpClient): SttpBackend[Task, Nothing] = AsyncHttpClientScalazBackend(client, closeClient = false) } - -private[scalaz] object TaskMonad extends MonadAsyncError[Task] { - override def unit[T](t: T): Task[T] = Task.point(t) - - override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) - - override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = - fa.flatMap(f) - - override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = - Task.async { cb => - register { - case Left(t) => cb(-\/(t)) - case Right(t) => cb(\/-(t)) - } - } - - override def error[T](t: Throwable): Task[T] = Task.fail(t) - - override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] = - rt.handleWith(h) -} @@ -14,13 +14,9 @@ val commonSettings = Seq( ), publishArtifact in Test := false, publishMavenStyle := true, - scmInfo := Some( - ScmInfo(url("https://github.com/softwaremill/sttp"), - "scm:git:git@github.com/softwaremill/sttp.git")), - developers := List( - Developer("adamw", "Adam Warski", "", url("https://softwaremill.com"))), - licenses := ("Apache-2.0", - url("http://www.apache.org/licenses/LICENSE-2.0.txt")) :: Nil, + scmInfo := Some(ScmInfo(url("https://github.com/softwaremill/sttp"), "scm:git:git@github.com/softwaremill/sttp.git")), + developers := List(Developer("adamw", "Adam Warski", "", url("https://softwaremill.com"))), + licenses := ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0.txt")) :: Nil, homepage := Some(url("http://softwaremill.com/open-source")), sonatypeProfileName := "com.softwaremill", // sbt-release @@ -36,9 +32,6 @@ val commonSettings = Seq( val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.1" val akkaStreams = "com.typesafe.akka" %% "akka-stream" % "2.5.12" -val monixVersion = "3.0.0-RC1" -val monix = "io.monix" %% "monix" % monixVersion - val scalaTest = "org.scalatest" %% "scalatest" % "3.0.5" lazy val rootProject = (project in file(".")) @@ -46,6 +39,9 @@ lazy val rootProject = (project in file(".")) .settings(publishArtifact := false, name := "sttp") .aggregate( core, + cats, + monix, + scalaz, akkaHttpBackend, asyncHttpClientBackend, asyncHttpClientFutureBackend, @@ -72,73 +68,90 @@ lazy val core: Project = (project in file("core")) ) ) -lazy val akkaHttpBackend: Project = (project in file("akka-http-backend")) +//----- implementations +lazy val cats: Project = (project in file("implementations/cats")) .settings(commonSettings: _*) .settings( - name := "akka-http-backend", - libraryDependencies ++= Seq( - akkaHttp, - // provided as we don't want to create a transitive dependency on a specific streams version, - // just as akka-http doesn't - akkaStreams % "provided" - ) - ) dependsOn core + name := "cats", + libraryDependencies ++= Seq("org.typelevel" %% "cats-effect" % "1.0.0-RC") + ) + .dependsOn(core % "compile->compile;test->test") -lazy val asyncHttpClientBackend: Project = (project in file( - "async-http-client-backend")) +lazy val monix: Project = (project in file("implementations/monix")) .settings(commonSettings: _*) .settings( - name := "async-http-client-backend", - libraryDependencies ++= Seq( - "org.asynchttpclient" % "async-http-client" % "2.4.7" - ) - ) dependsOn core + name := "monix", + libraryDependencies ++= Seq("io.monix" %% "monix" % "3.0.0-RC1") + ) + .dependsOn(core % "compile->compile;test->test") -lazy val asyncHttpClientFutureBackend: Project = (project in file( - "async-http-client-backend/future")) +lazy val scalaz: Project = (project in file("implementations/scalaz")) .settings(commonSettings: _*) .settings( - name := "async-http-client-backend-future" - ) dependsOn asyncHttpClientBackend + name := "scalaz", + libraryDependencies ++= Seq("org.scalaz" %% "scalaz-concurrent" % "7.2.22") + ) + .dependsOn(core % "compile->compile;test->test") -lazy val asyncHttpClientScalazBackend: Project = (project in file( - "async-http-client-backend/scalaz")) +//----- backends +//-- akka +lazy val akkaHttpBackend: Project = (project in file("akka-http-backend")) .settings(commonSettings: _*) .settings( - name := "async-http-client-backend-scalaz", + name := "akka-http-backend", libraryDependencies ++= Seq( - "org.scalaz" %% "scalaz-concurrent" % "7.2.23" + akkaHttp, + // provided as we don't want to create a transitive dependency on a specific streams version, + // just as akka-http doesn't + akkaStreams % "provided" ) - ) dependsOn asyncHttpClientBackend - -lazy val asyncHttpClientMonixBackend: Project = (project in file( - "async-http-client-backend/monix")) - .settings(commonSettings: _*) - .settings( - name := "async-http-client-backend-monix", - libraryDependencies ++= Seq(monix) - ) dependsOn asyncHttpClientBackend + ) + .dependsOn(core) -lazy val asyncHttpClientCatsBackend: Project = (project in file( - "async-http-client-backend/cats")) - .settings(commonSettings: _*) - .settings( - name := "async-http-client-backend-cats", - libraryDependencies ++= Seq( - "org.typelevel" %% "cats-effect" % "1.0.0-RC" +//-- async http client +lazy val asyncHttpClientBackend: Project = { + (project in file("async-http-client-backend")) + .settings(commonSettings: _*) + .settings( + name := "async-http-client-backend", + libraryDependencies ++= Seq( + "org.asynchttpclient" % "async-http-client" % "2.4.7" + ) ) - ) dependsOn asyncHttpClientBackend - -lazy val asyncHttpClientFs2Backend: Project = (project in file( - "async-http-client-backend/fs2")) - .settings(commonSettings: _*) - .settings( - name := "async-http-client-backend-fs2", - libraryDependencies ++= Seq( - "com.github.zainab-ali" %% "fs2-reactive-streams" % "0.5.1" + .dependsOn(core) +} + +def asyncHttpClientBackendProject(proj: String): Project = { + Project(s"asyncHttpClientBackend${proj.capitalize}", file(s"async-http-client-backend/$proj")) + .settings(commonSettings: _*) + .settings(name := s"async-http-client-backend-$proj") + .dependsOn(asyncHttpClientBackend) +} + +lazy val asyncHttpClientFutureBackend: Project = + asyncHttpClientBackendProject("future") + +lazy val asyncHttpClientScalazBackend: Project = + asyncHttpClientBackendProject("scalaz") + .dependsOn(scalaz) + +lazy val asyncHttpClientMonixBackend: Project = + asyncHttpClientBackendProject("monix") + .dependsOn(monix) + +lazy val asyncHttpClientCatsBackend: Project = + asyncHttpClientBackendProject("cats") + .dependsOn(cats) + +lazy val asyncHttpClientFs2Backend: Project = + asyncHttpClientBackendProject("fs2") + .settings( + libraryDependencies ++= Seq( + "com.github.zainab-ali" %% "fs2-reactive-streams" % "0.5.1" + ) ) - ) dependsOn asyncHttpClientBackend +//-- okhttp lazy val okhttpBackend: Project = (project in file("okhttp-backend")) .settings(commonSettings: _*) .settings( @@ -146,17 +159,23 @@ lazy val okhttpBackend: Project = (project in file("okhttp-backend")) libraryDependencies ++= Seq( "com.squareup.okhttp3" % "okhttp" % "3.10.0" ) - ) dependsOn core + ) + .dependsOn(core) -lazy val okhttpMonixBackend: Project = (project in file("okhttp-backend/monix")) - .settings(commonSettings: _*) - .settings( - name := "okhttp-backend-monix", - libraryDependencies ++= Seq(monix) - ) dependsOn okhttpBackend +def okhttpBackendProject(proj: String): Project = { + Project(s"okhttpBackend${proj.capitalize}", file(s"okhttp-backend/$proj")) + .settings(commonSettings: _*) + .settings(name := s"okhttp-backend-$proj") + .dependsOn(okhttpBackend) +} + +lazy val okhttpMonixBackend: Project = + okhttpBackendProject("monix") + .dependsOn(monix) lazy val circeVersion = "0.9.3" +//----- json lazy val circe: Project = (project in file("json/circe")) .settings(commonSettings: _*) .settings( @@ -190,7 +209,8 @@ lazy val braveBackend: Project = (project in file("metrics/brave-backend")) "io.zipkin.brave" % "brave-instrumentation-http-tests" % braveVersion % "test", scalaTest % "test" ) - ).dependsOn(core) + ) + .dependsOn(core) lazy val prometheusBackend: Project = (project in file("metrics/prometheus-backend")) .settings(commonSettings: _*) @@ -210,12 +230,24 @@ lazy val tests: Project = (project in file("tests")) name := "tests", libraryDependencies ++= Seq( akkaHttp, + akkaStreams, scalaTest, "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0", "com.github.pathikrit" %% "better-files" % "3.4.0", "ch.qos.logback" % "logback-classic" % "1.2.3", "org.scala-lang" % "scala-compiler" % scalaVersion.value - ).map(_ % "test"), - libraryDependencies += akkaStreams, - ) dependsOn (core, akkaHttpBackend, asyncHttpClientFutureBackend, asyncHttpClientScalazBackend, -asyncHttpClientMonixBackend, asyncHttpClientCatsBackend, asyncHttpClientFs2Backend, okhttpMonixBackend) + ).map(_ % "test") + ) + .dependsOn( + core % "compile->compile;test->test", + cats % "compile->compile;test->test", + monix % "compile->compile;test->test", + scalaz % "compile->compile;test->test", + akkaHttpBackend, + asyncHttpClientFutureBackend, + asyncHttpClientScalazBackend, + asyncHttpClientMonixBackend, + asyncHttpClientCatsBackend, + asyncHttpClientFs2Backend, + okhttpMonixBackend + ) diff --git a/core/src/test/scala/com/softwaremill/sttp/testing/streaming/ConvertToFuture.scala b/core/src/test/scala/com/softwaremill/sttp/testing/streaming/ConvertToFuture.scala new file mode 100644 index 0000000..9438890 --- /dev/null +++ b/core/src/test/scala/com/softwaremill/sttp/testing/streaming/ConvertToFuture.scala @@ -0,0 +1,26 @@ +package com.softwaremill.sttp.testing.streaming + +import com.softwaremill.sttp.Id +import scala.concurrent.Future +import scala.language.higherKinds +import scala.util.Try + +trait ConvertToFuture[R[_]] { + def toFuture[T](value: R[T]): Future[T] +} + +object ConvertToFuture { + + val id: ConvertToFuture[Id] = new ConvertToFuture[Id] { + override def toFuture[T](value: Id[T]): Future[T] = + Future.successful(value) + } + + val future: ConvertToFuture[Future] = new ConvertToFuture[Future] { + override def toFuture[T](value: Future[T]): Future[T] = value + } + + val scalaTry: ConvertToFuture[Try] = new ConvertToFuture[Try] { + override def toFuture[T](value: Try[T]): Future[T] = Future.fromTry(value) + } +} diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/TestStreamingBackend.scala b/core/src/test/scala/com/softwaremill/sttp/testing/streaming/TestStreamingBackend.scala index 5fc1c57..266c402 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/streaming/TestStreamingBackend.scala +++ b/core/src/test/scala/com/softwaremill/sttp/testing/streaming/TestStreamingBackend.scala @@ -1,13 +1,13 @@ -package com.softwaremill.sttp.streaming +package com.softwaremill.sttp.testing.streaming -import com.softwaremill.sttp.{ForceWrappedValue, SttpBackend} +import com.softwaremill.sttp.SttpBackend import scala.language.higherKinds trait TestStreamingBackend[R[_], S] { implicit def backend: SttpBackend[R, S] - implicit def forceResponse: ForceWrappedValue[R] + implicit def convertToFuture: ConvertToFuture[R] def bodyProducer(body: String): S diff --git a/implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala b/implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala new file mode 100644 index 0000000..902422e --- /dev/null +++ b/implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala @@ -0,0 +1,26 @@ +package com.softwaremill.sttp.impl.cats + +import cats.effect.{Async, Effect} +import com.softwaremill.sttp.MonadAsyncError + +import scala.language.higherKinds + +class AsyncMonadAsyncError[F[_]](implicit F: Async[F]) extends MonadAsyncError[F] { + + override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = + F.async(register) + + override def unit[T](t: T): F[T] = F.pure(t) + + override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f) + + override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] = + F.flatMap(fa)(f) + + override def error[T](t: Throwable): F[T] = F.raiseError(t) + + override protected def handleWrappedError[T](rt: F[T])(h: PartialFunction[Throwable, F[T]]): F[T] = + F.recoverWith(rt)(h) +} + +class EffectMonadAsyncError[F[_]](implicit F: Effect[F]) extends AsyncMonadAsyncError[F] diff --git a/implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala b/implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala new file mode 100644 index 0000000..abffc90 --- /dev/null +++ b/implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala @@ -0,0 +1,13 @@ +package com.softwaremill.sttp.impl + +import _root_.cats.effect.IO +import com.softwaremill.sttp.testing.streaming.ConvertToFuture + +import scala.concurrent.Future + +package object cats { + + val convertCatsIOToFuture: ConvertToFuture[IO] = new ConvertToFuture[IO] { + override def toFuture[T](value: IO[T]): Future[T] = value.unsafeToFuture() + } +} diff --git a/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala b/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala new file mode 100644 index 0000000..75992d4 --- /dev/null +++ b/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala @@ -0,0 +1,31 @@ +package com.softwaremill.sttp.impl.monix + +import com.softwaremill.sttp.MonadAsyncError +import monix.eval.Task +import monix.execution.Cancelable + +import scala.util.{Failure, Success} + +object TaskMonadAsyncError extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { (_, cb) => + register { + case Left(t) => cb(Failure(t)) + case Right(t) => cb(Success(t)) + } + + Cancelable.empty + } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) + + override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] = + rt.onErrorRecoverWith(h) +} diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixBaseBackend.scala b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala index acd67a7..3f84ec3 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixBaseBackend.scala +++ b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala @@ -1,15 +1,14 @@ -package com.softwaremill.sttp.streaming +package com.softwaremill.sttp.impl.monix import java.nio.ByteBuffer -import com.softwaremill.sttp.ForceWrappedValue +import com.softwaremill.sttp.testing.streaming.{ConvertToFuture, TestStreamingBackend} import monix.eval.Task import monix.reactive.Observable -trait MonixBaseBackend extends TestStreamingBackend[Task, Observable[ByteBuffer]] { +trait MonixTestStreamingBackend extends TestStreamingBackend[Task, Observable[ByteBuffer]] { - override implicit def forceResponse: ForceWrappedValue[Task] = - ForceWrappedValue.monixTask + override implicit def convertToFuture: ConvertToFuture[Task] = convertMonixTaskToFuture override def bodyProducer(body: String): Observable[ByteBuffer] = Observable.fromIterable(body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) diff --git a/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala new file mode 100644 index 0000000..f77aa93 --- /dev/null +++ b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala @@ -0,0 +1,15 @@ +package com.softwaremill.sttp.impl + +import scala.concurrent.Future + +import _root_.monix.eval.Task +import com.softwaremill.sttp.testing.streaming.ConvertToFuture + +package object monix { + + val convertMonixTaskToFuture: ConvertToFuture[Task] = new ConvertToFuture[Task] { + import _root_.monix.execution.Scheduler.Implicits.global + + override def toFuture[T](value: Task[T]): Future[T] = value.runAsync + } +} diff --git a/implementations/scalaz/src/main/scala/com/softwaremill/sttp/impl/scalaz/TaskMonadAsyncError.scala b/implementations/scalaz/src/main/scala/com/softwaremill/sttp/impl/scalaz/TaskMonadAsyncError.scala new file mode 100644 index 0000000..8535321 --- /dev/null +++ b/implementations/scalaz/src/main/scala/com/softwaremill/sttp/impl/scalaz/TaskMonadAsyncError.scala @@ -0,0 +1,28 @@ +package com.softwaremill.sttp.impl.scalaz + +import com.softwaremill.sttp.MonadAsyncError + +import scalaz.concurrent.Task +import scalaz.{-\/, \/-} + +object TaskMonadAsyncError extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.point(t) + + override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { cb => + register { + case Left(t) => cb(-\/(t)) + case Right(t) => cb(\/-(t)) + } + } + + override def error[T](t: Throwable): Task[T] = Task.fail(t) + + override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] = + rt.handleWith(h) +} diff --git a/implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala b/implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala new file mode 100644 index 0000000..8ac6446 --- /dev/null +++ b/implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala @@ -0,0 +1,25 @@ +package com.softwaremill.sttp.impl + +import com.softwaremill.sttp.testing.streaming.ConvertToFuture + +import _root_.scalaz.concurrent.Task +import _root_.scalaz.{-\/, \/-} +import scala.concurrent.{Future, Promise} +import scala.util.{Failure, Success} + +package object scalaz { + + val convertScalazTaskToFuture: ConvertToFuture[Task] = new ConvertToFuture[Task] { + // from https://github.com/Verizon/delorean + override def toFuture[T](value: Task[T]): Future[T] = { + val p = Promise[T]() + + value.unsafePerformAsync { + case \/-(a) => p.complete(Success(a)); () + case -\/(t) => p.complete(Failure(t)); () + } + + p.future + } + } +} diff --git a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala index bda8959..332d8d0 100644 --- a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala +++ b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala @@ -3,21 +3,22 @@ package com.softwaremill.sttp.okhttp.monix import java.nio.ByteBuffer import java.util.concurrent.ArrayBlockingQueue +import com.softwaremill.sttp.impl.monix.TaskMonadAsyncError import com.softwaremill.sttp.{SttpBackend, _} import com.softwaremill.sttp.okhttp.{OkHttpAsyncBackend, OkHttpBackend} import monix.eval.Task import monix.execution.Ack.Continue -import monix.execution.{Ack, Cancelable, Scheduler} +import monix.execution.{Ack, Scheduler} import monix.reactive.Observable import monix.reactive.observers.Subscriber import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} import okio.BufferedSink import scala.concurrent.Future -import scala.util.{Failure, Success, Try} +import scala.util.{Success, Try} class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(implicit s: Scheduler) - extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonad, closeClient) { + extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonadAsyncError, closeClient) { override def streamToRequestBody(stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = Some(new OkHttpRequestBody() { @@ -40,7 +41,7 @@ class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(im val blockingQueue = new ArrayBlockingQueue[Either[Throwable, T]](1) - observable.executeWithFork.subscribe(new Subscriber[T] { + observable.executeAsync.subscribe(new Subscriber[T] { override implicit def scheduler: Scheduler = s override def onError(ex: Throwable): Unit = { @@ -86,29 +87,3 @@ object OkHttpMonixBackend { implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] = OkHttpMonixBackend(client, closeClient = false)(s) } - -private[monix] object TaskMonad extends MonadAsyncError[Task] { - override def unit[T](t: T): Task[T] = Task.now(t) - - override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) - - override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = - fa.flatMap(f) - - override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = - Task.async { (_, cb) => - register { - case Left(t) => cb(Failure(t)) - case Right(t) => cb(Success(t)) - } - - Cancelable.empty - } - - override def error[T](t: Throwable): Task[T] = Task.raiseError(t) - - override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] = - rt.onErrorRecoverWith { - case t => h(t) - } -} diff --git a/tests/src/test/resources/logback.xml b/tests/src/test/resources/logback.xml new file mode 100644 index 0000000..1470b51 --- /dev/null +++ b/tests/src/test/resources/logback.xml @@ -0,0 +1,12 @@ +<configuration> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + + <root level="INFO"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> diff --git a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala index 392f6d8..55e21b8 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala @@ -19,8 +19,12 @@ import com.softwaremill.sttp.asynchttpclient.cats.AsyncHttpClientCatsBackend import com.softwaremill.sttp.asynchttpclient.future.AsyncHttpClientFutureBackend import com.softwaremill.sttp.asynchttpclient.monix.AsyncHttpClientMonixBackend import com.softwaremill.sttp.asynchttpclient.scalaz.AsyncHttpClientScalazBackend +import com.softwaremill.sttp.impl.cats.convertCatsIOToFuture +import com.softwaremill.sttp.impl.monix.convertMonixTaskToFuture +import com.softwaremill.sttp.impl.scalaz.convertScalazTaskToFuture import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend import com.softwaremill.sttp.okhttp.{OkHttpFutureBackend, OkHttpSyncBackend} +import com.softwaremill.sttp.testing.streaming.ConvertToFuture import com.typesafe.scalalogging.StrictLogging import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} import org.scalatest.{path => _, _} @@ -180,19 +184,20 @@ class BasicTests var closeBackends: List[() => Unit] = Nil - runTests("HttpURLConnection")(HttpURLConnectionBackend(), ForceWrappedValue.id) - runTests("TryHttpURLConnection")(TryHttpURLConnectionBackend(), ForceWrappedValue.scalaTry) - runTests("Akka HTTP")(AkkaHttpBackend.usingActorSystem(actorSystem), ForceWrappedValue.future) - runTests("Async Http Client - Future")(AsyncHttpClientFutureBackend(), ForceWrappedValue.future) - runTests("Async Http Client - Scalaz")(AsyncHttpClientScalazBackend(), ForceWrappedValue.scalazTask) - runTests("Async Http Client - Monix")(AsyncHttpClientMonixBackend(), ForceWrappedValue.monixTask) - runTests("Async Http Client - Cats Effect")(AsyncHttpClientCatsBackend[cats.effect.IO](), ForceWrappedValue.catsIo) - runTests("OkHttpSyncClientHandler")(OkHttpSyncBackend(), ForceWrappedValue.id) - runTests("OkHttpAsyncClientHandler - Future")(OkHttpFutureBackend(), ForceWrappedValue.future) - runTests("OkHttpAsyncClientHandler - Monix")(OkHttpMonixBackend(), ForceWrappedValue.monixTask) - - def runTests[R[_]](name: String)(implicit backend: SttpBackend[R, Nothing], - forceResponse: ForceWrappedValue[R]): Unit = { + runTests("HttpURLConnection")(HttpURLConnectionBackend(), ConvertToFuture.id) + runTests("TryHttpURLConnection")(TryHttpURLConnectionBackend(), ConvertToFuture.scalaTry) + runTests("Akka HTTP")(AkkaHttpBackend.usingActorSystem(actorSystem), ConvertToFuture.future) + runTests("Async Http Client - Future")(AsyncHttpClientFutureBackend(), ConvertToFuture.future) + runTests("Async Http Client - Scalaz")(AsyncHttpClientScalazBackend(), convertScalazTaskToFuture) + runTests("Async Http Client - Monix")(AsyncHttpClientMonixBackend(), convertMonixTaskToFuture) + runTests("Async Http Client - Cats IO")(AsyncHttpClientCatsBackend[cats.effect.IO](), convertCatsIOToFuture) + runTests("OkHttpSyncClientHandler")(OkHttpSyncBackend(), ConvertToFuture.id) + runTests("OkHttpAsyncClientHandler - Future")(OkHttpFutureBackend(), ConvertToFuture.future) + runTests("OkHttpAsyncClientHandler - Monix")(OkHttpMonixBackend(), convertMonixTaskToFuture) + + def runTests[R[_]](name: String)(implicit + backend: SttpBackend[R, Nothing], + convertToFuture: ConvertToFuture[R]): Unit = { closeBackends = (() => backend.close()) :: closeBackends diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala index e3dbe2c..b2b44a2 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala @@ -3,6 +3,7 @@ package com.softwaremill.sttp import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import com.softwaremill.sttp.streaming._ +import com.softwaremill.sttp.testing.streaming.TestStreamingBackend import com.typesafe.scalalogging.StrictLogging import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaHttpStreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaHttpStreamingTests.scala index d8fbb82..691df81 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaHttpStreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaHttpStreamingTests.scala @@ -5,8 +5,9 @@ import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.Source import akka.util.ByteString +import com.softwaremill.sttp.SttpBackend import com.softwaremill.sttp.akkahttp.AkkaHttpBackend -import com.softwaremill.sttp.{ForceWrappedValue, SttpBackend} +import com.softwaremill.sttp.testing.streaming.{ConvertToFuture, TestStreamingBackend} import scala.concurrent.Future @@ -16,8 +17,8 @@ class AkkaHttpStreamingTests(actorSystem: ActorSystem)(implicit materializer: Ma override implicit val backend: SttpBackend[Future, Source[ByteString, Any]] = AkkaHttpBackend.usingActorSystem(actorSystem) - override implicit val forceResponse: ForceWrappedValue[Future] = - ForceWrappedValue.future + override implicit val convertToFuture: ConvertToFuture[Future] = + ConvertToFuture.future override def bodyProducer(body: String): Source[ByteString, NotUsed] = Source.single(ByteString(body)) diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala index 82f36be..8959ccc 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala @@ -3,18 +3,19 @@ package com.softwaremill.sttp.streaming import java.nio.ByteBuffer import cats.effect._ -import cats.implicits._ +import cats.instances.string._ +import com.softwaremill.sttp.SttpBackend import com.softwaremill.sttp.asynchttpclient.fs2.AsyncHttpClientFs2Backend -import com.softwaremill.sttp.{ForceWrappedValue, SttpBackend} -import fs2._ +import com.softwaremill.sttp.impl.cats.convertCatsIOToFuture +import com.softwaremill.sttp.testing.streaming.{ConvertToFuture, TestStreamingBackend} +import fs2.{Chunk, Stream, text} class AsyncHttpClientFs2StreamingTests extends TestStreamingBackend[IO, Stream[IO, ByteBuffer]] { override implicit val backend: SttpBackend[IO, Stream[IO, ByteBuffer]] = AsyncHttpClientFs2Backend[IO]() - override implicit val forceResponse: ForceWrappedValue[IO] = - ForceWrappedValue.catsIo + override implicit val convertToFuture: ConvertToFuture[IO] = convertCatsIOToFuture override def bodyProducer(body: String): Stream[IO, ByteBuffer] = Stream.emits(body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientMonixStreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientMonixStreamingTests.scala index 0357668..faebf8b 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientMonixStreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientMonixStreamingTests.scala @@ -4,10 +4,11 @@ import java.nio.ByteBuffer import com.softwaremill.sttp.SttpBackend import com.softwaremill.sttp.asynchttpclient.monix.AsyncHttpClientMonixBackend +import com.softwaremill.sttp.impl.monix.MonixTestStreamingBackend import monix.eval.Task import monix.reactive.Observable -class AsyncHttpClientMonixStreamingTests extends MonixBaseBackend { +class AsyncHttpClientMonixStreamingTests extends MonixTestStreamingBackend { import monix.execution.Scheduler.Implicits.global diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/OkHttpMonixStreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/OkHttpMonixStreamingTests.scala index 9bfcc26..27a4517 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/streaming/OkHttpMonixStreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/OkHttpMonixStreamingTests.scala @@ -3,11 +3,12 @@ package com.softwaremill.sttp.streaming import java.nio.ByteBuffer import com.softwaremill.sttp.SttpBackend +import com.softwaremill.sttp.impl.monix.MonixTestStreamingBackend import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend import monix.eval.Task import monix.reactive.Observable -class OkHttpMonixStreamingTests extends MonixBaseBackend { +class OkHttpMonixStreamingTests extends MonixTestStreamingBackend { import monix.execution.Scheduler.Implicits.global diff --git a/tests/src/test/scala/com/softwaremill/sttp/testHelpers.scala b/tests/src/test/scala/com/softwaremill/sttp/testHelpers.scala index 1a57b7e..12fe770 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/testHelpers.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/testHelpers.scala @@ -12,11 +12,8 @@ import org.scalatest.exceptions.TestFailedException import org.scalatest.matchers.{MatchResult, Matcher} import org.scalatest.{BeforeAndAfterAll, Suite} -import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.higherKinds -import scala.util.Try -import scalaz._ trait TestHttpServer extends BeforeAndAfterAll with ScalaFutures with TestingPatience { this: Suite => @@ -38,60 +35,18 @@ trait TestHttpServer extends BeforeAndAfterAll with ScalaFutures with TestingPat def port: Int } -trait ForceWrappedValue[R[_]] { - def force[T](wrapped: R[T]): T -} - -object ForceWrappedValue extends ScalaFutures with TestingPatience { - - val id = new ForceWrappedValue[Id] { - override def force[T](wrapped: Id[T]): T = - wrapped - } - - val scalaTry = new ForceWrappedValue[Try] { - override def force[T](wrapped: Try[T]): T = wrapped.get - } - - val future = new ForceWrappedValue[Future] { - - override def force[T](wrapped: Future[T]): T = - try { - wrapped.futureValue - } catch { - case e: TestFailedException if e.getCause != null => throw e.getCause - } - } - val scalazTask = new ForceWrappedValue[scalaz.concurrent.Task] { - override def force[T](wrapped: scalaz.concurrent.Task[T]): T = - wrapped.unsafePerformSyncAttempt match { - case -\/(error) => throw error - case \/-(value) => value - } - } - val monixTask = new ForceWrappedValue[monix.eval.Task] { - import monix.execution.Scheduler.Implicits.global +trait ForceWrapped extends ScalaFutures with TestingPatience { this: Suite => + type ConvertToFuture[R[_]] = + com.softwaremill.sttp.testing.streaming.ConvertToFuture[R] - override def force[T](wrapped: monix.eval.Task[T]): T = + implicit class ForceDecorator[R[_], T](wrapped: R[T]) { + def force()(implicit ctf: ConvertToFuture[R]): T = { try { - wrapped.runAsync.futureValue + ctf.toFuture(wrapped).futureValue } catch { case e: TestFailedException if e.getCause != null => throw e.getCause } - } - val catsIo = new ForceWrappedValue[cats.effect.IO] { - override def force[T](wrapped: cats.effect.IO[T]): T = - wrapped.unsafeRunSync - } -} - -trait ForceWrapped extends ScalaFutures with TestingPatience { this: Suite => - type ForceWrappedValue[R[_]] = com.softwaremill.sttp.ForceWrappedValue[R] - val ForceWrappedValue: com.softwaremill.sttp.ForceWrappedValue.type = - com.softwaremill.sttp.ForceWrappedValue - - implicit class ForceDecorator[R[_], T](wrapped: R[T]) { - def force()(implicit fwv: ForceWrappedValue[R]): T = fwv.force(wrapped) + } } } |