aboutsummaryrefslogtreecommitdiff
path: root/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala')
-rw-r--r--async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala47
1 files changed, 16 insertions, 31 deletions
diff --git a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
index 4c6dc71..e608499 100644
--- a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
+++ b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
@@ -13,25 +13,21 @@ import org.reactivestreams.Publisher
import scala.concurrent.ExecutionContext
import scala.language.higherKinds
-class AsyncHttpClientFs2Backend[F[_]: Effect] private (
- asyncHttpClient: AsyncHttpClient,
- closeClient: Boolean)(implicit ec: ExecutionContext)
+class AsyncHttpClientFs2Backend[F[_]: Effect] private (asyncHttpClient: AsyncHttpClient, closeClient: Boolean)(
+ implicit ec: ExecutionContext)
extends AsyncHttpClientBackend[F, Stream[F, ByteBuffer]](
asyncHttpClient,
new EffectMonad,
closeClient
) {
- override protected def streamBodyToPublisher(
- s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] =
+ override protected def streamBodyToPublisher(s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] =
s.toUnicastPublisher
- override protected def publisherToStreamBody(
- p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] =
+ override protected def publisherToStreamBody(p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] =
p.toStream[F]
- override protected def publisherToString(
- p: Publisher[ByteBuffer]): F[String] = {
+ override protected def publisherToString(p: Publisher[ByteBuffer]): F[String] = {
val bytes = p
.toStream[F]
.compile
@@ -43,23 +39,18 @@ class AsyncHttpClientFs2Backend[F[_]: Effect] private (
object AsyncHttpClientFs2Backend {
- private def apply[F[_]: Effect](asyncHttpClient: AsyncHttpClient,
- closeClient: Boolean)(
+ private def apply[F[_]: Effect](asyncHttpClient: AsyncHttpClient, closeClient: Boolean)(
implicit ec: ExecutionContext): SttpBackend[F, Stream[F, ByteBuffer]] =
- new FollowRedirectsBackend(
- new AsyncHttpClientFs2Backend(asyncHttpClient, closeClient))
+ new FollowRedirectsBackend(new AsyncHttpClientFs2Backend(asyncHttpClient, closeClient))
/**
* @param ec The execution context for running non-network related operations,
* e.g. mapping responses. Defaults to the global execution
* context.
*/
- def apply[F[_]: Effect](options: SttpBackendOptions =
- SttpBackendOptions.Default)(
- implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
- : SttpBackend[F, Stream[F, ByteBuffer]] =
- AsyncHttpClientFs2Backend[F](AsyncHttpClientBackend.defaultClient(options),
- closeClient = true)
+ def apply[F[_]: Effect](options: SttpBackendOptions = SttpBackendOptions.Default)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global): SttpBackend[F, Stream[F, ByteBuffer]] =
+ AsyncHttpClientFs2Backend[F](AsyncHttpClientBackend.defaultClient(options), closeClient = true)
/**
* @param ec The execution context for running non-network related operations,
@@ -67,10 +58,8 @@ object AsyncHttpClientFs2Backend {
* context.
*/
def usingConfig[F[_]: Effect](cfg: AsyncHttpClientConfig)(
- implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
- : SttpBackend[F, Stream[F, ByteBuffer]] =
- AsyncHttpClientFs2Backend[F](new DefaultAsyncHttpClient(cfg),
- closeClient = true)
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global): SttpBackend[F, Stream[F, ByteBuffer]] =
+ AsyncHttpClientFs2Backend[F](new DefaultAsyncHttpClient(cfg), closeClient = true)
/**
* @param ec The execution context for running non-network related operations,
@@ -78,16 +67,13 @@ object AsyncHttpClientFs2Backend {
* context.
*/
def usingClient[F[_]: Effect](client: AsyncHttpClient)(
- implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
- : SttpBackend[F, Stream[F, ByteBuffer]] =
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global): SttpBackend[F, Stream[F, ByteBuffer]] =
AsyncHttpClientFs2Backend[F](client, closeClient = false)
}
-private[fs2] class EffectMonad[F[_]](implicit F: Effect[F])
- extends MonadAsyncError[F] {
+private[fs2] class EffectMonad[F[_]](implicit F: Effect[F]) extends MonadAsyncError[F] {
- override def async[T](
- register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
+ override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
F.async(register)
override def unit[T](t: T): F[T] = F.pure(t)
@@ -99,7 +85,6 @@ private[fs2] class EffectMonad[F[_]](implicit F: Effect[F])
override def error[T](t: Throwable): F[T] = F.raiseError(t)
- override protected def handleWrappedError[T](rt: F[T])(
- h: PartialFunction[Throwable, F[T]]): F[T] =
+ override protected def handleWrappedError[T](rt: F[T])(h: PartialFunction[Throwable, F[T]]): F[T] =
F.recoverWith(rt)(h)
}