aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-10-24 16:49:40 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-10-24 16:49:40 -0700
commited71df46cddc9a4f1363b937c10bfa2a928e564c (patch)
tree49ed03b146571ea64e8e71df47968892e26f4dde
parent1ef6ea25135fd33a7913944628b67f24c87db1f5 (diff)
downloadspark-ed71df46cddc9a4f1363b937c10bfa2a928e564c.tar.gz
spark-ed71df46cddc9a4f1363b937c10bfa2a928e564c.tar.bz2
spark-ed71df46cddc9a4f1363b937c10bfa2a928e564c.zip
Minor fixes.
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/SocketInputDStream.scala33
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala16
4 files changed, 27 insertions, 27 deletions
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 230d806a89..9bf9251519 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -12,7 +12,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
SparkEnv.set(ssc.env)
try {
val timeTaken = job.run()
- println("Total delay: %.5f s for job %s (execution: %.5f s)".format(
+ logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
(System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
index 6b41e4d2c8..5669d7fedf 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
@@ -74,7 +74,8 @@ abstract class NetworkReceiver[T: ClassManifest](streamId: Int) extends Serializ
onStart()
} catch {
case ie: InterruptedException =>
- logWarning("Receiving thread interrupted")
+ logInfo("Receiving thread interrupted")
+ //println("Receiving thread interrupted")
case e: Exception =>
stopOnError(e)
}
diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
index 8ff7865ca4..b566200273 100644
--- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
@@ -3,11 +3,12 @@ package spark.streaming
import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.storage.StorageLevel
-import java.io.{EOFException, DataInputStream, BufferedInputStream, InputStream}
+import java.io._
import java.net.Socket
import java.util.concurrent.ArrayBlockingQueue
import scala.collection.mutable.ArrayBuffer
+import scala.Serializable
class SocketInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
@@ -127,8 +128,7 @@ object SocketReceiver {
* to '\n' delimited strings and returns an iterator to access the strings.
*/
def bytesToLines(inputStream: InputStream): Iterator[String] = {
- val bufferedInputStream = new BufferedInputStream(inputStream)
- val dataInputStream = new DataInputStream(bufferedInputStream)
+ val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))
val iterator = new Iterator[String] {
var gotNext = false
@@ -138,35 +138,32 @@ object SocketReceiver {
private def getNext() {
try {
nextValue = dataInputStream.readLine()
- if (nextValue != null) {
- println("[" + nextValue + "]")
- } else {
- gotNext = false
- }
- } catch {
- case eof: EOFException =>
+ if (nextValue == null) {
finished = true
+ }
}
gotNext = true
}
override def hasNext: Boolean = {
- if (!gotNext) {
- getNext()
- }
- if (finished) {
- dataInputStream.close()
+ if (!finished) {
+ if (!gotNext) {
+ getNext()
+ if (finished) {
+ dataInputStream.close()
+ }
+ }
}
!finished
}
override def next(): String = {
- if (!gotNext) {
- getNext()
- }
if (finished) {
throw new NoSuchElementException("End of stream")
}
+ if (!gotNext) {
+ getNext()
+ }
gotNext = false
nextValue
}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index dd872059ea..a3f213ebd0 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -6,6 +6,7 @@ import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.ManualClock
import spark.storage.StorageLevel
+import spark.Logging
class InputStreamsSuite extends TestSuiteBase {
@@ -39,9 +40,10 @@ class InputStreamsSuite extends TestSuiteBase {
Thread.sleep(5000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
-
- ssc.stop()
+ logInfo("Stopping server")
server.stop()
+ logInfo("Stopping context")
+ ssc.stop()
assert(outputBuffer.size === expectedOutput.size)
for (i <- 0 until outputBuffer.size) {
@@ -52,7 +54,7 @@ class InputStreamsSuite extends TestSuiteBase {
}
-class TestServer(port: Int) {
+class TestServer(port: Int) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
@@ -62,9 +64,9 @@ class TestServer(port: Int) {
override def run() {
try {
while(true) {
- println("Accepting connections on port " + port)
+ logInfo("Accepting connections on port " + port)
val clientSocket = serverSocket.accept()
- println("New connection")
+ logInfo("New connection")
try {
clientSocket.setTcpNoDelay(true)
val outputStream = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream))
@@ -74,13 +76,13 @@ class TestServer(port: Int) {
if (msg != null) {
outputStream.write(msg)
outputStream.flush()
- println("Message '" + msg + "' sent")
+ logInfo("Message '" + msg + "' sent")
}
}
} catch {
case e: SocketException => println(e)
} finally {
- println("Connection closed")
+ logInfo("Connection closed")
if (!clientSocket.isClosed) clientSocket.close()
}
}