aboutsummaryrefslogtreecommitdiff
path: root/flow-stream
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-03-11 22:37:57 -0800
committerJakob Odersky <jakob@odersky.com>2016-03-11 22:37:57 -0800
commitf514d9d013b8059adeb4e326845f0b2b5e93ab02 (patch)
tree0f046791437f1c92df7ce3bb65f923286bf8be08 /flow-stream
parent0d319d22fc77df39563b743829d969f161d40e53 (diff)
downloadakka-serial-f514d9d013b8059adeb4e326845f0b2b5e93ab02.tar.gz
akka-serial-f514d9d013b8059adeb4e326845f0b2b5e93ab02.tar.bz2
akka-serial-f514d9d013b8059adeb4e326845f0b2b5e93ab02.zip
Clean up streaming code
Closes #30
Diffstat (limited to 'flow-stream')
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala12
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala31
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala11
3 files changed, 26 insertions, 28 deletions
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
index 26efcc9..6d250df 100644
--- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
@@ -1,16 +1,14 @@
package com.github.jodersky.flow
package stream
-import akka.actor._
+import scala.concurrent.Future
+
+import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider}
+import akka.io.IO
import akka.stream.scaladsl.Flow
-import akka.stream._
-import akka.stream.stage._
-import akka.dispatch.ExecutionContexts
import akka.util.ByteString
-import com.github.jodersky.flow.{Serial => CoreSerial, _}
-import scala.concurrent._
-import akka.io._
+import com.github.jodersky.flow.{Serial => CoreSerial}
import impl._
object Serial extends ExtensionId[Serial] with ExtensionIdProvider {
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
index 35be464..526ea40 100644
--- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
@@ -2,21 +2,19 @@ package com.github.jodersky.flow
package stream
package impl
-import akka.actor._
-import akka.stream.stage.GraphStageLogic.StageActorRef
-import akka.util._
-import akka.stream._
-import akka.stream.stage._
-import scala.concurrent._
-import akka.io._
-import com.github.jodersky.flow.{Serial => CoreSerial}
-import akka.stream.impl.ReactiveStreamsCompliance
+import scala.concurrent.Promise
+import akka.actor.{ActorRef, Terminated}
+import akka.stream.{FlowShape, Inlet, Outlet}
+import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler}
+import akka.util.ByteString
+
+import com.github.jodersky.flow.{Serial => CoreSerial, SerialSettings}
/**
* Graph logic that handles establishing and forwarding serial communication.
* The underlying stream is closed when downstream (output) finishes,
- * conversely upstream (input) closes are ignored.
+ * upstream (input) closes are ignored.
*/
private[stream] class SerialConnectionLogic(
shape: FlowShape[ByteString, ByteString],
@@ -40,8 +38,9 @@ private[stream] class SerialConnectionLogic(
* explicitly specifying a sender. */
implicit private def self = stageActor.ref
- /** Input handler for an established connection.
- * @param operator ther operator actor of the established connection
+ /**
+ * Input handler for an established connection.
+ * @param operator the operator actor of the established connection
*/
class ConnectedInHandler(operator: ActorRef) extends InHandler {
@@ -60,13 +59,13 @@ private[stream] class SerialConnectionLogic(
}
class ConnectedOutHandler(operator: ActorRef) extends OutHandler {
- // alias stage actor as implicit to it will be used in "!" calls
+ // implicit alias to stage actor, so it will be used in "!" calls
implicit val self = stageActor.ref
override def onPull(): Unit = {
// serial connections are at the end of the "backpressure chain",
// they do not natively support backpressure (as does TCP for example)
- // therefore nothing is done here as
+ // therefore nothing is done here
}
override def onDownstreamFinish(): Unit = {
@@ -86,7 +85,7 @@ private[stream] class SerialConnectionLogic(
setHandler(in, IgnoreTerminateInput)
setHandler(out, IgnoreTerminateOutput)
- /** Initial behaviour, before a serial connection is established. */
+ /** Initial behavior, before a serial connection is established. */
private def connecting(event: (ActorRef, Any)): Unit = {
val sender = event._1
val message = event._2
@@ -114,7 +113,7 @@ private[stream] class SerialConnectionLogic(
pull(in) // start pulling input
case other =>
- val ex = new StreamSerialException(s"Stage actor received unkown message [$other]")
+ val ex = new StreamSerialException(s"Stage actor received unknown message [$other]")
failStage(ex)
connectionPromise.failure(ex)
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala
index 2efe3e9..bff1d26 100644
--- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala
@@ -2,11 +2,12 @@ package com.github.jodersky.flow
package stream
package impl
-import akka.actor._
-import akka.util._
-import akka.stream._
-import akka.stream.stage._
-import scala.concurrent._
+import scala.concurrent.{Future, Promise}
+
+import akka.actor.ActorRef
+import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
+import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue}
+import akka.util.ByteString
/**
* Graph stage that establishes and thereby materializes a serial connection.