diff options
8 files changed, 30 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 20402686a8..075a18b068 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -46,7 +46,7 @@ private[spark] object CompressionCodec { def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { val ctor = Class.forName(codecName, true, Thread.currentThread.getContextClassLoader) .getConstructor(classOf[SparkConf]) - ctor.newInstance(conf).asInstanceOf[CompressionCodec] + ctor.newInstance(conf).asInstanceOf[CompressionCodec] } } diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala index 288aa14eeb..c650ef4ed5 100644 --- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala @@ -27,8 +27,10 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite => def sc: SparkContext = _sc + var conf = new SparkConf(false) + override def beforeAll() { - _sc = new SparkContext("local", "test") + _sc = new SparkContext("local", "test", conf) super.beforeAll() } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 4ecdde0001..71a2c6c498 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -24,10 +24,10 @@ import org.apache.spark.SparkConf class MetricsSystemSuite extends FunSuite with BeforeAndAfter { var filePath: String = _ var conf: SparkConf = null + before { filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile() - System.setProperty("spark.metrics.conf", filePath) - conf = new SparkConf + conf = new SparkConf(false).set("spark.metrics.conf", filePath) } test("MetricsSystem with default config") { diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 33b0148896..d23e01418b 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -26,7 +26,8 @@ import org.apache.spark.{SparkConf, SharedSparkContext} import org.apache.spark.serializer.KryoTest._ class KryoSerializerSuite extends FunSuite with SharedSparkContext { - val conf = new SparkConf(false) + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) test("basic types") { val ser = new KryoSerializer(conf).newInstance() @@ -127,8 +128,6 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("custom registrator") { - System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName) - val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) @@ -188,18 +187,6 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { .fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x assert(10 + control.sum === result) } - - override def beforeAll() { - System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName) - super.beforeAll() - } - - override def afterAll() { - super.afterAll() - System.clearProperty("spark.kryo.registrator") - System.clearProperty("spark.serializer") - } } object KryoTest { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 9d2033fd11..286ec285a9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -63,8 +63,8 @@ class StreamingContext private ( /** * Create a StreamingContext using an existing SparkContext. - * @param sparkContext Existing SparkContext - * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkContext existing SparkContext + * @param batchDuration the time interval at which streaming data will be divided into batches */ def this(sparkContext: SparkContext, batchDuration: Duration) = { this(sparkContext, null, batchDuration) @@ -72,8 +72,8 @@ class StreamingContext private ( /** * Create a StreamingContext by providing the configuration necessary for a new SparkContext. - * @param conf A standard Spark application configuration - * @param batchDuration The time interval at which streaming data will be divided into batches + * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters + * @param batchDuration the time interval at which streaming data will be divided into batches */ def this(conf: SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf), null, batchDuration) @@ -81,9 +81,9 @@ class StreamingContext private ( /** * Create a StreamingContext by providing the details necessary for creating a new SparkContext. - * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param appName A name for your job, to display on the cluster web UI - * @param batchDuration The time interval at which streaming data will be divided into batches + * @param master cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param appName a name for your job, to display on the cluster web UI + * @param batchDuration the time interval at which streaming data will be divided into batches */ def this( master: String, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 80dcf87491..5842a7cd68 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ +import org.apache.spark.SparkConf /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -123,6 +124,14 @@ class JavaStreamingContext(val ssc: StreamingContext) { this(new StreamingContext(sparkContext.sc, batchDuration)) /** + * Creates a StreamingContext using an existing SparkContext. + * @param conf A Spark application configuration + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(conf: SparkConf, batchDuration: Duration) = + this(new StreamingContext(conf, batchDuration)) + + /** * Re-creates a StreamingContext from a checkpoint file. * @param path Path either to the directory that was specified as the checkpoint directory, or * to the checkpoint file 'graph' or 'graph.bk'. diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index a1db0995e3..d53d433693 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -25,6 +25,7 @@ import com.google.common.io.Files; import kafka.serializer.StringDecoder; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.junit.After; import org.junit.Assert; @@ -62,14 +63,16 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext(conf, new Duration(1000)); ssc.checkpoint("checkpoint"); } @After public void tearDown() { - System.clearProperty("spark.streaming.clock"); ssc.stop(); ssc = null; diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 2a41ec0035..ca230fd056 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -201,10 +201,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // It also tests whether batches, whose processing was incomplete due to the // failure, are re-processed or not. test("recovery with file input stream") { - // Disable manual clock as FileInputDStream does not work with manual clock - val clockProperty = System.getProperty("spark.streaming.clock") - System.clearProperty("spark.streaming.clock") - // Set up the streaming context and input streams val testDir = Files.createTempDir() var ssc = new StreamingContext(master, framework, Seconds(1)) @@ -301,10 +297,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) // To ensure that all the inputs were received correctly assert(expectedOutput.last === output.last) - - // Enable manual clock back again for other tests - if (clockProperty != null) - System.setProperty("spark.streaming.clock", clockProperty) } |