diff options
author | Jakob Odersky <jakob@odersky.com> | 2017-01-08 21:16:25 +0100 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2017-01-21 17:22:10 -0800 |
commit | 23959966760174477a6b0fcbf9dd1e8ef37c643b (patch) | |
tree | 9a0ee44eb43a8c13af57b0d06313f3aabf9e4555 /core | |
parent | 6c371ba6d69c891c1f0d6df00bb643e1d543cc9d (diff) | |
download | akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.gz akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.bz2 akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.zip |
Rename project to akka-serial
Diffstat (limited to 'core')
-rw-r--r-- | core/build.sbt | 5 | ||||
-rw-r--r-- | core/src/main/scala/akka/serial/Serial.scala | 132 | ||||
-rw-r--r-- | core/src/main/scala/akka/serial/SerialExt.scala | 9 | ||||
-rw-r--r-- | core/src/main/scala/akka/serial/SerialManager.scala | 48 | ||||
-rw-r--r-- | core/src/main/scala/akka/serial/SerialOperator.scala | 84 | ||||
-rw-r--r-- | core/src/main/scala/akka/serial/Watcher.scala | 143 | ||||
-rw-r--r-- | core/src/test/resources/reference.conf | 3 | ||||
-rw-r--r-- | core/src/test/scala/akka/serial/SerialManagerSpec.scala | 40 | ||||
-rw-r--r-- | core/src/test/scala/akka/serial/SerialOperatorSpec.scala | 50 |
9 files changed, 514 insertions, 0 deletions
diff --git a/core/build.sbt b/core/build.sbt new file mode 100644 index 0000000..78673ce --- /dev/null +++ b/core/build.sbt @@ -0,0 +1,5 @@ +import akkaserial.Dependencies + +libraryDependencies += Dependencies.akkaActor +libraryDependencies += Dependencies.akkaTestKit % "test" +libraryDependencies += Dependencies.scalatest % "test" diff --git a/core/src/main/scala/akka/serial/Serial.scala b/core/src/main/scala/akka/serial/Serial.scala new file mode 100644 index 0000000..a8e564a --- /dev/null +++ b/core/src/main/scala/akka/serial/Serial.scala @@ -0,0 +1,132 @@ +package akka.serial + +import akka.actor.ExtensionKey +import akka.util.ByteString + +/** Defines messages used by akka-serial's serial IO layer. */ +object Serial extends ExtensionKey[SerialExt] { + + /** Base trait for any akka-serial-related messages. */ + sealed trait Message + + /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to akka-serial's API. */ + trait Command extends Message + + /** A message extending this trait is to be viewed as an event, an in-bound message issued by akka-serial to the client. */ + trait Event extends Message + + /** A command has failed. */ + case class CommandFailed(command: Command, reason: Throwable) extends Event + + /** + * Open a new serial port. + * + * Send this command to the serial manager to request the opening of a serial port. The manager will + * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port. + * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages. + * + * In case the port is successfully opened, the operator will respond with an `Opened` message. + * In case the port cannot be opened, the manager will respond with a `CommandFailed` message. + * + * @param port name of serial port to open + * @param settings settings of serial port to open + * @param bufferSize maximum read and write buffer sizes + */ + case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command + + /** + * A port has been successfully opened. + * + * Event sent by a port operator, indicating that a serial port was successfully opened. The sender + * of this message is the operator associated to the given serial port. + * + * @param port name of opened serial port + */ + case class Opened(port: String) extends Event + + /** + * Data has been received. + * + * Event sent by an operator, indicating that data was received on the operator's serial port. + * + * @param data data received on the port + */ + case class Received(data: ByteString) extends Event + + /** + * Write data to a serial port. + * + * Send this command to an operator to write the given data to its associated serial port. + * An acknowledgment may be set, in which case it is sent back to the sender on a successful write. + * Note that a successful write does not guarantee the actual transmission of data through the serial port, + * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to + * be transmitted. + * + * @param data data to be written to port + * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending (the acknowledgment + * is a function 'number of bytes written => event') + */ + case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command + + /** + * Special type of acknowledgment that is not sent back. + */ + case object NoAck extends Function1[Int, Event] { + def apply(length: Int) = sys.error("cannot apply NoAck") + } + + /** + * Request closing of port. + * + * Send this command to an operator to close its associated port. The operator will respond + * with a `Closed` message upon closing the serial port. + */ + case object Close extends Command + + /** + * A port has been closed. + * + * Event sent from operator, indicating that its port has been closed. + */ + case object Closed extends Event + + /** + * Watch a directory for new ports. + * + * Send this command to the manager to get notifications when a new port (i.e. file) is created in + * the given directory. + * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message. + * + * Note: the sender is also notified of currently existing ports. + * + * @param directory the directory to watch + * @param skipInitial don't get notified of already existing ports + * + * @see Unwatch + * @see Connected + */ + case class Watch(directory: String = "/dev", skipInitial: Boolean = false) extends Command + + /** + * Stop receiving notifications about a previously watched directory. + * + * @param directory the directory to unwatch + */ + case class Unwatch(directory: String = "/dev") extends Command + + /** + * A new port (i.e. file) has been detected. + * + * @param port the absolute file name of the connected port + */ + case class Connected(port: String) extends Event + + /** + * Sets native debugging mode. If debugging is enabled, detailed error messages + * are printed (to stderr) from native method calls. + * + * @param value set to enable debugging + */ + def debug(value: Boolean) = sync.UnsafeSerial.debug(value) + +} diff --git a/core/src/main/scala/akka/serial/SerialExt.scala b/core/src/main/scala/akka/serial/SerialExt.scala new file mode 100644 index 0000000..2fcec7f --- /dev/null +++ b/core/src/main/scala/akka/serial/SerialExt.scala @@ -0,0 +1,9 @@ +package akka.serial + +import akka.actor.{ ExtendedActorSystem, Props } +import akka.io.IO + +/** Provides the serial IO manager. */ +class SerialExt(system: ExtendedActorSystem) extends IO.Extension { + lazy val manager = system.systemActorOf(Props(classOf[SerialManager]), name = "IO-SERIAL") +} diff --git a/core/src/main/scala/akka/serial/SerialManager.scala b/core/src/main/scala/akka/serial/SerialManager.scala new file mode 100644 index 0000000..4833165 --- /dev/null +++ b/core/src/main/scala/akka/serial/SerialManager.scala @@ -0,0 +1,48 @@ +package akka.serial + +import akka.actor.{ Actor, ActorLogging, OneForOneStrategy } +import akka.actor.SupervisorStrategy.{ Escalate, Stop } +import scala.util.{ Failure, Success, Try } +import sync.SerialConnection + +/** + * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to + * a dedicated operator actor that acts as an intermediate between client code and the native system serial port. + * @see SerialOperator + */ +private[serial] class SerialManager extends Actor { + import SerialManager._ + import context._ + + override val supervisorStrategy = OneForOneStrategy() { + case _: Exception if sender == watcher => Escalate + case _: Exception => Stop + } + + private val watcher = actorOf(Watcher(self), "watcher") + + def receive = { + + case open @ Serial.Open(port, settings, bufferSize) => Try { + SerialConnection.open(port, settings) + } match { + case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port)) + case Failure(err) => sender ! Serial.CommandFailed(open, err) + } + + case w: Serial.Watch => watcher.forward(w) + + case u: Serial.Unwatch => watcher.forward(u) + + } + +} + +private[serial] object SerialManager { + + private def escapePortString(port: String) = port map { + case '/' => '-' + case c => c + } + +} diff --git a/core/src/main/scala/akka/serial/SerialOperator.scala b/core/src/main/scala/akka/serial/SerialOperator.scala new file mode 100644 index 0000000..cb5b46d --- /dev/null +++ b/core/src/main/scala/akka/serial/SerialOperator.scala @@ -0,0 +1,84 @@ +package akka.serial + +import akka.actor.{ Actor, ActorRef, Props, Terminated } +import akka.util.ByteString +import java.nio.ByteBuffer + +import sync.SerialConnection + +/** + * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. + * @see SerialManager + */ +private[serial] class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor { + import SerialOperator._ + import context._ + + case class ReaderDied(ex: Throwable) + object Reader extends Thread { + val buffer = ByteBuffer.allocateDirect(bufferSize) + + def loop() = { + var stop = false + while (!connection.isClosed && !stop) { + try { + buffer.clear() + connection.read(buffer) + val data = ByteString.fromByteBuffer(buffer) + client.tell(Serial.Received(data), self) + } catch { + // don't do anything if port is interrupted + case ex: PortInterruptedException => {} + + //stop and tell operator on other exception + case ex: Exception => + stop = true + self.tell(ReaderDied(ex), Actor.noSender) + } + } + } + + override def run() { + this.setName(s"serial-reader(${connection.port})") + loop() + } + + } + + val writeBuffer = ByteBuffer.allocateDirect(bufferSize) + + override def preStart() = { + context watch client + client ! Serial.Opened(connection.port) + Reader.start() + } + + override def receive: Receive = { + + case Serial.Write(data, ack) => + writeBuffer.clear() + data.copyToBuffer(writeBuffer) + val sent = connection.write(writeBuffer) + if (ack != Serial.NoAck) sender ! ack(sent) + + case Serial.Close => + client ! Serial.Closed + context stop self + + case Terminated(`client`) => + context stop self + + // go down with reader thread + case ReaderDied(ex) => throw ex + + } + + override def postStop() = { + connection.close() + } + +} + +private[serial] object SerialOperator { + def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client) +} diff --git a/core/src/main/scala/akka/serial/Watcher.scala b/core/src/main/scala/akka/serial/Watcher.scala new file mode 100644 index 0000000..faaf39a --- /dev/null +++ b/core/src/main/scala/akka/serial/Watcher.scala @@ -0,0 +1,143 @@ +package akka.serial + +import akka.actor.{ Actor, ActorRef, Props, Terminated } +import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey } +import java.nio.file.StandardWatchEventKinds._ +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ HashMap, Map, MultiMap, Set } +import scala.util.{ Failure, Success, Try } + +private[serial] class Watcher(from: Option[ActorRef]) extends Actor { + + case class WatcherDied(reason: Throwable) + object WatcherThread extends Thread { + import Watcher.NewFile + + private val service = FileSystems.getDefault().newWatchService() + + def register(directory: Path) = directory.register(service, ENTRY_CREATE) + + override def run(): Unit = { + this.setName("serial-port-watcher") + var stop = false + while (!stop) { + try { + val key = service.take() + key.pollEvents() foreach { ev => + val event = ev.asInstanceOf[WatchEvent[Path]] + if (event.kind == ENTRY_CREATE) { + val directory = key.watchable().asInstanceOf[Path] + val file = event.context() + self.tell(NewFile(directory, file), Actor.noSender) + } + } + key.reset() + } catch { + case _: InterruptedException => stop = true + case _: ClosedWatchServiceException => stop = true + case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender) + } + } + } + + def close() = service.close // causes the service to throw a ClosedWatchServiceException + } + + + // directory -> subscribers + private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef] + + // directory -> watchkey + private val keys: Map[String, WatchKey] = Map.empty + + def subscribe(directory: String, client: ActorRef): WatchKey = { + val normal = Paths.get(directory).toAbsolutePath + val index = normal.toString + val key = keys.getOrElseUpdate(index, WatcherThread.register(normal)) + clients addBinding (index, client) + key + } + + def unsubscribe(directory: String, client: ActorRef): Unit = { + val index = Paths.get(directory).toAbsolutePath.toString + + clients removeBinding (index, sender) + + if (clients.get(index).isEmpty && keys.get(index).isDefined) { + keys(index).cancel() + keys -= index + } + } + + def reply(msg: Any, sender: ActorRef) = { + val origin = from match { + case Some(ref) => ref + case None => self + } + sender.tell(msg, origin) + } + + override def preStart() = { + WatcherThread.setDaemon(true) + WatcherThread.start() + } + + override def receive = { + + case w @ Serial.Watch(directory, skipInitial) => + val normalPath = Paths.get(directory).toAbsolutePath + val normal = normalPath.toString + + Try { + subscribe(directory, sender) + } match { + case Failure(err) => reply(Serial.CommandFailed(w, err), sender) + case Success(key) => + context watch sender + if (!skipInitial) { + Files.newDirectoryStream(normalPath) foreach { path => + if (!Files.isDirectory(path)) { + reply(Serial.Connected(path.toString), sender) + } + } + } + } + + case u @ Serial.Unwatch(directory) => + val normal = Paths.get(directory).toAbsolutePath.toString + + clients.removeBinding(normal, sender) + + if (clients.get(normal).isEmpty && keys.get(normal).isDefined) { + keys(normal).cancel() + keys -= normal + } + + case Terminated(client) => + for ((directory, c) <- clients if c == client) { + unsubscribe(directory, client) + } + + case Watcher.NewFile(directory, file) => + val normal = directory.toAbsolutePath + val absFile = normal resolve file + clients.getOrElse(normal.toString, Set.empty) foreach { client => + reply(Serial.Connected(absFile.toString), client) + } + + case WatcherDied(err) => throw err // go down with watcher thread + + } + + override def postStop() = { + WatcherThread.close() + } + +} + +private[serial] object Watcher { + private case class NewFile(directory: Path, file: Path) + + def apply(from: ActorRef) = Props(classOf[Watcher], Some(from)) + +} diff --git a/core/src/test/resources/reference.conf b/core/src/test/resources/reference.conf new file mode 100644 index 0000000..bdf80e2 --- /dev/null +++ b/core/src/test/resources/reference.conf @@ -0,0 +1,3 @@ +# Don't fill test output with log messages +akka.stdout-loglevel = "OFF" +akka.loglevel = "OFF"
\ No newline at end of file diff --git a/core/src/test/scala/akka/serial/SerialManagerSpec.scala b/core/src/test/scala/akka/serial/SerialManagerSpec.scala new file mode 100644 index 0000000..eab5013 --- /dev/null +++ b/core/src/test/scala/akka/serial/SerialManagerSpec.scala @@ -0,0 +1,40 @@ +package akka.serial + +import akka.actor.ActorSystem +import akka.io.IO +import akka.testkit.{ImplicitSender, TestKit} +import org.scalatest._ + +class SerialManagerSpec + extends TestKit(ActorSystem("serial-manager")) + with ImplicitSender + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with PseudoTerminal { + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + "Serial manager" should { + val manager = IO(Serial) + + "open an existing port" in { + withEcho{ case (port, settings) => + manager ! Serial.Open(port, settings) + expectMsgType[Serial.Opened] + } + } + + "fail opening a non-existing port" in { + val cmd = Serial.Open("nonexistent", SerialSettings(115200)) + manager ! cmd + + //manager.context.set + assert(expectMsgType[Serial.CommandFailed].command == cmd) + } + + } + +} diff --git a/core/src/test/scala/akka/serial/SerialOperatorSpec.scala b/core/src/test/scala/akka/serial/SerialOperatorSpec.scala new file mode 100644 index 0000000..6907494 --- /dev/null +++ b/core/src/test/scala/akka/serial/SerialOperatorSpec.scala @@ -0,0 +1,50 @@ +package akka.serial + +import scala.concurrent.duration._ + +import akka.actor.{ActorRef, ActorSystem} +import akka.testkit.{ImplicitSender, TestKit} +import akka.util.ByteString +import org.scalatest._ +import sync._ + +case class Ack(n: Int) extends Serial.Event + +class SerialOperatorSpec + extends TestKit(ActorSystem("serial-operator")) + with ImplicitSender + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with SequentialNestedSuiteExecution + with PseudoTerminal { + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + def withEchoOp[A](action: ActorRef => A): A = { + withEcho { case (port, settings) => + val connection = SerialConnection.open(port, settings) + val operator = system.actorOf(SerialOperator.apply(connection, 1024, testActor)) + action(operator) + } + } + + "Serial operator" should { + + "follow the correct protocol" in withEchoOp { op => + expectMsgType[Serial.Opened] + + val data = ByteString("hello world".getBytes("utf-8")) + op ! Serial.Write(data) + expectMsg(Serial.Received(data)) + + op ! Serial.Close + expectMsg(Serial.Closed) + + } + + } + +} |