aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/SharedSparkContext.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala9
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala8
8 files changed, 30 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 20402686a8..075a18b068 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -46,7 +46,7 @@ private[spark] object CompressionCodec {
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val ctor = Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
.getConstructor(classOf[SparkConf])
- ctor.newInstance(conf).asInstanceOf[CompressionCodec]
+ ctor.newInstance(conf).asInstanceOf[CompressionCodec]
}
}
diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
index 288aa14eeb..c650ef4ed5 100644
--- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
@@ -27,8 +27,10 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
def sc: SparkContext = _sc
+ var conf = new SparkConf(false)
+
override def beforeAll() {
- _sc = new SparkContext("local", "test")
+ _sc = new SparkContext("local", "test", conf)
super.beforeAll()
}
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 4ecdde0001..71a2c6c498 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -24,10 +24,10 @@ import org.apache.spark.SparkConf
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
var conf: SparkConf = null
+
before {
filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
- System.setProperty("spark.metrics.conf", filePath)
- conf = new SparkConf
+ conf = new SparkConf(false).set("spark.metrics.conf", filePath)
}
test("MetricsSystem with default config") {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 33b0148896..d23e01418b 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -26,7 +26,8 @@ import org.apache.spark.{SparkConf, SharedSparkContext}
import org.apache.spark.serializer.KryoTest._
class KryoSerializerSuite extends FunSuite with SharedSparkContext {
- val conf = new SparkConf(false)
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
test("basic types") {
val ser = new KryoSerializer(conf).newInstance()
@@ -127,8 +128,6 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("custom registrator") {
- System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
-
val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
@@ -188,18 +187,6 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
.fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
assert(10 + control.sum === result)
}
-
- override def beforeAll() {
- System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
- super.beforeAll()
- }
-
- override def afterAll() {
- super.afterAll()
- System.clearProperty("spark.kryo.registrator")
- System.clearProperty("spark.serializer")
- }
}
object KryoTest {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 9d2033fd11..286ec285a9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -63,8 +63,8 @@ class StreamingContext private (
/**
* Create a StreamingContext using an existing SparkContext.
- * @param sparkContext Existing SparkContext
- * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkContext existing SparkContext
+ * @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
@@ -72,8 +72,8 @@ class StreamingContext private (
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
- * @param conf A standard Spark application configuration
- * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
+ * @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
@@ -81,9 +81,9 @@ class StreamingContext private (
/**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your job, to display on the cluster web UI
- * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param master cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName a name for your job, to display on the cluster web UI
+ * @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(
master: String,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 80dcf87491..5842a7cd68 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
+import org.apache.spark.SparkConf
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -123,6 +124,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(sparkContext.sc, batchDuration))
/**
+ * Creates a StreamingContext using an existing SparkContext.
+ * @param conf A Spark application configuration
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(conf: SparkConf, batchDuration: Duration) =
+ this(new StreamingContext(conf, batchDuration))
+
+ /**
* Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index a1db0995e3..d53d433693 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -25,6 +25,7 @@ import com.google.common.io.Files;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.junit.After;
import org.junit.Assert;
@@ -62,14 +63,16 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
- System.clearProperty("spark.streaming.clock");
ssc.stop();
ssc = null;
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 2a41ec0035..ca230fd056 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -201,10 +201,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
- // Disable manual clock as FileInputDStream does not work with manual clock
- val clockProperty = System.getProperty("spark.streaming.clock")
- System.clearProperty("spark.streaming.clock")
-
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
var ssc = new StreamingContext(master, framework, Seconds(1))
@@ -301,10 +297,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
)
// To ensure that all the inputs were received correctly
assert(expectedOutput.last === output.last)
-
- // Enable manual clock back again for other tests
- if (clockProperty != null)
- System.setProperty("spark.streaming.clock", clockProperty)
}