aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-04-18 02:34:57 -0700
committerJakob Odersky <jakob@odersky.com>2016-04-18 02:38:54 -0700
commit3a1b65b9c4f71d58b9f0e62c0052030aa38fb00d (patch)
treed6fda5890153c17ffaebbb6cc5c7d061be00d70a
parent36721ebc573e1bb5f3e3ab74ed2feb0f6bf0b6e7 (diff)
downloadakka-serial-3a1b65b9c4f71d58b9f0e62c0052030aa38fb00d.tar.gz
akka-serial-3a1b65b9c4f71d58b9f0e62c0052030aa38fb00d.tar.bz2
akka-serial-3a1b65b9c4f71d58b9f0e62c0052030aa38fb00d.zip
Implement stream interface for watchers
-rw-r--r--CHANGELOG.md6
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala10
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/StreamWatcherException.scala4
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala65
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherStage.scala38
5 files changed, 121 insertions, 2 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f534a30..d92fa74 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,7 @@
-# Version 2.5.1
-- Upgrade Akka to 2.4.3
+# Version 2.6.0
+- Implement streaming API for port watchers
+- Upgrade Akka to 2.4.4
+- Build for Scala 2.12.0-M4
# Version 2.5.0
- Rename main 'flow' project to 'flow-core'
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
index 6d250df..b7f0557 100644
--- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
@@ -1,6 +1,7 @@
package com.github.jodersky.flow
package stream
+import akka.stream.scaladsl.Source
import scala.concurrent.Future
import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider}
@@ -18,6 +19,8 @@ object Serial extends ExtensionId[Serial] with ExtensionIdProvider {
*/
case class Connection(port: String, settings: SerialSettings)
+ case class Watch(ports: Set[String])
+
def apply()(implicit system: ActorSystem): Serial = super.apply(system)
override def lookup() = Serial
@@ -54,4 +57,11 @@ class Serial(system: ExtendedActorSystem) extends Extension {
)
)
+ def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph(
+ new WatcherStage(
+ IO(CoreSerial)(system),
+ ports
+ )
+ )
+
}
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/StreamWatcherException.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/StreamWatcherException.scala
new file mode 100644
index 0000000..b9fa468
--- /dev/null
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/StreamWatcherException.scala
@@ -0,0 +1,4 @@
+package com.github.jodersky.flow
+package stream
+
+class StreamWatcherException(message: String, cause: Throwable = null) extends RuntimeException(message, cause)
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala
new file mode 100644
index 0000000..11f695b
--- /dev/null
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala
@@ -0,0 +1,65 @@
+package com.github.jodersky.flow
+package stream
+package impl
+
+import scala.concurrent.Promise
+
+import akka.actor.{ActorRef, Terminated}
+import akka.stream.SourceShape
+import akka.stream.stage.GraphStageLogic
+import com.github.jodersky.flow.{Serial => CoreSerial}
+
+private[stream] class WatcherLogic(
+ shape: SourceShape[String],
+ ioManager: ActorRef,
+ ports: Set[String],
+ watchPromise: Promise[Serial.Watch])
+ extends GraphStageLogic(shape) {
+ import GraphStageLogic._
+
+ implicit private def self = stageActor.ref
+
+ override def preStart(): Unit = {
+ getStageActor(receive)
+ stageActor watch ioManager
+ for (dir <- WatcherLogic.getDirs(ports)) {
+ ioManager ! CoreSerial.Watch(dir, skipInitial = false)
+ }
+ }
+
+ setHandler(shape.out, IgnoreTerminateOutput)
+
+ private def receive(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+
+ message match {
+
+ case Terminated(`ioManager`) =>
+ val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.")
+ failStage(ex)
+ watchPromise.failure(ex)
+
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason)
+ failStage(ex)
+ watchPromise.failure(ex)
+
+ case CoreSerial.Connected(port) =>
+ if (ports contains port) {
+ if (isAvailable(shape.out)) {
+ push(shape.out, port)
+ }
+ }
+
+ case other =>
+ failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]"))
+
+ }
+ }
+
+}
+
+private[stream] object WatcherLogic {
+ def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/"))
+}
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherStage.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherStage.scala
new file mode 100644
index 0000000..e236f5d
--- /dev/null
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherStage.scala
@@ -0,0 +1,38 @@
+package com.github.jodersky.flow
+package stream
+package impl
+
+import scala.concurrent.{Future, Promise}
+
+import akka.actor.ActorRef
+import akka.stream.{Attributes, Outlet, SourceShape}
+import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic}
+
+
+private[stream] class WatcherStage(
+ ioManager: ActorRef,
+ ports: Set[String]
+) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] {
+
+ val out = Outlet[String]("Watcher.out")
+
+ val shape = new SourceShape(out)
+
+ override def createLogicAndMaterializedValue(attributes: Attributes):
+ (GraphStageLogic, Future[Serial.Watch]) = {
+
+ val promise = Promise[Serial.Watch]
+
+ val logic = new WatcherLogic(
+ shape,
+ ioManager,
+ ports,
+ promise
+ )
+
+ (logic, promise.future)
+ }
+
+ override def toString = s"Watcher($ports)"
+
+}