aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-08-23 20:24:38 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-08-23 20:31:49 -0700
commitdeedb9e7b722d9fb37c89d3ef82b6bb2d470dcbe (patch)
treef09b629c4f5525ba655d1734d1b19560e0a2128b
parent59b831b9d1161a1c2d4312bb6711d8694983b5d6 (diff)
downloadspark-deedb9e7b722d9fb37c89d3ef82b6bb2d470dcbe.tar.gz
spark-deedb9e7b722d9fb37c89d3ef82b6bb2d470dcbe.tar.bz2
spark-deedb9e7b722d9fb37c89d3ef82b6bb2d470dcbe.zip
Fix further issues with tests and broadcast.
The broadcast fix is to store values as MEMORY_ONLY_DESER instead of MEMORY_ONLY, which will save substantial time on serialization.
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala5
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala6
-rw-r--r--core/src/test/scala/spark/BroadcastSuite.scala3
-rw-r--r--core/src/test/scala/spark/FailureSuite.scala3
-rw-r--r--core/src/test/scala/spark/FileSuite.scala3
-rw-r--r--core/src/test/scala/spark/KryoSerializerSuite.scala3
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala1
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala3
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala3
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala3
-rw-r--r--core/src/test/scala/spark/ThreadingSuite.scala1
14 files changed, 32 insertions, 16 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index d2189169d2..3da7152a09 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -18,7 +18,10 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
var sc: SparkContext = _
after {
- sc.stop()
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
}
test("halting by voting") {
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 43414d2e41..538e057926 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -65,7 +65,7 @@ class SparkContext(
System.setProperty("spark.master.port", "0")
}
- private val isLocal = (master == "local" || master.startsWith("local[")) && !master.startsWith("localhost")
+ private val isLocal = (master == "local" || master.startsWith("local["))
// Create the Spark execution environment (cache, map output tracker, etc)
val env = SparkEnv.createFromSystemProperties(
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index 473d080044..016dc00fb0 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -17,7 +17,8 @@ extends Broadcast[T] with Logging with Serializable {
def value = value_
MultiTracker.synchronized {
- SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ SparkEnv.get.blockManager.putSingle(
+ uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false)
}
@transient var arrayOfBlocks: Array[BroadcastBlock] = null
@@ -133,7 +134,8 @@ extends Broadcast[T] with Logging with Serializable {
val receptionSucceeded = receiveBroadcast(uuid)
if (receptionSucceeded) {
value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
- SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ SparkEnv.get.blockManager.putSingle(
+ uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false)
} else {
logError("Reading Broadcasted variable " + uuid + " failed")
}
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index aa8bb77f41..03986ea756 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -18,7 +18,8 @@ extends Broadcast[T] with Logging with Serializable {
def value = value_
HttpBroadcast.synchronized {
- SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ SparkEnv.get.blockManager.putSingle(
+ uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false)
}
if (!isLocal) {
@@ -35,7 +36,8 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("Started reading broadcast variable " + uuid)
val start = System.nanoTime
value_ = HttpBroadcast.read[T](uuid)
- SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ SparkEnv.get.blockManager.putSingle(
+ uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false)
val time = (System.nanoTime - start) / 1e9
logInfo("Reading broadcast variable " + uuid + " took " + time + " s")
}
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index 6928253537..c9e1e67d87 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -16,7 +16,8 @@ extends Broadcast[T] with Logging with Serializable {
def value = value_
MultiTracker.synchronized {
- SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ SparkEnv.get.blockManager.putSingle(
+ uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false)
}
@transient var arrayOfBlocks: Array[BroadcastBlock] = null
@@ -110,7 +111,8 @@ extends Broadcast[T] with Logging with Serializable {
val receptionSucceeded = receiveBroadcast(uuid)
if (receptionSucceeded) {
value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
- SparkEnv.get.blockManager.putSingle(uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ SparkEnv.get.blockManager.putSingle(
+ uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false)
} else {
logError("Reading Broadcasted variable " + uuid + " failed")
}
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala
index 1e0b587421..0738a2725b 100644
--- a/core/src/test/scala/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/spark/BroadcastSuite.scala
@@ -8,8 +8,9 @@ class BroadcastSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
after {
- if(sc != null) {
+ if (sc != null) {
sc.stop()
+ sc = null
}
}
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index 0aaa16dca4..4405829161 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -28,8 +28,9 @@ class FailureSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
after {
- if(sc != null) {
+ if (sc != null) {
sc.stop()
+ sc = null
}
}
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index 4cb9c7802f..17c7a8de43 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -16,8 +16,9 @@ class FileSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
after {
- if(sc != null) {
+ if (sc != null) {
sc.stop()
+ sc = null
}
}
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index e889769b9a..06d446ea24 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -8,8 +8,7 @@ import com.esotericsoftware.kryo._
import SparkContext._
-class KryoSerializerSuite extends FunSuite{
-
+class KryoSerializerSuite extends FunSuite {
test("basic types") {
val ser = (new KryoSerializer).newInstance()
def check[T](t: T) {
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index cf2ffeb9b1..5000fa7307 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -14,6 +14,7 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter {
after {
if(sc != null) {
sc.stop()
+ sc = null
}
}
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index d010a9be7a..426652dc15 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -9,8 +9,9 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
after {
- if(sc != null) {
+ if (sc != null) {
sc.stop()
+ sc = null
}
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index c467a7f916..ba9b36adb7 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -10,8 +10,9 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
after {
- if(sc != null) {
+ if (sc != null) {
sc.stop()
+ sc = null
}
}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 5fa494160f..99d13b31ef 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -18,8 +18,9 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
after {
- if(sc != null) {
+ if (sc != null) {
sc.stop()
+ sc = null
}
}
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala
index 90409a54ec..302f731187 100644
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/spark/ThreadingSuite.scala
@@ -29,6 +29,7 @@ class ThreadingSuite extends FunSuite with BeforeAndAfter {
after {
if(sc != null) {
sc.stop()
+ sc = null
}
}