diff options
author | Jakob Odersky <jakob@odersky.com> | 2016-01-24 20:21:17 -0800 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2016-02-03 20:46:28 -0800 |
commit | f865a76c2f441f619b069505b73fcbd1cba1a67c (patch) | |
tree | 3f53c519f4575037bdebf8c8399ca25d50649543 /flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala | |
parent | 46c30908f827e27b58166f56efa4f15917c1af4f (diff) | |
download | akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.tar.gz akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.tar.bz2 akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.zip |
Add support for Akka streams
Diffstat (limited to 'flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala')
-rw-r--r-- | flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala new file mode 100644 index 0000000..2efe3e9 --- /dev/null +++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala @@ -0,0 +1,48 @@ +package com.github.jodersky.flow +package stream +package impl + +import akka.actor._ +import akka.util._ +import akka.stream._ +import akka.stream.stage._ +import scala.concurrent._ + +/** + * Graph stage that establishes and thereby materializes a serial connection. + * The actual connection logic is deferred to [[SerialConnectionLogic]]. + */ +private[stream] class SerialConnectionStage( + manager: ActorRef, + port: String, + settings: SerialSettings, + failOnOverflow: Boolean, + bufferSize: Int +) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] { + + val in: Inlet[ByteString] = Inlet("Serial.in") + val out: Outlet[ByteString] = Outlet("Serial.out") + + val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): + (GraphStageLogic, Future[Serial.Connection]) = { + + val connectionPromise = Promise[Serial.Connection] + + val logic = new SerialConnectionLogic( + shape, + manager, + port, + settings, + failOnOverflow, + bufferSize, + connectionPromise + ) + + (logic, connectionPromise.future) + } + + override def toString = s"Serial($port)" + +} |