aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-12 01:12:08 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-12 01:12:08 -0800
commitc5921e5c6184ddc99c12c0b1f2646b6bd74a9e98 (patch)
treedd9b353cd99973534f0a09521ecd5fb205690f80
parent18f4889d96b61b59569ec05f64900da1477404d0 (diff)
downloadspark-c5921e5c6184ddc99c12c0b1f2646b6bd74a9e98.tar.gz
spark-c5921e5c6184ddc99c12c0b1f2646b6bd74a9e98.tar.bz2
spark-c5921e5c6184ddc99c12c0b1f2646b6bd74a9e98.zip
Fixed bugs.
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala2
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala2
3 files changed, 3 insertions, 3 deletions
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index fcc159e85a..73e7ce6e96 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.storage.StorageLevel
class MQTTStreamSuite extends TestSuiteBase {
- test("MQTT input stream") {
+ test("mqtt input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val brokerUrl = "abc"
val topic = "def"
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index a0a8fe617b..ccc38784ef 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -23,7 +23,7 @@ import twitter4j.auth.{NullAuthorization, Authorization}
class TwitterStreamSuite extends TestSuiteBase {
- test("kafka input stream") {
+ test("twitter input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val filters = Seq("filter1", "filter2")
val authorization: Authorization = NullAuthorization.getInstance()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index b28ff5d9d8..24d57548c3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -63,7 +63,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
def stop() = synchronized {
- if (eventActor != null) {
+ if (networkInputTracker != null) {
jobGenerator.stop()
networkInputTracker.stop()
executor.shutdown()