diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-24 16:49:40 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-24 16:49:40 -0700 |
commit | ed71df46cddc9a4f1363b937c10bfa2a928e564c (patch) | |
tree | 49ed03b146571ea64e8e71df47968892e26f4dde /streaming | |
parent | 1ef6ea25135fd33a7913944628b67f24c87db1f5 (diff) | |
download | spark-ed71df46cddc9a4f1363b937c10bfa2a928e564c.tar.gz spark-ed71df46cddc9a4f1363b937c10bfa2a928e564c.tar.bz2 spark-ed71df46cddc9a4f1363b937c10bfa2a928e564c.zip |
Minor fixes.
Diffstat (limited to 'streaming')
4 files changed, 27 insertions, 27 deletions
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 230d806a89..9bf9251519 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -12,7 +12,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { SparkEnv.set(ssc.env) try { val timeTaken = job.run() - println("Total delay: %.5f s for job %s (execution: %.5f s)".format( + logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format( (System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0)) } catch { case e: Exception => diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala index 6b41e4d2c8..5669d7fedf 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala @@ -74,7 +74,8 @@ abstract class NetworkReceiver[T: ClassManifest](streamId: Int) extends Serializ onStart() } catch { case ie: InterruptedException => - logWarning("Receiving thread interrupted") + logInfo("Receiving thread interrupted") + //println("Receiving thread interrupted") case e: Exception => stopOnError(e) } diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala index 8ff7865ca4..b566200273 100644 --- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala @@ -3,11 +3,12 @@ package spark.streaming import spark.streaming.util.{RecurringTimer, SystemClock} import spark.storage.StorageLevel -import java.io.{EOFException, DataInputStream, BufferedInputStream, InputStream} +import java.io._ import java.net.Socket import java.util.concurrent.ArrayBlockingQueue import scala.collection.mutable.ArrayBuffer +import scala.Serializable class SocketInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, @@ -127,8 +128,7 @@ object SocketReceiver { * to '\n' delimited strings and returns an iterator to access the strings. */ def bytesToLines(inputStream: InputStream): Iterator[String] = { - val bufferedInputStream = new BufferedInputStream(inputStream) - val dataInputStream = new DataInputStream(bufferedInputStream) + val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) val iterator = new Iterator[String] { var gotNext = false @@ -138,35 +138,32 @@ object SocketReceiver { private def getNext() { try { nextValue = dataInputStream.readLine() - if (nextValue != null) { - println("[" + nextValue + "]") - } else { - gotNext = false - } - } catch { - case eof: EOFException => + if (nextValue == null) { finished = true + } } gotNext = true } override def hasNext: Boolean = { - if (!gotNext) { - getNext() - } - if (finished) { - dataInputStream.close() + if (!finished) { + if (!gotNext) { + getNext() + if (finished) { + dataInputStream.close() + } + } } !finished } override def next(): String = { - if (!gotNext) { - getNext() - } if (finished) { throw new NoSuchElementException("End of stream") } + if (!gotNext) { + getNext() + } gotNext = false nextValue } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index dd872059ea..a3f213ebd0 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -6,6 +6,7 @@ import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock import spark.storage.StorageLevel +import spark.Logging class InputStreamsSuite extends TestSuiteBase { @@ -39,9 +40,10 @@ class InputStreamsSuite extends TestSuiteBase { Thread.sleep(5000) val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - - ssc.stop() + logInfo("Stopping server") server.stop() + logInfo("Stopping context") + ssc.stop() assert(outputBuffer.size === expectedOutput.size) for (i <- 0 until outputBuffer.size) { @@ -52,7 +54,7 @@ class InputStreamsSuite extends TestSuiteBase { } -class TestServer(port: Int) { +class TestServer(port: Int) extends Logging { val queue = new ArrayBlockingQueue[String](100) @@ -62,9 +64,9 @@ class TestServer(port: Int) { override def run() { try { while(true) { - println("Accepting connections on port " + port) + logInfo("Accepting connections on port " + port) val clientSocket = serverSocket.accept() - println("New connection") + logInfo("New connection") try { clientSocket.setTcpNoDelay(true) val outputStream = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream)) @@ -74,13 +76,13 @@ class TestServer(port: Int) { if (msg != null) { outputStream.write(msg) outputStream.flush() - println("Message '" + msg + "' sent") + logInfo("Message '" + msg + "' sent") } } } catch { case e: SocketException => println(e) } finally { - println("Connection closed") + logInfo("Connection closed") if (!clientSocket.isClosed) clientSocket.close() } } |