aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authortdas <tathagata.das1565@gmail.com>2012-11-08 11:35:40 +0000
committertdas <tathagata.das1565@gmail.com>2012-11-08 11:35:40 +0000
commit52d21cb682d1c4ca05e6823f8049ccedc3c5530c (patch)
tree226f7ee936289ef2831caea60e7ebfa1a5b77579 /streaming
parentcc2a65f54715ff0990d5873d50eec0dedf64d409 (diff)
downloadspark-52d21cb682d1c4ca05e6823f8049ccedc3c5530c.tar.gz
spark-52d21cb682d1c4ca05e6823f8049ccedc3c5530c.tar.bz2
spark-52d21cb682d1c4ca05e6823f8049ccedc3c5530c.zip
Removed unnecessary files.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala157
-rw-r--r--streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala67
-rw-r--r--streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala92
-rw-r--r--streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala23
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestGenerator.scala107
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala119
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala244
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala39
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala421
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala374
10 files changed, 0 insertions, 1643 deletions
diff --git a/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala b/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala
deleted file mode 100644
index cde868a0c9..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-package spark.streaming.util
-
-import spark.Logging
-
-import scala.collection.mutable.{ArrayBuffer, SynchronizedQueue}
-
-import java.net._
-import java.io._
-import java.nio._
-import java.nio.charset._
-import java.nio.channels._
-import java.nio.channels.spi._
-
-abstract class ConnectionHandler(host: String, port: Int, connect: Boolean)
-extends Thread with Logging {
-
- val selector = SelectorProvider.provider.openSelector()
- val interestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
-
- initLogging()
-
- override def run() {
- try {
- if (connect) {
- connect()
- } else {
- listen()
- }
-
- var interrupted = false
- while(!interrupted) {
-
- preSelect()
-
- while(!interestChangeRequests.isEmpty) {
- val (key, ops) = interestChangeRequests.dequeue
- val lastOps = key.interestOps()
- key.interestOps(ops)
-
- def intToOpStr(op: Int): String = {
- val opStrs = new ArrayBuffer[String]()
- if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
- if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
- if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
- if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
- if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
- }
-
- logTrace("Changed ops from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
- }
-
- selector.select()
- interrupted = Thread.currentThread.isInterrupted
-
- val selectedKeys = selector.selectedKeys().iterator()
- while (selectedKeys.hasNext) {
- val key = selectedKeys.next.asInstanceOf[SelectionKey]
- selectedKeys.remove()
- if (key.isValid) {
- if (key.isAcceptable) {
- accept(key)
- } else if (key.isConnectable) {
- finishConnect(key)
- } else if (key.isReadable) {
- read(key)
- } else if (key.isWritable) {
- write(key)
- }
- }
- }
- }
- } catch {
- case e: Exception => {
- logError("Error in select loop", e)
- }
- }
- }
-
- def connect() {
- val socketAddress = new InetSocketAddress(host, port)
- val channel = SocketChannel.open()
- channel.configureBlocking(false)
- channel.socket.setReuseAddress(true)
- channel.socket.setTcpNoDelay(true)
- channel.connect(socketAddress)
- channel.register(selector, SelectionKey.OP_CONNECT)
- logInfo("Initiating connection to [" + socketAddress + "]")
- }
-
- def listen() {
- val channel = ServerSocketChannel.open()
- channel.configureBlocking(false)
- channel.socket.setReuseAddress(true)
- channel.socket.setReceiveBufferSize(256 * 1024)
- channel.socket.bind(new InetSocketAddress(port))
- channel.register(selector, SelectionKey.OP_ACCEPT)
- logInfo("Listening on port " + port)
- }
-
- def finishConnect(key: SelectionKey) {
- try {
- val channel = key.channel.asInstanceOf[SocketChannel]
- val address = channel.socket.getRemoteSocketAddress
- channel.finishConnect()
- logInfo("Connected to [" + host + ":" + port + "]")
- ready(key)
- } catch {
- case e: IOException => {
- logError("Error finishing connect to " + host + ":" + port)
- close(key)
- }
- }
- }
-
- def accept(key: SelectionKey) {
- try {
- val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
- val channel = serverChannel.accept()
- val address = channel.socket.getRemoteSocketAddress
- channel.configureBlocking(false)
- logInfo("Accepted connection from [" + address + "]")
- ready(channel.register(selector, 0))
- } catch {
- case e: IOException => {
- logError("Error accepting connection", e)
- }
- }
- }
-
- def changeInterest(key: SelectionKey, ops: Int) {
- logTrace("Added request to change ops to " + ops)
- interestChangeRequests += ((key, ops))
- }
-
- def ready(key: SelectionKey)
-
- def preSelect() {
- }
-
- def read(key: SelectionKey) {
- throw new UnsupportedOperationException("Cannot read on connection of type " + this.getClass.toString)
- }
-
- def write(key: SelectionKey) {
- throw new UnsupportedOperationException("Cannot write on connection of type " + this.getClass.toString)
- }
-
- def close(key: SelectionKey) {
- try {
- key.channel.close()
- key.cancel()
- Thread.currentThread.interrupt
- } catch {
- case e: Exception => logError("Error closing connection", e)
- }
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala
deleted file mode 100644
index 3922dfbad6..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-package spark.streaming.util
-
-import java.net.{Socket, ServerSocket}
-import java.io.{ByteArrayOutputStream, DataOutputStream, DataInputStream, BufferedInputStream}
-
-object Receiver {
- def main(args: Array[String]) {
- val port = args(0).toInt
- val lsocket = new ServerSocket(port)
- println("Listening on port " + port )
- while(true) {
- val socket = lsocket.accept()
- (new Thread() {
- override def run() {
- val buffer = new Array[Byte](100000)
- var count = 0
- val time = System.currentTimeMillis
- try {
- val is = new DataInputStream(new BufferedInputStream(socket.getInputStream))
- var loop = true
- var string: String = null
- do {
- string = is.readUTF()
- if (string != null) {
- count += 28
- }
- } while (string != null)
- } catch {
- case e: Exception => e.printStackTrace()
- }
- val timeTaken = System.currentTimeMillis - time
- val tput = (count / 1024.0) / (timeTaken / 1000.0)
- println("Data = " + count + " bytes\nTime = " + timeTaken + " ms\nTput = " + tput + " KB/s")
- }
- }).start()
- }
- }
-
-}
-
-object Sender {
-
- def main(args: Array[String]) {
- try {
- val host = args(0)
- val port = args(1).toInt
- val size = args(2).toInt
-
- val byteStream = new ByteArrayOutputStream()
- val stringDataStream = new DataOutputStream(byteStream)
- (0 until size).foreach(_ => stringDataStream.writeUTF("abcdedfghijklmnopqrstuvwxy"))
- val bytes = byteStream.toByteArray()
- println("Generated array of " + bytes.length + " bytes")
-
- /*val bytes = new Array[Byte](size)*/
- val socket = new Socket(host, port)
- val os = socket.getOutputStream
- os.write(bytes)
- os.flush
- socket.close()
-
- } catch {
- case e: Exception => e.printStackTrace
- }
- }
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala b/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala
deleted file mode 100644
index 94e8f7a849..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-package spark.streaming.util
-
-import spark._
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-import scala.io.Source
-
-import java.net.InetSocketAddress
-
-import org.apache.hadoop.fs._
-import org.apache.hadoop.conf._
-import org.apache.hadoop.io._
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.util._
-
-object SentenceFileGenerator {
-
- def printUsage () {
- println ("Usage: SentenceFileGenerator <master> <target directory> <# partitions> <sentence file> [<sentences per second>]")
- System.exit(0)
- }
-
- def main (args: Array[String]) {
- if (args.length < 4) {
- printUsage
- }
-
- val master = args(0)
- val fs = new Path(args(1)).getFileSystem(new Configuration())
- val targetDirectory = new Path(args(1)).makeQualified(fs)
- val numPartitions = args(2).toInt
- val sentenceFile = args(3)
- val sentencesPerSecond = {
- if (args.length > 4) args(4).toInt
- else 10
- }
-
- val source = Source.fromFile(sentenceFile)
- val lines = source.mkString.split ("\n").toArray
- source.close ()
- println("Read " + lines.length + " lines from file " + sentenceFile)
-
- val sentences = {
- val buffer = ArrayBuffer[String]()
- val random = new Random()
- var i = 0
- while (i < sentencesPerSecond) {
- buffer += lines(random.nextInt(lines.length))
- i += 1
- }
- buffer.toArray
- }
- println("Generated " + sentences.length + " sentences")
-
- val sc = new SparkContext(master, "SentenceFileGenerator")
- val sentencesRDD = sc.parallelize(sentences, numPartitions)
-
- val tempDirectory = new Path(targetDirectory, "_tmp")
-
- fs.mkdirs(targetDirectory)
- fs.mkdirs(tempDirectory)
-
- var saveTimeMillis = System.currentTimeMillis
- try {
- while (true) {
- val newDir = new Path(targetDirectory, "Sentences-" + saveTimeMillis)
- val tmpNewDir = new Path(tempDirectory, "Sentences-" + saveTimeMillis)
- println("Writing to file " + newDir)
- sentencesRDD.saveAsTextFile(tmpNewDir.toString)
- fs.rename(tmpNewDir, newDir)
- saveTimeMillis += 1000
- val sleepTimeMillis = {
- val currentTimeMillis = System.currentTimeMillis
- if (saveTimeMillis < currentTimeMillis) {
- 0
- } else {
- saveTimeMillis - currentTimeMillis
- }
- }
- println("Sleeping for " + sleepTimeMillis + " ms")
- Thread.sleep(sleepTimeMillis)
- }
- } catch {
- case e: Exception =>
- }
- }
-}
-
-
-
-
diff --git a/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala b/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala
deleted file mode 100644
index 60085f4f88..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package spark.streaming.util
-
-import spark.SparkContext
-import SparkContext._
-
-object ShuffleTest {
- def main(args: Array[String]) {
-
- if (args.length < 1) {
- println ("Usage: ShuffleTest <host>")
- System.exit(1)
- }
-
- val sc = new spark.SparkContext(args(0), "ShuffleTest")
- val rdd = sc.parallelize(1 to 1000, 500).cache
-
- def time(f: => Unit) { val start = System.nanoTime; f; println((System.nanoTime - start) * 1.0e-6) }
-
- time { for (i <- 0 until 50) time { rdd.map(x => (x % 100, x)).reduceByKey(_ + _, 10).count } }
- System.exit(0)
- }
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala
deleted file mode 100644
index 23e9235c60..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-package spark.streaming.util
-
-import scala.util.Random
-import scala.io.Source
-import scala.actors._
-import scala.actors.Actor._
-import scala.actors.remote._
-import scala.actors.remote.RemoteActor._
-
-import java.net.InetSocketAddress
-
-
-object TestGenerator {
-
- def printUsage {
- println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]")
- System.exit(0)
- }
- /*
- def generateRandomSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) {
- val sleepBetweenSentences = 1000.0 / sentencesPerSecond.toDouble - 1
- val random = new Random ()
-
- try {
- var lastPrintTime = System.currentTimeMillis()
- var count = 0
- while(true) {
- streamReceiver ! lines(random.nextInt(lines.length))
- count += 1
- if (System.currentTimeMillis - lastPrintTime >= 1000) {
- println (count + " sentences sent last second")
- count = 0
- lastPrintTime = System.currentTimeMillis
- }
- Thread.sleep(sleepBetweenSentences.toLong)
- }
- } catch {
- case e: Exception =>
- }
- }*/
-
- def generateSameSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) {
- try {
- val numSentences = if (sentencesPerSecond <= 0) {
- lines.length
- } else {
- sentencesPerSecond
- }
- val sentences = lines.take(numSentences).toArray
-
- var nextSendingTime = System.currentTimeMillis()
- val sendAsArray = true
- while(true) {
- if (sendAsArray) {
- println("Sending as array")
- streamReceiver !? sentences
- } else {
- println("Sending individually")
- sentences.foreach(sentence => {
- streamReceiver !? sentence
- })
- }
- println ("Sent " + numSentences + " sentences in " + (System.currentTimeMillis - nextSendingTime) + " ms")
- nextSendingTime += 1000
- val sleepTime = nextSendingTime - System.currentTimeMillis
- if (sleepTime > 0) {
- println ("Sleeping for " + sleepTime + " ms")
- Thread.sleep(sleepTime)
- }
- }
- } catch {
- case e: Exception =>
- }
- }
-
- def main(args: Array[String]) {
- if (args.length < 3) {
- printUsage
- }
-
- val generateRandomly = false
-
- val streamReceiverIP = args(0)
- val streamReceiverPort = args(1).toInt
- val sentenceFile = args(2)
- val sentencesPerSecond = if (args.length > 3) args(3).toInt else 10
- val sentenceInputName = if (args.length > 4) args(4) else "Sentences"
-
- println("Sending " + sentencesPerSecond + " sentences per second to " +
- streamReceiverIP + ":" + streamReceiverPort + "/NetworkStreamReceiver-" + sentenceInputName)
- val source = Source.fromFile(sentenceFile)
- val lines = source.mkString.split ("\n")
- source.close ()
-
- val streamReceiver = select(
- Node(streamReceiverIP, streamReceiverPort),
- Symbol("NetworkStreamReceiver-" + sentenceInputName))
- if (generateRandomly) {
- /*generateRandomSentences(lines, sentencesPerSecond, streamReceiver)*/
- } else {
- generateSameSentences(lines, sentencesPerSecond, streamReceiver)
- }
- }
-}
-
-
-
diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala
deleted file mode 100644
index ff840d084f..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-package spark.streaming.util
-
-import scala.util.Random
-import scala.io.Source
-import scala.actors._
-import scala.actors.Actor._
-import scala.actors.remote._
-import scala.actors.remote.RemoteActor._
-
-import java.io.{DataOutputStream, ByteArrayOutputStream, DataInputStream}
-import java.net.Socket
-
-object TestGenerator2 {
-
- def printUsage {
- println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]")
- System.exit(0)
- }
-
- def sendSentences(streamReceiverHost: String, streamReceiverPort: Int, numSentences: Int, bytes: Array[Byte], intervalTime: Long){
- try {
- println("Connecting to " + streamReceiverHost + ":" + streamReceiverPort)
- val socket = new Socket(streamReceiverHost, streamReceiverPort)
-
- println("Sending " + numSentences+ " sentences / " + (bytes.length / 1024.0 / 1024.0) + " MB per " + intervalTime + " ms to " + streamReceiverHost + ":" + streamReceiverPort )
- val currentTime = System.currentTimeMillis
- var targetTime = (currentTime / intervalTime + 1).toLong * intervalTime
- Thread.sleep(targetTime - currentTime)
-
- while(true) {
- val startTime = System.currentTimeMillis()
- println("Sending at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms")
- val socketOutputStream = socket.getOutputStream
- val parts = 10
- (0 until parts).foreach(i => {
- val partStartTime = System.currentTimeMillis
-
- val offset = (i * bytes.length / parts).toInt
- val len = math.min(((i + 1) * bytes.length / parts).toInt - offset, bytes.length)
- socketOutputStream.write(bytes, offset, len)
- socketOutputStream.flush()
- val partFinishTime = System.currentTimeMillis
- println("Sending part " + i + " of " + len + " bytes took " + (partFinishTime - partStartTime) + " ms")
- val sleepTime = math.max(0, 1000 / parts - (partFinishTime - partStartTime) - 1)
- Thread.sleep(sleepTime)
- })
-
- socketOutputStream.flush()
- /*val socketInputStream = new DataInputStream(socket.getInputStream)*/
- /*val reply = socketInputStream.readUTF()*/
- val finishTime = System.currentTimeMillis()
- println ("Sent " + bytes.length + " bytes in " + (finishTime - startTime) + " ms for interval [" + targetTime + ", " + (targetTime + intervalTime) + "]")
- /*println("Received = " + reply)*/
- targetTime = targetTime + intervalTime
- val sleepTime = (targetTime - finishTime) + 10
- if (sleepTime > 0) {
- println("Sleeping for " + sleepTime + " ms")
- Thread.sleep(sleepTime)
- } else {
- println("############################")
- println("###### Skipping sleep ######")
- println("############################")
- }
- }
- } catch {
- case e: Exception => println(e)
- }
- println("Stopped sending")
- }
-
- def main(args: Array[String]) {
- if (args.length < 4) {
- printUsage
- }
-
- val streamReceiverHost = args(0)
- val streamReceiverPort = args(1).toInt
- val sentenceFile = args(2)
- val intervalTime = args(3).toLong
- val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0
-
- println("Reading the file " + sentenceFile)
- val source = Source.fromFile(sentenceFile)
- val lines = source.mkString.split ("\n")
- source.close()
-
- val numSentences = if (sentencesPerInterval <= 0) {
- lines.length
- } else {
- sentencesPerInterval
- }
-
- println("Generating sentences")
- val sentences: Array[String] = if (numSentences <= lines.length) {
- lines.take(numSentences).toArray
- } else {
- (0 until numSentences).map(i => lines(i % lines.length)).toArray
- }
-
- println("Converting to byte array")
- val byteStream = new ByteArrayOutputStream()
- val stringDataStream = new DataOutputStream(byteStream)
- /*stringDataStream.writeInt(sentences.size)*/
- sentences.foreach(stringDataStream.writeUTF)
- val bytes = byteStream.toByteArray()
- stringDataStream.close()
- println("Generated array of " + bytes.length + " bytes")
-
- /*while(true) { */
- sendSentences(streamReceiverHost, streamReceiverPort, numSentences, bytes, intervalTime)
- /*println("Sleeping for 5 seconds")*/
- /*Thread.sleep(5000)*/
- /*System.gc()*/
- /*}*/
- }
-}
-
-
-
diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala
deleted file mode 100644
index 9c39ef3e12..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala
+++ /dev/null
@@ -1,244 +0,0 @@
-package spark.streaming.util
-
-import spark.Logging
-
-import scala.util.Random
-import scala.io.Source
-import scala.collection.mutable.{ArrayBuffer, Queue}
-
-import java.net._
-import java.io._
-import java.nio._
-import java.nio.charset._
-import java.nio.channels._
-
-import it.unimi.dsi.fastutil.io._
-
-class TestGenerator4(targetHost: String, targetPort: Int, sentenceFile: String, intervalDuration: Long, sentencesPerInterval: Int)
-extends Logging {
-
- class SendingConnectionHandler(host: String, port: Int, generator: TestGenerator4)
- extends ConnectionHandler(host, port, true) {
-
- val buffers = new ArrayBuffer[ByteBuffer]
- val newBuffers = new Queue[ByteBuffer]
- var activeKey: SelectionKey = null
-
- def send(buffer: ByteBuffer) {
- logDebug("Sending: " + buffer)
- newBuffers.synchronized {
- newBuffers.enqueue(buffer)
- }
- selector.wakeup()
- buffer.synchronized {
- buffer.wait()
- }
- }
-
- override def ready(key: SelectionKey) {
- logDebug("Ready")
- activeKey = key
- val channel = key.channel.asInstanceOf[SocketChannel]
- channel.register(selector, SelectionKey.OP_WRITE)
- generator.startSending()
- }
-
- override def preSelect() {
- newBuffers.synchronized {
- while(!newBuffers.isEmpty) {
- val buffer = newBuffers.dequeue
- buffers += buffer
- logDebug("Added: " + buffer)
- changeInterest(activeKey, SelectionKey.OP_WRITE)
- }
- }
- }
-
- override def write(key: SelectionKey) {
- try {
- /*while(true) {*/
- val channel = key.channel.asInstanceOf[SocketChannel]
- if (buffers.size > 0) {
- val buffer = buffers(0)
- val newBuffer = buffer.slice()
- newBuffer.limit(math.min(newBuffer.remaining, 32768))
- val bytesWritten = channel.write(newBuffer)
- buffer.position(buffer.position + bytesWritten)
- if (bytesWritten == 0) return
- if (buffer.remaining == 0) {
- buffers -= buffer
- buffer.synchronized {
- buffer.notify()
- }
- }
- /*changeInterest(key, SelectionKey.OP_WRITE)*/
- } else {
- changeInterest(key, 0)
- }
- /*}*/
- } catch {
- case e: IOException => {
- if (e.toString.contains("pipe") || e.toString.contains("reset")) {
- logError("Connection broken")
- } else {
- logError("Connection error", e)
- }
- close(key)
- }
- }
- }
-
- override def close(key: SelectionKey) {
- buffers.clear()
- super.close(key)
- }
- }
-
- initLogging()
-
- val connectionHandler = new SendingConnectionHandler(targetHost, targetPort, this)
- var sendingThread: Thread = null
- var sendCount = 0
- val sendBatches = 5
-
- def run() {
- logInfo("Connection handler started")
- connectionHandler.start()
- connectionHandler.join()
- if (sendingThread != null && !sendingThread.isInterrupted) {
- sendingThread.interrupt
- }
- logInfo("Connection handler stopped")
- }
-
- def startSending() {
- sendingThread = new Thread() {
- override def run() {
- logInfo("STARTING TO SEND")
- sendSentences()
- logInfo("SENDING STOPPED AFTER " + sendCount)
- connectionHandler.interrupt()
- }
- }
- sendingThread.start()
- }
-
- def stopSending() {
- sendingThread.interrupt()
- }
-
- def sendSentences() {
- logInfo("Reading the file " + sentenceFile)
- val source = Source.fromFile(sentenceFile)
- val lines = source.mkString.split ("\n")
- source.close()
-
- val numSentences = if (sentencesPerInterval <= 0) {
- lines.length
- } else {
- sentencesPerInterval
- }
-
- logInfo("Generating sentence buffer")
- val sentences: Array[String] = if (numSentences <= lines.length) {
- lines.take(numSentences).toArray
- } else {
- (0 until numSentences).map(i => lines(i % lines.length)).toArray
- }
-
- /*
- val sentences: Array[String] = if (numSentences <= lines.length) {
- lines.take((numSentences / sendBatches).toInt).toArray
- } else {
- (0 until (numSentences/sendBatches)).map(i => lines(i % lines.length)).toArray
- }*/
-
-
- val serializer = new spark.KryoSerializer().newInstance()
- val byteStream = new FastByteArrayOutputStream(100 * 1024 * 1024)
- serializer.serializeStream(byteStream).writeAll(sentences.toIterator.asInstanceOf[Iterator[Any]]).close()
- byteStream.trim()
- val sentenceBuffer = ByteBuffer.wrap(byteStream.array)
-
- logInfo("Sending " + numSentences+ " sentences / " + sentenceBuffer.limit + " bytes per " + intervalDuration + " ms to " + targetHost + ":" + targetPort )
- val currentTime = System.currentTimeMillis
- var targetTime = (currentTime / intervalDuration + 1).toLong * intervalDuration
- Thread.sleep(targetTime - currentTime)
-
- val totalBytes = sentenceBuffer.limit
-
- while(true) {
- val batchesInCurrentInterval = sendBatches // if (sendCount < 10) 1 else sendBatches
-
- val startTime = System.currentTimeMillis()
- logDebug("Sending # " + sendCount + " at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms")
-
- (0 until batchesInCurrentInterval).foreach(i => {
- try {
- val position = (i * totalBytes / sendBatches).toInt
- val limit = if (i == sendBatches - 1) {
- totalBytes
- } else {
- ((i + 1) * totalBytes / sendBatches).toInt - 1
- }
-
- val partStartTime = System.currentTimeMillis
- sentenceBuffer.limit(limit)
- connectionHandler.send(sentenceBuffer)
- val partFinishTime = System.currentTimeMillis
- val sleepTime = math.max(0, intervalDuration / sendBatches - (partFinishTime - partStartTime) - 1)
- Thread.sleep(sleepTime)
-
- } catch {
- case ie: InterruptedException => return
- case e: Exception => e.printStackTrace()
- }
- })
- sentenceBuffer.rewind()
-
- val finishTime = System.currentTimeMillis()
- /*logInfo ("Sent " + sentenceBuffer.limit + " bytes in " + (finishTime - startTime) + " ms")*/
- targetTime = targetTime + intervalDuration //+ (if (sendCount < 3) 1000 else 0)
-
- val sleepTime = (targetTime - finishTime) + 20
- if (sleepTime > 0) {
- logInfo("Sleeping for " + sleepTime + " ms")
- Thread.sleep(sleepTime)
- } else {
- logInfo("###### Skipping sleep ######")
- }
- if (Thread.currentThread.isInterrupted) {
- return
- }
- sendCount += 1
- }
- }
-}
-
-object TestGenerator4 {
- def printUsage {
- println("Usage: TestGenerator4 <target IP> <target port> <sentence file> <interval duration> [<sentences per second>]")
- System.exit(0)
- }
-
- def main(args: Array[String]) {
- println("GENERATOR STARTED")
- if (args.length < 4) {
- printUsage
- }
-
-
- val streamReceiverHost = args(0)
- val streamReceiverPort = args(1).toInt
- val sentenceFile = args(2)
- val intervalDuration = args(3).toLong
- val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0
-
- while(true) {
- val generator = new TestGenerator4(streamReceiverHost, streamReceiverPort, sentenceFile, intervalDuration, sentencesPerInterval)
- generator.run()
- Thread.sleep(2000)
- }
- println("GENERATOR STOPPED")
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala
deleted file mode 100644
index f584f772bb..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-package spark.streaming.util
-
-import spark.streaming._
-import spark.Logging
-
-import akka.actor._
-import akka.actor.Actor
-import akka.actor.Actor._
-
-sealed trait TestStreamCoordinatorMessage
-case class GetStreamDetails extends TestStreamCoordinatorMessage
-case class GotStreamDetails(name: String, duration: Long) extends TestStreamCoordinatorMessage
-case class TestStarted extends TestStreamCoordinatorMessage
-
-class TestStreamCoordinator(streamDetails: Array[(String, Long)]) extends Actor with Logging {
-
- var index = 0
-
- initLogging()
-
- logInfo("Created")
-
- def receive = {
- case TestStarted => {
- sender ! "OK"
- }
-
- case GetStreamDetails => {
- val streamDetail = if (index >= streamDetails.length) null else streamDetails(index)
- sender ! GotStreamDetails(streamDetail._1, streamDetail._2)
- index += 1
- if (streamDetail != null) {
- logInfo("Allocated " + streamDetail._1 + " (" + index + "/" + streamDetails.length + ")" )
- }
- }
- }
-
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala
deleted file mode 100644
index 80ad924dd8..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala
+++ /dev/null
@@ -1,421 +0,0 @@
-package spark.streaming.util
-
-import spark._
-import spark.storage._
-import spark.util.AkkaUtils
-import spark.streaming._
-
-import scala.math._
-import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap}
-
-import akka.actor._
-import akka.actor.Actor
-import akka.dispatch._
-import akka.pattern.ask
-import akka.util.duration._
-
-import java.io.DataInputStream
-import java.io.BufferedInputStream
-import java.net.Socket
-import java.net.ServerSocket
-import java.util.LinkedHashMap
-
-import org.apache.hadoop.fs._
-import org.apache.hadoop.conf._
-import org.apache.hadoop.io._
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.util._
-
-import spark.Utils
-
-
-class TestStreamReceiver3(actorSystem: ActorSystem, blockManager: BlockManager)
-extends Thread with Logging {
-
-
- class DataHandler(
- inputName: String,
- longIntervalDuration: Time,
- shortIntervalDuration: Time,
- blockManager: BlockManager
- )
- extends Logging {
-
- class Block(var id: String, var shortInterval: Interval) {
- val data = ArrayBuffer[String]()
- var pushed = false
- def longInterval = getLongInterval(shortInterval)
- def empty() = (data.size == 0)
- def += (str: String) = (data += str)
- override def toString() = "Block " + id
- }
-
- class Bucket(val longInterval: Interval) {
- val blocks = new ArrayBuffer[Block]()
- var filled = false
- def += (block: Block) = blocks += block
- def empty() = (blocks.size == 0)
- def ready() = (filled && !blocks.exists(! _.pushed))
- def blockIds() = blocks.map(_.id).toArray
- override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]"
- }
-
- initLogging()
-
- val shortIntervalDurationMillis = shortIntervalDuration.toLong
- val longIntervalDurationMillis = longIntervalDuration.toLong
-
- var currentBlock: Block = null
- var currentBucket: Bucket = null
-
- val blocksForPushing = new Queue[Block]()
- val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket]
-
- val blockUpdatingThread = new Thread() { override def run() { keepUpdatingCurrentBlock() } }
- val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
-
- def start() {
- blockUpdatingThread.start()
- blockPushingThread.start()
- }
-
- def += (data: String) = addData(data)
-
- def addData(data: String) {
- if (currentBlock == null) {
- updateCurrentBlock()
- }
- currentBlock.synchronized {
- currentBlock += data
- }
- }
-
- def getShortInterval(time: Time): Interval = {
- val intervalBegin = time.floor(shortIntervalDuration)
- Interval(intervalBegin, intervalBegin + shortIntervalDuration)
- }
-
- def getLongInterval(shortInterval: Interval): Interval = {
- val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration)
- Interval(intervalBegin, intervalBegin + longIntervalDuration)
- }
-
- def updateCurrentBlock() {
- /*logInfo("Updating current block")*/
- val currentTime = Time(System.currentTimeMillis)
- val shortInterval = getShortInterval(currentTime)
- val longInterval = getLongInterval(shortInterval)
-
- def createBlock(reuseCurrentBlock: Boolean = false) {
- val newBlockId = inputName + "-" + longInterval.toFormattedString + "-" + currentBucket.blocks.size
- if (!reuseCurrentBlock) {
- val newBlock = new Block(newBlockId, shortInterval)
- /*logInfo("Created " + currentBlock)*/
- currentBlock = newBlock
- } else {
- currentBlock.shortInterval = shortInterval
- currentBlock.id = newBlockId
- }
- }
-
- def createBucket() {
- val newBucket = new Bucket(longInterval)
- buckets += ((longInterval, newBucket))
- currentBucket = newBucket
- /*logInfo("Created " + currentBucket + ", " + buckets.size + " buckets")*/
- }
-
- if (currentBlock == null || currentBucket == null) {
- createBucket()
- currentBucket.synchronized {
- createBlock()
- }
- return
- }
-
- currentBlock.synchronized {
- var reuseCurrentBlock = false
-
- if (shortInterval != currentBlock.shortInterval) {
- if (!currentBlock.empty) {
- blocksForPushing.synchronized {
- blocksForPushing += currentBlock
- blocksForPushing.notifyAll()
- }
- }
-
- currentBucket.synchronized {
- if (currentBlock.empty) {
- reuseCurrentBlock = true
- } else {
- currentBucket += currentBlock
- }
-
- if (longInterval != currentBucket.longInterval) {
- currentBucket.filled = true
- if (currentBucket.ready) {
- currentBucket.notifyAll()
- }
- createBucket()
- }
- }
-
- createBlock(reuseCurrentBlock)
- }
- }
- }
-
- def pushBlock(block: Block) {
- try{
- if (blockManager != null) {
- logInfo("Pushing block")
- val startTime = System.currentTimeMillis
-
- val bytes = blockManager.dataSerialize("rdd_", block.data.toIterator) // TODO: Will this be an RDD block?
- val finishTime = System.currentTimeMillis
- logInfo(block + " serialization delay is " + (finishTime - startTime) / 1000.0 + " s")
-
- blockManager.putBytes(block.id.toString, bytes, StorageLevel.MEMORY_AND_DISK_SER_2)
- /*blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_DESER_2)*/
- /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY_DESER)*/
- /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY)*/
- val finishTime1 = System.currentTimeMillis
- logInfo(block + " put delay is " + (finishTime1 - startTime) / 1000.0 + " s")
- } else {
- logWarning(block + " not put as block manager is null")
- }
- } catch {
- case e: Exception => logError("Exception writing " + block + " to blockmanager" , e)
- }
- }
-
- def getBucket(longInterval: Interval): Option[Bucket] = {
- buckets.get(longInterval)
- }
-
- def clearBucket(longInterval: Interval) {
- buckets.remove(longInterval)
- }
-
- def keepUpdatingCurrentBlock() {
- logInfo("Thread to update current block started")
- while(true) {
- updateCurrentBlock()
- val currentTimeMillis = System.currentTimeMillis
- val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) *
- shortIntervalDurationMillis - currentTimeMillis + 1
- Thread.sleep(sleepTimeMillis)
- }
- }
-
- def keepPushingBlocks() {
- var loop = true
- logInfo("Thread to push blocks started")
- while(loop) {
- val block = blocksForPushing.synchronized {
- if (blocksForPushing.size == 0) {
- blocksForPushing.wait()
- }
- blocksForPushing.dequeue
- }
- pushBlock(block)
- block.pushed = true
- block.data.clear()
-
- val bucket = buckets(block.longInterval)
- bucket.synchronized {
- if (bucket.ready) {
- bucket.notifyAll()
- }
- }
- }
- }
- }
-
-
- class ConnectionListener(port: Int, dataHandler: DataHandler)
- extends Thread with Logging {
- initLogging()
- override def run {
- try {
- val listener = new ServerSocket(port)
- logInfo("Listening on port " + port)
- while (true) {
- new ConnectionHandler(listener.accept(), dataHandler).start();
- }
- listener.close()
- } catch {
- case e: Exception => logError("", e);
- }
- }
- }
-
- class ConnectionHandler(socket: Socket, dataHandler: DataHandler) extends Thread with Logging {
- initLogging()
- override def run {
- logInfo("New connection from " + socket.getInetAddress() + ":" + socket.getPort)
- val bytes = new Array[Byte](100 * 1024 * 1024)
- try {
-
- val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream, 1024 * 1024))
- /*val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream))*/
- var str: String = null
- str = inputStream.readUTF
- while(str != null) {
- dataHandler += str
- str = inputStream.readUTF()
- }
-
- /*
- var loop = true
- while(loop) {
- val numRead = inputStream.read(bytes)
- if (numRead < 0) {
- loop = false
- }
- inbox += ((LongTime(SystemTime.currentTimeMillis), "test"))
- }*/
-
- inputStream.close()
- } catch {
- case e => logError("Error receiving data", e)
- }
- socket.close()
- }
- }
-
- initLogging()
-
- val masterHost = System.getProperty("spark.master.host")
- val masterPort = System.getProperty("spark.master.port").toInt
-
- val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort)
- val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler")
- val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator")
-
- logInfo("Getting stream details from master " + masterHost + ":" + masterPort)
-
- val timeout = 50 millis
-
- var started = false
- while (!started) {
- askActor[String](testStreamCoordinator, TestStarted) match {
- case Some(str) => {
- started = true
- logInfo("TestStreamCoordinator started")
- }
- case None => {
- logInfo("TestStreamCoordinator not started yet")
- Thread.sleep(200)
- }
- }
- }
-
- val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match {
- case Some(details) => details
- case None => throw new Exception("Could not get stream details")
- }
- logInfo("Stream details received: " + streamDetails)
-
- val inputName = streamDetails.name
- val intervalDurationMillis = streamDetails.duration
- val intervalDuration = Time(intervalDurationMillis)
-
- val dataHandler = new DataHandler(
- inputName,
- intervalDuration,
- Time(TestStreamReceiver3.SHORT_INTERVAL_MILLIS),
- blockManager)
-
- val connListener = new ConnectionListener(TestStreamReceiver3.PORT, dataHandler)
-
- // Send a message to an actor and return an option with its reply, or None if this times out
- def askActor[T](actor: ActorRef, message: Any): Option[T] = {
- try {
- val future = actor.ask(message)(timeout)
- return Some(Await.result(future, timeout).asInstanceOf[T])
- } catch {
- case e: Exception =>
- logInfo("Error communicating with " + actor, e)
- return None
- }
- }
-
- override def run() {
- connListener.start()
- dataHandler.start()
-
- var interval = Interval.currentInterval(intervalDuration)
- var dataStarted = false
-
- while(true) {
- waitFor(interval.endTime)
- logInfo("Woken up at " + System.currentTimeMillis + " for " + interval)
- dataHandler.getBucket(interval) match {
- case Some(bucket) => {
- logInfo("Found " + bucket + " for " + interval)
- bucket.synchronized {
- if (!bucket.ready) {
- logInfo("Waiting for " + bucket)
- bucket.wait()
- logInfo("Wait over for " + bucket)
- }
- if (dataStarted || !bucket.empty) {
- logInfo("Notifying " + bucket)
- notifyScheduler(interval, bucket.blockIds)
- dataStarted = true
- }
- bucket.blocks.clear()
- dataHandler.clearBucket(interval)
- }
- }
- case None => {
- logInfo("Found none for " + interval)
- if (dataStarted) {
- logInfo("Notifying none")
- notifyScheduler(interval, Array[String]())
- }
- }
- }
- interval = interval.next
- }
- }
-
- def waitFor(time: Time) {
- val currentTimeMillis = System.currentTimeMillis
- val targetTimeMillis = time.milliseconds
- if (currentTimeMillis < targetTimeMillis) {
- val sleepTime = (targetTimeMillis - currentTimeMillis)
- Thread.sleep(sleepTime + 1)
- }
- }
-
- def notifyScheduler(interval: Interval, blockIds: Array[String]) {
- try {
- sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray)
- val time = interval.endTime
- val delay = (System.currentTimeMillis - time.milliseconds) / 1000.0
- logInfo("Pushing delay for " + time + " is " + delay + " s")
- } catch {
- case _ => logError("Exception notifying scheduler at interval " + interval)
- }
- }
-}
-
-object TestStreamReceiver3 {
-
- val PORT = 9999
- val SHORT_INTERVAL_MILLIS = 100
-
- def main(args: Array[String]) {
- System.setProperty("spark.master.host", Utils.localHostName)
- System.setProperty("spark.master.port", "7078")
- val details = Array(("Sentences", 2000L))
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078)
- actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator")
- new TestStreamReceiver3(actorSystem, null).start()
- }
-}
-
-
-
diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala
deleted file mode 100644
index 31754870dd..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala
+++ /dev/null
@@ -1,374 +0,0 @@
-package spark.streaming.util
-
-import spark.streaming._
-import spark._
-import spark.storage._
-import spark.util.AkkaUtils
-
-import scala.math._
-import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap}
-
-import java.io._
-import java.nio._
-import java.nio.charset._
-import java.nio.channels._
-import java.util.concurrent.Executors
-
-import akka.actor._
-import akka.actor.Actor
-import akka.dispatch._
-import akka.pattern.ask
-import akka.util.duration._
-
-class TestStreamReceiver4(actorSystem: ActorSystem, blockManager: BlockManager)
-extends Thread with Logging {
-
- class DataHandler(
- inputName: String,
- longIntervalDuration: Time,
- shortIntervalDuration: Time,
- blockManager: BlockManager
- )
- extends Logging {
-
- class Block(val id: String, val shortInterval: Interval, val buffer: ByteBuffer) {
- var pushed = false
- def longInterval = getLongInterval(shortInterval)
- override def toString() = "Block " + id
- }
-
- class Bucket(val longInterval: Interval) {
- val blocks = new ArrayBuffer[Block]()
- var filled = false
- def += (block: Block) = blocks += block
- def empty() = (blocks.size == 0)
- def ready() = (filled && !blocks.exists(! _.pushed))
- def blockIds() = blocks.map(_.id).toArray
- override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]"
- }
-
- initLogging()
-
- val syncOnLastShortInterval = true
-
- val shortIntervalDurationMillis = shortIntervalDuration.milliseconds
- val longIntervalDurationMillis = longIntervalDuration.milliseconds
-
- val buffer = ByteBuffer.allocateDirect(100 * 1024 * 1024)
- var currentShortInterval = Interval.currentInterval(shortIntervalDuration)
-
- val blocksForPushing = new Queue[Block]()
- val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket]
-
- val bufferProcessingThread = new Thread() { override def run() { keepProcessingBuffers() } }
- val blockPushingExecutor = Executors.newFixedThreadPool(5)
-
-
- def start() {
- buffer.clear()
- if (buffer.remaining == 0) {
- throw new Exception("Buffer initialization error")
- }
- bufferProcessingThread.start()
- }
-
- def readDataToBuffer(func: ByteBuffer => Int): Int = {
- buffer.synchronized {
- if (buffer.remaining == 0) {
- logInfo("Received first data for interval " + currentShortInterval)
- }
- func(buffer)
- }
- }
-
- def getLongInterval(shortInterval: Interval): Interval = {
- val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration)
- Interval(intervalBegin, intervalBegin + longIntervalDuration)
- }
-
- def processBuffer() {
-
- def readInt(buffer: ByteBuffer): Int = {
- var offset = 0
- var result = 0
- while (offset < 32) {
- val b = buffer.get()
- result |= ((b & 0x7F) << offset)
- if ((b & 0x80) == 0) {
- return result
- }
- offset += 7
- }
- throw new Exception("Malformed zigzag-encoded integer")
- }
-
- val currentLongInterval = getLongInterval(currentShortInterval)
- val startTime = System.currentTimeMillis
- val newBuffer: ByteBuffer = buffer.synchronized {
- buffer.flip()
- if (buffer.remaining == 0) {
- buffer.clear()
- null
- } else {
- logDebug("Processing interval " + currentShortInterval + " with delay of " + (System.currentTimeMillis - startTime) + " ms")
- val startTime1 = System.currentTimeMillis
- var loop = true
- var count = 0
- while(loop) {
- buffer.mark()
- try {
- val len = readInt(buffer)
- buffer.position(buffer.position + len)
- count += 1
- } catch {
- case e: Exception => {
- buffer.reset()
- loop = false
- }
- }
- }
- val bytesToCopy = buffer.position
- val newBuf = ByteBuffer.allocate(bytesToCopy)
- buffer.position(0)
- newBuf.put(buffer.slice().limit(bytesToCopy).asInstanceOf[ByteBuffer])
- newBuf.flip()
- buffer.position(bytesToCopy)
- buffer.compact()
- newBuf
- }
- }
-
- if (newBuffer != null) {
- val bucket = buckets.getOrElseUpdate(currentLongInterval, new Bucket(currentLongInterval))
- bucket.synchronized {
- val newBlockId = inputName + "-" + currentLongInterval.toFormattedString + "-" + currentShortInterval.toFormattedString
- val newBlock = new Block(newBlockId, currentShortInterval, newBuffer)
- if (syncOnLastShortInterval) {
- bucket += newBlock
- }
- logDebug("Created " + newBlock + " with " + newBuffer.remaining + " bytes, creation delay is " + (System.currentTimeMillis - currentShortInterval.endTime.milliseconds) / 1000.0 + " s" )
- blockPushingExecutor.execute(new Runnable() { def run() { pushAndNotifyBlock(newBlock) } })
- }
- }
-
- val newShortInterval = Interval.currentInterval(shortIntervalDuration)
- val newLongInterval = getLongInterval(newShortInterval)
-
- if (newLongInterval != currentLongInterval) {
- buckets.get(currentLongInterval) match {
- case Some(bucket) => {
- bucket.synchronized {
- bucket.filled = true
- if (bucket.ready) {
- bucket.notifyAll()
- }
- }
- }
- case None =>
- }
- buckets += ((newLongInterval, new Bucket(newLongInterval)))
- }
-
- currentShortInterval = newShortInterval
- }
-
- def pushBlock(block: Block) {
- try{
- if (blockManager != null) {
- val startTime = System.currentTimeMillis
- logInfo(block + " put start delay is " + (startTime - block.shortInterval.endTime.milliseconds) + " ms")
- /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY)*/
- /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_2)*/
- blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY_2)
- /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY)*/
- /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER)*/
- /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER_2)*/
- val finishTime = System.currentTimeMillis
- logInfo(block + " put delay is " + (finishTime - startTime) + " ms")
- } else {
- logWarning(block + " not put as block manager is null")
- }
- } catch {
- case e: Exception => logError("Exception writing " + block + " to blockmanager" , e)
- }
- }
-
- def getBucket(longInterval: Interval): Option[Bucket] = {
- buckets.get(longInterval)
- }
-
- def clearBucket(longInterval: Interval) {
- buckets.remove(longInterval)
- }
-
- def keepProcessingBuffers() {
- logInfo("Thread to process buffers started")
- while(true) {
- processBuffer()
- val currentTimeMillis = System.currentTimeMillis
- val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) *
- shortIntervalDurationMillis - currentTimeMillis + 1
- Thread.sleep(sleepTimeMillis)
- }
- }
-
- def pushAndNotifyBlock(block: Block) {
- pushBlock(block)
- block.pushed = true
- val bucket = if (syncOnLastShortInterval) {
- buckets(block.longInterval)
- } else {
- var longInterval = block.longInterval
- while(!buckets.contains(longInterval)) {
- logWarning("Skipping bucket of " + longInterval + " for " + block)
- longInterval = longInterval.next
- }
- val chosenBucket = buckets(longInterval)
- logDebug("Choosing bucket of " + longInterval + " for " + block)
- chosenBucket += block
- chosenBucket
- }
-
- bucket.synchronized {
- if (bucket.ready) {
- bucket.notifyAll()
- }
- }
-
- }
- }
-
-
- class ReceivingConnectionHandler(host: String, port: Int, dataHandler: DataHandler)
- extends ConnectionHandler(host, port, false) {
-
- override def ready(key: SelectionKey) {
- changeInterest(key, SelectionKey.OP_READ)
- }
-
- override def read(key: SelectionKey) {
- try {
- val channel = key.channel.asInstanceOf[SocketChannel]
- val bytesRead = dataHandler.readDataToBuffer(channel.read)
- if (bytesRead < 0) {
- close(key)
- }
- } catch {
- case e: IOException => {
- logError("Error reading", e)
- close(key)
- }
- }
- }
- }
-
- initLogging()
-
- val masterHost = System.getProperty("spark.master.host", "localhost")
- val masterPort = System.getProperty("spark.master.port", "7078").toInt
-
- val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort)
- val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler")
- val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator")
-
- logInfo("Getting stream details from master " + masterHost + ":" + masterPort)
-
- val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match {
- case Some(details) => details
- case None => throw new Exception("Could not get stream details")
- }
- logInfo("Stream details received: " + streamDetails)
-
- val inputName = streamDetails.name
- val intervalDurationMillis = streamDetails.duration
- val intervalDuration = Milliseconds(intervalDurationMillis)
- val shortIntervalDuration = Milliseconds(System.getProperty("spark.stream.shortinterval", "500").toInt)
-
- val dataHandler = new DataHandler(inputName, intervalDuration, shortIntervalDuration, blockManager)
- val connectionHandler = new ReceivingConnectionHandler("localhost", 9999, dataHandler)
-
- val timeout = 100 millis
-
- // Send a message to an actor and return an option with its reply, or None if this times out
- def askActor[T](actor: ActorRef, message: Any): Option[T] = {
- try {
- val future = actor.ask(message)(timeout)
- return Some(Await.result(future, timeout).asInstanceOf[T])
- } catch {
- case e: Exception =>
- logInfo("Error communicating with " + actor, e)
- return None
- }
- }
-
- override def run() {
- connectionHandler.start()
- dataHandler.start()
-
- var interval = Interval.currentInterval(intervalDuration)
- var dataStarted = false
-
-
- while(true) {
- waitFor(interval.endTime)
- /*logInfo("Woken up at " + System.currentTimeMillis + " for " + interval)*/
- dataHandler.getBucket(interval) match {
- case Some(bucket) => {
- logDebug("Found " + bucket + " for " + interval)
- bucket.synchronized {
- if (!bucket.ready) {
- logDebug("Waiting for " + bucket)
- bucket.wait()
- logDebug("Wait over for " + bucket)
- }
- if (dataStarted || !bucket.empty) {
- logDebug("Notifying " + bucket)
- notifyScheduler(interval, bucket.blockIds)
- dataStarted = true
- }
- bucket.blocks.clear()
- dataHandler.clearBucket(interval)
- }
- }
- case None => {
- logDebug("Found none for " + interval)
- if (dataStarted) {
- logDebug("Notifying none")
- notifyScheduler(interval, Array[String]())
- }
- }
- }
- interval = interval.next
- }
- }
-
- def waitFor(time: Time) {
- val currentTimeMillis = System.currentTimeMillis
- val targetTimeMillis = time.milliseconds
- if (currentTimeMillis < targetTimeMillis) {
- val sleepTime = (targetTimeMillis - currentTimeMillis)
- Thread.sleep(sleepTime + 1)
- }
- }
-
- def notifyScheduler(interval: Interval, blockIds: Array[String]) {
- try {
- sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray)
- val time = interval.endTime
- val delay = (System.currentTimeMillis - time.milliseconds)
- logInfo("Notification delay for " + time + " is " + delay + " ms")
- } catch {
- case e: Exception => logError("Exception notifying scheduler at interval " + interval + ": " + e)
- }
- }
-}
-
-
-object TestStreamReceiver4 {
- def main(args: Array[String]) {
- val details = Array(("Sentences", 2000L))
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078)
- actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator")
- new TestStreamReceiver4(actorSystem, null).start()
- }
-}