aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/storage/channelStreams.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/storage/channelStreams.scala')
-rw-r--r--src/main/scala/xyz/driver/core/storage/channelStreams.scala112
1 files changed, 0 insertions, 112 deletions
diff --git a/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/src/main/scala/xyz/driver/core/storage/channelStreams.scala
deleted file mode 100644
index fc652be..0000000
--- a/src/main/scala/xyz/driver/core/storage/channelStreams.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-package xyz.driver.core.storage
-
-import java.nio.ByteBuffer
-import java.nio.channels.{ReadableByteChannel, WritableByteChannel}
-
-import akka.stream._
-import akka.stream.scaladsl.{Sink, Source}
-import akka.stream.stage._
-import akka.util.ByteString
-import akka.{Done, NotUsed}
-
-import scala.concurrent.{Future, Promise}
-import scala.util.control.NonFatal
-
-class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int)
- extends GraphStage[SourceShape[ByteString]] {
-
- val out = Outlet[ByteString]("ChannelSource.out")
- val shape = SourceShape(out)
-
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
- val channel = createChannel()
-
- object Handler extends OutHandler {
- override def onPull(): Unit = {
- try {
- val buffer = ByteBuffer.allocate(chunkSize)
- if (channel.read(buffer) > 0) {
- buffer.flip()
- push(out, ByteString.fromByteBuffer(buffer))
- } else {
- completeStage()
- }
- } catch {
- case NonFatal(_) =>
- channel.close()
- }
- }
- override def onDownstreamFinish(): Unit = {
- channel.close()
- }
- }
-
- setHandler(out, Handler)
- }
-
-}
-
-class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int)
- extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] {
-
- val in = Inlet[ByteString]("ChannelSink.in")
- val shape = SinkShape(in)
-
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
- val promise = Promise[Done]()
- val logic = new GraphStageLogic(shape) {
- val channel = createChannel()
-
- object Handler extends InHandler {
- override def onPush(): Unit = {
- try {
- val data = grab(in)
- channel.write(data.asByteBuffer)
- pull(in)
- } catch {
- case NonFatal(e) =>
- channel.close()
- promise.failure(e)
- }
- }
-
- override def onUpstreamFinish(): Unit = {
- channel.close()
- completeStage()
- promise.success(Done)
- }
-
- override def onUpstreamFailure(ex: Throwable): Unit = {
- channel.close()
- promise.failure(ex)
- }
- }
-
- setHandler(in, Handler)
-
- override def preStart(): Unit = {
- pull(in)
- }
- }
- (logic, promise.future)
- }
-
-}
-
-object ChannelStream {
-
- def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = {
- Source
- .fromGraph(new ChannelSource(channel, chunkSize))
- .withAttributes(Attributes(ActorAttributes.IODispatcher))
- .async
- }
-
- def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = {
- Sink
- .fromGraph(new ChannelSink(channel, chunkSize))
- .withAttributes(Attributes(ActorAttributes.IODispatcher))
- .async
- }
-
-}