From be1e2330ab84cf449fae664fd0c9788e3ae4af1f Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Fri, 21 Jun 2013 22:57:40 +0200 Subject: improve thread safety in serial class --- src/main/native/flow.c | 1 + .../com/github/jodersky/flow/SerialOperator.scala | 26 +++++++++++++--------- .../com/github/jodersky/flow/low/Serial.scala | 10 ++++----- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/main/native/flow.c b/src/main/native/flow.c index 43dd750..c8927e9 100644 --- a/src/main/native/flow.c +++ b/src/main/native/flow.c @@ -163,6 +163,7 @@ int serial_close(struct serial_config* serial) { } free(serial); + return 0; } int serial_read(struct serial_config* serial, unsigned char* buffer, size_t size) { diff --git a/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/src/main/scala/com/github/jodersky/flow/SerialOperator.scala index c9f38cd..951e9f9 100644 --- a/src/main/scala/com/github/jodersky/flow/SerialOperator.scala +++ b/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -13,34 +13,38 @@ import scala.concurrent._ class SerialOperator(serial: LowSerial, handler: ActorRef) extends Actor { import context._ - + object Reader extends Thread { - private var continueReading = true - + private var continueReading = true + override def run() { Thread.currentThread().setName("flow-reader " + serial.port) while (continueReading) { - println("beginning read") - val data = ByteString(serial.read()) - println("return from read") - handler ! Received(data) + try { + println("beginning read") + val data = ByteString(serial.read()) + println("return from read") + handler ! Received(data) + } catch { + case ex: PortInterruptedException => continueReading = false + } } } } - + Reader.start() - + context.watch(handler) def receive = { case c @ Write(data) => { val writer = sender - future{serial.write(data.toArray)}.onComplete { + future { serial.write(data.toArray) }.onComplete { case Success(data) => writer ! Wrote(ByteString(data)) case Failure(t) => writer ! CommandFailed(c, t) } } - + case Close => { sender ! Closed context.stop(self) diff --git a/src/main/scala/com/github/jodersky/flow/low/Serial.scala b/src/main/scala/com/github/jodersky/flow/low/Serial.scala index 916ab84..0a098c5 100644 --- a/src/main/scala/com/github/jodersky/flow/low/Serial.scala +++ b/src/main/scala/com/github/jodersky/flow/low/Serial.scala @@ -21,8 +21,8 @@ class Serial private (val port: String, private val pointer: Long) { def close(): Unit = if (!closed.get()) { closed.set(true) requireSuccess(NativeSerial.interrupt(pointer), port) - if (writing.get()) wait() // if reading, wait for read to finish - if (reading.get()) wait() + if (writing.get()) synchronized{wait()} // if reading, wait for read to finish + if (reading.get()) synchronized{wait()} requireSuccess(NativeSerial.close(pointer), port) } @@ -30,7 +30,7 @@ class Serial private (val port: String, private val pointer: Long) { reading.set(true) val buffer = new Array[Byte](100) val readResult = NativeSerial.read(pointer, buffer) - if (closed.get) notify(); //read was interrupted by close + if (closed.get) synchronized{notify()}; //read was interrupted by close reading.set(false) val n = requireSuccess(readResult, port) buffer take n @@ -41,7 +41,7 @@ class Serial private (val port: String, private val pointer: Long) { def write(data: Array[Byte]): Array[Byte] = if (!closed.get) { writing.set(true) val writeResult = NativeSerial.write(pointer, data) - if (closed.get) notify() + if (closed.get) synchronized{notify()} writing.set(false) val n = requireSuccess(writeResult, port) data take n @@ -54,7 +54,7 @@ class Serial private (val port: String, private val pointer: Long) { object Serial { import NativeSerial._ - def requireSuccess(result: Int, port: String): Int = result match { + private def requireSuccess(result: Int, port: String): Int = result match { case E_IO => throw new IOException(port) case E_ACCESS_DENIED => throw new AccessDeniedException(port) case E_BUSY => throw new PortInUseException(port) -- cgit v1.2.3