blob: d00c056909269e36665bba27892f91f6d1a32a47 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
package com.softwaremill.sttp.impl.monix
import java.nio.ByteBuffer
import com.softwaremill.sttp.testing.ConvertToFuture
import com.softwaremill.sttp.testing.streaming.StreamingTest
import monix.eval.Task
import monix.reactive.Observable
abstract class MonixStreamingTest extends StreamingTest[Task, Observable[ByteBuffer]] {
override implicit val convertToFuture: ConvertToFuture[Task] = convertMonixTaskToFuture
override def bodyProducer(body: String): Observable[ByteBuffer] =
Observable
.fromIterable(
body.getBytes("utf-8")
)
.map(v => ByteBuffer.wrap(Array(v)))
override def bodyConsumer(stream: Observable[ByteBuffer]): Task[String] =
stream
.flatMap(v => Observable.fromIterable(v.array()))
.toListL
.map(bs => new String(bs.toArray, "utf8"))
}
|