aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Guymer <sam@guymer.me>2018-05-17 20:07:04 +1000
committerSam Guymer <sam@guymer.me>2018-05-18 22:28:28 +1000
commit40288a1aaddfc27e141771371d69122ce222a8d0 (patch)
tree14177a51769ce42e88ec3d20435abe505b44aac5
parent96ff655f906f2e3f4e9ba906c42e96506f4668b9 (diff)
downloadsttp-40288a1aaddfc27e141771371d69122ce222a8d0.tar.gz
sttp-40288a1aaddfc27e141771371d69122ce222a8d0.tar.bz2
sttp-40288a1aaddfc27e141771371d69122ce222a8d0.zip
Extract MonadAsyncError implementations
Extract MonadAsyncError implementations into their own projects to allow reuse by multiple backends.
-rw-r--r--async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala25
-rw-r--r--async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala1
-rw-r--r--async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala40
-rw-r--r--async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala28
-rw-r--r--build.sbt176
-rw-r--r--core/src/test/scala/com/softwaremill/sttp/testing/streaming/ConvertToFuture.scala26
-rw-r--r--core/src/test/scala/com/softwaremill/sttp/testing/streaming/TestStreamingBackend.scala (renamed from tests/src/test/scala/com/softwaremill/sttp/streaming/TestStreamingBackend.scala)6
-rw-r--r--implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala26
-rw-r--r--implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala13
-rw-r--r--implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala31
-rw-r--r--implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala22
-rw-r--r--implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala15
-rw-r--r--implementations/scalaz/src/main/scala/com/softwaremill/sttp/impl/scalaz/TaskMonadAsyncError.scala28
-rw-r--r--implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala25
-rw-r--r--okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala35
-rw-r--r--tests/src/test/resources/logback.xml12
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala30
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala1
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaHttpStreamingTests.scala7
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala11
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientMonixStreamingTests.scala3
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/streaming/MonixBaseBackend.scala23
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/streaming/OkHttpMonixStreamingTests.scala3
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/testHelpers.scala59
24 files changed, 361 insertions, 285 deletions
diff --git a/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala b/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala
index 506ad42..0e5955a 100644
--- a/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala
+++ b/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala
@@ -2,9 +2,10 @@ package com.softwaremill.sttp.asynchttpclient.cats
import java.nio.ByteBuffer
-import cats.effect._
+import cats.effect.Async
import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend
-import com.softwaremill.sttp.{FollowRedirectsBackend, MonadAsyncError, SttpBackend, SttpBackendOptions}
+import com.softwaremill.sttp.impl.cats.AsyncMonadAsyncError
+import com.softwaremill.sttp.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions}
import io.netty.buffer.ByteBuf
import org.asynchttpclient.{AsyncHttpClient, AsyncHttpClientConfig, DefaultAsyncHttpClient}
import org.reactivestreams.Publisher
@@ -16,7 +17,7 @@ class AsyncHttpClientCatsBackend[F[_]: Async] private (
closeClient: Boolean
) extends AsyncHttpClientBackend[F, Nothing](
asyncHttpClient,
- new AsyncMonad,
+ new AsyncMonadAsyncError,
closeClient
) {
override protected def streamBodyToPublisher(s: Nothing): Publisher[ByteBuf] =
@@ -43,21 +44,3 @@ object AsyncHttpClientCatsBackend {
def usingClient[F[_]: Async](client: AsyncHttpClient): SttpBackend[F, Nothing] =
AsyncHttpClientCatsBackend(client, closeClient = false)
}
-
-private[cats] class AsyncMonad[F[_]](implicit F: Async[F]) extends MonadAsyncError[F] {
-
- override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
- F.async(register)
-
- override def unit[T](t: T): F[T] = F.pure(t)
-
- override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f)
-
- override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] =
- F.flatMap(fa)(f)
-
- override def error[T](t: Throwable): F[T] = F.raiseError(t)
-
- override protected def handleWrappedError[T](rt: F[T])(h: PartialFunction[Throwable, F[T]]): F[T] =
- F.recoverWith(rt)(h)
-}
diff --git a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
index 35366c1..50d9057 100644
--- a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
+++ b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
@@ -79,6 +79,7 @@ object AsyncHttpClientFs2Backend {
AsyncHttpClientFs2Backend[F](client, closeClient = false)
}
+// TODO replace with EffectMonadAsyncError when cats-effect versions are bin compat
private[fs2] class EffectMonad[F[_]](implicit F: Effect[F]) extends MonadAsyncError[F] {
override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
diff --git a/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala b/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala
index 915b325..a181517 100644
--- a/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala
+++ b/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala
@@ -3,26 +3,18 @@ package com.softwaremill.sttp.asynchttpclient.monix
import java.nio.ByteBuffer
import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend
-import com.softwaremill.sttp.{
- FollowRedirectsBackend,
- MonadAsyncError,
- SttpBackend,
- SttpBackendOptions,
- Utf8,
- concatByteBuffers
-}
+import com.softwaremill.sttp.impl.monix.TaskMonadAsyncError
+import com.softwaremill.sttp.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions, Utf8, concatByteBuffers}
import io.netty.buffer.{ByteBuf, Unpooled}
import monix.eval.Task
-import monix.execution.{Cancelable, Scheduler}
+import monix.execution.Scheduler
import monix.reactive.Observable
import org.asynchttpclient.{AsyncHttpClient, AsyncHttpClientConfig, DefaultAsyncHttpClient}
import org.reactivestreams.Publisher
-import scala.util.{Failure, Success}
-
class AsyncHttpClientMonixBackend private (asyncHttpClient: AsyncHttpClient, closeClient: Boolean)(
implicit scheduler: Scheduler)
- extends AsyncHttpClientBackend[Task, Observable[ByteBuffer]](asyncHttpClient, TaskMonad, closeClient) {
+ extends AsyncHttpClientBackend[Task, Observable[ByteBuffer]](asyncHttpClient, TaskMonadAsyncError, closeClient) {
override protected def streamBodyToPublisher(s: Observable[ByteBuffer]): Publisher[ByteBuf] =
s.map(Unpooled.wrappedBuffer).toReactivePublisher
@@ -70,27 +62,3 @@ object AsyncHttpClientMonixBackend {
implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] =
AsyncHttpClientMonixBackend(client, closeClient = false)
}
-
-private[monix] object TaskMonad extends MonadAsyncError[Task] {
- override def unit[T](t: T): Task[T] = Task.now(t)
-
- override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f)
-
- override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
- fa.flatMap(f)
-
- override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
- Task.async { (_, cb) =>
- register {
- case Left(t) => cb(Failure(t))
- case Right(t) => cb(Success(t))
- }
-
- Cancelable.empty
- }
-
- override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
-
- override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] =
- rt.onErrorRecoverWith(h)
-}
diff --git a/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala b/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala
index 7c2343d..db8731e 100644
--- a/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala
+++ b/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala
@@ -3,16 +3,16 @@ package com.softwaremill.sttp.asynchttpclient.scalaz
import java.nio.ByteBuffer
import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend
-import com.softwaremill.sttp.{FollowRedirectsBackend, MonadAsyncError, SttpBackend, SttpBackendOptions}
+import com.softwaremill.sttp.impl.scalaz.TaskMonadAsyncError
+import com.softwaremill.sttp.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions}
import io.netty.buffer.ByteBuf
import org.asynchttpclient.{AsyncHttpClient, AsyncHttpClientConfig, DefaultAsyncHttpClient}
import org.reactivestreams.Publisher
import scalaz.concurrent.Task
-import scalaz.{-\/, \/-}
class AsyncHttpClientScalazBackend private (asyncHttpClient: AsyncHttpClient, closeClient: Boolean)
- extends AsyncHttpClientBackend[Task, Nothing](asyncHttpClient, TaskMonad, closeClient) {
+ extends AsyncHttpClientBackend[Task, Nothing](asyncHttpClient, TaskMonadAsyncError, closeClient) {
override protected def streamBodyToPublisher(s: Nothing): Publisher[ByteBuf] =
s // nothing is everything
@@ -37,25 +37,3 @@ object AsyncHttpClientScalazBackend {
def usingClient(client: AsyncHttpClient): SttpBackend[Task, Nothing] =
AsyncHttpClientScalazBackend(client, closeClient = false)
}
-
-private[scalaz] object TaskMonad extends MonadAsyncError[Task] {
- override def unit[T](t: T): Task[T] = Task.point(t)
-
- override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f)
-
- override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
- fa.flatMap(f)
-
- override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
- Task.async { cb =>
- register {
- case Left(t) => cb(-\/(t))
- case Right(t) => cb(\/-(t))
- }
- }
-
- override def error[T](t: Throwable): Task[T] = Task.fail(t)
-
- override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] =
- rt.handleWith(h)
-}
diff --git a/build.sbt b/build.sbt
index ce57190..1f510e1 100644
--- a/build.sbt
+++ b/build.sbt
@@ -14,13 +14,9 @@ val commonSettings = Seq(
),
publishArtifact in Test := false,
publishMavenStyle := true,
- scmInfo := Some(
- ScmInfo(url("https://github.com/softwaremill/sttp"),
- "scm:git:git@github.com/softwaremill/sttp.git")),
- developers := List(
- Developer("adamw", "Adam Warski", "", url("https://softwaremill.com"))),
- licenses := ("Apache-2.0",
- url("http://www.apache.org/licenses/LICENSE-2.0.txt")) :: Nil,
+ scmInfo := Some(ScmInfo(url("https://github.com/softwaremill/sttp"), "scm:git:git@github.com/softwaremill/sttp.git")),
+ developers := List(Developer("adamw", "Adam Warski", "", url("https://softwaremill.com"))),
+ licenses := ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0.txt")) :: Nil,
homepage := Some(url("http://softwaremill.com/open-source")),
sonatypeProfileName := "com.softwaremill",
// sbt-release
@@ -36,9 +32,6 @@ val commonSettings = Seq(
val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.1"
val akkaStreams = "com.typesafe.akka" %% "akka-stream" % "2.5.12"
-val monixVersion = "3.0.0-RC1"
-val monix = "io.monix" %% "monix" % monixVersion
-
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.5"
lazy val rootProject = (project in file("."))
@@ -46,6 +39,9 @@ lazy val rootProject = (project in file("."))
.settings(publishArtifact := false, name := "sttp")
.aggregate(
core,
+ cats,
+ monix,
+ scalaz,
akkaHttpBackend,
asyncHttpClientBackend,
asyncHttpClientFutureBackend,
@@ -72,73 +68,90 @@ lazy val core: Project = (project in file("core"))
)
)
-lazy val akkaHttpBackend: Project = (project in file("akka-http-backend"))
+//----- implementations
+lazy val cats: Project = (project in file("implementations/cats"))
.settings(commonSettings: _*)
.settings(
- name := "akka-http-backend",
- libraryDependencies ++= Seq(
- akkaHttp,
- // provided as we don't want to create a transitive dependency on a specific streams version,
- // just as akka-http doesn't
- akkaStreams % "provided"
- )
- ) dependsOn core
+ name := "cats",
+ libraryDependencies ++= Seq("org.typelevel" %% "cats-effect" % "1.0.0-RC")
+ )
+ .dependsOn(core % "compile->compile;test->test")
-lazy val asyncHttpClientBackend: Project = (project in file(
- "async-http-client-backend"))
+lazy val monix: Project = (project in file("implementations/monix"))
.settings(commonSettings: _*)
.settings(
- name := "async-http-client-backend",
- libraryDependencies ++= Seq(
- "org.asynchttpclient" % "async-http-client" % "2.4.7"
- )
- ) dependsOn core
+ name := "monix",
+ libraryDependencies ++= Seq("io.monix" %% "monix" % "3.0.0-RC1")
+ )
+ .dependsOn(core % "compile->compile;test->test")
-lazy val asyncHttpClientFutureBackend: Project = (project in file(
- "async-http-client-backend/future"))
+lazy val scalaz: Project = (project in file("implementations/scalaz"))
.settings(commonSettings: _*)
.settings(
- name := "async-http-client-backend-future"
- ) dependsOn asyncHttpClientBackend
+ name := "scalaz",
+ libraryDependencies ++= Seq("org.scalaz" %% "scalaz-concurrent" % "7.2.22")
+ )
+ .dependsOn(core % "compile->compile;test->test")
-lazy val asyncHttpClientScalazBackend: Project = (project in file(
- "async-http-client-backend/scalaz"))
+//----- backends
+//-- akka
+lazy val akkaHttpBackend: Project = (project in file("akka-http-backend"))
.settings(commonSettings: _*)
.settings(
- name := "async-http-client-backend-scalaz",
+ name := "akka-http-backend",
libraryDependencies ++= Seq(
- "org.scalaz" %% "scalaz-concurrent" % "7.2.23"
+ akkaHttp,
+ // provided as we don't want to create a transitive dependency on a specific streams version,
+ // just as akka-http doesn't
+ akkaStreams % "provided"
)
- ) dependsOn asyncHttpClientBackend
-
-lazy val asyncHttpClientMonixBackend: Project = (project in file(
- "async-http-client-backend/monix"))
- .settings(commonSettings: _*)
- .settings(
- name := "async-http-client-backend-monix",
- libraryDependencies ++= Seq(monix)
- ) dependsOn asyncHttpClientBackend
+ )
+ .dependsOn(core)
-lazy val asyncHttpClientCatsBackend: Project = (project in file(
- "async-http-client-backend/cats"))
- .settings(commonSettings: _*)
- .settings(
- name := "async-http-client-backend-cats",
- libraryDependencies ++= Seq(
- "org.typelevel" %% "cats-effect" % "1.0.0-RC"
+//-- async http client
+lazy val asyncHttpClientBackend: Project = {
+ (project in file("async-http-client-backend"))
+ .settings(commonSettings: _*)
+ .settings(
+ name := "async-http-client-backend",
+ libraryDependencies ++= Seq(
+ "org.asynchttpclient" % "async-http-client" % "2.4.7"
+ )
)
- ) dependsOn asyncHttpClientBackend
-
-lazy val asyncHttpClientFs2Backend: Project = (project in file(
- "async-http-client-backend/fs2"))
- .settings(commonSettings: _*)
- .settings(
- name := "async-http-client-backend-fs2",
- libraryDependencies ++= Seq(
- "com.github.zainab-ali" %% "fs2-reactive-streams" % "0.5.1"
+ .dependsOn(core)
+}
+
+def asyncHttpClientBackendProject(proj: String): Project = {
+ Project(s"asyncHttpClientBackend${proj.capitalize}", file(s"async-http-client-backend/$proj"))
+ .settings(commonSettings: _*)
+ .settings(name := s"async-http-client-backend-$proj")
+ .dependsOn(asyncHttpClientBackend)
+}
+
+lazy val asyncHttpClientFutureBackend: Project =
+ asyncHttpClientBackendProject("future")
+
+lazy val asyncHttpClientScalazBackend: Project =
+ asyncHttpClientBackendProject("scalaz")
+ .dependsOn(scalaz)
+
+lazy val asyncHttpClientMonixBackend: Project =
+ asyncHttpClientBackendProject("monix")
+ .dependsOn(monix)
+
+lazy val asyncHttpClientCatsBackend: Project =
+ asyncHttpClientBackendProject("cats")
+ .dependsOn(cats)
+
+lazy val asyncHttpClientFs2Backend: Project =
+ asyncHttpClientBackendProject("fs2")
+ .settings(
+ libraryDependencies ++= Seq(
+ "com.github.zainab-ali" %% "fs2-reactive-streams" % "0.5.1"
+ )
)
- ) dependsOn asyncHttpClientBackend
+//-- okhttp
lazy val okhttpBackend: Project = (project in file("okhttp-backend"))
.settings(commonSettings: _*)
.settings(
@@ -146,17 +159,23 @@ lazy val okhttpBackend: Project = (project in file("okhttp-backend"))
libraryDependencies ++= Seq(
"com.squareup.okhttp3" % "okhttp" % "3.10.0"
)
- ) dependsOn core
+ )
+ .dependsOn(core)
-lazy val okhttpMonixBackend: Project = (project in file("okhttp-backend/monix"))
- .settings(commonSettings: _*)
- .settings(
- name := "okhttp-backend-monix",
- libraryDependencies ++= Seq(monix)
- ) dependsOn okhttpBackend
+def okhttpBackendProject(proj: String): Project = {
+ Project(s"okhttpBackend${proj.capitalize}", file(s"okhttp-backend/$proj"))
+ .settings(commonSettings: _*)
+ .settings(name := s"okhttp-backend-$proj")
+ .dependsOn(okhttpBackend)
+}
+
+lazy val okhttpMonixBackend: Project =
+ okhttpBackendProject("monix")
+ .dependsOn(monix)
lazy val circeVersion = "0.9.3"
+//----- json
lazy val circe: Project = (project in file("json/circe"))
.settings(commonSettings: _*)
.settings(
@@ -190,7 +209,8 @@ lazy val braveBackend: Project = (project in file("metrics/brave-backend"))
"io.zipkin.brave" % "brave-instrumentation-http-tests" % braveVersion % "test",
scalaTest % "test"
)
- ).dependsOn(core)
+ )
+ .dependsOn(core)
lazy val prometheusBackend: Project = (project in file("metrics/prometheus-backend"))
.settings(commonSettings: _*)
@@ -210,12 +230,24 @@ lazy val tests: Project = (project in file("tests"))
name := "tests",
libraryDependencies ++= Seq(
akkaHttp,
+ akkaStreams,
scalaTest,
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
"com.github.pathikrit" %% "better-files" % "3.4.0",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.scala-lang" % "scala-compiler" % scalaVersion.value
- ).map(_ % "test"),
- libraryDependencies += akkaStreams,
- ) dependsOn (core, akkaHttpBackend, asyncHttpClientFutureBackend, asyncHttpClientScalazBackend,
-asyncHttpClientMonixBackend, asyncHttpClientCatsBackend, asyncHttpClientFs2Backend, okhttpMonixBackend)
+ ).map(_ % "test")
+ )
+ .dependsOn(
+ core % "compile->compile;test->test",
+ cats % "compile->compile;test->test",
+ monix % "compile->compile;test->test",
+ scalaz % "compile->compile;test->test",
+ akkaHttpBackend,
+ asyncHttpClientFutureBackend,
+ asyncHttpClientScalazBackend,
+ asyncHttpClientMonixBackend,
+ asyncHttpClientCatsBackend,
+ asyncHttpClientFs2Backend,
+ okhttpMonixBackend
+ )
diff --git a/core/src/test/scala/com/softwaremill/sttp/testing/streaming/ConvertToFuture.scala b/core/src/test/scala/com/softwaremill/sttp/testing/streaming/ConvertToFuture.scala
new file mode 100644
index 0000000..9438890
--- /dev/null
+++ b/core/src/test/scala/com/softwaremill/sttp/testing/streaming/ConvertToFuture.scala
@@ -0,0 +1,26 @@
+package com.softwaremill.sttp.testing.streaming
+
+import com.softwaremill.sttp.Id
+import scala.concurrent.Future
+import scala.language.higherKinds
+import scala.util.Try
+
+trait ConvertToFuture[R[_]] {
+ def toFuture[T](value: R[T]): Future[T]
+}
+
+object ConvertToFuture {
+
+ val id: ConvertToFuture[Id] = new ConvertToFuture[Id] {
+ override def toFuture[T](value: Id[T]): Future[T] =
+ Future.successful(value)
+ }
+
+ val future: ConvertToFuture[Future] = new ConvertToFuture[Future] {
+ override def toFuture[T](value: Future[T]): Future[T] = value
+ }
+
+ val scalaTry: ConvertToFuture[Try] = new ConvertToFuture[Try] {
+ override def toFuture[T](value: Try[T]): Future[T] = Future.fromTry(value)
+ }
+}
diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/TestStreamingBackend.scala b/core/src/test/scala/com/softwaremill/sttp/testing/streaming/TestStreamingBackend.scala
index 5fc1c57..266c402 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/streaming/TestStreamingBackend.scala
+++ b/core/src/test/scala/com/softwaremill/sttp/testing/streaming/TestStreamingBackend.scala
@@ -1,13 +1,13 @@
-package com.softwaremill.sttp.streaming
+package com.softwaremill.sttp.testing.streaming
-import com.softwaremill.sttp.{ForceWrappedValue, SttpBackend}
+import com.softwaremill.sttp.SttpBackend
import scala.language.higherKinds
trait TestStreamingBackend[R[_], S] {
implicit def backend: SttpBackend[R, S]
- implicit def forceResponse: ForceWrappedValue[R]
+ implicit def convertToFuture: ConvertToFuture[R]
def bodyProducer(body: String): S
diff --git a/implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala b/implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala
new file mode 100644
index 0000000..02f9643
--- /dev/null
+++ b/implementations/cats/src/main/scala/com/softwaremill/sttp/impl/cats/AsyncMonadAsyncError.scala
@@ -0,0 +1,26 @@
+package com.softwaremill.sttp.impl.cats
+
+import _root_.cats.effect.{Async, Effect}
+import com.softwaremill.sttp.MonadAsyncError
+
+import scala.language.higherKinds
+
+class AsyncMonadAsyncError[F[_]](implicit F: Async[F]) extends MonadAsyncError[F] {
+
+ override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
+ F.async(register)
+
+ override def unit[T](t: T): F[T] = F.pure(t)
+
+ override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f)
+
+ override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] =
+ F.flatMap(fa)(f)
+
+ override def error[T](t: Throwable): F[T] = F.raiseError(t)
+
+ override protected def handleWrappedError[T](rt: F[T])(h: PartialFunction[Throwable, F[T]]): F[T] =
+ F.recoverWith(rt)(h)
+}
+
+class EffectMonadAsyncError[F[_]](implicit F: Effect[F]) extends AsyncMonadAsyncError[F]
diff --git a/implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala b/implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala
new file mode 100644
index 0000000..65be9f2
--- /dev/null
+++ b/implementations/cats/src/test/scala/com/softwaremill/sttp/impl/cats/package.scala
@@ -0,0 +1,13 @@
+package com.softwaremill.sttp.impl
+
+import _root_.cats.effect.IO
+import com.softwaremill.sttp.testing.streaming.ConvertToFuture
+
+import scala.concurrent.Future
+
+package object cats {
+
+ val convertToFuture: ConvertToFuture[IO] = new ConvertToFuture[IO] {
+ override def toFuture[T](value: IO[T]): Future[T] = value.unsafeToFuture()
+ }
+}
diff --git a/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala b/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala
new file mode 100644
index 0000000..1806100
--- /dev/null
+++ b/implementations/monix/src/main/scala/com/softwaremill/sttp/impl/monix/TaskMonadAsyncError.scala
@@ -0,0 +1,31 @@
+package com.softwaremill.sttp.impl.monix
+
+import com.softwaremill.sttp.MonadAsyncError
+import _root_.monix.eval.Task
+import _root_.monix.execution.Cancelable
+
+import scala.util.{Failure, Success}
+
+object TaskMonadAsyncError extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.now(t)
+
+ override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
+ Task.async { (_, cb) =>
+ register {
+ case Left(t) => cb(Failure(t))
+ case Right(t) => cb(Success(t))
+ }
+
+ Cancelable.empty
+ }
+
+ override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
+
+ override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] =
+ rt.onErrorRecoverWith(h)
+}
diff --git a/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala
new file mode 100644
index 0000000..c6e44da
--- /dev/null
+++ b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/MonixTestStreamingBackend.scala
@@ -0,0 +1,22 @@
+package com.softwaremill.sttp.impl.monix
+
+import java.nio.ByteBuffer
+
+import _root_.monix.eval.Task
+import _root_.monix.reactive.Observable
+import com.softwaremill.sttp.testing.streaming.{ConvertToFuture, TestStreamingBackend}
+
+trait MonixTestStreamingBackend extends TestStreamingBackend[Task, Observable[ByteBuffer]] {
+
+ override implicit def convertToFuture: ConvertToFuture[Task] = com.softwaremill.sttp.impl.monix.convertToFuture
+
+ override def bodyProducer(body: String): Observable[ByteBuffer] =
+ Observable.fromIterable(body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b))))
+
+ override def bodyConsumer(stream: Observable[ByteBuffer]): Task[String] =
+ stream
+ .flatMap(bb => Observable.fromIterable(bb.array()))
+ .toListL
+ .map(bs => new String(bs.toArray, "utf8"))
+
+}
diff --git a/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala
new file mode 100644
index 0000000..25cf7e1
--- /dev/null
+++ b/implementations/monix/src/test/scala/com/softwaremill/sttp/impl/monix/package.scala
@@ -0,0 +1,15 @@
+package com.softwaremill.sttp.impl
+
+import scala.concurrent.Future
+
+import _root_.monix.eval.Task
+import com.softwaremill.sttp.testing.streaming.ConvertToFuture
+
+package object monix {
+
+ val convertToFuture: ConvertToFuture[Task] = new ConvertToFuture[Task] {
+ import _root_.monix.execution.Scheduler.Implicits.global
+
+ override def toFuture[T](value: Task[T]): Future[T] = value.runAsync
+ }
+}
diff --git a/implementations/scalaz/src/main/scala/com/softwaremill/sttp/impl/scalaz/TaskMonadAsyncError.scala b/implementations/scalaz/src/main/scala/com/softwaremill/sttp/impl/scalaz/TaskMonadAsyncError.scala
new file mode 100644
index 0000000..8535321
--- /dev/null
+++ b/implementations/scalaz/src/main/scala/com/softwaremill/sttp/impl/scalaz/TaskMonadAsyncError.scala
@@ -0,0 +1,28 @@
+package com.softwaremill.sttp.impl.scalaz
+
+import com.softwaremill.sttp.MonadAsyncError
+
+import scalaz.concurrent.Task
+import scalaz.{-\/, \/-}
+
+object TaskMonadAsyncError extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.point(t)
+
+ override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
+ Task.async { cb =>
+ register {
+ case Left(t) => cb(-\/(t))
+ case Right(t) => cb(\/-(t))
+ }
+ }
+
+ override def error[T](t: Throwable): Task[T] = Task.fail(t)
+
+ override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] =
+ rt.handleWith(h)
+}
diff --git a/implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala b/implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala
new file mode 100644
index 0000000..72dbf31
--- /dev/null
+++ b/implementations/scalaz/src/test/scala/com/softwaremill/sttp/impl/scalaz/package.scala
@@ -0,0 +1,25 @@
+package com.softwaremill.sttp.impl
+
+import com.softwaremill.sttp.testing.streaming.ConvertToFuture
+
+import _root_.scalaz.concurrent.Task
+import _root_.scalaz.{-\/, \/-}
+import scala.concurrent.{Future, Promise}
+import scala.util.{Failure, Success}
+
+package object scalaz {
+
+ val convertToFuture: ConvertToFuture[Task] = new ConvertToFuture[Task] {
+ // from https://github.com/Verizon/delorean
+ override def toFuture[T](value: Task[T]): Future[T] = {
+ val p = Promise[T]()
+
+ value.unsafePerformAsync {
+ case \/-(a) => p.complete(Success(a)); ()
+ case -\/(t) => p.complete(Failure(t)); ()
+ }
+
+ p.future
+ }
+ }
+}
diff --git a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala
index bda8959..332d8d0 100644
--- a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala
+++ b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala
@@ -3,21 +3,22 @@ package com.softwaremill.sttp.okhttp.monix
import java.nio.ByteBuffer
import java.util.concurrent.ArrayBlockingQueue
+import com.softwaremill.sttp.impl.monix.TaskMonadAsyncError
import com.softwaremill.sttp.{SttpBackend, _}
import com.softwaremill.sttp.okhttp.{OkHttpAsyncBackend, OkHttpBackend}
import monix.eval.Task
import monix.execution.Ack.Continue
-import monix.execution.{Ack, Cancelable, Scheduler}
+import monix.execution.{Ack, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody}
import okio.BufferedSink
import scala.concurrent.Future
-import scala.util.{Failure, Success, Try}
+import scala.util.{Success, Try}
class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(implicit s: Scheduler)
- extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonad, closeClient) {
+ extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonadAsyncError, closeClient) {
override def streamToRequestBody(stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
Some(new OkHttpRequestBody() {
@@ -40,7 +41,7 @@ class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(im
val blockingQueue = new ArrayBlockingQueue[Either[Throwable, T]](1)
- observable.executeWithFork.subscribe(new Subscriber[T] {
+ observable.executeAsync.subscribe(new Subscriber[T] {
override implicit def scheduler: Scheduler = s
override def onError(ex: Throwable): Unit = {
@@ -86,29 +87,3 @@ object OkHttpMonixBackend {
implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] =
OkHttpMonixBackend(client, closeClient = false)(s)
}
-
-private[monix] object TaskMonad extends MonadAsyncError[Task] {
- override def unit[T](t: T): Task[T] = Task.now(t)
-
- override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f)
-
- override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
- fa.flatMap(f)
-
- override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
- Task.async { (_, cb) =>
- register {
- case Left(t) => cb(Failure(t))
- case Right(t) => cb(Success(t))
- }
-
- Cancelable.empty
- }
-
- override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
-
- override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] =
- rt.onErrorRecoverWith {
- case t => h(t)
- }
-}
diff --git a/tests/src/test/resources/logback.xml b/tests/src/test/resources/logback.xml
new file mode 100644
index 0000000..1470b51
--- /dev/null
+++ b/tests/src/test/resources/logback.xml
@@ -0,0 +1,12 @@
+<configuration>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
diff --git a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala
index 392f6d8..821b951 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala
@@ -21,6 +21,7 @@ import com.softwaremill.sttp.asynchttpclient.monix.AsyncHttpClientMonixBackend
import com.softwaremill.sttp.asynchttpclient.scalaz.AsyncHttpClientScalazBackend
import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend
import com.softwaremill.sttp.okhttp.{OkHttpFutureBackend, OkHttpSyncBackend}
+import com.softwaremill.sttp.testing.streaming.ConvertToFuture
import com.typesafe.scalalogging.StrictLogging
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.{path => _, _}
@@ -180,19 +181,22 @@ class BasicTests
var closeBackends: List[() => Unit] = Nil
- runTests("HttpURLConnection")(HttpURLConnectionBackend(), ForceWrappedValue.id)
- runTests("TryHttpURLConnection")(TryHttpURLConnectionBackend(), ForceWrappedValue.scalaTry)
- runTests("Akka HTTP")(AkkaHttpBackend.usingActorSystem(actorSystem), ForceWrappedValue.future)
- runTests("Async Http Client - Future")(AsyncHttpClientFutureBackend(), ForceWrappedValue.future)
- runTests("Async Http Client - Scalaz")(AsyncHttpClientScalazBackend(), ForceWrappedValue.scalazTask)
- runTests("Async Http Client - Monix")(AsyncHttpClientMonixBackend(), ForceWrappedValue.monixTask)
- runTests("Async Http Client - Cats Effect")(AsyncHttpClientCatsBackend[cats.effect.IO](), ForceWrappedValue.catsIo)
- runTests("OkHttpSyncClientHandler")(OkHttpSyncBackend(), ForceWrappedValue.id)
- runTests("OkHttpAsyncClientHandler - Future")(OkHttpFutureBackend(), ForceWrappedValue.future)
- runTests("OkHttpAsyncClientHandler - Monix")(OkHttpMonixBackend(), ForceWrappedValue.monixTask)
-
- def runTests[R[_]](name: String)(implicit backend: SttpBackend[R, Nothing],
- forceResponse: ForceWrappedValue[R]): Unit = {
+ runTests("HttpURLConnection")(HttpURLConnectionBackend(), ConvertToFuture.id)
+ runTests("TryHttpURLConnection")(TryHttpURLConnectionBackend(), ConvertToFuture.scalaTry)
+ runTests("Akka HTTP")(AkkaHttpBackend.usingActorSystem(actorSystem), ConvertToFuture.future)
+ runTests("Async Http Client - Future")(AsyncHttpClientFutureBackend(), ConvertToFuture.future)
+ runTests("Async Http Client - Scalaz")(AsyncHttpClientScalazBackend(),
+ com.softwaremill.sttp.impl.scalaz.convertToFuture)
+ runTests("Async Http Client - Monix")(AsyncHttpClientMonixBackend(), com.softwaremill.sttp.impl.monix.convertToFuture)
+ runTests("Async Http Client - Cats Effect")(AsyncHttpClientCatsBackend[cats.effect.IO](),
+ com.softwaremill.sttp.impl.cats.convertToFuture)
+ runTests("OkHttpSyncClientHandler")(OkHttpSyncBackend(), ConvertToFuture.id)
+ runTests("OkHttpAsyncClientHandler - Future")(OkHttpFutureBackend(), ConvertToFuture.future)
+ runTests("OkHttpAsyncClientHandler - Monix")(OkHttpMonixBackend(), com.softwaremill.sttp.impl.monix.convertToFuture)
+
+ def runTests[R[_]](name: String)(implicit
+ backend: SttpBackend[R, Nothing],
+ convertToFuture: ConvertToFuture[R]): Unit = {
closeBackends = (() => backend.close()) :: closeBackends
diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
index e3dbe2c..b2b44a2 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
@@ -3,6 +3,7 @@ package com.softwaremill.sttp
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import com.softwaremill.sttp.streaming._
+import com.softwaremill.sttp.testing.streaming.TestStreamingBackend
import com.typesafe.scalalogging.StrictLogging
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaHttpStreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaHttpStreamingTests.scala
index d8fbb82..691df81 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaHttpStreamingTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/AkkaHttpStreamingTests.scala
@@ -5,8 +5,9 @@ import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
+import com.softwaremill.sttp.SttpBackend
import com.softwaremill.sttp.akkahttp.AkkaHttpBackend
-import com.softwaremill.sttp.{ForceWrappedValue, SttpBackend}
+import com.softwaremill.sttp.testing.streaming.{ConvertToFuture, TestStreamingBackend}
import scala.concurrent.Future
@@ -16,8 +17,8 @@ class AkkaHttpStreamingTests(actorSystem: ActorSystem)(implicit materializer: Ma
override implicit val backend: SttpBackend[Future, Source[ByteString, Any]] =
AkkaHttpBackend.usingActorSystem(actorSystem)
- override implicit val forceResponse: ForceWrappedValue[Future] =
- ForceWrappedValue.future
+ override implicit val convertToFuture: ConvertToFuture[Future] =
+ ConvertToFuture.future
override def bodyProducer(body: String): Source[ByteString, NotUsed] =
Source.single(ByteString(body))
diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala
index 82f36be..680e91a 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientFs2StreamingTests.scala
@@ -3,18 +3,19 @@ package com.softwaremill.sttp.streaming
import java.nio.ByteBuffer
import cats.effect._
-import cats.implicits._
+import cats.instances.string._
+import com.softwaremill.sttp.SttpBackend
import com.softwaremill.sttp.asynchttpclient.fs2.AsyncHttpClientFs2Backend
-import com.softwaremill.sttp.{ForceWrappedValue, SttpBackend}
-import fs2._
+import com.softwaremill.sttp.testing.streaming.{ConvertToFuture, TestStreamingBackend}
+import fs2.{Chunk, Stream, text}
class AsyncHttpClientFs2StreamingTests extends TestStreamingBackend[IO, Stream[IO, ByteBuffer]] {
override implicit val backend: SttpBackend[IO, Stream[IO, ByteBuffer]] =
AsyncHttpClientFs2Backend[IO]()
- override implicit val forceResponse: ForceWrappedValue[IO] =
- ForceWrappedValue.catsIo
+ override implicit val convertToFuture: ConvertToFuture[IO] =
+ com.softwaremill.sttp.impl.cats.convertToFuture
override def bodyProducer(body: String): Stream[IO, ByteBuffer] =
Stream.emits(body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b))))
diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientMonixStreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientMonixStreamingTests.scala
index 0357668..faebf8b 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientMonixStreamingTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/AsyncHttpClientMonixStreamingTests.scala
@@ -4,10 +4,11 @@ import java.nio.ByteBuffer
import com.softwaremill.sttp.SttpBackend
import com.softwaremill.sttp.asynchttpclient.monix.AsyncHttpClientMonixBackend
+import com.softwaremill.sttp.impl.monix.MonixTestStreamingBackend
import monix.eval.Task
import monix.reactive.Observable
-class AsyncHttpClientMonixStreamingTests extends MonixBaseBackend {
+class AsyncHttpClientMonixStreamingTests extends MonixTestStreamingBackend {
import monix.execution.Scheduler.Implicits.global
diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixBaseBackend.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixBaseBackend.scala
deleted file mode 100644
index acd67a7..0000000
--- a/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixBaseBackend.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.softwaremill.sttp.streaming
-
-import java.nio.ByteBuffer
-
-import com.softwaremill.sttp.ForceWrappedValue
-import monix.eval.Task
-import monix.reactive.Observable
-
-trait MonixBaseBackend extends TestStreamingBackend[Task, Observable[ByteBuffer]] {
-
- override implicit def forceResponse: ForceWrappedValue[Task] =
- ForceWrappedValue.monixTask
-
- override def bodyProducer(body: String): Observable[ByteBuffer] =
- Observable.fromIterable(body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b))))
-
- override def bodyConsumer(stream: Observable[ByteBuffer]): Task[String] =
- stream
- .flatMap(bb => Observable.fromIterable(bb.array()))
- .toListL
- .map(bs => new String(bs.toArray, "utf8"))
-
-}
diff --git a/tests/src/test/scala/com/softwaremill/sttp/streaming/OkHttpMonixStreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/streaming/OkHttpMonixStreamingTests.scala
index 9bfcc26..27a4517 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/streaming/OkHttpMonixStreamingTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/streaming/OkHttpMonixStreamingTests.scala
@@ -3,11 +3,12 @@ package com.softwaremill.sttp.streaming
import java.nio.ByteBuffer
import com.softwaremill.sttp.SttpBackend
+import com.softwaremill.sttp.impl.monix.MonixTestStreamingBackend
import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend
import monix.eval.Task
import monix.reactive.Observable
-class OkHttpMonixStreamingTests extends MonixBaseBackend {
+class OkHttpMonixStreamingTests extends MonixTestStreamingBackend {
import monix.execution.Scheduler.Implicits.global
diff --git a/tests/src/test/scala/com/softwaremill/sttp/testHelpers.scala b/tests/src/test/scala/com/softwaremill/sttp/testHelpers.scala
index 1a57b7e..12fe770 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/testHelpers.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/testHelpers.scala
@@ -12,11 +12,8 @@ import org.scalatest.exceptions.TestFailedException
import org.scalatest.matchers.{MatchResult, Matcher}
import org.scalatest.{BeforeAndAfterAll, Suite}
-import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.higherKinds
-import scala.util.Try
-import scalaz._
trait TestHttpServer extends BeforeAndAfterAll with ScalaFutures with TestingPatience {
this: Suite =>
@@ -38,60 +35,18 @@ trait TestHttpServer extends BeforeAndAfterAll with ScalaFutures with TestingPat
def port: Int
}
-trait ForceWrappedValue[R[_]] {
- def force[T](wrapped: R[T]): T
-}
-
-object ForceWrappedValue extends ScalaFutures with TestingPatience {
-
- val id = new ForceWrappedValue[Id] {
- override def force[T](wrapped: Id[T]): T =
- wrapped
- }
-
- val scalaTry = new ForceWrappedValue[Try] {
- override def force[T](wrapped: Try[T]): T = wrapped.get
- }
-
- val future = new ForceWrappedValue[Future] {
-
- override def force[T](wrapped: Future[T]): T =
- try {
- wrapped.futureValue
- } catch {
- case e: TestFailedException if e.getCause != null => throw e.getCause
- }
- }
- val scalazTask = new ForceWrappedValue[scalaz.concurrent.Task] {
- override def force[T](wrapped: scalaz.concurrent.Task[T]): T =
- wrapped.unsafePerformSyncAttempt match {
- case -\/(error) => throw error
- case \/-(value) => value
- }
- }
- val monixTask = new ForceWrappedValue[monix.eval.Task] {
- import monix.execution.Scheduler.Implicits.global
+trait ForceWrapped extends ScalaFutures with TestingPatience { this: Suite =>
+ type ConvertToFuture[R[_]] =
+ com.softwaremill.sttp.testing.streaming.ConvertToFuture[R]
- override def force[T](wrapped: monix.eval.Task[T]): T =
+ implicit class ForceDecorator[R[_], T](wrapped: R[T]) {
+ def force()(implicit ctf: ConvertToFuture[R]): T = {
try {
- wrapped.runAsync.futureValue
+ ctf.toFuture(wrapped).futureValue
} catch {
case e: TestFailedException if e.getCause != null => throw e.getCause
}
- }
- val catsIo = new ForceWrappedValue[cats.effect.IO] {
- override def force[T](wrapped: cats.effect.IO[T]): T =
- wrapped.unsafeRunSync
- }
-}
-
-trait ForceWrapped extends ScalaFutures with TestingPatience { this: Suite =>
- type ForceWrappedValue[R[_]] = com.softwaremill.sttp.ForceWrappedValue[R]
- val ForceWrappedValue: com.softwaremill.sttp.ForceWrappedValue.type =
- com.softwaremill.sttp.ForceWrappedValue
-
- implicit class ForceDecorator[R[_], T](wrapped: R[T]) {
- def force()(implicit fwv: ForceWrappedValue[R]): T = fwv.force(wrapped)
+ }
}
}