aboutsummaryrefslogtreecommitdiff
path: root/src/scala/ubiquifs/Slave.scala
blob: 328b73c8288bb395223660f795540b00a74d76cd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package ubiquifs

import java.io.{DataInputStream, DataOutputStream, IOException}
import java.net.{InetAddress, Socket, ServerSocket}
import java.util.concurrent.locks.ReentrantLock

import scala.actors.Actor
import scala.actors.Actor._
import scala.actors.remote.RemoteActor
import scala.actors.remote.RemoteActor._
import scala.actors.remote.Node
import scala.collection.mutable.{ArrayBuffer, Map, Set}

class Slave(myPort: Int, master: String) extends Thread("UbiquiFS slave") {
  val CHUNK_SIZE = 1024 * 1024

  val buffers = Map[String, Buffer]()

  override def run() {
    // Create server socket
    val socket = new ServerSocket(myPort)

    // Register with master
    val (masterHost, masterPort) = Utils.parseHostPort(master)
    val masterActor = select(Node(masterHost, masterPort), 'UbiquiFS)
    val myHost = InetAddress.getLocalHost.getHostName
    val reply = masterActor !? RegisterSlave(myHost, myPort)
    println("Registered with master, reply = " + reply)

    while (true) {
      val conn = socket.accept()
      new ConnectionHandler(conn).start()
    }
  }

  class ConnectionHandler(conn: Socket) extends Thread("ConnectionHandler") {
    try {
      val in = new DataInputStream(conn.getInputStream)
      val out = new DataOutputStream(conn.getOutputStream)
      val header = Header.read(in)
      header.requestType match {
        case RequestType.READ =>
          performRead(header.path, out)
        case RequestType.WRITE =>
          performWrite(header.path, in)
        case other =>
          throw new IOException("Invalid header type " + other)
      }
      println("hi")
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      conn.close()
    }
  }

  def performWrite(path: String, in: DataInputStream) {
    var buffer = new Buffer()
    synchronized {
      if (buffers.contains(path))
        throw new IllegalArgumentException("Path " + path + " already exists")
      buffers(path) = buffer
    }
    var chunk = new Array[Byte](CHUNK_SIZE)
    var pos = 0
    while (true) {
      var numRead = in.read(chunk, pos, chunk.size - pos)
      if (numRead == -1) {
        buffer.addChunk(chunk.subArray(0, pos), true)
        return
      } else {
        pos += numRead
        if (pos == chunk.size) {
          buffer.addChunk(chunk, false)
          chunk = new Array[Byte](CHUNK_SIZE)
          pos = 0
        }
      }
    }
    // TODO: launch a thread to write the data to disk, and when this finishes,
    // remove the hard reference to buffer
  }

  def performRead(path: String, out: DataOutputStream) {
    var buffer: Buffer = null
    synchronized {
      if (!buffers.contains(path))
        throw new IllegalArgumentException("Path " + path + " doesn't exist")
      buffer = buffers(path)
    }
    for (chunk <- buffer.iterator) {
      out.write(chunk, 0, chunk.size)
    }
  }

  class Buffer {
    val chunks = new ArrayBuffer[Array[Byte]]
    var finished = false
    val mutex = new ReentrantLock
    val chunksAvailable = mutex.newCondition()

    def addChunk(chunk: Array[Byte], finish: Boolean) {
      mutex.lock()
      chunks += chunk
      finished = finish
      chunksAvailable.signalAll()
      mutex.unlock()
    }

    def iterator = new Iterator[Array[Byte]] {
      var index = 0

      def hasNext: Boolean = {
        mutex.lock()
        while (index >= chunks.size && !finished)
          chunksAvailable.await()
        val ret = (index < chunks.size)
        mutex.unlock()
        return ret
      }

      def next: Array[Byte] = {
        mutex.lock()
        if (!hasNext)
          throw new NoSuchElementException("End of file")
        val ret = chunks(index) // hasNext ensures we advance past index
        index += 1
        mutex.unlock()
        return ret
      }
    }
  }
}

object SlaveMain {
  def main(args: Array[String]) {
    val port = args(0).toInt
    val master = args(1)
    new Slave(port, master).start()
  }
}