aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-11-21 12:34:46 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-11-21 12:34:46 +0530
commit95d8dbce91f49467050250d5cf3671aaaa648d76 (patch)
tree06e2583c63cdf39d6d15d36a3189c2e6db0148ba /streaming
parent199e9cf02dfaa372c1f067bca54556e1f6ce787d (diff)
parent2fead510f74b962b293de4d724136c24a9825271 (diff)
downloadspark-95d8dbce91f49467050250d5cf3671aaaa648d76.tar.gz
spark-95d8dbce91f49467050250d5cf3671aaaa648d76.tar.bz2
spark-95d8dbce91f49467050250d5cf3671aaaa648d76.zip
Merge branch 'master' of github.com:apache/incubator-spark into scala-2.10-temp
Conflicts: core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala2
5 files changed, 12 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index bb9febad38..9271914eb5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -94,7 +94,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
fs.delete(file, false)
fs.rename(writeFile, file)
- val finishTime = System.currentTimeMillis();
+ val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
return
@@ -124,7 +124,9 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
def stop() {
synchronized {
- if (stopped) return ;
+ if (stopped) {
+ return
+ }
stopped = true
}
executor.shutdown()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index ca0c905932..80dcf87491 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Long => JLong, Integer => JInt}
+import java.lang.{Integer => JInt}
import java.io.InputStream
import java.util.{Map => JMap, List => JList}
@@ -36,10 +36,9 @@ import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD}
+import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy}
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -315,7 +314,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val cmf: ClassTag[F] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
- ssc.fileStream[K, V, F](directory);
+ ssc.fileStream[K, V, F](directory)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
index b72ab90e60..60d79175f1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
@@ -138,8 +138,8 @@ class FlumeReceiver(
protected override def onStart() {
val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this));
- val server = new NettyServer(responder, new InetSocketAddress(host, port));
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ val server = new NettyServer(responder, new InetSocketAddress(host, port))
blockGenerator.start()
server.start()
logInfo("Flume receiver started")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index a559db468a..7dc82decef 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -124,9 +124,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
- val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort));
+ val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol], transceiver);
+ classOf[AvroSourceProtocol], transceiver)
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 126915abc9..2f34e812a1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -252,7 +252,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
Thread.sleep(500) // Give some time for the forgetting old RDDs to complete
} catch {
- case e: Exception => e.printStackTrace(); throw e;
+ case e: Exception => {e.printStackTrace(); throw e}
} finally {
ssc.stop()
}