aboutsummaryrefslogtreecommitdiff
path: root/tests/src/test/scala/com/softwaremill/sttp/streaming/MonixBaseBackend.scala
blob: acd67a7ac632879848c77a3bd61f0ab32e86e997 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.softwaremill.sttp.streaming

import java.nio.ByteBuffer

import com.softwaremill.sttp.ForceWrappedValue
import monix.eval.Task
import monix.reactive.Observable

trait MonixBaseBackend extends TestStreamingBackend[Task, Observable[ByteBuffer]] {

  override implicit def forceResponse: ForceWrappedValue[Task] =
    ForceWrappedValue.monixTask

  override def bodyProducer(body: String): Observable[ByteBuffer] =
    Observable.fromIterable(body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b))))

  override def bodyConsumer(stream: Observable[ByteBuffer]): Task[String] =
    stream
      .flatMap(bb => Observable.fromIterable(bb.array()))
      .toListL
      .map(bs => new String(bs.toArray, "utf8"))

}