summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/remote/TcpService.scala
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-09-29 16:50:14 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-09-29 16:50:14 +0000
commit499d7f10e23549ef30a61d13fc7f4203145f14f1 (patch)
treea69dde160c1496d07ea2d1d0edc25ab2cee1a26c /src/actors/scala/actors/remote/TcpService.scala
parent8abd909119cb9a3011d6c141138f2e2a5af5e304 (diff)
downloadscala-499d7f10e23549ef30a61d13fc7f4203145f14f1.tar.gz
scala-499d7f10e23549ef30a61d13fc7f4203145f14f1.tar.bz2
scala-499d7f10e23549ef30a61d13fc7f4203145f14f1.zip
Checked in new actors lib.
Diffstat (limited to 'src/actors/scala/actors/remote/TcpService.scala')
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala191
1 files changed, 191 insertions, 0 deletions
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
new file mode 100644
index 0000000000..c8c987e75a
--- /dev/null
+++ b/src/actors/scala/actors/remote/TcpService.scala
@@ -0,0 +1,191 @@
+package scala.actors.remote
+
+import java.io.{DataInputStream,DataOutputStream,BufferedReader,PrintWriter,
+ IOException,InputStreamReader,OutputStreamWriter}
+import java.net.{InetAddress,ServerSocket,Socket,UnknownHostException}
+
+object TcpService {
+ val random = new java.util.Random(System.currentTimeMillis())
+
+ def generatePort: int = {
+ var portnum = 0
+ try {
+ portnum = 8000 + random.nextInt(500)
+ val socket = new ServerSocket(portnum)
+ socket.close()
+ }
+ catch {
+ case ioe: IOException =>
+ // this happens when trying to open a socket twice at the same port
+ // try again
+ generatePort
+ case se: SecurityException =>
+ // do nothing
+ }
+ portnum
+ }
+}
+
+class TcpService(port: Int) extends Thread with Service {
+ val serializer: JavaSerializer = new JavaSerializer(this)
+
+ private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port)
+ def node: TcpNode = internalNode
+
+ def send(node: Node, data: Array[byte]): unit = synchronized {
+ // retrieve worker thread (if any) that already has connection
+ node match {
+ case tnode: TcpNode =>
+ getConnection(tnode) match {
+ case None =>
+ // we are not connected, yet
+ val newWorker = connect(tnode)
+ newWorker transmit data
+ case Some(worker) => worker transmit data
+ }
+ case any => error("no TcpNode!")
+ }
+ }
+
+ override def run(): Unit =
+ try {
+ val socket = new ServerSocket(port)
+
+ while (true) {
+ val nextClient = socket.accept()
+ val worker = new TcpServiceWorker(this, nextClient)
+ worker.readNode
+ worker.start()
+ }
+ } catch {
+ case ioe: IOException => // do nothing
+ case sec: SecurityException => // do nothing
+ }
+
+ // connection management
+
+ private val connections =
+ new scala.collection.mutable.HashMap[TcpNode, TcpServiceWorker]
+
+ private[actors] def addConnection(node: TcpNode, worker: TcpServiceWorker) = synchronized {
+ connections += node -> worker
+ }
+
+ def getConnection(n: TcpNode) = synchronized {
+ connections.get(n)
+ }
+
+ def isConnected(n: Node): Boolean = synchronized {
+ n match {
+ case tnode: TcpNode => !connections.get(tnode).isEmpty
+ case _ => false
+ }
+ }
+
+ def connect(n: Node): Unit = synchronized {
+ n match {
+ case tnode: TcpNode =>
+ connect(tnode)
+ }
+ }
+
+ def connect(n: TcpNode): TcpServiceWorker = synchronized {
+ val sock = new Socket(n.address, n.port)
+ val worker = new TcpServiceWorker(this, sock)
+ worker.sendNode(n)
+ worker.start()
+ addConnection(n, worker)
+ worker
+ }
+
+ def disconnectNode(n: Node) = synchronized {
+ n match {
+ case node: TcpNode =>
+ connections.get(node) match {
+ case None => // do nothing
+ case Some(worker) => {
+ connections -= node
+ worker.halt
+ }
+ }
+ case any => error("no TcpNode.")
+ }
+ }
+
+ def isReachable(node: Node): boolean =
+ if (isConnected(node)) true
+ else try {
+ connect(node)
+ return true
+ } catch {
+ case uhe: UnknownHostException => false
+ case ioe: IOException => false
+ case se: SecurityException => false
+ }
+
+ def nodeDown(mnode: TcpNode): Unit = synchronized {
+ connections -= mnode
+ }
+}
+
+
+class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
+ val in = so.getInputStream()
+ val out = so.getOutputStream()
+
+ val datain = new DataInputStream(in)
+ val dataout = new DataOutputStream(out)
+
+ val reader = new BufferedReader(new InputStreamReader(in))
+ val writer = new PrintWriter(new OutputStreamWriter(out))
+
+ var connectedNode: TcpNode = _
+
+ def sendNode(n: TcpNode) = {
+ connectedNode = n
+ parent.serializer.writeObject(dataout, parent.node)
+ }
+
+ def readNode = {
+ //val node = parent.serializer.deserialize(reader)
+ val node = parent.serializer.readObject(datain)
+ node match {
+ case n: TcpNode => {
+ connectedNode = n
+ parent.addConnection(n, this)
+ }
+ }
+ }
+
+ def transmit(data: Array[byte]): Unit = synchronized {
+ dataout.writeInt(data.length)
+ dataout.write(data)
+ dataout.flush()
+ }
+
+ var running = true
+
+ def halt = synchronized {
+ so.close()
+ running = false
+ }
+
+ override def run(): Unit = {
+ try {
+ while (running) {
+ if (in.available() > 0) {
+ //val msg = parent.serializer.deserialize(reader);
+ val msg = parent.serializer.readObject(datain);
+ parent.kernel.processMsg(connectedNode, msg)
+ }
+ }
+ }
+ catch {
+ case ioe: IOException =>
+ parent nodeDown connectedNode
+ case e: Exception =>
+ // catch-all
+ parent nodeDown connectedNode
+ }
+ }
+}