diff options
author | tdas <tathagata.das1565@gmail.com> | 2012-11-08 11:35:40 +0000 |
---|---|---|
committer | tdas <tathagata.das1565@gmail.com> | 2012-11-08 11:35:40 +0000 |
commit | 52d21cb682d1c4ca05e6823f8049ccedc3c5530c (patch) | |
tree | 226f7ee936289ef2831caea60e7ebfa1a5b77579 | |
parent | cc2a65f54715ff0990d5873d50eec0dedf64d409 (diff) | |
download | spark-52d21cb682d1c4ca05e6823f8049ccedc3c5530c.tar.gz spark-52d21cb682d1c4ca05e6823f8049ccedc3c5530c.tar.bz2 spark-52d21cb682d1c4ca05e6823f8049ccedc3c5530c.zip |
Removed unnecessary files.
10 files changed, 0 insertions, 1643 deletions
diff --git a/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala b/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala deleted file mode 100644 index cde868a0c9..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala +++ /dev/null @@ -1,157 +0,0 @@ -package spark.streaming.util - -import spark.Logging - -import scala.collection.mutable.{ArrayBuffer, SynchronizedQueue} - -import java.net._ -import java.io._ -import java.nio._ -import java.nio.charset._ -import java.nio.channels._ -import java.nio.channels.spi._ - -abstract class ConnectionHandler(host: String, port: Int, connect: Boolean) -extends Thread with Logging { - - val selector = SelectorProvider.provider.openSelector() - val interestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] - - initLogging() - - override def run() { - try { - if (connect) { - connect() - } else { - listen() - } - - var interrupted = false - while(!interrupted) { - - preSelect() - - while(!interestChangeRequests.isEmpty) { - val (key, ops) = interestChangeRequests.dequeue - val lastOps = key.interestOps() - key.interestOps(ops) - - def intToOpStr(op: Int): String = { - val opStrs = new ArrayBuffer[String]() - if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ" - if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE" - if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT" - if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT" - if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " " - } - - logTrace("Changed ops from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]") - } - - selector.select() - interrupted = Thread.currentThread.isInterrupted - - val selectedKeys = selector.selectedKeys().iterator() - while (selectedKeys.hasNext) { - val key = selectedKeys.next.asInstanceOf[SelectionKey] - selectedKeys.remove() - if (key.isValid) { - if (key.isAcceptable) { - accept(key) - } else if (key.isConnectable) { - finishConnect(key) - } else if (key.isReadable) { - read(key) - } else if (key.isWritable) { - write(key) - } - } - } - } - } catch { - case e: Exception => { - logError("Error in select loop", e) - } - } - } - - def connect() { - val socketAddress = new InetSocketAddress(host, port) - val channel = SocketChannel.open() - channel.configureBlocking(false) - channel.socket.setReuseAddress(true) - channel.socket.setTcpNoDelay(true) - channel.connect(socketAddress) - channel.register(selector, SelectionKey.OP_CONNECT) - logInfo("Initiating connection to [" + socketAddress + "]") - } - - def listen() { - val channel = ServerSocketChannel.open() - channel.configureBlocking(false) - channel.socket.setReuseAddress(true) - channel.socket.setReceiveBufferSize(256 * 1024) - channel.socket.bind(new InetSocketAddress(port)) - channel.register(selector, SelectionKey.OP_ACCEPT) - logInfo("Listening on port " + port) - } - - def finishConnect(key: SelectionKey) { - try { - val channel = key.channel.asInstanceOf[SocketChannel] - val address = channel.socket.getRemoteSocketAddress - channel.finishConnect() - logInfo("Connected to [" + host + ":" + port + "]") - ready(key) - } catch { - case e: IOException => { - logError("Error finishing connect to " + host + ":" + port) - close(key) - } - } - } - - def accept(key: SelectionKey) { - try { - val serverChannel = key.channel.asInstanceOf[ServerSocketChannel] - val channel = serverChannel.accept() - val address = channel.socket.getRemoteSocketAddress - channel.configureBlocking(false) - logInfo("Accepted connection from [" + address + "]") - ready(channel.register(selector, 0)) - } catch { - case e: IOException => { - logError("Error accepting connection", e) - } - } - } - - def changeInterest(key: SelectionKey, ops: Int) { - logTrace("Added request to change ops to " + ops) - interestChangeRequests += ((key, ops)) - } - - def ready(key: SelectionKey) - - def preSelect() { - } - - def read(key: SelectionKey) { - throw new UnsupportedOperationException("Cannot read on connection of type " + this.getClass.toString) - } - - def write(key: SelectionKey) { - throw new UnsupportedOperationException("Cannot write on connection of type " + this.getClass.toString) - } - - def close(key: SelectionKey) { - try { - key.channel.close() - key.cancel() - Thread.currentThread.interrupt - } catch { - case e: Exception => logError("Error closing connection", e) - } - } -} diff --git a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala deleted file mode 100644 index 3922dfbad6..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala +++ /dev/null @@ -1,67 +0,0 @@ -package spark.streaming.util - -import java.net.{Socket, ServerSocket} -import java.io.{ByteArrayOutputStream, DataOutputStream, DataInputStream, BufferedInputStream} - -object Receiver { - def main(args: Array[String]) { - val port = args(0).toInt - val lsocket = new ServerSocket(port) - println("Listening on port " + port ) - while(true) { - val socket = lsocket.accept() - (new Thread() { - override def run() { - val buffer = new Array[Byte](100000) - var count = 0 - val time = System.currentTimeMillis - try { - val is = new DataInputStream(new BufferedInputStream(socket.getInputStream)) - var loop = true - var string: String = null - do { - string = is.readUTF() - if (string != null) { - count += 28 - } - } while (string != null) - } catch { - case e: Exception => e.printStackTrace() - } - val timeTaken = System.currentTimeMillis - time - val tput = (count / 1024.0) / (timeTaken / 1000.0) - println("Data = " + count + " bytes\nTime = " + timeTaken + " ms\nTput = " + tput + " KB/s") - } - }).start() - } - } - -} - -object Sender { - - def main(args: Array[String]) { - try { - val host = args(0) - val port = args(1).toInt - val size = args(2).toInt - - val byteStream = new ByteArrayOutputStream() - val stringDataStream = new DataOutputStream(byteStream) - (0 until size).foreach(_ => stringDataStream.writeUTF("abcdedfghijklmnopqrstuvwxy")) - val bytes = byteStream.toByteArray() - println("Generated array of " + bytes.length + " bytes") - - /*val bytes = new Array[Byte](size)*/ - val socket = new Socket(host, port) - val os = socket.getOutputStream - os.write(bytes) - os.flush - socket.close() - - } catch { - case e: Exception => e.printStackTrace - } - } -} - diff --git a/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala b/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala deleted file mode 100644 index 94e8f7a849..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala +++ /dev/null @@ -1,92 +0,0 @@ -package spark.streaming.util - -import spark._ - -import scala.collection.mutable.ArrayBuffer -import scala.util.Random -import scala.io.Source - -import java.net.InetSocketAddress - -import org.apache.hadoop.fs._ -import org.apache.hadoop.conf._ -import org.apache.hadoop.io._ -import org.apache.hadoop.mapred._ -import org.apache.hadoop.util._ - -object SentenceFileGenerator { - - def printUsage () { - println ("Usage: SentenceFileGenerator <master> <target directory> <# partitions> <sentence file> [<sentences per second>]") - System.exit(0) - } - - def main (args: Array[String]) { - if (args.length < 4) { - printUsage - } - - val master = args(0) - val fs = new Path(args(1)).getFileSystem(new Configuration()) - val targetDirectory = new Path(args(1)).makeQualified(fs) - val numPartitions = args(2).toInt - val sentenceFile = args(3) - val sentencesPerSecond = { - if (args.length > 4) args(4).toInt - else 10 - } - - val source = Source.fromFile(sentenceFile) - val lines = source.mkString.split ("\n").toArray - source.close () - println("Read " + lines.length + " lines from file " + sentenceFile) - - val sentences = { - val buffer = ArrayBuffer[String]() - val random = new Random() - var i = 0 - while (i < sentencesPerSecond) { - buffer += lines(random.nextInt(lines.length)) - i += 1 - } - buffer.toArray - } - println("Generated " + sentences.length + " sentences") - - val sc = new SparkContext(master, "SentenceFileGenerator") - val sentencesRDD = sc.parallelize(sentences, numPartitions) - - val tempDirectory = new Path(targetDirectory, "_tmp") - - fs.mkdirs(targetDirectory) - fs.mkdirs(tempDirectory) - - var saveTimeMillis = System.currentTimeMillis - try { - while (true) { - val newDir = new Path(targetDirectory, "Sentences-" + saveTimeMillis) - val tmpNewDir = new Path(tempDirectory, "Sentences-" + saveTimeMillis) - println("Writing to file " + newDir) - sentencesRDD.saveAsTextFile(tmpNewDir.toString) - fs.rename(tmpNewDir, newDir) - saveTimeMillis += 1000 - val sleepTimeMillis = { - val currentTimeMillis = System.currentTimeMillis - if (saveTimeMillis < currentTimeMillis) { - 0 - } else { - saveTimeMillis - currentTimeMillis - } - } - println("Sleeping for " + sleepTimeMillis + " ms") - Thread.sleep(sleepTimeMillis) - } - } catch { - case e: Exception => - } - } -} - - - - diff --git a/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala b/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala deleted file mode 100644 index 60085f4f88..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala +++ /dev/null @@ -1,23 +0,0 @@ -package spark.streaming.util - -import spark.SparkContext -import SparkContext._ - -object ShuffleTest { - def main(args: Array[String]) { - - if (args.length < 1) { - println ("Usage: ShuffleTest <host>") - System.exit(1) - } - - val sc = new spark.SparkContext(args(0), "ShuffleTest") - val rdd = sc.parallelize(1 to 1000, 500).cache - - def time(f: => Unit) { val start = System.nanoTime; f; println((System.nanoTime - start) * 1.0e-6) } - - time { for (i <- 0 until 50) time { rdd.map(x => (x % 100, x)).reduceByKey(_ + _, 10).count } } - System.exit(0) - } -} - diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala deleted file mode 100644 index 23e9235c60..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala +++ /dev/null @@ -1,107 +0,0 @@ -package spark.streaming.util - -import scala.util.Random -import scala.io.Source -import scala.actors._ -import scala.actors.Actor._ -import scala.actors.remote._ -import scala.actors.remote.RemoteActor._ - -import java.net.InetSocketAddress - - -object TestGenerator { - - def printUsage { - println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]") - System.exit(0) - } - /* - def generateRandomSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) { - val sleepBetweenSentences = 1000.0 / sentencesPerSecond.toDouble - 1 - val random = new Random () - - try { - var lastPrintTime = System.currentTimeMillis() - var count = 0 - while(true) { - streamReceiver ! lines(random.nextInt(lines.length)) - count += 1 - if (System.currentTimeMillis - lastPrintTime >= 1000) { - println (count + " sentences sent last second") - count = 0 - lastPrintTime = System.currentTimeMillis - } - Thread.sleep(sleepBetweenSentences.toLong) - } - } catch { - case e: Exception => - } - }*/ - - def generateSameSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) { - try { - val numSentences = if (sentencesPerSecond <= 0) { - lines.length - } else { - sentencesPerSecond - } - val sentences = lines.take(numSentences).toArray - - var nextSendingTime = System.currentTimeMillis() - val sendAsArray = true - while(true) { - if (sendAsArray) { - println("Sending as array") - streamReceiver !? sentences - } else { - println("Sending individually") - sentences.foreach(sentence => { - streamReceiver !? sentence - }) - } - println ("Sent " + numSentences + " sentences in " + (System.currentTimeMillis - nextSendingTime) + " ms") - nextSendingTime += 1000 - val sleepTime = nextSendingTime - System.currentTimeMillis - if (sleepTime > 0) { - println ("Sleeping for " + sleepTime + " ms") - Thread.sleep(sleepTime) - } - } - } catch { - case e: Exception => - } - } - - def main(args: Array[String]) { - if (args.length < 3) { - printUsage - } - - val generateRandomly = false - - val streamReceiverIP = args(0) - val streamReceiverPort = args(1).toInt - val sentenceFile = args(2) - val sentencesPerSecond = if (args.length > 3) args(3).toInt else 10 - val sentenceInputName = if (args.length > 4) args(4) else "Sentences" - - println("Sending " + sentencesPerSecond + " sentences per second to " + - streamReceiverIP + ":" + streamReceiverPort + "/NetworkStreamReceiver-" + sentenceInputName) - val source = Source.fromFile(sentenceFile) - val lines = source.mkString.split ("\n") - source.close () - - val streamReceiver = select( - Node(streamReceiverIP, streamReceiverPort), - Symbol("NetworkStreamReceiver-" + sentenceInputName)) - if (generateRandomly) { - /*generateRandomSentences(lines, sentencesPerSecond, streamReceiver)*/ - } else { - generateSameSentences(lines, sentencesPerSecond, streamReceiver) - } - } -} - - - diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala deleted file mode 100644 index ff840d084f..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala +++ /dev/null @@ -1,119 +0,0 @@ -package spark.streaming.util - -import scala.util.Random -import scala.io.Source -import scala.actors._ -import scala.actors.Actor._ -import scala.actors.remote._ -import scala.actors.remote.RemoteActor._ - -import java.io.{DataOutputStream, ByteArrayOutputStream, DataInputStream} -import java.net.Socket - -object TestGenerator2 { - - def printUsage { - println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]") - System.exit(0) - } - - def sendSentences(streamReceiverHost: String, streamReceiverPort: Int, numSentences: Int, bytes: Array[Byte], intervalTime: Long){ - try { - println("Connecting to " + streamReceiverHost + ":" + streamReceiverPort) - val socket = new Socket(streamReceiverHost, streamReceiverPort) - - println("Sending " + numSentences+ " sentences / " + (bytes.length / 1024.0 / 1024.0) + " MB per " + intervalTime + " ms to " + streamReceiverHost + ":" + streamReceiverPort ) - val currentTime = System.currentTimeMillis - var targetTime = (currentTime / intervalTime + 1).toLong * intervalTime - Thread.sleep(targetTime - currentTime) - - while(true) { - val startTime = System.currentTimeMillis() - println("Sending at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms") - val socketOutputStream = socket.getOutputStream - val parts = 10 - (0 until parts).foreach(i => { - val partStartTime = System.currentTimeMillis - - val offset = (i * bytes.length / parts).toInt - val len = math.min(((i + 1) * bytes.length / parts).toInt - offset, bytes.length) - socketOutputStream.write(bytes, offset, len) - socketOutputStream.flush() - val partFinishTime = System.currentTimeMillis - println("Sending part " + i + " of " + len + " bytes took " + (partFinishTime - partStartTime) + " ms") - val sleepTime = math.max(0, 1000 / parts - (partFinishTime - partStartTime) - 1) - Thread.sleep(sleepTime) - }) - - socketOutputStream.flush() - /*val socketInputStream = new DataInputStream(socket.getInputStream)*/ - /*val reply = socketInputStream.readUTF()*/ - val finishTime = System.currentTimeMillis() - println ("Sent " + bytes.length + " bytes in " + (finishTime - startTime) + " ms for interval [" + targetTime + ", " + (targetTime + intervalTime) + "]") - /*println("Received = " + reply)*/ - targetTime = targetTime + intervalTime - val sleepTime = (targetTime - finishTime) + 10 - if (sleepTime > 0) { - println("Sleeping for " + sleepTime + " ms") - Thread.sleep(sleepTime) - } else { - println("############################") - println("###### Skipping sleep ######") - println("############################") - } - } - } catch { - case e: Exception => println(e) - } - println("Stopped sending") - } - - def main(args: Array[String]) { - if (args.length < 4) { - printUsage - } - - val streamReceiverHost = args(0) - val streamReceiverPort = args(1).toInt - val sentenceFile = args(2) - val intervalTime = args(3).toLong - val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0 - - println("Reading the file " + sentenceFile) - val source = Source.fromFile(sentenceFile) - val lines = source.mkString.split ("\n") - source.close() - - val numSentences = if (sentencesPerInterval <= 0) { - lines.length - } else { - sentencesPerInterval - } - - println("Generating sentences") - val sentences: Array[String] = if (numSentences <= lines.length) { - lines.take(numSentences).toArray - } else { - (0 until numSentences).map(i => lines(i % lines.length)).toArray - } - - println("Converting to byte array") - val byteStream = new ByteArrayOutputStream() - val stringDataStream = new DataOutputStream(byteStream) - /*stringDataStream.writeInt(sentences.size)*/ - sentences.foreach(stringDataStream.writeUTF) - val bytes = byteStream.toByteArray() - stringDataStream.close() - println("Generated array of " + bytes.length + " bytes") - - /*while(true) { */ - sendSentences(streamReceiverHost, streamReceiverPort, numSentences, bytes, intervalTime) - /*println("Sleeping for 5 seconds")*/ - /*Thread.sleep(5000)*/ - /*System.gc()*/ - /*}*/ - } -} - - - diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala deleted file mode 100644 index 9c39ef3e12..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala +++ /dev/null @@ -1,244 +0,0 @@ -package spark.streaming.util - -import spark.Logging - -import scala.util.Random -import scala.io.Source -import scala.collection.mutable.{ArrayBuffer, Queue} - -import java.net._ -import java.io._ -import java.nio._ -import java.nio.charset._ -import java.nio.channels._ - -import it.unimi.dsi.fastutil.io._ - -class TestGenerator4(targetHost: String, targetPort: Int, sentenceFile: String, intervalDuration: Long, sentencesPerInterval: Int) -extends Logging { - - class SendingConnectionHandler(host: String, port: Int, generator: TestGenerator4) - extends ConnectionHandler(host, port, true) { - - val buffers = new ArrayBuffer[ByteBuffer] - val newBuffers = new Queue[ByteBuffer] - var activeKey: SelectionKey = null - - def send(buffer: ByteBuffer) { - logDebug("Sending: " + buffer) - newBuffers.synchronized { - newBuffers.enqueue(buffer) - } - selector.wakeup() - buffer.synchronized { - buffer.wait() - } - } - - override def ready(key: SelectionKey) { - logDebug("Ready") - activeKey = key - val channel = key.channel.asInstanceOf[SocketChannel] - channel.register(selector, SelectionKey.OP_WRITE) - generator.startSending() - } - - override def preSelect() { - newBuffers.synchronized { - while(!newBuffers.isEmpty) { - val buffer = newBuffers.dequeue - buffers += buffer - logDebug("Added: " + buffer) - changeInterest(activeKey, SelectionKey.OP_WRITE) - } - } - } - - override def write(key: SelectionKey) { - try { - /*while(true) {*/ - val channel = key.channel.asInstanceOf[SocketChannel] - if (buffers.size > 0) { - val buffer = buffers(0) - val newBuffer = buffer.slice() - newBuffer.limit(math.min(newBuffer.remaining, 32768)) - val bytesWritten = channel.write(newBuffer) - buffer.position(buffer.position + bytesWritten) - if (bytesWritten == 0) return - if (buffer.remaining == 0) { - buffers -= buffer - buffer.synchronized { - buffer.notify() - } - } - /*changeInterest(key, SelectionKey.OP_WRITE)*/ - } else { - changeInterest(key, 0) - } - /*}*/ - } catch { - case e: IOException => { - if (e.toString.contains("pipe") || e.toString.contains("reset")) { - logError("Connection broken") - } else { - logError("Connection error", e) - } - close(key) - } - } - } - - override def close(key: SelectionKey) { - buffers.clear() - super.close(key) - } - } - - initLogging() - - val connectionHandler = new SendingConnectionHandler(targetHost, targetPort, this) - var sendingThread: Thread = null - var sendCount = 0 - val sendBatches = 5 - - def run() { - logInfo("Connection handler started") - connectionHandler.start() - connectionHandler.join() - if (sendingThread != null && !sendingThread.isInterrupted) { - sendingThread.interrupt - } - logInfo("Connection handler stopped") - } - - def startSending() { - sendingThread = new Thread() { - override def run() { - logInfo("STARTING TO SEND") - sendSentences() - logInfo("SENDING STOPPED AFTER " + sendCount) - connectionHandler.interrupt() - } - } - sendingThread.start() - } - - def stopSending() { - sendingThread.interrupt() - } - - def sendSentences() { - logInfo("Reading the file " + sentenceFile) - val source = Source.fromFile(sentenceFile) - val lines = source.mkString.split ("\n") - source.close() - - val numSentences = if (sentencesPerInterval <= 0) { - lines.length - } else { - sentencesPerInterval - } - - logInfo("Generating sentence buffer") - val sentences: Array[String] = if (numSentences <= lines.length) { - lines.take(numSentences).toArray - } else { - (0 until numSentences).map(i => lines(i % lines.length)).toArray - } - - /* - val sentences: Array[String] = if (numSentences <= lines.length) { - lines.take((numSentences / sendBatches).toInt).toArray - } else { - (0 until (numSentences/sendBatches)).map(i => lines(i % lines.length)).toArray - }*/ - - - val serializer = new spark.KryoSerializer().newInstance() - val byteStream = new FastByteArrayOutputStream(100 * 1024 * 1024) - serializer.serializeStream(byteStream).writeAll(sentences.toIterator.asInstanceOf[Iterator[Any]]).close() - byteStream.trim() - val sentenceBuffer = ByteBuffer.wrap(byteStream.array) - - logInfo("Sending " + numSentences+ " sentences / " + sentenceBuffer.limit + " bytes per " + intervalDuration + " ms to " + targetHost + ":" + targetPort ) - val currentTime = System.currentTimeMillis - var targetTime = (currentTime / intervalDuration + 1).toLong * intervalDuration - Thread.sleep(targetTime - currentTime) - - val totalBytes = sentenceBuffer.limit - - while(true) { - val batchesInCurrentInterval = sendBatches // if (sendCount < 10) 1 else sendBatches - - val startTime = System.currentTimeMillis() - logDebug("Sending # " + sendCount + " at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms") - - (0 until batchesInCurrentInterval).foreach(i => { - try { - val position = (i * totalBytes / sendBatches).toInt - val limit = if (i == sendBatches - 1) { - totalBytes - } else { - ((i + 1) * totalBytes / sendBatches).toInt - 1 - } - - val partStartTime = System.currentTimeMillis - sentenceBuffer.limit(limit) - connectionHandler.send(sentenceBuffer) - val partFinishTime = System.currentTimeMillis - val sleepTime = math.max(0, intervalDuration / sendBatches - (partFinishTime - partStartTime) - 1) - Thread.sleep(sleepTime) - - } catch { - case ie: InterruptedException => return - case e: Exception => e.printStackTrace() - } - }) - sentenceBuffer.rewind() - - val finishTime = System.currentTimeMillis() - /*logInfo ("Sent " + sentenceBuffer.limit + " bytes in " + (finishTime - startTime) + " ms")*/ - targetTime = targetTime + intervalDuration //+ (if (sendCount < 3) 1000 else 0) - - val sleepTime = (targetTime - finishTime) + 20 - if (sleepTime > 0) { - logInfo("Sleeping for " + sleepTime + " ms") - Thread.sleep(sleepTime) - } else { - logInfo("###### Skipping sleep ######") - } - if (Thread.currentThread.isInterrupted) { - return - } - sendCount += 1 - } - } -} - -object TestGenerator4 { - def printUsage { - println("Usage: TestGenerator4 <target IP> <target port> <sentence file> <interval duration> [<sentences per second>]") - System.exit(0) - } - - def main(args: Array[String]) { - println("GENERATOR STARTED") - if (args.length < 4) { - printUsage - } - - - val streamReceiverHost = args(0) - val streamReceiverPort = args(1).toInt - val sentenceFile = args(2) - val intervalDuration = args(3).toLong - val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0 - - while(true) { - val generator = new TestGenerator4(streamReceiverHost, streamReceiverPort, sentenceFile, intervalDuration, sentencesPerInterval) - generator.run() - Thread.sleep(2000) - } - println("GENERATOR STOPPED") - } -} diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala deleted file mode 100644 index f584f772bb..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala +++ /dev/null @@ -1,39 +0,0 @@ -package spark.streaming.util - -import spark.streaming._ -import spark.Logging - -import akka.actor._ -import akka.actor.Actor -import akka.actor.Actor._ - -sealed trait TestStreamCoordinatorMessage -case class GetStreamDetails extends TestStreamCoordinatorMessage -case class GotStreamDetails(name: String, duration: Long) extends TestStreamCoordinatorMessage -case class TestStarted extends TestStreamCoordinatorMessage - -class TestStreamCoordinator(streamDetails: Array[(String, Long)]) extends Actor with Logging { - - var index = 0 - - initLogging() - - logInfo("Created") - - def receive = { - case TestStarted => { - sender ! "OK" - } - - case GetStreamDetails => { - val streamDetail = if (index >= streamDetails.length) null else streamDetails(index) - sender ! GotStreamDetails(streamDetail._1, streamDetail._2) - index += 1 - if (streamDetail != null) { - logInfo("Allocated " + streamDetail._1 + " (" + index + "/" + streamDetails.length + ")" ) - } - } - } - -} - diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala deleted file mode 100644 index 80ad924dd8..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala +++ /dev/null @@ -1,421 +0,0 @@ -package spark.streaming.util - -import spark._ -import spark.storage._ -import spark.util.AkkaUtils -import spark.streaming._ - -import scala.math._ -import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap} - -import akka.actor._ -import akka.actor.Actor -import akka.dispatch._ -import akka.pattern.ask -import akka.util.duration._ - -import java.io.DataInputStream -import java.io.BufferedInputStream -import java.net.Socket -import java.net.ServerSocket -import java.util.LinkedHashMap - -import org.apache.hadoop.fs._ -import org.apache.hadoop.conf._ -import org.apache.hadoop.io._ -import org.apache.hadoop.mapred._ -import org.apache.hadoop.util._ - -import spark.Utils - - -class TestStreamReceiver3(actorSystem: ActorSystem, blockManager: BlockManager) -extends Thread with Logging { - - - class DataHandler( - inputName: String, - longIntervalDuration: Time, - shortIntervalDuration: Time, - blockManager: BlockManager - ) - extends Logging { - - class Block(var id: String, var shortInterval: Interval) { - val data = ArrayBuffer[String]() - var pushed = false - def longInterval = getLongInterval(shortInterval) - def empty() = (data.size == 0) - def += (str: String) = (data += str) - override def toString() = "Block " + id - } - - class Bucket(val longInterval: Interval) { - val blocks = new ArrayBuffer[Block]() - var filled = false - def += (block: Block) = blocks += block - def empty() = (blocks.size == 0) - def ready() = (filled && !blocks.exists(! _.pushed)) - def blockIds() = blocks.map(_.id).toArray - override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]" - } - - initLogging() - - val shortIntervalDurationMillis = shortIntervalDuration.toLong - val longIntervalDurationMillis = longIntervalDuration.toLong - - var currentBlock: Block = null - var currentBucket: Bucket = null - - val blocksForPushing = new Queue[Block]() - val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket] - - val blockUpdatingThread = new Thread() { override def run() { keepUpdatingCurrentBlock() } } - val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } - - def start() { - blockUpdatingThread.start() - blockPushingThread.start() - } - - def += (data: String) = addData(data) - - def addData(data: String) { - if (currentBlock == null) { - updateCurrentBlock() - } - currentBlock.synchronized { - currentBlock += data - } - } - - def getShortInterval(time: Time): Interval = { - val intervalBegin = time.floor(shortIntervalDuration) - Interval(intervalBegin, intervalBegin + shortIntervalDuration) - } - - def getLongInterval(shortInterval: Interval): Interval = { - val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration) - Interval(intervalBegin, intervalBegin + longIntervalDuration) - } - - def updateCurrentBlock() { - /*logInfo("Updating current block")*/ - val currentTime = Time(System.currentTimeMillis) - val shortInterval = getShortInterval(currentTime) - val longInterval = getLongInterval(shortInterval) - - def createBlock(reuseCurrentBlock: Boolean = false) { - val newBlockId = inputName + "-" + longInterval.toFormattedString + "-" + currentBucket.blocks.size - if (!reuseCurrentBlock) { - val newBlock = new Block(newBlockId, shortInterval) - /*logInfo("Created " + currentBlock)*/ - currentBlock = newBlock - } else { - currentBlock.shortInterval = shortInterval - currentBlock.id = newBlockId - } - } - - def createBucket() { - val newBucket = new Bucket(longInterval) - buckets += ((longInterval, newBucket)) - currentBucket = newBucket - /*logInfo("Created " + currentBucket + ", " + buckets.size + " buckets")*/ - } - - if (currentBlock == null || currentBucket == null) { - createBucket() - currentBucket.synchronized { - createBlock() - } - return - } - - currentBlock.synchronized { - var reuseCurrentBlock = false - - if (shortInterval != currentBlock.shortInterval) { - if (!currentBlock.empty) { - blocksForPushing.synchronized { - blocksForPushing += currentBlock - blocksForPushing.notifyAll() - } - } - - currentBucket.synchronized { - if (currentBlock.empty) { - reuseCurrentBlock = true - } else { - currentBucket += currentBlock - } - - if (longInterval != currentBucket.longInterval) { - currentBucket.filled = true - if (currentBucket.ready) { - currentBucket.notifyAll() - } - createBucket() - } - } - - createBlock(reuseCurrentBlock) - } - } - } - - def pushBlock(block: Block) { - try{ - if (blockManager != null) { - logInfo("Pushing block") - val startTime = System.currentTimeMillis - - val bytes = blockManager.dataSerialize("rdd_", block.data.toIterator) // TODO: Will this be an RDD block? - val finishTime = System.currentTimeMillis - logInfo(block + " serialization delay is " + (finishTime - startTime) / 1000.0 + " s") - - blockManager.putBytes(block.id.toString, bytes, StorageLevel.MEMORY_AND_DISK_SER_2) - /*blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_DESER_2)*/ - /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY_DESER)*/ - /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY)*/ - val finishTime1 = System.currentTimeMillis - logInfo(block + " put delay is " + (finishTime1 - startTime) / 1000.0 + " s") - } else { - logWarning(block + " not put as block manager is null") - } - } catch { - case e: Exception => logError("Exception writing " + block + " to blockmanager" , e) - } - } - - def getBucket(longInterval: Interval): Option[Bucket] = { - buckets.get(longInterval) - } - - def clearBucket(longInterval: Interval) { - buckets.remove(longInterval) - } - - def keepUpdatingCurrentBlock() { - logInfo("Thread to update current block started") - while(true) { - updateCurrentBlock() - val currentTimeMillis = System.currentTimeMillis - val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) * - shortIntervalDurationMillis - currentTimeMillis + 1 - Thread.sleep(sleepTimeMillis) - } - } - - def keepPushingBlocks() { - var loop = true - logInfo("Thread to push blocks started") - while(loop) { - val block = blocksForPushing.synchronized { - if (blocksForPushing.size == 0) { - blocksForPushing.wait() - } - blocksForPushing.dequeue - } - pushBlock(block) - block.pushed = true - block.data.clear() - - val bucket = buckets(block.longInterval) - bucket.synchronized { - if (bucket.ready) { - bucket.notifyAll() - } - } - } - } - } - - - class ConnectionListener(port: Int, dataHandler: DataHandler) - extends Thread with Logging { - initLogging() - override def run { - try { - val listener = new ServerSocket(port) - logInfo("Listening on port " + port) - while (true) { - new ConnectionHandler(listener.accept(), dataHandler).start(); - } - listener.close() - } catch { - case e: Exception => logError("", e); - } - } - } - - class ConnectionHandler(socket: Socket, dataHandler: DataHandler) extends Thread with Logging { - initLogging() - override def run { - logInfo("New connection from " + socket.getInetAddress() + ":" + socket.getPort) - val bytes = new Array[Byte](100 * 1024 * 1024) - try { - - val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream, 1024 * 1024)) - /*val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream))*/ - var str: String = null - str = inputStream.readUTF - while(str != null) { - dataHandler += str - str = inputStream.readUTF() - } - - /* - var loop = true - while(loop) { - val numRead = inputStream.read(bytes) - if (numRead < 0) { - loop = false - } - inbox += ((LongTime(SystemTime.currentTimeMillis), "test")) - }*/ - - inputStream.close() - } catch { - case e => logError("Error receiving data", e) - } - socket.close() - } - } - - initLogging() - - val masterHost = System.getProperty("spark.master.host") - val masterPort = System.getProperty("spark.master.port").toInt - - val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort) - val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler") - val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator") - - logInfo("Getting stream details from master " + masterHost + ":" + masterPort) - - val timeout = 50 millis - - var started = false - while (!started) { - askActor[String](testStreamCoordinator, TestStarted) match { - case Some(str) => { - started = true - logInfo("TestStreamCoordinator started") - } - case None => { - logInfo("TestStreamCoordinator not started yet") - Thread.sleep(200) - } - } - } - - val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match { - case Some(details) => details - case None => throw new Exception("Could not get stream details") - } - logInfo("Stream details received: " + streamDetails) - - val inputName = streamDetails.name - val intervalDurationMillis = streamDetails.duration - val intervalDuration = Time(intervalDurationMillis) - - val dataHandler = new DataHandler( - inputName, - intervalDuration, - Time(TestStreamReceiver3.SHORT_INTERVAL_MILLIS), - blockManager) - - val connListener = new ConnectionListener(TestStreamReceiver3.PORT, dataHandler) - - // Send a message to an actor and return an option with its reply, or None if this times out - def askActor[T](actor: ActorRef, message: Any): Option[T] = { - try { - val future = actor.ask(message)(timeout) - return Some(Await.result(future, timeout).asInstanceOf[T]) - } catch { - case e: Exception => - logInfo("Error communicating with " + actor, e) - return None - } - } - - override def run() { - connListener.start() - dataHandler.start() - - var interval = Interval.currentInterval(intervalDuration) - var dataStarted = false - - while(true) { - waitFor(interval.endTime) - logInfo("Woken up at " + System.currentTimeMillis + " for " + interval) - dataHandler.getBucket(interval) match { - case Some(bucket) => { - logInfo("Found " + bucket + " for " + interval) - bucket.synchronized { - if (!bucket.ready) { - logInfo("Waiting for " + bucket) - bucket.wait() - logInfo("Wait over for " + bucket) - } - if (dataStarted || !bucket.empty) { - logInfo("Notifying " + bucket) - notifyScheduler(interval, bucket.blockIds) - dataStarted = true - } - bucket.blocks.clear() - dataHandler.clearBucket(interval) - } - } - case None => { - logInfo("Found none for " + interval) - if (dataStarted) { - logInfo("Notifying none") - notifyScheduler(interval, Array[String]()) - } - } - } - interval = interval.next - } - } - - def waitFor(time: Time) { - val currentTimeMillis = System.currentTimeMillis - val targetTimeMillis = time.milliseconds - if (currentTimeMillis < targetTimeMillis) { - val sleepTime = (targetTimeMillis - currentTimeMillis) - Thread.sleep(sleepTime + 1) - } - } - - def notifyScheduler(interval: Interval, blockIds: Array[String]) { - try { - sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray) - val time = interval.endTime - val delay = (System.currentTimeMillis - time.milliseconds) / 1000.0 - logInfo("Pushing delay for " + time + " is " + delay + " s") - } catch { - case _ => logError("Exception notifying scheduler at interval " + interval) - } - } -} - -object TestStreamReceiver3 { - - val PORT = 9999 - val SHORT_INTERVAL_MILLIS = 100 - - def main(args: Array[String]) { - System.setProperty("spark.master.host", Utils.localHostName) - System.setProperty("spark.master.port", "7078") - val details = Array(("Sentences", 2000L)) - val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078) - actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator") - new TestStreamReceiver3(actorSystem, null).start() - } -} - - - diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala deleted file mode 100644 index 31754870dd..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala +++ /dev/null @@ -1,374 +0,0 @@ -package spark.streaming.util - -import spark.streaming._ -import spark._ -import spark.storage._ -import spark.util.AkkaUtils - -import scala.math._ -import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap} - -import java.io._ -import java.nio._ -import java.nio.charset._ -import java.nio.channels._ -import java.util.concurrent.Executors - -import akka.actor._ -import akka.actor.Actor -import akka.dispatch._ -import akka.pattern.ask -import akka.util.duration._ - -class TestStreamReceiver4(actorSystem: ActorSystem, blockManager: BlockManager) -extends Thread with Logging { - - class DataHandler( - inputName: String, - longIntervalDuration: Time, - shortIntervalDuration: Time, - blockManager: BlockManager - ) - extends Logging { - - class Block(val id: String, val shortInterval: Interval, val buffer: ByteBuffer) { - var pushed = false - def longInterval = getLongInterval(shortInterval) - override def toString() = "Block " + id - } - - class Bucket(val longInterval: Interval) { - val blocks = new ArrayBuffer[Block]() - var filled = false - def += (block: Block) = blocks += block - def empty() = (blocks.size == 0) - def ready() = (filled && !blocks.exists(! _.pushed)) - def blockIds() = blocks.map(_.id).toArray - override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]" - } - - initLogging() - - val syncOnLastShortInterval = true - - val shortIntervalDurationMillis = shortIntervalDuration.milliseconds - val longIntervalDurationMillis = longIntervalDuration.milliseconds - - val buffer = ByteBuffer.allocateDirect(100 * 1024 * 1024) - var currentShortInterval = Interval.currentInterval(shortIntervalDuration) - - val blocksForPushing = new Queue[Block]() - val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket] - - val bufferProcessingThread = new Thread() { override def run() { keepProcessingBuffers() } } - val blockPushingExecutor = Executors.newFixedThreadPool(5) - - - def start() { - buffer.clear() - if (buffer.remaining == 0) { - throw new Exception("Buffer initialization error") - } - bufferProcessingThread.start() - } - - def readDataToBuffer(func: ByteBuffer => Int): Int = { - buffer.synchronized { - if (buffer.remaining == 0) { - logInfo("Received first data for interval " + currentShortInterval) - } - func(buffer) - } - } - - def getLongInterval(shortInterval: Interval): Interval = { - val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration) - Interval(intervalBegin, intervalBegin + longIntervalDuration) - } - - def processBuffer() { - - def readInt(buffer: ByteBuffer): Int = { - var offset = 0 - var result = 0 - while (offset < 32) { - val b = buffer.get() - result |= ((b & 0x7F) << offset) - if ((b & 0x80) == 0) { - return result - } - offset += 7 - } - throw new Exception("Malformed zigzag-encoded integer") - } - - val currentLongInterval = getLongInterval(currentShortInterval) - val startTime = System.currentTimeMillis - val newBuffer: ByteBuffer = buffer.synchronized { - buffer.flip() - if (buffer.remaining == 0) { - buffer.clear() - null - } else { - logDebug("Processing interval " + currentShortInterval + " with delay of " + (System.currentTimeMillis - startTime) + " ms") - val startTime1 = System.currentTimeMillis - var loop = true - var count = 0 - while(loop) { - buffer.mark() - try { - val len = readInt(buffer) - buffer.position(buffer.position + len) - count += 1 - } catch { - case e: Exception => { - buffer.reset() - loop = false - } - } - } - val bytesToCopy = buffer.position - val newBuf = ByteBuffer.allocate(bytesToCopy) - buffer.position(0) - newBuf.put(buffer.slice().limit(bytesToCopy).asInstanceOf[ByteBuffer]) - newBuf.flip() - buffer.position(bytesToCopy) - buffer.compact() - newBuf - } - } - - if (newBuffer != null) { - val bucket = buckets.getOrElseUpdate(currentLongInterval, new Bucket(currentLongInterval)) - bucket.synchronized { - val newBlockId = inputName + "-" + currentLongInterval.toFormattedString + "-" + currentShortInterval.toFormattedString - val newBlock = new Block(newBlockId, currentShortInterval, newBuffer) - if (syncOnLastShortInterval) { - bucket += newBlock - } - logDebug("Created " + newBlock + " with " + newBuffer.remaining + " bytes, creation delay is " + (System.currentTimeMillis - currentShortInterval.endTime.milliseconds) / 1000.0 + " s" ) - blockPushingExecutor.execute(new Runnable() { def run() { pushAndNotifyBlock(newBlock) } }) - } - } - - val newShortInterval = Interval.currentInterval(shortIntervalDuration) - val newLongInterval = getLongInterval(newShortInterval) - - if (newLongInterval != currentLongInterval) { - buckets.get(currentLongInterval) match { - case Some(bucket) => { - bucket.synchronized { - bucket.filled = true - if (bucket.ready) { - bucket.notifyAll() - } - } - } - case None => - } - buckets += ((newLongInterval, new Bucket(newLongInterval))) - } - - currentShortInterval = newShortInterval - } - - def pushBlock(block: Block) { - try{ - if (blockManager != null) { - val startTime = System.currentTimeMillis - logInfo(block + " put start delay is " + (startTime - block.shortInterval.endTime.milliseconds) + " ms") - /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY)*/ - /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_2)*/ - blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY_2) - /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY)*/ - /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER)*/ - /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER_2)*/ - val finishTime = System.currentTimeMillis - logInfo(block + " put delay is " + (finishTime - startTime) + " ms") - } else { - logWarning(block + " not put as block manager is null") - } - } catch { - case e: Exception => logError("Exception writing " + block + " to blockmanager" , e) - } - } - - def getBucket(longInterval: Interval): Option[Bucket] = { - buckets.get(longInterval) - } - - def clearBucket(longInterval: Interval) { - buckets.remove(longInterval) - } - - def keepProcessingBuffers() { - logInfo("Thread to process buffers started") - while(true) { - processBuffer() - val currentTimeMillis = System.currentTimeMillis - val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) * - shortIntervalDurationMillis - currentTimeMillis + 1 - Thread.sleep(sleepTimeMillis) - } - } - - def pushAndNotifyBlock(block: Block) { - pushBlock(block) - block.pushed = true - val bucket = if (syncOnLastShortInterval) { - buckets(block.longInterval) - } else { - var longInterval = block.longInterval - while(!buckets.contains(longInterval)) { - logWarning("Skipping bucket of " + longInterval + " for " + block) - longInterval = longInterval.next - } - val chosenBucket = buckets(longInterval) - logDebug("Choosing bucket of " + longInterval + " for " + block) - chosenBucket += block - chosenBucket - } - - bucket.synchronized { - if (bucket.ready) { - bucket.notifyAll() - } - } - - } - } - - - class ReceivingConnectionHandler(host: String, port: Int, dataHandler: DataHandler) - extends ConnectionHandler(host, port, false) { - - override def ready(key: SelectionKey) { - changeInterest(key, SelectionKey.OP_READ) - } - - override def read(key: SelectionKey) { - try { - val channel = key.channel.asInstanceOf[SocketChannel] - val bytesRead = dataHandler.readDataToBuffer(channel.read) - if (bytesRead < 0) { - close(key) - } - } catch { - case e: IOException => { - logError("Error reading", e) - close(key) - } - } - } - } - - initLogging() - - val masterHost = System.getProperty("spark.master.host", "localhost") - val masterPort = System.getProperty("spark.master.port", "7078").toInt - - val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort) - val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler") - val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator") - - logInfo("Getting stream details from master " + masterHost + ":" + masterPort) - - val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match { - case Some(details) => details - case None => throw new Exception("Could not get stream details") - } - logInfo("Stream details received: " + streamDetails) - - val inputName = streamDetails.name - val intervalDurationMillis = streamDetails.duration - val intervalDuration = Milliseconds(intervalDurationMillis) - val shortIntervalDuration = Milliseconds(System.getProperty("spark.stream.shortinterval", "500").toInt) - - val dataHandler = new DataHandler(inputName, intervalDuration, shortIntervalDuration, blockManager) - val connectionHandler = new ReceivingConnectionHandler("localhost", 9999, dataHandler) - - val timeout = 100 millis - - // Send a message to an actor and return an option with its reply, or None if this times out - def askActor[T](actor: ActorRef, message: Any): Option[T] = { - try { - val future = actor.ask(message)(timeout) - return Some(Await.result(future, timeout).asInstanceOf[T]) - } catch { - case e: Exception => - logInfo("Error communicating with " + actor, e) - return None - } - } - - override def run() { - connectionHandler.start() - dataHandler.start() - - var interval = Interval.currentInterval(intervalDuration) - var dataStarted = false - - - while(true) { - waitFor(interval.endTime) - /*logInfo("Woken up at " + System.currentTimeMillis + " for " + interval)*/ - dataHandler.getBucket(interval) match { - case Some(bucket) => { - logDebug("Found " + bucket + " for " + interval) - bucket.synchronized { - if (!bucket.ready) { - logDebug("Waiting for " + bucket) - bucket.wait() - logDebug("Wait over for " + bucket) - } - if (dataStarted || !bucket.empty) { - logDebug("Notifying " + bucket) - notifyScheduler(interval, bucket.blockIds) - dataStarted = true - } - bucket.blocks.clear() - dataHandler.clearBucket(interval) - } - } - case None => { - logDebug("Found none for " + interval) - if (dataStarted) { - logDebug("Notifying none") - notifyScheduler(interval, Array[String]()) - } - } - } - interval = interval.next - } - } - - def waitFor(time: Time) { - val currentTimeMillis = System.currentTimeMillis - val targetTimeMillis = time.milliseconds - if (currentTimeMillis < targetTimeMillis) { - val sleepTime = (targetTimeMillis - currentTimeMillis) - Thread.sleep(sleepTime + 1) - } - } - - def notifyScheduler(interval: Interval, blockIds: Array[String]) { - try { - sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray) - val time = interval.endTime - val delay = (System.currentTimeMillis - time.milliseconds) - logInfo("Notification delay for " + time + " is " + delay + " ms") - } catch { - case e: Exception => logError("Exception notifying scheduler at interval " + interval + ": " + e) - } - } -} - - -object TestStreamReceiver4 { - def main(args: Array[String]) { - val details = Array(("Sentences", 2000L)) - val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078) - actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator") - new TestStreamReceiver4(actorSystem, null).start() - } -} |