aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala2
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala6
-rw-r--r--core/src/test/scala/spark/AccumulatorSuite.scala9
-rw-r--r--core/src/test/scala/spark/BroadcastSuite.scala2
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala6
-rw-r--r--core/src/test/scala/spark/FailureSuite.scala2
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala2
-rw-r--r--core/src/test/scala/spark/FileSuite.scala2
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala2
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala2
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala2
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala2
-rw-r--r--core/src/test/scala/spark/SizeEstimatorSuite.scala5
-rw-r--r--core/src/test/scala/spark/SortingSuite.scala3
-rw-r--r--core/src/test/scala/spark/ThreadingSuite.scala2
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala16
-rw-r--r--repl/src/test/scala/spark/repl/ReplSuite.scala2
17 files changed, 44 insertions, 23 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 3da7152a09..ca59f46843 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -22,6 +22,8 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("halting by voting") {
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 46b1574a61..6a006e0697 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -44,11 +44,9 @@ class SparkEnv (
blockManager.stop()
blockManager.master.stop()
actorSystem.shutdown()
- // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
- Thread.sleep(100)
+ // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
+ // down, but let's call it anyway in case it gets fixed in a later release
actorSystem.awaitTermination()
- // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
- Thread.sleep(100)
}
}
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
index b43730468e..d8be99dde7 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -18,6 +18,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
@@ -54,6 +55,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
sc.stop()
sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
}
@@ -85,6 +88,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
} should produce [SparkException]
sc.stop()
sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
}
@@ -111,7 +116,9 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
mapAcc.value should contain (i -> i.toString)
}
sc.stop()
- sc = null
+ sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
}
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala
index 0738a2725b..2d3302f0aa 100644
--- a/core/src/test/scala/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/spark/BroadcastSuite.scala
@@ -12,6 +12,8 @@ class BroadcastSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("basic broadcast") {
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 97cfa9dad1..433d2fdc19 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -27,21 +27,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc = null
}
System.clearProperty("spark.reducer.maxMbInFlight")
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("local-cluster format") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
sc.stop()
+ System.clearProperty("spark.master.port")
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
sc.stop()
+ System.clearProperty("spark.master.port")
sc = new SparkContext("local-cluster[2, 1, 512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
sc.stop()
+ System.clearProperty("spark.master.port")
sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
sc.stop()
+ System.clearProperty("spark.master.port")
sc = null
}
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index 4405829161..a3454f25f6 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -32,6 +32,8 @@ class FailureSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
// Run a 3-task map job in which task 1 deterministically fails once, and check
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index fd7a7bd589..a25b61dcbd 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -31,6 +31,8 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
if (tmpFile.exists) {
tmpFile.delete()
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("Distributing files locally") {
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index 5c1577ed0b..554bea53a9 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -20,6 +20,8 @@ class FileSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("text files") {
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index 5000fa7307..3dadc7acec 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -16,6 +16,8 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index add5221e30..9b84b29227 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -13,6 +13,8 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("basic pipe") {
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 95e402627c..37a0ff0947 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -16,6 +16,8 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("basic operations") {
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 068607824b..7f8ec5d48f 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -24,6 +24,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("groupByKey") {
diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala
index 7677ac6db5..17f366212b 100644
--- a/core/src/test/scala/spark/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala
@@ -20,8 +20,9 @@ class DummyClass4(val d: DummyClass3) {
val x: Int = 0
}
-class SizeEstimatorSuite extends FunSuite
- with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers {
+class SizeEstimatorSuite
+ extends FunSuite with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers {
+
var oldArch: String = _
var oldOops: String = _
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
index c87595ecb3..1ad11ff4c3 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/spark/SortingSuite.scala
@@ -12,7 +12,10 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
after {
if (sc != null) {
sc.stop()
+ sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("sortByKey") {
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala
index 302f731187..e9b1837d89 100644
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/spark/ThreadingSuite.scala
@@ -31,6 +31,8 @@ class ThreadingSuite extends FunSuite with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index f61fd45ed3..213a423fef 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -88,14 +88,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
- Thread.sleep(100)
assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
- Thread.sleep(100)
assert(store.getSingle("a3") === None, "a3 was in store")
}
@@ -107,14 +105,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
- Thread.sleep(100)
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
- Thread.sleep(100)
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") === None, "a3 was in store")
@@ -128,7 +124,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
- Thread.sleep(100)
// Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2
// from the same RDD
assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
@@ -145,7 +140,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
- Thread.sleep(100)
// At this point rdd_1_1 should've replaced rdd_0_1
assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store")
assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
@@ -155,7 +149,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Put in more partitions from RDD 0; they should replace rdd_1_1
store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
- Thread.sleep(100)
// Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
// when we try to add rdd_0_4.
assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store")
@@ -186,7 +179,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
- Thread.sleep(100)
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
@@ -202,7 +194,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
- Thread.sleep(100)
assert(store.getLocalBytes("a2") != None, "a2 was not in store")
assert(store.getLocalBytes("a3") != None, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
@@ -218,7 +209,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
- Thread.sleep(100)
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
@@ -234,7 +224,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
- Thread.sleep(100)
assert(store.getLocalBytes("a2") != None, "a2 was not in store")
assert(store.getLocalBytes("a3") != None, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
@@ -261,7 +250,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a3") != None, "a1 was not in store")
// Now let's add in a4, which uses both disk and memory; a1 should drop out
store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
- Thread.sleep(100)
assert(store.getSingle("a1") == None, "a1 was in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
@@ -276,7 +264,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY)
store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY)
- Thread.sleep(100)
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
assert(store.get("list3") != None, "list3 was not in store")
@@ -286,7 +273,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list2").get.size == 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
- Thread.sleep(100)
assert(store.get("list1") != None, "list1 was not in store")
assert(store.get("list1").get.size == 2)
assert(store.get("list2") != None, "list2 was not in store")
@@ -304,7 +290,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER)
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER)
store.put("list3", list3.iterator, StorageLevel.DISK_ONLY)
- Thread.sleep(100)
// At this point LRU should not kick in because list3 is only on disk
assert(store.get("list1") != None, "list2 was not in store")
assert(store.get("list1").get.size === 2)
@@ -320,7 +305,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER)
- Thread.sleep(100)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size === 2)
diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala
index 0b5d439ca4..db78d06d4f 100644
--- a/repl/src/test/scala/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/spark/repl/ReplSuite.scala
@@ -30,6 +30,8 @@ class ReplSuite extends FunSuite {
spark.repl.Main.interp = null
if (interp.sparkContext != null)
interp.sparkContext.stop()
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
return out.toString
}