aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2013-06-27 13:00:55 +0200
committerJakob Odersky <jodersky@gmail.com>2013-06-27 13:00:55 +0200
commit77bddab03e8905fcd1bd84778d097be745a3e08d (patch)
tree7a6d64aa131368b76aaacda71c3ed1fb23c7b2e7
parentf007a17cca8213193f5a6b51e3bcb61653230ff4 (diff)
downloadakka-serial-77bddab03e8905fcd1bd84778d097be745a3e08d.tar.gz
akka-serial-77bddab03e8905fcd1bd84778d097be745a3e08d.tar.bz2
akka-serial-77bddab03e8905fcd1bd84778d097be745a3e08d.zip
lots of changes mon internal thread handling and serial operation
-rw-r--r--project/Build.scala2
-rw-r--r--samples/rwc/src/main/scala/com/github/jodersky/flow/example/Main.scala7
-rw-r--r--samples/rwc/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala54
-rw-r--r--src/main/native/flow.c14
-rw-r--r--src/main/native/flow.h2
-rw-r--r--src/main/scala/com/github/jodersky/flow/Serial.scala7
-rw-r--r--src/main/scala/com/github/jodersky/flow/SerialExt.scala2
-rw-r--r--src/main/scala/com/github/jodersky/flow/SerialManager.scala50
-rw-r--r--src/main/scala/com/github/jodersky/flow/SerialOperator.scala84
-rw-r--r--src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala1
10 files changed, 126 insertions, 97 deletions
diff --git a/project/Build.scala b/project/Build.scala
index 4026ba7..fc71e5b 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -17,7 +17,7 @@ object FlowBuild extends Build {
NativeDefault.defaultSettings ++
Seq(
libraryDependencies ++= Dependencies.all,
- javahClasses := Seq("com.github.jodersky.flow.low.NativeSerial"),
+ javahClasses := Seq("com.github.jodersky.flow.internal.NativeSerial"),
includeDirectories in Native += jdkHome.value / "include" / "linux",
binaryType in Native := SharedLibrary,
binaryName in Native := "flow",
diff --git a/samples/rwc/src/main/scala/com/github/jodersky/flow/example/Main.scala b/samples/rwc/src/main/scala/com/github/jodersky/flow/example/Main.scala
index 71c40bc..985513b 100644
--- a/samples/rwc/src/main/scala/com/github/jodersky/flow/example/Main.scala
+++ b/samples/rwc/src/main/scala/com/github/jodersky/flow/example/Main.scala
@@ -4,14 +4,13 @@ package example
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Try
-
import com.github.jodersky.flow.Serial
import com.github.jodersky.flow.Serial._
-
import akka.actor.ActorSystem
import akka.actor.Props
import akka.io.IO
import akka.util.ByteString
+import com.github.jodersky.flow.internal.InternalSerial
object Main {
@@ -26,11 +25,11 @@ object Main {
val port = "/dev/ttyACM0"
val baud = 115200
- low.Serial.debug(true)
+ // InternalSerial.debug(true)
implicit val system = ActorSystem("flow")
val serial = system.actorOf(Props(classOf[SerialHandler], port, baud), name = "serial-handler")
-
+
readLine()
serial ! ByteString("hello back".getBytes())
diff --git a/samples/rwc/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala b/samples/rwc/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala
index 8a717d1..ddcde8d 100644
--- a/samples/rwc/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala
+++ b/samples/rwc/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala
@@ -1,43 +1,55 @@
package com.github.jodersky.flow.example
import com.github.jodersky.flow.Serial._
-import com.github.jodersky.flow.low.{ Serial => LowSerial }
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.util.ByteString
import akka.io.IO
import com.github.jodersky.flow.Serial
-
+import akka.actor.Terminated
class SerialHandler(port: String, baud: Int) extends Actor with ActorLogging {
import context._
-
- println(s"Requesting port open: ${port}, baud: ${baud}")
+
+ log.info(s"Requesting manager to open port: ${port}, baud: ${baud}")
IO(Serial) ! Serial.Open(self, port, baud)
-
def receive = {
- case CommandFailed(_: Open, reason) => {
- println(s"connection failed, reason: ${reason}")
+
+ case OpenFailed(_, reason) => {
+ log.error(s"Connection failed, stopping handler. Reason: ${reason}")
context stop self
}
- case Opened(operator) =>
- println("Port opened.")
- context become {
- case Received(data) => {
- println("received data: " + formatData(data))
- println("as string: " + new String(data.toArray, "UTF-8"))
- }
- case Wrote(data) => println("wrote ACK: " + formatData(data))
- case CommandFailed(_, _) => println("write failed")
- case Closed => context stop self
- case "close" => operator ! Close
- case data: ByteString => operator ! Write(data)
- }
+ case Opened(port) => {
+ log.info(s"Port ${port} is now open.")
+ context watch sender
+ context become opened(sender)
+ }
+ }
+
+ def opened(operator: ActorRef): Receive = {
+ case Terminated(`operator`) => {
+ log.info("operator down, handler exiting")
+ context.stop(self)
+ }
+ case Received(data) => {
+ log.info("Received data: " + formatData(data))
+ log.info("As string: " + new String(data.toArray, "UTF-8"))
+ }
+ case Wrote(data) => log.info("Got ACK for writing data: " + formatData(data))
+ case Closed => {
+ log.info("Operator closed, exiting handler.")
+ context stop self
+ }
+ case "close" => {
+ log.info("Initiating close.")
+ operator ! Close
+ }
+ case data: ByteString => operator ! Write(data, true)
}
-
+
private def formatData(data: ByteString) = data.mkString("[", ",", "]")
} \ No newline at end of file
diff --git a/src/main/native/flow.c b/src/main/native/flow.c
index 8cb7def..174e9c0 100644
--- a/src/main/native/flow.c
+++ b/src/main/native/flow.c
@@ -36,7 +36,7 @@
#include <termios.h>
#include <fcntl.h>
#include <poll.h>
-#include "com_github_jodersky_flow_low_NativeSerial.h"
+#include "com_github_jodersky_flow_internal_NativeSerial.h"
#include "flow.h"
static bool debug = false;
@@ -234,7 +234,7 @@ inline jlong s2j(struct serial_config* pointer) {
return (jlong) pointer;
}
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_open
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_open
(JNIEnv *env, jclass clazz, jstring port_name, jint baud, jlongArray jserialp)
{
const char *dev = (*env)->GetStringUTFChars(env, port_name, 0);
@@ -248,13 +248,13 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_open
return r;
}
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_close
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_close
(JNIEnv * env, jclass clazz, jlong serial)
{
serial_close(j2s(serial));
}
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_read
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_read
(JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer)
{
@@ -270,7 +270,7 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_read
return n;
}
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_write
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_write
(JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer)
{
unsigned char * buffer = (*env)->GetByteArrayElements(env, jbuffer, NULL);
@@ -282,13 +282,13 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_write
return r;
}
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_interrupt
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_interrupt
(JNIEnv * env, jclass clazz, jlong serial)
{
return serial_interrupt(j2s(serial));
}
-JNIEXPORT void JNICALL Java_com_github_jodersky_flow_low_NativeSerial_debug
+JNIEXPORT void JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_debug
(JNIEnv *env, jclass clazz, jboolean value)
{
serial_debug((bool) value);
diff --git a/src/main/native/flow.h b/src/main/native/flow.h
index 29b8d28..3b6df24 100644
--- a/src/main/native/flow.h
+++ b/src/main/native/flow.h
@@ -45,7 +45,7 @@ int serial_open(const char* port_name, int baud, struct serial_config** serial);
* @return E_IO on error */
int serial_close(struct serial_config* serial);
-/**Starts a blocking read from a previously opened serial port. The read is blocking, however it may be
+/**Starts a read from a previously opened serial port. The read is blocking, however it may be
* interrupted by calling 'serial_interrupt' on the given serial port.
* @param serial pointer to serial configuration from which to read
* @param buffer buffer into which data is read
diff --git a/src/main/scala/com/github/jodersky/flow/Serial.scala b/src/main/scala/com/github/jodersky/flow/Serial.scala
index 7929ef2..9b03ba0 100644
--- a/src/main/scala/com/github/jodersky/flow/Serial.scala
+++ b/src/main/scala/com/github/jodersky/flow/Serial.scala
@@ -3,17 +3,18 @@ package com.github.jodersky.flow
import akka.io._
import akka.actor.ExtensionKey
import akka.actor.Props
-import low.{ Serial => LowSerial }
import akka.actor.ActorRef
import akka.util.ByteString
+/** Defines messages used by serial IO layer. */
object Serial extends ExtensionKey[SerialExt] {
trait Command
trait Event
case class Open(handler: ActorRef, port: String, baud: Int) extends Command
- case class Opened(operator: ActorRef) extends Event
+ case class Opened(port: String) extends Event
+ case class OpenFailed(port: String, reason: Throwable) extends Event
case class Received(data: ByteString) extends Event
@@ -22,7 +23,5 @@ object Serial extends ExtensionKey[SerialExt] {
case object Close extends Command
case object Closed extends Event
-
- case class CommandFailed(command: Command, reason: Throwable) extends Event
}
diff --git a/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/src/main/scala/com/github/jodersky/flow/SerialExt.scala
index 14da388..45d10d8 100644
--- a/src/main/scala/com/github/jodersky/flow/SerialExt.scala
+++ b/src/main/scala/com/github/jodersky/flow/SerialExt.scala
@@ -5,5 +5,5 @@ import akka.io.IO
import akka.actor.Props
class SerialExt(system: ExtendedActorSystem) extends IO.Extension {
- lazy val manager = system.actorOf(Props[SerialManager], name = "IO-SERIAL")
+ lazy val manager = system.actorOf(Props(classOf[SerialManager]), name = "IO-SERIAL")
} \ No newline at end of file
diff --git a/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/src/main/scala/com/github/jodersky/flow/SerialManager.scala
index 3e6e543..8b0a39c 100644
--- a/src/main/scala/com/github/jodersky/flow/SerialManager.scala
+++ b/src/main/scala/com/github/jodersky/flow/SerialManager.scala
@@ -1,40 +1,50 @@
package com.github.jodersky.flow
-import akka.actor.Actor
-import Serial._
-import low.{ Serial => LowSerial }
-import scala.util.Success
import scala.util.Failure
-import akka.actor.Props
-import scala.concurrent._
+import scala.util.Success
+import scala.util.Try
+import com.github.jodersky.flow.internal.InternalSerial
+import Serial._
+import akka.actor.Actor
import akka.actor.ActorLogging
+import akka.actor.OneForOneStrategy
+import akka.actor.OneForOneStrategy
+import akka.actor.Props
+import akka.actor.SupervisorStrategy._
+import java.io.IOException
class SerialManager extends Actor with ActorLogging {
import SerialManager._
import context._
+ override val supervisorStrategy =
+ OneForOneStrategy() {
+ case _: IOException => Stop
+ case _: Exception => Escalate
+ }
+
def receive = {
- case command @ Open(handler, port, baud) =>
- future{LowSerial.open(port, baud)}.onComplete(_ match {
- case Success(serial) => {
- log.debug(s"opened low serial port at ${port}, baud ${baud}")
- val operator = context.actorOf(Props(classOf[SerialOperator], serial, handler), name = escapePortString(port))
- handler ! Opened(operator)
- }
- case Failure(t) => {
- log.debug(s"failed to open low serial port at ${port}, baud ${baud}, reason: " + t.getMessage())
- handler ! CommandFailed(command, t)
- }
- })
+ case Open(handler, port, baud) => Try { InternalSerial.open(port, baud) } match {
+ case Failure(t) => {
+ log.debug(s"failed to open low serial port at ${port}, baud ${baud}, reason: " + t.getMessage())
+ handler ! OpenFailed(port, t)
+ }
+
+ case Success(serial) => {
+ log.debug(s"opened low-level serial port at ${port}, baud ${baud}")
+ context.actorOf(Props(classOf[SerialOperator], handler, serial), name = escapePortString(port))
+ }
+
+ }
}
}
object SerialManager {
-
+
private def escapePortString(port: String) = port collect {
case '/' => '-'
case c => c
}
-
+
} \ No newline at end of file
diff --git a/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
index 8f958b2..5be8cd5 100644
--- a/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
+++ b/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -1,76 +1,84 @@
package com.github.jodersky.flow
-import com.github.jodersky.flow.internalial.Close;
-import com.github.jodersky.flow.internalial.Closed;
-import com.github.jodersky.flow.internalial.CommandFailed;
-import com.github.jodersky.flow.internalial.Received;
-import com.github.jodersky.flow.internalial.Write;
-import com.github.jodersky.flow.internalial.Wrote;
-
import scala.concurrent.future
import scala.util.Failure
import scala.util.Success
-
-import Serial.Close
-import Serial.Closed
-import Serial.CommandFailed
-import Serial.Received
-import Serial.Write
-import Serial.Wrote
+import Serial._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
-import akka.actor.actorRef2Scala
import akka.util.ByteString
-import low.{Serial => LowSerial}
+import com.github.jodersky.flow.internal.InternalSerial
+import akka.actor.Terminated
+import scala.util.Try
+
+class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor with ActorLogging {
+ import context._
-class SerialOperator(serial: LowSerial, handler: ActorRef) extends Actor with ActorLogging {
-import context._
+ case class ReadException(ex: Throwable)
object Reader extends Thread {
- private var continueReading = true
- override def run() {
- Thread.currentThread().setName("flow-reader " + serial.port)
- log.debug("started read thread " + Thread.currentThread().getName())
+ def enterReadLoop() = {
+ var continueReading = true
while (continueReading) {
try {
- log.debug("enter blocking read")
val data = ByteString(serial.read())
- log.debug("return from blocking read")
handler ! Received(data)
} catch {
+
+ //port is closing, stop thread gracefully
case ex: PortInterruptedException => {
continueReading = false
- log.debug("interrupted from blocking read")
+ }
+
+ //something else went wrong stop and tell actor
+ case ex: Exception => {
+ continueReading = false
+ self ! ReadException(ex)
}
}
}
- log.debug("exit read thread normally " + Thread.currentThread().getName())
}
+
+ def name = this.getName()
+
+ override def run() {
+ this.setName("flow-reader " + serial.port)
+ log.debug(name + ": started reader thread")
+ enterReadLoop()
+ log.debug(name + ": exiting")
+ }
+
}
- Reader.start()
+ override def preStart() = {
+ context watch handler
+ handler ! Opened(serial.port)
+ Reader.start()
+ }
+
+ override def postStop = {
+ serial.close()
+ }
- context.watch(handler)
+ def receive: Receive = {
- def receive = {
- case c @ Write(data, ack) => {
- val writer = sender
- future { serial.write(data.toArray) }.onComplete {
- case Success(data) => writer ! Wrote(ByteString(data))
- case Failure(t) => writer ! CommandFailed(c, t)
- }
+ case Write(data, ack) => {
+ serial.write(data.toArray) // no future needed as write is non-blocking
+ if (ack) sender ! Wrote(data)
}
case Close => {
sender ! Closed
context.stop(self)
}
- }
- override def postStop = {
- serial.close()
+ case Terminated(`handler`) => context.stop(self)
+
+ //go down with reader thread
+ case ReadException(ex) => throw ex
+
}
} \ No newline at end of file
diff --git a/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala b/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
index f1268c3..315f395 100644
--- a/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
+++ b/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
@@ -4,6 +4,7 @@ import java.io.IOException
import com.github.jodersky.flow._
import java.util.concurrent.atomic.AtomicBoolean
+/** Wraps NativeSerial in a more object-oriented style, still quite low level. */
class InternalSerial private (val port: String, private val pointer: Long) {
import InternalSerial._