From 642029e7f43322f84abe4f7f36bb0b1b95d8101d Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 28 Dec 2013 17:13:15 -0500 Subject: Various fixes to configuration code - Got rid of global SparkContext.globalConf - Pass SparkConf to serializers and compression codecs - Made SparkConf public instead of private[spark] - Improved API of SparkContext and SparkConf - Switched executor environment vars to be passed through SparkConf - Fixed some places that were still using system properties - Fixed some tests, though others are still failing This still fails several tests in core, repl and streaming, likely due to properties not being set or cleared correctly (some of the tests run fine in isolation). --- .../scala/org/apache/spark/io/CompressionCodecSuite.scala | 8 +++++--- .../scheduler/cluster/ClusterTaskSetManagerSuite.scala | 2 +- .../org/apache/spark/serializer/KryoSerializerSuite.scala | 14 ++++++++------ .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 8 ++++---- .../scala/org/apache/spark/util/SizeEstimatorSuite.scala | 2 -- 5 files changed, 18 insertions(+), 16 deletions(-) (limited to 'core/src/test') diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index ab81bfbe55..8d7546085f 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -20,9 +20,11 @@ package org.apache.spark.io import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import org.scalatest.FunSuite +import org.apache.spark.SparkConf class CompressionCodecSuite extends FunSuite { + val conf = new SparkConf(false) def testCodec(codec: CompressionCodec) { // Write 1000 integers to the output stream, compressed. @@ -43,19 +45,19 @@ class CompressionCodecSuite extends FunSuite { } test("default compression codec") { - val codec = CompressionCodec.createCodec() + val codec = CompressionCodec.createCodec(conf) assert(codec.getClass === classOf[LZFCompressionCodec]) testCodec(codec) } test("lzf compression codec") { - val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName) + val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName) assert(codec.getClass === classOf[LZFCompressionCodec]) testCodec(codec) } test("snappy compression codec") { - val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName) + val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName) assert(codec.getClass === classOf[SnappyCompressionCodec]) testCodec(codec) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index 2bb827c022..3711382f2e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -82,7 +82,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL} private val conf = new SparkConf - val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong + val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong test("TaskSet with no preferences") { sc = new SparkContext("local", "test") diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index c016c51171..33b0148896 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -22,12 +22,14 @@ import scala.collection.mutable import com.esotericsoftware.kryo.Kryo import org.scalatest.FunSuite -import org.apache.spark.SharedSparkContext +import org.apache.spark.{SparkConf, SharedSparkContext} import org.apache.spark.serializer.KryoTest._ class KryoSerializerSuite extends FunSuite with SharedSparkContext { + val conf = new SparkConf(false) + test("basic types") { - val ser = (new KryoSerializer).newInstance() + val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) } @@ -57,7 +59,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("pairs") { - val ser = (new KryoSerializer).newInstance() + val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) } @@ -81,7 +83,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("Scala data structures") { - val ser = (new KryoSerializer).newInstance() + val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) } @@ -104,7 +106,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("ranges") { - val ser = (new KryoSerializer).newInstance() + val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) // Check that very long ranges don't get written one element at a time @@ -127,7 +129,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { test("custom registrator") { System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName) - val ser = (new KryoSerializer).newInstance() + val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 4ef5538951..a0fc3445be 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.{SparkConf, SparkContext} class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { - private val conf = new SparkConf + private val conf = new SparkConf(false) var store: BlockManager = null var store2: BlockManager = null var actorSystem: ActorSystem = null @@ -45,7 +45,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test conf.set("spark.kryoserializer.buffer.mb", "1") - val serializer = new KryoSerializer + val serializer = new KryoSerializer(conf) // Implicitly convert strings to BlockIds for test clarity. implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) @@ -167,7 +167,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("master + 2 managers interaction") { store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf) - store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000, conf) + store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf) val peers = master.getPeers(store.blockManagerId, 1) assert(peers.size === 1, "master did not return the other manager as a peer") @@ -654,7 +654,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("block store put failure") { // Use Java serializer so we can create an unserializable error. - store = new BlockManager("", actorSystem, master, new JavaSerializer, 1200, conf) + store = new BlockManager("", actorSystem, master, new JavaSerializer(conf), 1200, conf) // The put should fail since a1 is not serializable. class UnserializableClass diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala index a5facd5bbd..11ebdc352b 100644 --- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala @@ -140,8 +140,6 @@ class SizeEstimatorSuite test("64-bit arch with no compressed oops") { val arch = System.setProperty("os.arch", "amd64") val oops = System.setProperty("spark.test.useCompressedOops", "false") - SparkContext.globalConf.set("os.arch", "amd64") - SparkContext.globalConf.set("spark.test.useCompressedOops", "false") val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() -- cgit v1.2.3