diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-23 20:24:38 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-23 20:31:49 -0700 |
commit | deedb9e7b722d9fb37c89d3ef82b6bb2d470dcbe (patch) | |
tree | f09b629c4f5525ba655d1734d1b19560e0a2128b | |
parent | 59b831b9d1161a1c2d4312bb6711d8694983b5d6 (diff) | |
download | spark-deedb9e7b722d9fb37c89d3ef82b6bb2d470dcbe.tar.gz spark-deedb9e7b722d9fb37c89d3ef82b6bb2d470dcbe.tar.bz2 spark-deedb9e7b722d9fb37c89d3ef82b6bb2d470dcbe.zip |
Fix further issues with tests and broadcast.
The broadcast fix is to store values as MEMORY_ONLY_DESER instead of
MEMORY_ONLY, which will save substantial time on serialization.
-rw-r--r-- | bagel/src/test/scala/bagel/BagelSuite.scala | 5 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/broadcast/HttpBroadcast.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/broadcast/TreeBroadcast.scala | 6 | ||||
-rw-r--r-- | core/src/test/scala/spark/BroadcastSuite.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/spark/FailureSuite.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/spark/FileSuite.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/spark/KryoSerializerSuite.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/spark/PartitioningSuite.scala | 1 | ||||
-rw-r--r-- | core/src/test/scala/spark/PipedRDDSuite.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/spark/ThreadingSuite.scala | 1 |
14 files changed, 32 insertions, 16 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index d2189169d2..3da7152a09 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -18,7 +18,10 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { var sc: SparkContext = _ after { - sc.stop() + if (sc != null) { + sc.stop() + sc = null + } } test("halting by voting") { diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 43414d2e41..538e057926 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -65,7 +65,7 @@ class SparkContext( System.setProperty("spark.master.port", "0") } - private val isLocal = (master == "local" || master.startsWith("local[")) && !master.startsWith("localhost") + private val isLocal = (master == "local" || master.startsWith("local[")) // Create the Spark execution environment (cache, map output tracker, etc) val env = SparkEnv.createFromSystemProperties( diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 473d080044..016dc00fb0 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -17,7 +17,8 @@ extends Broadcast[T] with Logging with Serializable { def value = value_ MultiTracker.synchronized { - SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + SparkEnv.get.blockManager.putSingle( + uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) } @transient var arrayOfBlocks: Array[BroadcastBlock] = null @@ -133,7 +134,8 @@ extends Broadcast[T] with Logging with Serializable { val receptionSucceeded = receiveBroadcast(uuid) if (receptionSucceeded) { value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) - SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + SparkEnv.get.blockManager.putSingle( + uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) } else { logError("Reading Broadcasted variable " + uuid + " failed") } diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index aa8bb77f41..03986ea756 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -18,7 +18,8 @@ extends Broadcast[T] with Logging with Serializable { def value = value_ HttpBroadcast.synchronized { - SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + SparkEnv.get.blockManager.putSingle( + uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) } if (!isLocal) { @@ -35,7 +36,8 @@ extends Broadcast[T] with Logging with Serializable { logInfo("Started reading broadcast variable " + uuid) val start = System.nanoTime value_ = HttpBroadcast.read[T](uuid) - SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + SparkEnv.get.blockManager.putSingle( + uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) val time = (System.nanoTime - start) / 1e9 logInfo("Reading broadcast variable " + uuid + " took " + time + " s") } diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index 6928253537..c9e1e67d87 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -16,7 +16,8 @@ extends Broadcast[T] with Logging with Serializable { def value = value_ MultiTracker.synchronized { - SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + SparkEnv.get.blockManager.putSingle( + uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) } @transient var arrayOfBlocks: Array[BroadcastBlock] = null @@ -110,7 +111,8 @@ extends Broadcast[T] with Logging with Serializable { val receptionSucceeded = receiveBroadcast(uuid) if (receptionSucceeded) { value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) - SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + SparkEnv.get.blockManager.putSingle( + uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) } else { logError("Reading Broadcasted variable " + uuid + " failed") } diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala index 1e0b587421..0738a2725b 100644 --- a/core/src/test/scala/spark/BroadcastSuite.scala +++ b/core/src/test/scala/spark/BroadcastSuite.scala @@ -8,8 +8,9 @@ class BroadcastSuite extends FunSuite with BeforeAndAfter { var sc: SparkContext = _ after { - if(sc != null) { + if (sc != null) { sc.stop() + sc = null } } diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala index 0aaa16dca4..4405829161 100644 --- a/core/src/test/scala/spark/FailureSuite.scala +++ b/core/src/test/scala/spark/FailureSuite.scala @@ -28,8 +28,9 @@ class FailureSuite extends FunSuite with BeforeAndAfter { var sc: SparkContext = _ after { - if(sc != null) { + if (sc != null) { sc.stop() + sc = null } } diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala index 4cb9c7802f..17c7a8de43 100644 --- a/core/src/test/scala/spark/FileSuite.scala +++ b/core/src/test/scala/spark/FileSuite.scala @@ -16,8 +16,9 @@ class FileSuite extends FunSuite with BeforeAndAfter { var sc: SparkContext = _ after { - if(sc != null) { + if (sc != null) { sc.stop() + sc = null } } diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala index e889769b9a..06d446ea24 100644 --- a/core/src/test/scala/spark/KryoSerializerSuite.scala +++ b/core/src/test/scala/spark/KryoSerializerSuite.scala @@ -8,8 +8,7 @@ import com.esotericsoftware.kryo._ import SparkContext._ -class KryoSerializerSuite extends FunSuite{ - +class KryoSerializerSuite extends FunSuite { test("basic types") { val ser = (new KryoSerializer).newInstance() def check[T](t: T) { diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index cf2ffeb9b1..5000fa7307 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -14,6 +14,7 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter { after { if(sc != null) { sc.stop() + sc = null } } diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index d010a9be7a..426652dc15 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -9,8 +9,9 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter { var sc: SparkContext = _ after { - if(sc != null) { + if (sc != null) { sc.stop() + sc = null } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index c467a7f916..ba9b36adb7 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -10,8 +10,9 @@ class RDDSuite extends FunSuite with BeforeAndAfter { var sc: SparkContext = _ after { - if(sc != null) { + if (sc != null) { sc.stop() + sc = null } } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 5fa494160f..99d13b31ef 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -18,8 +18,9 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter { var sc: SparkContext = _ after { - if(sc != null) { + if (sc != null) { sc.stop() + sc = null } } diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala index 90409a54ec..302f731187 100644 --- a/core/src/test/scala/spark/ThreadingSuite.scala +++ b/core/src/test/scala/spark/ThreadingSuite.scala @@ -29,6 +29,7 @@ class ThreadingSuite extends FunSuite with BeforeAndAfter { after { if(sc != null) { sc.stop() + sc = null } } |