diff options
Diffstat (limited to 'async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala')
-rw-r--r-- | async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala | 47 |
1 files changed, 16 insertions, 31 deletions
diff --git a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala index 4c6dc71..e608499 100644 --- a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala +++ b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala @@ -13,25 +13,21 @@ import org.reactivestreams.Publisher import scala.concurrent.ExecutionContext import scala.language.higherKinds -class AsyncHttpClientFs2Backend[F[_]: Effect] private ( - asyncHttpClient: AsyncHttpClient, - closeClient: Boolean)(implicit ec: ExecutionContext) +class AsyncHttpClientFs2Backend[F[_]: Effect] private (asyncHttpClient: AsyncHttpClient, closeClient: Boolean)( + implicit ec: ExecutionContext) extends AsyncHttpClientBackend[F, Stream[F, ByteBuffer]]( asyncHttpClient, new EffectMonad, closeClient ) { - override protected def streamBodyToPublisher( - s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] = + override protected def streamBodyToPublisher(s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] = s.toUnicastPublisher - override protected def publisherToStreamBody( - p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] = + override protected def publisherToStreamBody(p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] = p.toStream[F] - override protected def publisherToString( - p: Publisher[ByteBuffer]): F[String] = { + override protected def publisherToString(p: Publisher[ByteBuffer]): F[String] = { val bytes = p .toStream[F] .compile @@ -43,23 +39,18 @@ class AsyncHttpClientFs2Backend[F[_]: Effect] private ( object AsyncHttpClientFs2Backend { - private def apply[F[_]: Effect](asyncHttpClient: AsyncHttpClient, - closeClient: Boolean)( + private def apply[F[_]: Effect](asyncHttpClient: AsyncHttpClient, closeClient: Boolean)( implicit ec: ExecutionContext): SttpBackend[F, Stream[F, ByteBuffer]] = - new FollowRedirectsBackend( - new AsyncHttpClientFs2Backend(asyncHttpClient, closeClient)) + new FollowRedirectsBackend(new AsyncHttpClientFs2Backend(asyncHttpClient, closeClient)) /** * @param ec The execution context for running non-network related operations, * e.g. mapping responses. Defaults to the global execution * context. */ - def apply[F[_]: Effect](options: SttpBackendOptions = - SttpBackendOptions.Default)( - implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpBackend[F, Stream[F, ByteBuffer]] = - AsyncHttpClientFs2Backend[F](AsyncHttpClientBackend.defaultClient(options), - closeClient = true) + def apply[F[_]: Effect](options: SttpBackendOptions = SttpBackendOptions.Default)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global): SttpBackend[F, Stream[F, ByteBuffer]] = + AsyncHttpClientFs2Backend[F](AsyncHttpClientBackend.defaultClient(options), closeClient = true) /** * @param ec The execution context for running non-network related operations, @@ -67,10 +58,8 @@ object AsyncHttpClientFs2Backend { * context. */ def usingConfig[F[_]: Effect](cfg: AsyncHttpClientConfig)( - implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpBackend[F, Stream[F, ByteBuffer]] = - AsyncHttpClientFs2Backend[F](new DefaultAsyncHttpClient(cfg), - closeClient = true) + implicit ec: ExecutionContext = ExecutionContext.Implicits.global): SttpBackend[F, Stream[F, ByteBuffer]] = + AsyncHttpClientFs2Backend[F](new DefaultAsyncHttpClient(cfg), closeClient = true) /** * @param ec The execution context for running non-network related operations, @@ -78,16 +67,13 @@ object AsyncHttpClientFs2Backend { * context. */ def usingClient[F[_]: Effect](client: AsyncHttpClient)( - implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpBackend[F, Stream[F, ByteBuffer]] = + implicit ec: ExecutionContext = ExecutionContext.Implicits.global): SttpBackend[F, Stream[F, ByteBuffer]] = AsyncHttpClientFs2Backend[F](client, closeClient = false) } -private[fs2] class EffectMonad[F[_]](implicit F: Effect[F]) - extends MonadAsyncError[F] { +private[fs2] class EffectMonad[F[_]](implicit F: Effect[F]) extends MonadAsyncError[F] { - override def async[T]( - register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = + override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = F.async(register) override def unit[T](t: T): F[T] = F.pure(t) @@ -99,7 +85,6 @@ private[fs2] class EffectMonad[F[_]](implicit F: Effect[F]) override def error[T](t: Throwable): F[T] = F.raiseError(t) - override protected def handleWrappedError[T](rt: F[T])( - h: PartialFunction[Throwable, F[T]]): F[T] = + override protected def handleWrappedError[T](rt: F[T])(h: PartialFunction[Throwable, F[T]]): F[T] = F.recoverWith(rt)(h) } |