aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-28 17:13:15 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-28 17:13:15 -0500
commit642029e7f43322f84abe4f7f36bb0b1b95d8101d (patch)
treecef080193815b279b99a8b35f2401873a3ea3eb1 /core/src/test
parent2573add94cf920a88f74d80d8ea94218d812704d (diff)
downloadspark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.tar.gz
spark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.tar.bz2
spark-642029e7f43322f84abe4f7f36bb0b1b95d8101d.zip
Various fixes to configuration code
- Got rid of global SparkContext.globalConf - Pass SparkConf to serializers and compression codecs - Made SparkConf public instead of private[spark] - Improved API of SparkContext and SparkConf - Switched executor environment vars to be passed through SparkConf - Fixed some places that were still using system properties - Fixed some tests, though others are still failing This still fails several tests in core, repl and streaming, likely due to properties not being set or cleared correctly (some of the tests run fine in isolation).
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala2
5 files changed, 18 insertions, 16 deletions
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index ab81bfbe55..8d7546085f 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.io
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
class CompressionCodecSuite extends FunSuite {
+ val conf = new SparkConf(false)
def testCodec(codec: CompressionCodec) {
// Write 1000 integers to the output stream, compressed.
@@ -43,19 +45,19 @@ class CompressionCodecSuite extends FunSuite {
}
test("default compression codec") {
- val codec = CompressionCodec.createCodec()
+ val codec = CompressionCodec.createCodec(conf)
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}
test("lzf compression codec") {
- val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
+ val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}
test("snappy compression codec") {
- val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
+ val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index 2bb827c022..3711382f2e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -82,7 +82,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
private val conf = new SparkConf
- val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong
+ val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong
test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index c016c51171..33b0148896 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -22,12 +22,14 @@ import scala.collection.mutable
import com.esotericsoftware.kryo.Kryo
import org.scalatest.FunSuite
-import org.apache.spark.SharedSparkContext
+import org.apache.spark.{SparkConf, SharedSparkContext}
import org.apache.spark.serializer.KryoTest._
class KryoSerializerSuite extends FunSuite with SharedSparkContext {
+ val conf = new SparkConf(false)
+
test("basic types") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
@@ -57,7 +59,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("pairs") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
@@ -81,7 +83,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("Scala data structures") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
@@ -104,7 +106,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("ranges") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
@@ -127,7 +129,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
test("custom registrator") {
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 4ef5538951..a0fc3445be 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.{SparkConf, SparkContext}
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
- private val conf = new SparkConf
+ private val conf = new SparkConf(false)
var store: BlockManager = null
var store2: BlockManager = null
var actorSystem: ActorSystem = null
@@ -45,7 +45,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
conf.set("spark.kryoserializer.buffer.mb", "1")
- val serializer = new KryoSerializer
+ val serializer = new KryoSerializer(conf)
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
@@ -167,7 +167,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("master + 2 managers interaction") {
store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf)
- store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000, conf)
+ store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -654,7 +654,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
- store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200, conf)
+ store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf)
// The put should fail since a1 is not serializable.
class UnserializableClass
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index a5facd5bbd..11ebdc352b 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -140,8 +140,6 @@ class SizeEstimatorSuite
test("64-bit arch with no compressed oops") {
val arch = System.setProperty("os.arch", "amd64")
val oops = System.setProperty("spark.test.useCompressedOops", "false")
- SparkContext.globalConf.set("os.arch", "amd64")
- SparkContext.globalConf.set("spark.test.useCompressedOops", "false")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()