aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBjørn Madsen <bm@aeons.dk>2017-08-28 21:56:18 +0200
committerBjørn Madsen <bm@aeons.dk>2017-08-30 13:50:12 +0200
commitdba1836f72dc38ab14ab4bb614b4101a80e97552 (patch)
tree419013cd265bf8b404d502cf47323efb28edd3d4
parent18d3ae2a0c2c7b73b747004127d7362edfbeee8c (diff)
downloadsttp-dba1836f72dc38ab14ab4bb614b4101a80e97552.tar.gz
sttp-dba1836f72dc38ab14ab4bb614b4101a80e97552.tar.bz2
sttp-dba1836f72dc38ab14ab4bb614b4101a80e97552.zip
Add fs2 streaming module
-rw-r--r--async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/Fs2AsyncHttpClientHandler.scala88
-rw-r--r--build.sbt18
2 files changed, 104 insertions, 2 deletions
diff --git a/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/Fs2AsyncHttpClientHandler.scala b/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/Fs2AsyncHttpClientHandler.scala
new file mode 100644
index 0000000..91a3109
--- /dev/null
+++ b/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/Fs2AsyncHttpClientHandler.scala
@@ -0,0 +1,88 @@
+package com.softwaremill.sttp.asynchttpclient.fs2
+
+import java.nio.ByteBuffer
+
+import cats.effect._
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
+import com.softwaremill.sttp.{MonadAsyncError, SttpHandler}
+import fs2._
+import fs2.interop.reactivestreams._
+import org.asynchttpclient.{
+ AsyncHttpClient,
+ AsyncHttpClientConfig,
+ DefaultAsyncHttpClient
+}
+import org.reactivestreams.Publisher
+
+import scala.concurrent.ExecutionContext
+import scala.language.higherKinds
+
+class Fs2AsyncHttpClientHandler[F[_]: Effect] private (
+ asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean)(implicit ec: ExecutionContext)
+ extends AsyncHttpClientHandler[F, Stream[F, ByteBuffer]](
+ asyncHttpClient,
+ new EffectMonad,
+ closeClient
+ ) {
+
+ override protected def streamBodyToPublisher(
+ s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] =
+ s.toUnicastPublisher
+
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] =
+ p.toStream[F]
+}
+
+object Fs2AsyncHttpClientHandler {
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def apply[F[_]: Effect]()(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpHandler[F, Stream[F, ByteBuffer]] =
+ new Fs2AsyncHttpClientHandler[F](new DefaultAsyncHttpClient(),
+ closeClient = true)
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def usingConfig[F[_]: Effect](cfg: AsyncHttpClientConfig)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpHandler[F, Stream[F, ByteBuffer]] =
+ new Fs2AsyncHttpClientHandler[F](new DefaultAsyncHttpClient(cfg),
+ closeClient = true)
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def usingClient[F[_]: Effect](client: AsyncHttpClient)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpHandler[F, Stream[F, ByteBuffer]] =
+ new Fs2AsyncHttpClientHandler[F](client, closeClient = false)
+}
+
+private[fs2] class EffectMonad[F[_]](implicit F: Effect[F])
+ extends MonadAsyncError[F] {
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
+ F.async(register)
+
+ override def unit[T](t: T): F[T] = F.pure(t)
+
+ override def map[T, T2](fa: F[T], f: (T) => T2): F[T2] = F.map(fa)(f)
+
+ override def flatMap[T, T2](fa: F[T], f: (T) => F[T2]): F[T2] =
+ F.flatMap(fa)(f)
+
+ override def error[T](t: Throwable): F[T] = F.raiseError(t)
+}
diff --git a/build.sbt b/build.sbt
index a1896fa..3e2f310 100644
--- a/build.sbt
+++ b/build.sbt
@@ -51,6 +51,7 @@ lazy val rootProject = (project in file("."))
scalazAsyncHttpClientHandler,
monixAsyncHttpClientHandler,
catsAsyncHttpClientHandler,
+ fs2AsyncHttpClientHandler,
okhttpClientHandler,
okhttpMonixClientHandler,
circe,
@@ -121,6 +122,16 @@ lazy val catsAsyncHttpClientHandler: Project = (project in file(
)
) dependsOn asyncHttpClientHandler
+lazy val fs2AsyncHttpClientHandler: Project = (project in file(
+ "async-http-client-handler/fs2"))
+ .settings(commonSettings: _*)
+ .settings(
+ name := "async-http-client-handler-fs2",
+ libraryDependencies ++= Seq(
+ "com.github.zainab-ali" %% "fs2-reactive-streams" % "0.2.2"
+ )
+ ) dependsOn asyncHttpClientHandler
+
lazy val okhttpClientHandler: Project = (project in file(
"okhttp-client-handler"))
.settings(commonSettings: _*)
@@ -163,5 +174,8 @@ lazy val tests: Project = (project in file("tests"))
"ch.qos.logback" % "logback-classic" % "1.2.3"
).map(_ % "test"),
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value % "test"
- ) dependsOn (core, akkaHttpHandler, futureAsyncHttpClientHandler, scalazAsyncHttpClientHandler,
-monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, okhttpMonixClientHandler)
+ ) dependsOn (
+ core, akkaHttpHandler, futureAsyncHttpClientHandler, scalazAsyncHttpClientHandler,
+ monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, fs2AsyncHttpClientHandler,
+ okhttpClientHandler, okhttpMonixClientHandler
+)