aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2017-01-08 21:16:25 +0100
committerJakob Odersky <jakob@odersky.com>2017-01-21 17:22:10 -0800
commit23959966760174477a6b0fcbf9dd1e8ef37c643b (patch)
tree9a0ee44eb43a8c13af57b0d06313f3aabf9e4555 /core
parent6c371ba6d69c891c1f0d6df00bb643e1d543cc9d (diff)
downloadakka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.gz
akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.bz2
akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.zip
Rename project to akka-serial
Diffstat (limited to 'core')
-rw-r--r--core/build.sbt5
-rw-r--r--core/src/main/scala/akka/serial/Serial.scala132
-rw-r--r--core/src/main/scala/akka/serial/SerialExt.scala9
-rw-r--r--core/src/main/scala/akka/serial/SerialManager.scala48
-rw-r--r--core/src/main/scala/akka/serial/SerialOperator.scala84
-rw-r--r--core/src/main/scala/akka/serial/Watcher.scala143
-rw-r--r--core/src/test/resources/reference.conf3
-rw-r--r--core/src/test/scala/akka/serial/SerialManagerSpec.scala40
-rw-r--r--core/src/test/scala/akka/serial/SerialOperatorSpec.scala50
9 files changed, 514 insertions, 0 deletions
diff --git a/core/build.sbt b/core/build.sbt
new file mode 100644
index 0000000..78673ce
--- /dev/null
+++ b/core/build.sbt
@@ -0,0 +1,5 @@
+import akkaserial.Dependencies
+
+libraryDependencies += Dependencies.akkaActor
+libraryDependencies += Dependencies.akkaTestKit % "test"
+libraryDependencies += Dependencies.scalatest % "test"
diff --git a/core/src/main/scala/akka/serial/Serial.scala b/core/src/main/scala/akka/serial/Serial.scala
new file mode 100644
index 0000000..a8e564a
--- /dev/null
+++ b/core/src/main/scala/akka/serial/Serial.scala
@@ -0,0 +1,132 @@
+package akka.serial
+
+import akka.actor.ExtensionKey
+import akka.util.ByteString
+
+/** Defines messages used by akka-serial's serial IO layer. */
+object Serial extends ExtensionKey[SerialExt] {
+
+ /** Base trait for any akka-serial-related messages. */
+ sealed trait Message
+
+ /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to akka-serial's API. */
+ trait Command extends Message
+
+ /** A message extending this trait is to be viewed as an event, an in-bound message issued by akka-serial to the client. */
+ trait Event extends Message
+
+ /** A command has failed. */
+ case class CommandFailed(command: Command, reason: Throwable) extends Event
+
+ /**
+ * Open a new serial port.
+ *
+ * Send this command to the serial manager to request the opening of a serial port. The manager will
+ * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port.
+ * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages.
+ *
+ * In case the port is successfully opened, the operator will respond with an `Opened` message.
+ * In case the port cannot be opened, the manager will respond with a `CommandFailed` message.
+ *
+ * @param port name of serial port to open
+ * @param settings settings of serial port to open
+ * @param bufferSize maximum read and write buffer sizes
+ */
+ case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command
+
+ /**
+ * A port has been successfully opened.
+ *
+ * Event sent by a port operator, indicating that a serial port was successfully opened. The sender
+ * of this message is the operator associated to the given serial port.
+ *
+ * @param port name of opened serial port
+ */
+ case class Opened(port: String) extends Event
+
+ /**
+ * Data has been received.
+ *
+ * Event sent by an operator, indicating that data was received on the operator's serial port.
+ *
+ * @param data data received on the port
+ */
+ case class Received(data: ByteString) extends Event
+
+ /**
+ * Write data to a serial port.
+ *
+ * Send this command to an operator to write the given data to its associated serial port.
+ * An acknowledgment may be set, in which case it is sent back to the sender on a successful write.
+ * Note that a successful write does not guarantee the actual transmission of data through the serial port,
+ * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to
+ * be transmitted.
+ *
+ * @param data data to be written to port
+ * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending (the acknowledgment
+ * is a function 'number of bytes written => event')
+ */
+ case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command
+
+ /**
+ * Special type of acknowledgment that is not sent back.
+ */
+ case object NoAck extends Function1[Int, Event] {
+ def apply(length: Int) = sys.error("cannot apply NoAck")
+ }
+
+ /**
+ * Request closing of port.
+ *
+ * Send this command to an operator to close its associated port. The operator will respond
+ * with a `Closed` message upon closing the serial port.
+ */
+ case object Close extends Command
+
+ /**
+ * A port has been closed.
+ *
+ * Event sent from operator, indicating that its port has been closed.
+ */
+ case object Closed extends Event
+
+ /**
+ * Watch a directory for new ports.
+ *
+ * Send this command to the manager to get notifications when a new port (i.e. file) is created in
+ * the given directory.
+ * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message.
+ *
+ * Note: the sender is also notified of currently existing ports.
+ *
+ * @param directory the directory to watch
+ * @param skipInitial don't get notified of already existing ports
+ *
+ * @see Unwatch
+ * @see Connected
+ */
+ case class Watch(directory: String = "/dev", skipInitial: Boolean = false) extends Command
+
+ /**
+ * Stop receiving notifications about a previously watched directory.
+ *
+ * @param directory the directory to unwatch
+ */
+ case class Unwatch(directory: String = "/dev") extends Command
+
+ /**
+ * A new port (i.e. file) has been detected.
+ *
+ * @param port the absolute file name of the connected port
+ */
+ case class Connected(port: String) extends Event
+
+ /**
+ * Sets native debugging mode. If debugging is enabled, detailed error messages
+ * are printed (to stderr) from native method calls.
+ *
+ * @param value set to enable debugging
+ */
+ def debug(value: Boolean) = sync.UnsafeSerial.debug(value)
+
+}
diff --git a/core/src/main/scala/akka/serial/SerialExt.scala b/core/src/main/scala/akka/serial/SerialExt.scala
new file mode 100644
index 0000000..2fcec7f
--- /dev/null
+++ b/core/src/main/scala/akka/serial/SerialExt.scala
@@ -0,0 +1,9 @@
+package akka.serial
+
+import akka.actor.{ ExtendedActorSystem, Props }
+import akka.io.IO
+
+/** Provides the serial IO manager. */
+class SerialExt(system: ExtendedActorSystem) extends IO.Extension {
+ lazy val manager = system.systemActorOf(Props(classOf[SerialManager]), name = "IO-SERIAL")
+}
diff --git a/core/src/main/scala/akka/serial/SerialManager.scala b/core/src/main/scala/akka/serial/SerialManager.scala
new file mode 100644
index 0000000..4833165
--- /dev/null
+++ b/core/src/main/scala/akka/serial/SerialManager.scala
@@ -0,0 +1,48 @@
+package akka.serial
+
+import akka.actor.{ Actor, ActorLogging, OneForOneStrategy }
+import akka.actor.SupervisorStrategy.{ Escalate, Stop }
+import scala.util.{ Failure, Success, Try }
+import sync.SerialConnection
+
+/**
+ * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to
+ * a dedicated operator actor that acts as an intermediate between client code and the native system serial port.
+ * @see SerialOperator
+ */
+private[serial] class SerialManager extends Actor {
+ import SerialManager._
+ import context._
+
+ override val supervisorStrategy = OneForOneStrategy() {
+ case _: Exception if sender == watcher => Escalate
+ case _: Exception => Stop
+ }
+
+ private val watcher = actorOf(Watcher(self), "watcher")
+
+ def receive = {
+
+ case open @ Serial.Open(port, settings, bufferSize) => Try {
+ SerialConnection.open(port, settings)
+ } match {
+ case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port))
+ case Failure(err) => sender ! Serial.CommandFailed(open, err)
+ }
+
+ case w: Serial.Watch => watcher.forward(w)
+
+ case u: Serial.Unwatch => watcher.forward(u)
+
+ }
+
+}
+
+private[serial] object SerialManager {
+
+ private def escapePortString(port: String) = port map {
+ case '/' => '-'
+ case c => c
+ }
+
+}
diff --git a/core/src/main/scala/akka/serial/SerialOperator.scala b/core/src/main/scala/akka/serial/SerialOperator.scala
new file mode 100644
index 0000000..cb5b46d
--- /dev/null
+++ b/core/src/main/scala/akka/serial/SerialOperator.scala
@@ -0,0 +1,84 @@
+package akka.serial
+
+import akka.actor.{ Actor, ActorRef, Props, Terminated }
+import akka.util.ByteString
+import java.nio.ByteBuffer
+
+import sync.SerialConnection
+
+/**
+ * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager.
+ * @see SerialManager
+ */
+private[serial] class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor {
+ import SerialOperator._
+ import context._
+
+ case class ReaderDied(ex: Throwable)
+ object Reader extends Thread {
+ val buffer = ByteBuffer.allocateDirect(bufferSize)
+
+ def loop() = {
+ var stop = false
+ while (!connection.isClosed && !stop) {
+ try {
+ buffer.clear()
+ connection.read(buffer)
+ val data = ByteString.fromByteBuffer(buffer)
+ client.tell(Serial.Received(data), self)
+ } catch {
+ // don't do anything if port is interrupted
+ case ex: PortInterruptedException => {}
+
+ //stop and tell operator on other exception
+ case ex: Exception =>
+ stop = true
+ self.tell(ReaderDied(ex), Actor.noSender)
+ }
+ }
+ }
+
+ override def run() {
+ this.setName(s"serial-reader(${connection.port})")
+ loop()
+ }
+
+ }
+
+ val writeBuffer = ByteBuffer.allocateDirect(bufferSize)
+
+ override def preStart() = {
+ context watch client
+ client ! Serial.Opened(connection.port)
+ Reader.start()
+ }
+
+ override def receive: Receive = {
+
+ case Serial.Write(data, ack) =>
+ writeBuffer.clear()
+ data.copyToBuffer(writeBuffer)
+ val sent = connection.write(writeBuffer)
+ if (ack != Serial.NoAck) sender ! ack(sent)
+
+ case Serial.Close =>
+ client ! Serial.Closed
+ context stop self
+
+ case Terminated(`client`) =>
+ context stop self
+
+ // go down with reader thread
+ case ReaderDied(ex) => throw ex
+
+ }
+
+ override def postStop() = {
+ connection.close()
+ }
+
+}
+
+private[serial] object SerialOperator {
+ def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client)
+}
diff --git a/core/src/main/scala/akka/serial/Watcher.scala b/core/src/main/scala/akka/serial/Watcher.scala
new file mode 100644
index 0000000..faaf39a
--- /dev/null
+++ b/core/src/main/scala/akka/serial/Watcher.scala
@@ -0,0 +1,143 @@
+package akka.serial
+
+import akka.actor.{ Actor, ActorRef, Props, Terminated }
+import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey }
+import java.nio.file.StandardWatchEventKinds._
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
+import scala.util.{ Failure, Success, Try }
+
+private[serial] class Watcher(from: Option[ActorRef]) extends Actor {
+
+ case class WatcherDied(reason: Throwable)
+ object WatcherThread extends Thread {
+ import Watcher.NewFile
+
+ private val service = FileSystems.getDefault().newWatchService()
+
+ def register(directory: Path) = directory.register(service, ENTRY_CREATE)
+
+ override def run(): Unit = {
+ this.setName("serial-port-watcher")
+ var stop = false
+ while (!stop) {
+ try {
+ val key = service.take()
+ key.pollEvents() foreach { ev =>
+ val event = ev.asInstanceOf[WatchEvent[Path]]
+ if (event.kind == ENTRY_CREATE) {
+ val directory = key.watchable().asInstanceOf[Path]
+ val file = event.context()
+ self.tell(NewFile(directory, file), Actor.noSender)
+ }
+ }
+ key.reset()
+ } catch {
+ case _: InterruptedException => stop = true
+ case _: ClosedWatchServiceException => stop = true
+ case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender)
+ }
+ }
+ }
+
+ def close() = service.close // causes the service to throw a ClosedWatchServiceException
+ }
+
+
+ // directory -> subscribers
+ private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
+
+ // directory -> watchkey
+ private val keys: Map[String, WatchKey] = Map.empty
+
+ def subscribe(directory: String, client: ActorRef): WatchKey = {
+ val normal = Paths.get(directory).toAbsolutePath
+ val index = normal.toString
+ val key = keys.getOrElseUpdate(index, WatcherThread.register(normal))
+ clients addBinding (index, client)
+ key
+ }
+
+ def unsubscribe(directory: String, client: ActorRef): Unit = {
+ val index = Paths.get(directory).toAbsolutePath.toString
+
+ clients removeBinding (index, sender)
+
+ if (clients.get(index).isEmpty && keys.get(index).isDefined) {
+ keys(index).cancel()
+ keys -= index
+ }
+ }
+
+ def reply(msg: Any, sender: ActorRef) = {
+ val origin = from match {
+ case Some(ref) => ref
+ case None => self
+ }
+ sender.tell(msg, origin)
+ }
+
+ override def preStart() = {
+ WatcherThread.setDaemon(true)
+ WatcherThread.start()
+ }
+
+ override def receive = {
+
+ case w @ Serial.Watch(directory, skipInitial) =>
+ val normalPath = Paths.get(directory).toAbsolutePath
+ val normal = normalPath.toString
+
+ Try {
+ subscribe(directory, sender)
+ } match {
+ case Failure(err) => reply(Serial.CommandFailed(w, err), sender)
+ case Success(key) =>
+ context watch sender
+ if (!skipInitial) {
+ Files.newDirectoryStream(normalPath) foreach { path =>
+ if (!Files.isDirectory(path)) {
+ reply(Serial.Connected(path.toString), sender)
+ }
+ }
+ }
+ }
+
+ case u @ Serial.Unwatch(directory) =>
+ val normal = Paths.get(directory).toAbsolutePath.toString
+
+ clients.removeBinding(normal, sender)
+
+ if (clients.get(normal).isEmpty && keys.get(normal).isDefined) {
+ keys(normal).cancel()
+ keys -= normal
+ }
+
+ case Terminated(client) =>
+ for ((directory, c) <- clients if c == client) {
+ unsubscribe(directory, client)
+ }
+
+ case Watcher.NewFile(directory, file) =>
+ val normal = directory.toAbsolutePath
+ val absFile = normal resolve file
+ clients.getOrElse(normal.toString, Set.empty) foreach { client =>
+ reply(Serial.Connected(absFile.toString), client)
+ }
+
+ case WatcherDied(err) => throw err // go down with watcher thread
+
+ }
+
+ override def postStop() = {
+ WatcherThread.close()
+ }
+
+}
+
+private[serial] object Watcher {
+ private case class NewFile(directory: Path, file: Path)
+
+ def apply(from: ActorRef) = Props(classOf[Watcher], Some(from))
+
+}
diff --git a/core/src/test/resources/reference.conf b/core/src/test/resources/reference.conf
new file mode 100644
index 0000000..bdf80e2
--- /dev/null
+++ b/core/src/test/resources/reference.conf
@@ -0,0 +1,3 @@
+# Don't fill test output with log messages
+akka.stdout-loglevel = "OFF"
+akka.loglevel = "OFF" \ No newline at end of file
diff --git a/core/src/test/scala/akka/serial/SerialManagerSpec.scala b/core/src/test/scala/akka/serial/SerialManagerSpec.scala
new file mode 100644
index 0000000..eab5013
--- /dev/null
+++ b/core/src/test/scala/akka/serial/SerialManagerSpec.scala
@@ -0,0 +1,40 @@
+package akka.serial
+
+import akka.actor.ActorSystem
+import akka.io.IO
+import akka.testkit.{ImplicitSender, TestKit}
+import org.scalatest._
+
+class SerialManagerSpec
+ extends TestKit(ActorSystem("serial-manager"))
+ with ImplicitSender
+ with WordSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with PseudoTerminal {
+
+ override def afterAll {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ "Serial manager" should {
+ val manager = IO(Serial)
+
+ "open an existing port" in {
+ withEcho{ case (port, settings) =>
+ manager ! Serial.Open(port, settings)
+ expectMsgType[Serial.Opened]
+ }
+ }
+
+ "fail opening a non-existing port" in {
+ val cmd = Serial.Open("nonexistent", SerialSettings(115200))
+ manager ! cmd
+
+ //manager.context.set
+ assert(expectMsgType[Serial.CommandFailed].command == cmd)
+ }
+
+ }
+
+}
diff --git a/core/src/test/scala/akka/serial/SerialOperatorSpec.scala b/core/src/test/scala/akka/serial/SerialOperatorSpec.scala
new file mode 100644
index 0000000..6907494
--- /dev/null
+++ b/core/src/test/scala/akka/serial/SerialOperatorSpec.scala
@@ -0,0 +1,50 @@
+package akka.serial
+
+import scala.concurrent.duration._
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.testkit.{ImplicitSender, TestKit}
+import akka.util.ByteString
+import org.scalatest._
+import sync._
+
+case class Ack(n: Int) extends Serial.Event
+
+class SerialOperatorSpec
+ extends TestKit(ActorSystem("serial-operator"))
+ with ImplicitSender
+ with WordSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with SequentialNestedSuiteExecution
+ with PseudoTerminal {
+
+ override def afterAll {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ def withEchoOp[A](action: ActorRef => A): A = {
+ withEcho { case (port, settings) =>
+ val connection = SerialConnection.open(port, settings)
+ val operator = system.actorOf(SerialOperator.apply(connection, 1024, testActor))
+ action(operator)
+ }
+ }
+
+ "Serial operator" should {
+
+ "follow the correct protocol" in withEchoOp { op =>
+ expectMsgType[Serial.Opened]
+
+ val data = ByteString("hello world".getBytes("utf-8"))
+ op ! Serial.Write(data)
+ expectMsg(Serial.Received(data))
+
+ op ! Serial.Close
+ expectMsg(Serial.Closed)
+
+ }
+
+ }
+
+}