diff options
author | adamw <adam@warski.org> | 2017-09-14 11:03:21 +0100 |
---|---|---|
committer | adamw <adam@warski.org> | 2017-09-14 11:03:21 +0100 |
commit | fbc71ee712635ed64c50ca694735a84ec794eb11 (patch) | |
tree | bf1dd7335306b7f320262d45d0d5b6d02f5a0b27 /async-http-client-backend/fs2 | |
parent | a971d409cb1063a2089d936abf3d3ab70bbbabb6 (diff) | |
download | sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.tar.gz sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.tar.bz2 sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.zip |
Renaming "handler" to "backend"
Diffstat (limited to 'async-http-client-backend/fs2')
-rw-r--r-- | async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala new file mode 100644 index 0000000..90db69c --- /dev/null +++ b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala @@ -0,0 +1,112 @@ +package com.softwaremill.sttp.asynchttpclient.fs2 + +import java.nio.ByteBuffer + +import cats.effect._ +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend +import com.softwaremill.sttp.{ + FollowRedirectsBackend, + MonadAsyncError, + SttpBackend, + Utf8, + concatByteBuffers +} +import fs2._ +import fs2.interop.reactivestreams._ +import org.asynchttpclient.{ + AsyncHttpClient, + AsyncHttpClientConfig, + DefaultAsyncHttpClient +} +import org.reactivestreams.Publisher + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration +import scala.language.higherKinds + +class AsyncHttpClientFs2Backend[F[_]: Effect] private ( + asyncHttpClient: AsyncHttpClient, + closeClient: Boolean)(implicit ec: ExecutionContext) + extends AsyncHttpClientBackend[F, Stream[F, ByteBuffer]]( + asyncHttpClient, + new EffectMonad, + closeClient + ) { + + override protected def streamBodyToPublisher( + s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] = + s.toUnicastPublisher + + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] = + p.toStream[F] + + override protected def publisherToString( + p: Publisher[ByteBuffer]): F[String] = { + val bytes = p + .toStream[F] + .runFold(ByteBuffer.allocate(0))(concatByteBuffers) + + implicitly[Effect[F]].map(bytes)(bb => new String(bb.array(), Utf8)) + } +} + +object AsyncHttpClientFs2Backend { + + private def apply[F[_]: Effect](asyncHttpClient: AsyncHttpClient, + closeClient: Boolean)( + implicit ec: ExecutionContext): SttpBackend[F, Stream[F, ByteBuffer]] = + new FollowRedirectsBackend( + new AsyncHttpClientFs2Backend(asyncHttpClient, closeClient)) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def apply[F[_]: Effect](connectionTimeout: FiniteDuration = + SttpBackend.DefaultConnectionTimeout)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpBackend[F, Stream[F, ByteBuffer]] = + AsyncHttpClientFs2Backend[F]( + AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt), + closeClient = true) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def usingConfig[F[_]: Effect](cfg: AsyncHttpClientConfig)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpBackend[F, Stream[F, ByteBuffer]] = + AsyncHttpClientFs2Backend[F](new DefaultAsyncHttpClient(cfg), + closeClient = true) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def usingClient[F[_]: Effect](client: AsyncHttpClient)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpBackend[F, Stream[F, ByteBuffer]] = + AsyncHttpClientFs2Backend[F](client, closeClient = false) +} + +private[fs2] class EffectMonad[F[_]](implicit F: Effect[F]) + extends MonadAsyncError[F] { + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = + F.async(register) + + override def unit[T](t: T): F[T] = F.pure(t) + + override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f) + + override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] = + F.flatMap(fa)(f) + + override def error[T](t: Throwable): F[T] = F.raiseError(t) +} |