From a7aeed67da977fc885f7e89ae5f52aa7dadbac24 Mon Sep 17 00:00:00 2001 From: michelou Date: Tue, 2 Jun 2009 10:31:36 +0000 Subject: fixed (finally!) issue with JavaSerializer --- .../scala/actors/remote/JavaSerializer.scala | 13 ++++++++-- src/actors/scala/actors/remote/Serializer.scala | 6 ++--- src/actors/scala/actors/remote/TcpService.scala | 30 +++++++++------------- 3 files changed, 26 insertions(+), 23 deletions(-) (limited to 'src/actors') diff --git a/src/actors/scala/actors/remote/JavaSerializer.scala b/src/actors/scala/actors/remote/JavaSerializer.scala index fa71dbc8cf..42b7000f9d 100644 --- a/src/actors/scala/actors/remote/JavaSerializer.scala +++ b/src/actors/scala/actors/remote/JavaSerializer.scala @@ -17,14 +17,23 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, /** * @author Guy Oliver */ -class CustomObjectInputStream(os: InputStream, cl: ClassLoader) extends ObjectInputStream(os) { - override def resolveClass(cd: ObjectStreamClass): Class[T] forSome { type T } = +private[remote] class CustomObjectInputStream(in: InputStream, cl: ClassLoader) +extends ObjectInputStream(in) { + override def resolveClass(cd: ObjectStreamClass): Class[_] = try { cl.loadClass(cd.getName()) } catch { case cnf: ClassNotFoundException => super.resolveClass(cd) } + override def resolveProxyClass(interfaces: Array[String]): Class[_] = + try { + val ifaces = interfaces map { iface => cl.loadClass(iface) } + java.lang.reflect.Proxy.getProxyClass(cl, ifaces: _*) + } catch { + case e: ClassNotFoundException => + super.resolveProxyClass(interfaces) + } } /** diff --git a/src/actors/scala/actors/remote/Serializer.scala b/src/actors/scala/actors/remote/Serializer.scala index 4da91dc3c0..0d5ee1bf34 100644 --- a/src/actors/scala/actors/remote/Serializer.scala +++ b/src/actors/scala/actors/remote/Serializer.scala @@ -21,12 +21,12 @@ abstract class Serializer(val service: Service) { def deserialize(a: Array[Byte]): AnyRef @throws(classOf[IOException]) - def readBytes(inputStream: DataInputStream): Array[Byte] = { + private def readBytes(inputStream: DataInputStream): Array[Byte] = { try { val length = inputStream.readInt() val bytes = new Array[Byte](length) inputStream.readFully(bytes, 0, length) - return bytes + bytes } catch { case npe: NullPointerException => @@ -41,7 +41,7 @@ abstract class Serializer(val service: Service) { } @throws(classOf[IOException]) - def writeBytes(outputStream: DataOutputStream, bytes: Array[Byte]) { + private def writeBytes(outputStream: DataOutputStream, bytes: Array[Byte]) { val length = bytes.length; // original length outputStream.writeInt(length) diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala index b0b3b1392a..e41ab94446 100644 --- a/src/actors/scala/actors/remote/TcpService.scala +++ b/src/actors/scala/actors/remote/TcpService.scala @@ -138,7 +138,7 @@ class TcpService(port: Int, cl: ClassLoader) extends Thread with Service { try { val socket = new ServerSocket(port) while (!shouldTerminate) { - Debug.info(this+": waiting for new connection...") + Debug.info(this+": waiting for new connection on port "+port+"...") val nextClient = socket.accept() if (!shouldTerminate) { val worker = new TcpServiceWorker(this, nextClient) @@ -182,8 +182,8 @@ class TcpService(port: Int, cl: ClassLoader) extends Thread with Service { } def connect(n: Node): TcpServiceWorker = synchronized { - val sock = new Socket(n.address, n.port) - val worker = new TcpServiceWorker(this, sock) + val socket = new Socket(n.address, n.port) + val worker = new TcpServiceWorker(this, socket) worker.sendNode(n) worker.start() addConnection(n, worker) @@ -192,13 +192,11 @@ class TcpService(port: Int, cl: ClassLoader) extends Thread with Service { def disconnectNode(n: Node) = synchronized { connections.get(n) match { - case None => { + case None => // do nothing - } - case Some(worker) => { + case Some(worker) => connections -= n worker.halt - } } } @@ -219,27 +217,23 @@ class TcpService(port: Int, cl: ClassLoader) extends Thread with Service { } -class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { - val in = so.getInputStream() - val out = so.getOutputStream() - - val datain = new DataInputStream(in) - val dataout = new DataOutputStream(out) +private[actors] class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { + val datain = new DataInputStream(so.getInputStream) + val dataout = new DataOutputStream(so.getOutputStream) var connectedNode: Node = _ - def sendNode(n: Node) = { + def sendNode(n: Node) { connectedNode = n parent.serializer.writeObject(dataout, parent.node) } - def readNode = { + def readNode { val node = parent.serializer.readObject(datain) node match { - case n: Node => { + case n: Node => connectedNode = n parent.addConnection(n, this) - } } } @@ -272,6 +266,6 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { Debug.info(this+": caught "+e) parent nodeDown connectedNode } - Debug.info(this+": terminated") + Debug.info(this+": service terminated at "+parent.node) } } -- cgit v1.2.3