aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-28 23:17:58 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-28 23:17:58 -0500
commit20631348d198ba52059f278c1b415c3a80a95b81 (patch)
tree02212ec0ad07a97b19595155129cb52ae610d582 /streaming
parent0900d5c72aaf6670bb9fcce8e0c0cfa976adcdf7 (diff)
downloadspark-20631348d198ba52059f278c1b415c3a80a95b81.tar.gz
spark-20631348d198ba52059f278c1b415c3a80a95b81.tar.bz2
spark-20631348d198ba52059f278c1b415c3a80a95b81.zip
Fix other failing tests
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala9
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala8
4 files changed, 22 insertions, 18 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 9d2033fd11..286ec285a9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -63,8 +63,8 @@ class StreamingContext private (
/**
* Create a StreamingContext using an existing SparkContext.
- * @param sparkContext Existing SparkContext
- * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkContext existing SparkContext
+ * @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
@@ -72,8 +72,8 @@ class StreamingContext private (
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
- * @param conf A standard Spark application configuration
- * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
+ * @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
@@ -81,9 +81,9 @@ class StreamingContext private (
/**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your job, to display on the cluster web UI
- * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param master cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName a name for your job, to display on the cluster web UI
+ * @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(
master: String,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 80dcf87491..5842a7cd68 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
+import org.apache.spark.SparkConf
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -123,6 +124,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(sparkContext.sc, batchDuration))
/**
+ * Creates a StreamingContext using an existing SparkContext.
+ * @param conf A Spark application configuration
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(conf: SparkConf, batchDuration: Duration) =
+ this(new StreamingContext(conf, batchDuration))
+
+ /**
* Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index a1db0995e3..d53d433693 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -25,6 +25,7 @@ import com.google.common.io.Files;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.junit.After;
import org.junit.Assert;
@@ -62,14 +63,16 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
- System.clearProperty("spark.streaming.clock");
ssc.stop();
ssc = null;
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 2a41ec0035..ca230fd056 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -201,10 +201,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
- // Disable manual clock as FileInputDStream does not work with manual clock
- val clockProperty = System.getProperty("spark.streaming.clock")
- System.clearProperty("spark.streaming.clock")
-
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
var ssc = new StreamingContext(master, framework, Seconds(1))
@@ -301,10 +297,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
)
// To ensure that all the inputs were received correctly
assert(expectedOutput.last === output.last)
-
- // Enable manual clock back again for other tests
- if (clockProperty != null)
- System.setProperty("spark.streaming.clock", clockProperty)
}