diff options
-rw-r--r-- | async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/Fs2AsyncHttpClientHandler.scala | 88 | ||||
-rw-r--r-- | build.sbt | 18 |
2 files changed, 104 insertions, 2 deletions
diff --git a/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/Fs2AsyncHttpClientHandler.scala b/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/Fs2AsyncHttpClientHandler.scala new file mode 100644 index 0000000..91a3109 --- /dev/null +++ b/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/Fs2AsyncHttpClientHandler.scala @@ -0,0 +1,88 @@ +package com.softwaremill.sttp.asynchttpclient.fs2 + +import java.nio.ByteBuffer + +import cats.effect._ +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler +import com.softwaremill.sttp.{MonadAsyncError, SttpHandler} +import fs2._ +import fs2.interop.reactivestreams._ +import org.asynchttpclient.{ + AsyncHttpClient, + AsyncHttpClientConfig, + DefaultAsyncHttpClient +} +import org.reactivestreams.Publisher + +import scala.concurrent.ExecutionContext +import scala.language.higherKinds + +class Fs2AsyncHttpClientHandler[F[_]: Effect] private ( + asyncHttpClient: AsyncHttpClient, + closeClient: Boolean)(implicit ec: ExecutionContext) + extends AsyncHttpClientHandler[F, Stream[F, ByteBuffer]]( + asyncHttpClient, + new EffectMonad, + closeClient + ) { + + override protected def streamBodyToPublisher( + s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] = + s.toUnicastPublisher + + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] = + p.toStream[F] +} + +object Fs2AsyncHttpClientHandler { + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def apply[F[_]: Effect]()( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpHandler[F, Stream[F, ByteBuffer]] = + new Fs2AsyncHttpClientHandler[F](new DefaultAsyncHttpClient(), + closeClient = true) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def usingConfig[F[_]: Effect](cfg: AsyncHttpClientConfig)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpHandler[F, Stream[F, ByteBuffer]] = + new Fs2AsyncHttpClientHandler[F](new DefaultAsyncHttpClient(cfg), + closeClient = true) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def usingClient[F[_]: Effect](client: AsyncHttpClient)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpHandler[F, Stream[F, ByteBuffer]] = + new Fs2AsyncHttpClientHandler[F](client, closeClient = false) +} + +private[fs2] class EffectMonad[F[_]](implicit F: Effect[F]) + extends MonadAsyncError[F] { + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = + F.async(register) + + override def unit[T](t: T): F[T] = F.pure(t) + + override def map[T, T2](fa: F[T], f: (T) => T2): F[T2] = F.map(fa)(f) + + override def flatMap[T, T2](fa: F[T], f: (T) => F[T2]): F[T2] = + F.flatMap(fa)(f) + + override def error[T](t: Throwable): F[T] = F.raiseError(t) +} @@ -51,6 +51,7 @@ lazy val rootProject = (project in file(".")) scalazAsyncHttpClientHandler, monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, + fs2AsyncHttpClientHandler, okhttpClientHandler, okhttpMonixClientHandler, circe, @@ -121,6 +122,16 @@ lazy val catsAsyncHttpClientHandler: Project = (project in file( ) ) dependsOn asyncHttpClientHandler +lazy val fs2AsyncHttpClientHandler: Project = (project in file( + "async-http-client-handler/fs2")) + .settings(commonSettings: _*) + .settings( + name := "async-http-client-handler-fs2", + libraryDependencies ++= Seq( + "com.github.zainab-ali" %% "fs2-reactive-streams" % "0.2.2" + ) + ) dependsOn asyncHttpClientHandler + lazy val okhttpClientHandler: Project = (project in file( "okhttp-client-handler")) .settings(commonSettings: _*) @@ -163,5 +174,8 @@ lazy val tests: Project = (project in file("tests")) "ch.qos.logback" % "logback-classic" % "1.2.3" ).map(_ % "test"), libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value % "test" - ) dependsOn (core, akkaHttpHandler, futureAsyncHttpClientHandler, scalazAsyncHttpClientHandler, -monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, okhttpMonixClientHandler) + ) dependsOn ( + core, akkaHttpHandler, futureAsyncHttpClientHandler, scalazAsyncHttpClientHandler, + monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, fs2AsyncHttpClientHandler, + okhttpClientHandler, okhttpMonixClientHandler +) |