From f514d9d013b8059adeb4e326845f0b2b5e93ab02 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Fri, 11 Mar 2016 22:37:57 -0800 Subject: Clean up streaming code Closes #30 --- .../com/github/jodersky/flow/stream/Serial.scala | 12 ++++----- .../flow/stream/impl/SerialConnectionLogic.scala | 31 +++++++++++----------- .../flow/stream/impl/SerialConnectionStage.scala | 11 ++++---- 3 files changed, 26 insertions(+), 28 deletions(-) (limited to 'flow-stream') diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala index 26efcc9..6d250df 100644 --- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala +++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala @@ -1,16 +1,14 @@ package com.github.jodersky.flow package stream -import akka.actor._ +import scala.concurrent.Future + +import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider} +import akka.io.IO import akka.stream.scaladsl.Flow -import akka.stream._ -import akka.stream.stage._ -import akka.dispatch.ExecutionContexts import akka.util.ByteString -import com.github.jodersky.flow.{Serial => CoreSerial, _} -import scala.concurrent._ -import akka.io._ +import com.github.jodersky.flow.{Serial => CoreSerial} import impl._ object Serial extends ExtensionId[Serial] with ExtensionIdProvider { diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala index 35be464..526ea40 100644 --- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala +++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala @@ -2,21 +2,19 @@ package com.github.jodersky.flow package stream package impl -import akka.actor._ -import akka.stream.stage.GraphStageLogic.StageActorRef -import akka.util._ -import akka.stream._ -import akka.stream.stage._ -import scala.concurrent._ -import akka.io._ -import com.github.jodersky.flow.{Serial => CoreSerial} -import akka.stream.impl.ReactiveStreamsCompliance +import scala.concurrent.Promise +import akka.actor.{ActorRef, Terminated} +import akka.stream.{FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler} +import akka.util.ByteString + +import com.github.jodersky.flow.{Serial => CoreSerial, SerialSettings} /** * Graph logic that handles establishing and forwarding serial communication. * The underlying stream is closed when downstream (output) finishes, - * conversely upstream (input) closes are ignored. + * upstream (input) closes are ignored. */ private[stream] class SerialConnectionLogic( shape: FlowShape[ByteString, ByteString], @@ -40,8 +38,9 @@ private[stream] class SerialConnectionLogic( * explicitly specifying a sender. */ implicit private def self = stageActor.ref - /** Input handler for an established connection. - * @param operator ther operator actor of the established connection + /** + * Input handler for an established connection. + * @param operator the operator actor of the established connection */ class ConnectedInHandler(operator: ActorRef) extends InHandler { @@ -60,13 +59,13 @@ private[stream] class SerialConnectionLogic( } class ConnectedOutHandler(operator: ActorRef) extends OutHandler { - // alias stage actor as implicit to it will be used in "!" calls + // implicit alias to stage actor, so it will be used in "!" calls implicit val self = stageActor.ref override def onPull(): Unit = { // serial connections are at the end of the "backpressure chain", // they do not natively support backpressure (as does TCP for example) - // therefore nothing is done here as + // therefore nothing is done here } override def onDownstreamFinish(): Unit = { @@ -86,7 +85,7 @@ private[stream] class SerialConnectionLogic( setHandler(in, IgnoreTerminateInput) setHandler(out, IgnoreTerminateOutput) - /** Initial behaviour, before a serial connection is established. */ + /** Initial behavior, before a serial connection is established. */ private def connecting(event: (ActorRef, Any)): Unit = { val sender = event._1 val message = event._2 @@ -114,7 +113,7 @@ private[stream] class SerialConnectionLogic( pull(in) // start pulling input case other => - val ex = new StreamSerialException(s"Stage actor received unkown message [$other]") + val ex = new StreamSerialException(s"Stage actor received unknown message [$other]") failStage(ex) connectionPromise.failure(ex) diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala index 2efe3e9..bff1d26 100644 --- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala +++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala @@ -2,11 +2,12 @@ package com.github.jodersky.flow package stream package impl -import akka.actor._ -import akka.util._ -import akka.stream._ -import akka.stream.stage._ -import scala.concurrent._ +import scala.concurrent.{Future, Promise} + +import akka.actor.ActorRef +import akka.stream.{Attributes, FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue} +import akka.util.ByteString /** * Graph stage that establishes and thereby materializes a serial connection. -- cgit v1.2.3