summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-10-04 15:23:56 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-10-04 15:23:56 +0000
commit76faa29bb78029a79a327cbe5b6708b1c7b408de (patch)
treef3f4e3a7b790d9b61e2e74ab3d1470fd2c0a5025
parentf4f4e7138732bec78765b4a470f7aea3468fe8ba (diff)
downloadscala-76faa29bb78029a79a327cbe5b6708b1c7b408de.tar.gz
scala-76faa29bb78029a79a327cbe5b6708b1c7b408de.tar.bz2
scala-76faa29bb78029a79a327cbe5b6708b1c7b408de.zip
Small clean-ups.
-rw-r--r--src/actors/scala/actors/Actor.scala10
-rw-r--r--src/actors/scala/actors/Reactor.scala5
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala2
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala68
4 files changed, 37 insertions, 48 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 157f2631b8..873835d177 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -573,9 +573,13 @@ object RemoteActor {
}
}
-abstract class Node
-case class TcpNode(address: String, port: Int) extends Node
-case class JxtaNode(group: String) extends Node
+
+/**
+ This class represents a machine node on a TCP network.
+
+ @author Philipp Haller
+ */
+case class Node(address: String, port: Int)
/**
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 4b6c63180e..d8cb0d0bb5 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -1,7 +1,6 @@
package scala.actors
/**
-
This class provides (together with <code>Channel</code>) an
implementation of event-based actors (aka reactors).
@@ -101,7 +100,9 @@ private[actors] class StartTask(a: Reactor) extends Reaction {
a.exit("normal")
}
catch {
- case _: InterruptedException => a.exitLinked()
+ case _: InterruptedException => {
+ a.exitLinked()
+ }
case d: SuspendActorException => {
// do nothing (continuation is already saved)
}
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index 90af3d563c..19d6f81f6a 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -36,7 +36,7 @@ class NetKernel(service: Service) {
actors.get(receiver) match {
case Some(a) => {
val msg = service.serializer.deserialize(data)
- val senderProxy = new Reactor {
+ val senderProxy = new ActorThread {
def act() = { a ! msg }
override def !(msg: Any): Unit = {
msg match {
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
index c8c987e75a..0237cc0e96 100644
--- a/src/actors/scala/actors/remote/TcpService.scala
+++ b/src/actors/scala/actors/remote/TcpService.scala
@@ -29,28 +29,24 @@ object TcpService {
class TcpService(port: Int) extends Thread with Service {
val serializer: JavaSerializer = new JavaSerializer(this)
- private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port)
- def node: TcpNode = internalNode
+ private val internalNode = new Node(InetAddress.getLocalHost().getHostAddress(), port)
+ def node: Node = internalNode
def send(node: Node, data: Array[byte]): unit = synchronized {
// retrieve worker thread (if any) that already has connection
- node match {
- case tnode: TcpNode =>
- getConnection(tnode) match {
- case None =>
- // we are not connected, yet
- val newWorker = connect(tnode)
- newWorker transmit data
- case Some(worker) => worker transmit data
- }
- case any => error("no TcpNode!")
+ getConnection(node) match {
+ case None => {
+ // we are not connected, yet
+ val newWorker = connect(node)
+ newWorker transmit data
+ }
+ case Some(worker) => worker transmit data
}
}
override def run(): Unit =
try {
val socket = new ServerSocket(port)
-
while (true) {
val nextClient = socket.accept()
val worker = new TcpServiceWorker(this, nextClient)
@@ -65,31 +61,21 @@ class TcpService(port: Int) extends Thread with Service {
// connection management
private val connections =
- new scala.collection.mutable.HashMap[TcpNode, TcpServiceWorker]
+ new scala.collection.mutable.HashMap[Node, TcpServiceWorker]
- private[actors] def addConnection(node: TcpNode, worker: TcpServiceWorker) = synchronized {
+ private[actors] def addConnection(node: Node, worker: TcpServiceWorker) = synchronized {
connections += node -> worker
}
- def getConnection(n: TcpNode) = synchronized {
+ def getConnection(n: Node) = synchronized {
connections.get(n)
}
def isConnected(n: Node): Boolean = synchronized {
- n match {
- case tnode: TcpNode => !connections.get(tnode).isEmpty
- case _ => false
- }
- }
-
- def connect(n: Node): Unit = synchronized {
- n match {
- case tnode: TcpNode =>
- connect(tnode)
- }
+ !connections.get(n).isEmpty
}
- def connect(n: TcpNode): TcpServiceWorker = synchronized {
+ def connect(n: Node): TcpServiceWorker = synchronized {
val sock = new Socket(n.address, n.port)
val worker = new TcpServiceWorker(this, sock)
worker.sendNode(n)
@@ -99,16 +85,14 @@ class TcpService(port: Int) extends Thread with Service {
}
def disconnectNode(n: Node) = synchronized {
- n match {
- case node: TcpNode =>
- connections.get(node) match {
- case None => // do nothing
- case Some(worker) => {
- connections -= node
- worker.halt
- }
- }
- case any => error("no TcpNode.")
+ connections.get(n) match {
+ case None => {
+ // do nothing
+ }
+ case Some(worker) => {
+ connections -= n
+ worker.halt
+ }
}
}
@@ -123,7 +107,7 @@ class TcpService(port: Int) extends Thread with Service {
case se: SecurityException => false
}
- def nodeDown(mnode: TcpNode): Unit = synchronized {
+ def nodeDown(mnode: Node): Unit = synchronized {
connections -= mnode
}
}
@@ -139,9 +123,9 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
val reader = new BufferedReader(new InputStreamReader(in))
val writer = new PrintWriter(new OutputStreamWriter(out))
- var connectedNode: TcpNode = _
+ var connectedNode: Node = _
- def sendNode(n: TcpNode) = {
+ def sendNode(n: Node) = {
connectedNode = n
parent.serializer.writeObject(dataout, parent.node)
}
@@ -150,7 +134,7 @@ class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
//val node = parent.serializer.deserialize(reader)
val node = parent.serializer.readObject(datain)
node match {
- case n: TcpNode => {
+ case n: Node => {
connectedNode = n
parent.addConnection(n, this)
}