diff options
author | Jakob Odersky <jodersky@gmail.com> | 2013-06-27 13:00:55 +0200 |
---|---|---|
committer | Jakob Odersky <jodersky@gmail.com> | 2013-06-27 13:00:55 +0200 |
commit | 77bddab03e8905fcd1bd84778d097be745a3e08d (patch) | |
tree | 7a6d64aa131368b76aaacda71c3ed1fb23c7b2e7 | |
parent | f007a17cca8213193f5a6b51e3bcb61653230ff4 (diff) | |
download | akka-serial-77bddab03e8905fcd1bd84778d097be745a3e08d.tar.gz akka-serial-77bddab03e8905fcd1bd84778d097be745a3e08d.tar.bz2 akka-serial-77bddab03e8905fcd1bd84778d097be745a3e08d.zip |
lots of changes mon internal thread handling and serial operation
10 files changed, 126 insertions, 97 deletions
diff --git a/project/Build.scala b/project/Build.scala index 4026ba7..fc71e5b 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -17,7 +17,7 @@ object FlowBuild extends Build { NativeDefault.defaultSettings ++ Seq( libraryDependencies ++= Dependencies.all, - javahClasses := Seq("com.github.jodersky.flow.low.NativeSerial"), + javahClasses := Seq("com.github.jodersky.flow.internal.NativeSerial"), includeDirectories in Native += jdkHome.value / "include" / "linux", binaryType in Native := SharedLibrary, binaryName in Native := "flow", diff --git a/samples/rwc/src/main/scala/com/github/jodersky/flow/example/Main.scala b/samples/rwc/src/main/scala/com/github/jodersky/flow/example/Main.scala index 71c40bc..985513b 100644 --- a/samples/rwc/src/main/scala/com/github/jodersky/flow/example/Main.scala +++ b/samples/rwc/src/main/scala/com/github/jodersky/flow/example/Main.scala @@ -4,14 +4,13 @@ package example import scala.concurrent.ExecutionContext.Implicits.global import scala.util.Success import scala.util.Try - import com.github.jodersky.flow.Serial import com.github.jodersky.flow.Serial._ - import akka.actor.ActorSystem import akka.actor.Props import akka.io.IO import akka.util.ByteString +import com.github.jodersky.flow.internal.InternalSerial object Main { @@ -26,11 +25,11 @@ object Main { val port = "/dev/ttyACM0" val baud = 115200 - low.Serial.debug(true) + // InternalSerial.debug(true) implicit val system = ActorSystem("flow") val serial = system.actorOf(Props(classOf[SerialHandler], port, baud), name = "serial-handler") - + readLine() serial ! ByteString("hello back".getBytes()) diff --git a/samples/rwc/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala b/samples/rwc/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala index 8a717d1..ddcde8d 100644 --- a/samples/rwc/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala +++ b/samples/rwc/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala @@ -1,43 +1,55 @@ package com.github.jodersky.flow.example import com.github.jodersky.flow.Serial._ -import com.github.jodersky.flow.low.{ Serial => LowSerial } import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef import akka.util.ByteString import akka.io.IO import com.github.jodersky.flow.Serial - +import akka.actor.Terminated class SerialHandler(port: String, baud: Int) extends Actor with ActorLogging { import context._ - - println(s"Requesting port open: ${port}, baud: ${baud}") + + log.info(s"Requesting manager to open port: ${port}, baud: ${baud}") IO(Serial) ! Serial.Open(self, port, baud) - def receive = { - case CommandFailed(_: Open, reason) => { - println(s"connection failed, reason: ${reason}") + + case OpenFailed(_, reason) => { + log.error(s"Connection failed, stopping handler. Reason: ${reason}") context stop self } - case Opened(operator) => - println("Port opened.") - context become { - case Received(data) => { - println("received data: " + formatData(data)) - println("as string: " + new String(data.toArray, "UTF-8")) - } - case Wrote(data) => println("wrote ACK: " + formatData(data)) - case CommandFailed(_, _) => println("write failed") - case Closed => context stop self - case "close" => operator ! Close - case data: ByteString => operator ! Write(data) - } + case Opened(port) => { + log.info(s"Port ${port} is now open.") + context watch sender + context become opened(sender) + } + } + + def opened(operator: ActorRef): Receive = { + case Terminated(`operator`) => { + log.info("operator down, handler exiting") + context.stop(self) + } + case Received(data) => { + log.info("Received data: " + formatData(data)) + log.info("As string: " + new String(data.toArray, "UTF-8")) + } + case Wrote(data) => log.info("Got ACK for writing data: " + formatData(data)) + case Closed => { + log.info("Operator closed, exiting handler.") + context stop self + } + case "close" => { + log.info("Initiating close.") + operator ! Close + } + case data: ByteString => operator ! Write(data, true) } - + private def formatData(data: ByteString) = data.mkString("[", ",", "]") }
\ No newline at end of file diff --git a/src/main/native/flow.c b/src/main/native/flow.c index 8cb7def..174e9c0 100644 --- a/src/main/native/flow.c +++ b/src/main/native/flow.c @@ -36,7 +36,7 @@ #include <termios.h> #include <fcntl.h> #include <poll.h> -#include "com_github_jodersky_flow_low_NativeSerial.h" +#include "com_github_jodersky_flow_internal_NativeSerial.h" #include "flow.h" static bool debug = false; @@ -234,7 +234,7 @@ inline jlong s2j(struct serial_config* pointer) { return (jlong) pointer; } -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_open +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_open (JNIEnv *env, jclass clazz, jstring port_name, jint baud, jlongArray jserialp) { const char *dev = (*env)->GetStringUTFChars(env, port_name, 0); @@ -248,13 +248,13 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_open return r; } -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_close +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_close (JNIEnv * env, jclass clazz, jlong serial) { serial_close(j2s(serial)); } -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_read +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_read (JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer) { @@ -270,7 +270,7 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_read return n; } -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_write +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_write (JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer) { unsigned char * buffer = (*env)->GetByteArrayElements(env, jbuffer, NULL); @@ -282,13 +282,13 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_write return r; } -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_interrupt +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_interrupt (JNIEnv * env, jclass clazz, jlong serial) { return serial_interrupt(j2s(serial)); } -JNIEXPORT void JNICALL Java_com_github_jodersky_flow_low_NativeSerial_debug +JNIEXPORT void JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_debug (JNIEnv *env, jclass clazz, jboolean value) { serial_debug((bool) value); diff --git a/src/main/native/flow.h b/src/main/native/flow.h index 29b8d28..3b6df24 100644 --- a/src/main/native/flow.h +++ b/src/main/native/flow.h @@ -45,7 +45,7 @@ int serial_open(const char* port_name, int baud, struct serial_config** serial); * @return E_IO on error */ int serial_close(struct serial_config* serial); -/**Starts a blocking read from a previously opened serial port. The read is blocking, however it may be +/**Starts a read from a previously opened serial port. The read is blocking, however it may be * interrupted by calling 'serial_interrupt' on the given serial port. * @param serial pointer to serial configuration from which to read * @param buffer buffer into which data is read diff --git a/src/main/scala/com/github/jodersky/flow/Serial.scala b/src/main/scala/com/github/jodersky/flow/Serial.scala index 7929ef2..9b03ba0 100644 --- a/src/main/scala/com/github/jodersky/flow/Serial.scala +++ b/src/main/scala/com/github/jodersky/flow/Serial.scala @@ -3,17 +3,18 @@ package com.github.jodersky.flow import akka.io._ import akka.actor.ExtensionKey import akka.actor.Props -import low.{ Serial => LowSerial } import akka.actor.ActorRef import akka.util.ByteString +/** Defines messages used by serial IO layer. */ object Serial extends ExtensionKey[SerialExt] { trait Command trait Event case class Open(handler: ActorRef, port: String, baud: Int) extends Command - case class Opened(operator: ActorRef) extends Event + case class Opened(port: String) extends Event + case class OpenFailed(port: String, reason: Throwable) extends Event case class Received(data: ByteString) extends Event @@ -22,7 +23,5 @@ object Serial extends ExtensionKey[SerialExt] { case object Close extends Command case object Closed extends Event - - case class CommandFailed(command: Command, reason: Throwable) extends Event } diff --git a/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/src/main/scala/com/github/jodersky/flow/SerialExt.scala index 14da388..45d10d8 100644 --- a/src/main/scala/com/github/jodersky/flow/SerialExt.scala +++ b/src/main/scala/com/github/jodersky/flow/SerialExt.scala @@ -5,5 +5,5 @@ import akka.io.IO import akka.actor.Props class SerialExt(system: ExtendedActorSystem) extends IO.Extension { - lazy val manager = system.actorOf(Props[SerialManager], name = "IO-SERIAL") + lazy val manager = system.actorOf(Props(classOf[SerialManager]), name = "IO-SERIAL") }
\ No newline at end of file diff --git a/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/src/main/scala/com/github/jodersky/flow/SerialManager.scala index 3e6e543..8b0a39c 100644 --- a/src/main/scala/com/github/jodersky/flow/SerialManager.scala +++ b/src/main/scala/com/github/jodersky/flow/SerialManager.scala @@ -1,40 +1,50 @@ package com.github.jodersky.flow -import akka.actor.Actor -import Serial._ -import low.{ Serial => LowSerial } -import scala.util.Success import scala.util.Failure -import akka.actor.Props -import scala.concurrent._ +import scala.util.Success +import scala.util.Try +import com.github.jodersky.flow.internal.InternalSerial +import Serial._ +import akka.actor.Actor import akka.actor.ActorLogging +import akka.actor.OneForOneStrategy +import akka.actor.OneForOneStrategy +import akka.actor.Props +import akka.actor.SupervisorStrategy._ +import java.io.IOException class SerialManager extends Actor with ActorLogging { import SerialManager._ import context._ + override val supervisorStrategy = + OneForOneStrategy() { + case _: IOException => Stop + case _: Exception => Escalate + } + def receive = { - case command @ Open(handler, port, baud) => - future{LowSerial.open(port, baud)}.onComplete(_ match { - case Success(serial) => { - log.debug(s"opened low serial port at ${port}, baud ${baud}") - val operator = context.actorOf(Props(classOf[SerialOperator], serial, handler), name = escapePortString(port)) - handler ! Opened(operator) - } - case Failure(t) => { - log.debug(s"failed to open low serial port at ${port}, baud ${baud}, reason: " + t.getMessage()) - handler ! CommandFailed(command, t) - } - }) + case Open(handler, port, baud) => Try { InternalSerial.open(port, baud) } match { + case Failure(t) => { + log.debug(s"failed to open low serial port at ${port}, baud ${baud}, reason: " + t.getMessage()) + handler ! OpenFailed(port, t) + } + + case Success(serial) => { + log.debug(s"opened low-level serial port at ${port}, baud ${baud}") + context.actorOf(Props(classOf[SerialOperator], handler, serial), name = escapePortString(port)) + } + + } } } object SerialManager { - + private def escapePortString(port: String) = port collect { case '/' => '-' case c => c } - + }
\ No newline at end of file diff --git a/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/src/main/scala/com/github/jodersky/flow/SerialOperator.scala index 8f958b2..5be8cd5 100644 --- a/src/main/scala/com/github/jodersky/flow/SerialOperator.scala +++ b/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -1,76 +1,84 @@ package com.github.jodersky.flow -import com.github.jodersky.flow.internalial.Close; -import com.github.jodersky.flow.internalial.Closed; -import com.github.jodersky.flow.internalial.CommandFailed; -import com.github.jodersky.flow.internalial.Received; -import com.github.jodersky.flow.internalial.Write; -import com.github.jodersky.flow.internalial.Wrote; - import scala.concurrent.future import scala.util.Failure import scala.util.Success - -import Serial.Close -import Serial.Closed -import Serial.CommandFailed -import Serial.Received -import Serial.Write -import Serial.Wrote +import Serial._ import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef -import akka.actor.actorRef2Scala import akka.util.ByteString -import low.{Serial => LowSerial} +import com.github.jodersky.flow.internal.InternalSerial +import akka.actor.Terminated +import scala.util.Try + +class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor with ActorLogging { + import context._ -class SerialOperator(serial: LowSerial, handler: ActorRef) extends Actor with ActorLogging { -import context._ + case class ReadException(ex: Throwable) object Reader extends Thread { - private var continueReading = true - override def run() { - Thread.currentThread().setName("flow-reader " + serial.port) - log.debug("started read thread " + Thread.currentThread().getName()) + def enterReadLoop() = { + var continueReading = true while (continueReading) { try { - log.debug("enter blocking read") val data = ByteString(serial.read()) - log.debug("return from blocking read") handler ! Received(data) } catch { + + //port is closing, stop thread gracefully case ex: PortInterruptedException => { continueReading = false - log.debug("interrupted from blocking read") + } + + //something else went wrong stop and tell actor + case ex: Exception => { + continueReading = false + self ! ReadException(ex) } } } - log.debug("exit read thread normally " + Thread.currentThread().getName()) } + + def name = this.getName() + + override def run() { + this.setName("flow-reader " + serial.port) + log.debug(name + ": started reader thread") + enterReadLoop() + log.debug(name + ": exiting") + } + } - Reader.start() + override def preStart() = { + context watch handler + handler ! Opened(serial.port) + Reader.start() + } + + override def postStop = { + serial.close() + } - context.watch(handler) + def receive: Receive = { - def receive = { - case c @ Write(data, ack) => { - val writer = sender - future { serial.write(data.toArray) }.onComplete { - case Success(data) => writer ! Wrote(ByteString(data)) - case Failure(t) => writer ! CommandFailed(c, t) - } + case Write(data, ack) => { + serial.write(data.toArray) // no future needed as write is non-blocking + if (ack) sender ! Wrote(data) } case Close => { sender ! Closed context.stop(self) } - } - override def postStop = { - serial.close() + case Terminated(`handler`) => context.stop(self) + + //go down with reader thread + case ReadException(ex) => throw ex + } }
\ No newline at end of file diff --git a/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala b/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala index f1268c3..315f395 100644 --- a/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala +++ b/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala @@ -4,6 +4,7 @@ import java.io.IOException import com.github.jodersky.flow._ import java.util.concurrent.atomic.AtomicBoolean +/** Wraps NativeSerial in a more object-oriented style, still quite low level. */ class InternalSerial private (val port: String, private val pointer: Long) { import InternalSerial._ |