aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala (renamed from core/src/test/scala/spark/AccumulatorSuite.scala)4
-rw-r--r--core/src/test/scala/org/apache/spark/BroadcastSuite.scala (renamed from core/src/test/scala/spark/BroadcastSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala (renamed from core/src/test/scala/spark/CheckpointSuite.scala)9
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala (renamed from core/src/test/scala/spark/DistributedSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala (renamed from core/src/test/scala/spark/DriverSuite.scala)5
-rw-r--r--core/src/test/scala/org/apache/spark/FailureSuite.scala (renamed from core/src/test/scala/spark/FailureSuite.scala)42
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala (renamed from core/src/test/scala/spark/FileServerSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala (renamed from core/src/test/scala/spark/FileSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java (renamed from core/src/test/scala/spark/JavaAPISuite.java)54
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala (renamed from core/src/test/scala/spark/LocalSparkContext.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala (renamed from core/src/test/scala/spark/MapOutputTrackerSuite.scala)20
-rw-r--r--core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala (renamed from core/src/test/scala/spark/PartitioningSuite.scala)13
-rw-r--r--core/src/test/scala/org/apache/spark/PipedRDDSuite.scala (renamed from core/src/test/scala/spark/PipedRDDSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/SharedSparkContext.scala (renamed from core/src/test/scala/spark/SharedSparkContext.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala (renamed from core/src/test/scala/spark/ShuffleNettySuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala (renamed from core/src/test/scala/spark/ShuffleSuite.scala)105
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala60
-rw-r--r--core/src/test/scala/org/apache/spark/ThreadingSuite.scala (renamed from core/src/test/scala/spark/ThreadingSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/UnpersistSuite.scala (renamed from core/src/test/scala/spark/UnpersistSuite.scala)4
-rw-r--r--core/src/test/scala/org/apache/spark/ZippedPartitionsSuite.scala (renamed from core/src/test/scala/spark/ZippedPartitionsSuite.scala)4
-rw-r--r--core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala62
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala89
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala54
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala (renamed from core/src/test/scala/spark/rdd/JdbcRDDSuite.scala)6
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala (renamed from core/src/test/scala/spark/PairRDDFunctionsSuite.scala)10
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala (renamed from core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala (renamed from core/src/test/scala/spark/RDDSuite.scala)71
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala (renamed from core/src/test/scala/spark/SortingSuite.scala)6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala (renamed from core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala)57
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala (renamed from core/src/test/scala/spark/scheduler/JobLoggerSuite.scala)14
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala (renamed from core/src/test/scala/spark/scheduler/SparkListenerSuite.scala)6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala (renamed from core/src/test/scala/spark/scheduler/TaskContextSuite.scala)12
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala (renamed from core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala)41
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala273
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala (renamed from core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala)56
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala (renamed from core/src/test/scala/spark/KryoSerializerSuite.scala)71
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala (renamed from core/src/test/scala/spark/storage/BlockManagerSuite.scala)29
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala (renamed from core/src/test/scala/spark/ui/UISuite.scala)9
-rw-r--r--core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala (renamed from core/src/test/scala/spark/ClosureCleanerSuite.scala)7
-rw-r--r--core/src/test/scala/org/apache/spark/util/DistributionSuite.scala (renamed from core/src/test/scala/spark/util/DistributionSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/util/FakeClock.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala (renamed from core/src/test/scala/spark/util/NextIteratorSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala (renamed from core/src/test/scala/spark/util/RateLimitedOutputStreamSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala (renamed from core/src/test/scala/spark/SizeEstimatorSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala (renamed from core/src/test/scala/spark/UtilsSuite.scala)18
47 files changed, 1089 insertions, 230 deletions
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 0af175f316..4434f3b87c 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
@@ -23,7 +23,7 @@ import collection.mutable
import java.util.Random
import scala.math.exp
import scala.math.signum
-import spark.SparkContext._
+import org.apache.spark.SparkContext._
class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
index 785721ece8..b3a53d928b 100644
--- a/core/src/test/scala/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.FunSuite
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index a84c89e3c9..d9103aebb7 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.FunSuite
import java.io.File
-import spark.rdd._
-import spark.SparkContext._
+import org.apache.spark.rdd._
+import org.apache.spark.SparkContext._
import storage.StorageLevel
+import org.apache.spark.util.Utils
class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
initLogging()
@@ -99,7 +100,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("ShuffledRDD") {
testCheckpointing(rdd => {
// Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
- new ShuffledRDD(rdd.map(x => (x % 2, 1)), partitioner)
+ new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
})
}
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index e11efe459c..7a856d4081 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import network.ConnectionManagerId
import org.scalatest.FunSuite
diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index ed16b9d8ef..01a72d8401 100644
--- a/core/src/test/scala/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import java.io.File
@@ -26,6 +26,7 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.time.SpanSugar._
+import org.apache.spark.util.Utils
class DriverSuite extends FunSuite with Timeouts {
test("driver should exit after finishing") {
@@ -34,7 +35,7 @@ class DriverSuite extends FunSuite with Timeouts {
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
failAfter(30 seconds) {
- Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master),
+ Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(System.getenv("SPARK_HOME")))
}
}
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index 6c847b8fef..af448fcb37 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-
-import scala.collection.mutable.ArrayBuffer
import SparkContext._
+import org.apache.spark.util.NonSerializable
// Common state shared by FailureSuite-launched tasks. We use a global object
// for this because any local variables used in the task closures will rightfully
@@ -40,7 +38,7 @@ object FailureSuiteState {
}
class FailureSuite extends FunSuite with LocalSparkContext {
-
+
// Run a 3-task map job in which task 1 deterministically fails once, and check
// whether the job completes successfully and we ran 4 tasks in total.
test("failure in a single-stage job") {
@@ -66,7 +64,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
test("failure in a two-stage job") {
sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
- case (k, v) =>
+ case (k, v) =>
FailureSuiteState.synchronized {
FailureSuiteState.tasksRun += 1
if (k == 1 && FailureSuiteState.tasksFailed == 0) {
@@ -87,12 +85,40 @@ class FailureSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
- val thrown = intercept[spark.SparkException] {
+ val thrown = intercept[SparkException] {
results.collect()
}
- assert(thrown.getClass === classOf[spark.SparkException])
+ assert(thrown.getClass === classOf[SparkException])
+ assert(thrown.getMessage.contains("NotSerializableException"))
+
+ FailureSuiteState.clear()
+ }
+
+ test("failure because task closure is not serializable") {
+ sc = new SparkContext("local[1,1]", "test")
+ val a = new NonSerializable
+
+ // Non-serializable closure in the final result stage
+ val thrown = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).map(x => a).count()
+ }
+ assert(thrown.getClass === classOf[SparkException])
assert(thrown.getMessage.contains("NotSerializableException"))
+ // Non-serializable closure in an earlier stage
+ val thrown1 = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new HashPartitioner(3)).count()
+ }
+ assert(thrown1.getClass === classOf[SparkException])
+ assert(thrown1.getMessage.contains("NotSerializableException"))
+
+ // Non-serializable closure in foreach function
+ val thrown2 = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).foreach(x => println(a))
+ }
+ assert(thrown2.getClass === classOf[SparkException])
+ assert(thrown2.getMessage.contains("NotSerializableException"))
+
FailureSuiteState.clear()
}
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 242ae971f8..35d1d41af1 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import com.google.common.io.Files
import org.scalatest.FunSuite
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 1e2c257c4b..7b82a4cdd9 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import java.io.{FileWriter, PrintWriter, File}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index aaf03e683b..8a869c9005 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package spark;
+package org.apache.spark;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
+import com.google.common.base.Optional;
import scala.Tuple2;
import com.google.common.base.Charsets;
@@ -37,15 +38,15 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import spark.api.java.JavaDoubleRDD;
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.*;
-import spark.partial.BoundedDouble;
-import spark.partial.PartialResult;
-import spark.storage.StorageLevel;
-import spark.util.StatCounter;
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.partial.BoundedDouble;
+import org.apache.spark.partial.PartialResult;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.util.StatCounter;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -198,6 +199,35 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void leftOuterJoin() {
+ JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(1, 2),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(3, 1)
+ ));
+ JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<Integer, Character>(1, 'x'),
+ new Tuple2<Integer, Character>(2, 'y'),
+ new Tuple2<Integer, Character>(2, 'z'),
+ new Tuple2<Integer, Character>(4, 'w')
+ ));
+ List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
+ rdd1.leftOuterJoin(rdd2).collect();
+ Assert.assertEquals(5, joined.size());
+ Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
+ rdd1.leftOuterJoin(rdd2).filter(
+ new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup)
+ throws Exception {
+ return !tup._2()._2().isPresent();
+ }
+ }).first();
+ Assert.assertEquals(3, firstUnmatched._1().intValue());
+ }
+
+ @Test
public void foldReduce() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
@@ -326,7 +356,9 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
Assert.assertEquals(20/6.0, rdd.mean(), 0.01);
Assert.assertEquals(6.22222, rdd.variance(), 0.01);
+ Assert.assertEquals(7.46667, rdd.sampleVariance(), 0.01);
Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
+ Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);
Double first = rdd.first();
List<Double> take = rdd.take(5);
@@ -716,7 +748,7 @@ public class JavaAPISuite implements Serializable {
}
};
- JavaRDD<Integer> sizes = rdd1.zipPartitions(sizesFn, rdd2);
+ JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
}
diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index ddc212d290..6ec124da9c 100644
--- a/core/src/test/scala/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.Suite
import org.scalatest.BeforeAndAfterEach
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index ce6cec0451..6013320eaa 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.FunSuite
import akka.actor._
-import spark.scheduler.MapStatus
-import spark.storage.BlockManagerId
-import spark.util.AkkaUtils
+import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
@@ -112,22 +112,22 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
"akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
masterTracker.registerShuffle(10, 1)
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
+ masterTracker.incrementEpoch()
+ slaveTracker.updateEpoch(masterTracker.getEpoch)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
+ masterTracker.incrementEpoch()
+ slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
+ masterTracker.incrementEpoch()
+ slaveTracker.updateEpoch(masterTracker.getEpoch)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
// failure should be cached
diff --git a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
new file mode 100644
index 0000000000..5a18dd13ff
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
@@ -0,0 +1,28 @@
+package org.apache.spark
+
+import org.scalatest.FunSuite
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.{RDD, PartitionPruningRDD}
+
+
+class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
+
+ test("Pruned Partitions inherit locality prefs correctly") {
+ class TestPartition(i: Int) extends Partition {
+ def index = i
+ }
+ val rdd = new RDD[Int](sc, Nil) {
+ override protected def getPartitions = {
+ Array[Partition](
+ new TestPartition(1),
+ new TestPartition(2),
+ new TestPartition(3))
+ }
+ def compute(split: Partition, context: TaskContext) = {Iterator()}
+ }
+ val prunedRDD = PartitionPruningRDD.create(rdd, {x => if (x==2) true else false})
+ val p = prunedRDD.partitions(0)
+ assert(p.index == 2)
+ assert(prunedRDD.partitions.length == 1)
+ }
+}
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index b1e0b2b4d0..7d938917f2 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -15,13 +15,16 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
-import org.scalatest.FunSuite
-import scala.collection.mutable.ArrayBuffer
-import SparkContext._
-import spark.util.StatCounter
import scala.math.abs
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.util.StatCounter
+import org.apache.spark.rdd.RDD
class PartitioningSuite extends FunSuite with SharedSparkContext {
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
index 35c04710a3..2e851d892d 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.FunSuite
import SparkContext._
diff --git a/core/src/test/scala/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
index 70c24515be..97cbca09bf 100644
--- a/core/src/test/scala/spark/SharedSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.Suite
import org.scalatest.BeforeAndAfterAll
diff --git a/core/src/test/scala/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index 6bad6c1d13..e121b162ad 100644
--- a/core/src/test/scala/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.BeforeAndAfterAll
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 3a56c26bef..db717865db 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -15,27 +15,22 @@
* limitations under the License.
*/
-package spark
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashSet
+package org.apache.spark
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
-import com.google.common.io.Files
+import org.apache.spark.SparkContext._
+import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
+import org.apache.spark.rdd.{RDD, SubtractedRDD, CoGroupedRDD, OrderedRDDFunctions, ShuffledRDD}
+import org.apache.spark.util.MutablePair
+import org.apache.spark.serializer.KryoSerializer
-import spark.rdd.ShuffledRDD
-import spark.SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
- test("groupByKey with compression") {
+ test("groupByKey without compression") {
try {
- System.setProperty("spark.shuffle.compress", "true")
+ System.setProperty("spark.shuffle.compress", "false")
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
@@ -45,7 +40,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
} finally {
- System.setProperty("spark.blockManager.compress", "false")
+ System.setProperty("spark.shuffle.compress", "true")
}
}
@@ -55,12 +50,12 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
- (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
+ (x, new NonJavaSerializableClass(x * 2))
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS),
- classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
+ b, new HashPartitioner(NUM_BLOCKS)).setSerializer(classOf[KryoSerializer].getName)
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 10)
@@ -77,11 +72,12 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
sc = new SparkContext("local-cluster[2,1,512]", "test")
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
- (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
+ (x, new NonJavaSerializableClass(x * 2))
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD(b, new HashPartitioner(3), classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
+ b, new HashPartitioner(3)).setSerializer(classOf[KryoSerializer].getName)
assert(c.count === 10)
}
@@ -96,7 +92,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
// NOTE: The default Java serializer doesn't create zero-sized blocks.
// So, use Kryo
- val c = new ShuffledRDD(b, new HashPartitioner(10), classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
+ .setSerializer(classOf[KryoSerializer].getName)
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 4)
@@ -121,7 +118,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val b = a.map(x => (x, x*2))
// NOTE: The default Java serializer should create zero-sized blocks
- val c = new ShuffledRDD(b, new HashPartitioner(10))
+ val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 4)
@@ -135,6 +132,72 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
// We should have at most 4 non-zero sized partitions
assert(nonEmptyBlocks.size <= 4)
}
+
+ test("shuffle using mutable pairs") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
+ val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
+ val results = new ShuffledRDD[Int, Int, MutablePair[Int, Int]](pairs, new HashPartitioner(2))
+ .collect()
+
+ data.foreach { pair => results should contain (pair) }
+ }
+
+ test("sorting using mutable pairs") {
+ // This is not in SortingSuite because of the local cluster setup.
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
+ val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
+ val results = new OrderedRDDFunctions[Int, Int, MutablePair[Int, Int]](pairs)
+ .sortByKey().collect()
+ results(0) should be (p(1, 11))
+ results(1) should be (p(2, 22))
+ results(2) should be (p(3, 33))
+ results(3) should be (p(100, 100))
+ }
+
+ test("cogroup using mutable pairs") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
+ val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
+ val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
+ val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
+ val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
+
+ assert(results(1)(0).length === 3)
+ assert(results(1)(0).contains(1))
+ assert(results(1)(0).contains(2))
+ assert(results(1)(0).contains(3))
+ assert(results(1)(1).length === 2)
+ assert(results(1)(1).contains("11"))
+ assert(results(1)(1).contains("12"))
+ assert(results(2)(0).length === 1)
+ assert(results(2)(0).contains(1))
+ assert(results(2)(1).length === 1)
+ assert(results(2)(1).contains("22"))
+ assert(results(3)(0).length === 0)
+ assert(results(3)(1).contains("3"))
+ }
+
+ test("subtract mutable pairs") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
+ val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
+ val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
+ val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
+ val results = new SubtractedRDD(pairs1, pairs2, new HashPartitioner(2)).collect()
+ results should have length (1)
+ // substracted rdd return results as Tuple2
+ results(0) should be ((3, 33))
+ }
}
object ShuffleSuite {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
new file mode 100644
index 0000000000..939fe51801
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+import org.apache.spark.SparkContext._
+
+class SparkContextInfoSuite extends FunSuite with LocalSparkContext {
+ test("getPersistentRDDs only returns RDDs that are marked as cached") {
+ sc = new SparkContext("local", "test")
+ assert(sc.getPersistentRDDs.isEmpty === true)
+
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ assert(sc.getPersistentRDDs.isEmpty === true)
+
+ rdd.cache()
+ assert(sc.getPersistentRDDs.size === 1)
+ assert(sc.getPersistentRDDs.values.head === rdd)
+ }
+
+ test("getPersistentRDDs returns an immutable map") {
+ sc = new SparkContext("local", "test")
+ val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
+
+ val myRdds = sc.getPersistentRDDs
+ assert(myRdds.size === 1)
+ assert(myRdds.values.head === rdd1)
+
+ val rdd2 = sc.makeRDD(Array(5, 6, 7, 8), 1).cache()
+
+ // getPersistentRDDs should have 2 RDDs, but myRdds should not change
+ assert(sc.getPersistentRDDs.size === 2)
+ assert(myRdds.size === 1)
+ }
+
+ test("getRDDStorageInfo only reports on RDDs that actually persist data") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
+
+ assert(sc.getRDDStorageInfo.size === 0)
+
+ rdd.collect()
+ assert(sc.getRDDStorageInfo.size === 1)
+ }
+}
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index f2acd0bd3c..69383ddfb8 100644
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicBoolean
diff --git a/core/src/test/scala/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
index 93977d16f4..46a2da1724 100644
--- a/core/src/test/scala/spark/UnpersistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
-import spark.SparkContext._
+import org.apache.spark.SparkContext._
class UnpersistSuite extends FunSuite with LocalSparkContext {
test("unpersist RDD") {
diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/org/apache/spark/ZippedPartitionsSuite.scala
index 5e6d7b09d8..618b9c113b 100644
--- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ZippedPartitionsSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import scala.collection.immutable.NumericRange
@@ -40,7 +40,7 @@ class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
- val zippedRDD = data1.zipPartitions(ZippedPartitionsSuite.procZippedData, data2, data3)
+ val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData)
val obtainedSizes = zippedRDD.collect()
val expectedSizes = Array(2, 3, 1, 2, 3, 1)
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
new file mode 100644
index 0000000000..fd6f69041a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.io
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import org.scalatest.FunSuite
+
+
+class CompressionCodecSuite extends FunSuite {
+
+ def testCodec(codec: CompressionCodec) {
+ // Write 1000 integers to the output stream, compressed.
+ val outputStream = new ByteArrayOutputStream()
+ val out = codec.compressedOutputStream(outputStream)
+ for (i <- 1 until 1000) {
+ out.write(i % 256)
+ }
+ out.close()
+
+ // Read the 1000 integers back.
+ val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
+ val in = codec.compressedInputStream(inputStream)
+ for (i <- 1 until 1000) {
+ assert(in.read() === i % 256)
+ }
+ in.close()
+ }
+
+ test("default compression codec") {
+ val codec = CompressionCodec.createCodec()
+ assert(codec.getClass === classOf[SnappyCompressionCodec])
+ testCodec(codec)
+ }
+
+ test("lzf compression codec") {
+ val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
+ assert(codec.getClass === classOf[LZFCompressionCodec])
+ testCodec(codec)
+ }
+
+ test("snappy compression codec") {
+ val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
+ assert(codec.getClass === classOf[SnappyCompressionCodec])
+ testCodec(codec)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
new file mode 100644
index 0000000000..58c94a162d
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
+ var filePath: String = _
+
+ before {
+ filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile()
+ }
+
+ test("MetricsConfig with default properties") {
+ val conf = new MetricsConfig(Option("dummy-file"))
+ conf.initialize()
+
+ assert(conf.properties.size() === 5)
+ assert(conf.properties.getProperty("test-for-dummy") === null)
+
+ val property = conf.getInstance("random")
+ assert(property.size() === 3)
+ assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
+ assert(property.getProperty("sink.servlet.sample") === "false")
+ }
+
+ test("MetricsConfig with properties set") {
+ val conf = new MetricsConfig(Option(filePath))
+ conf.initialize()
+
+ val masterProp = conf.getInstance("master")
+ assert(masterProp.size() === 6)
+ assert(masterProp.getProperty("sink.console.period") === "20")
+ assert(masterProp.getProperty("sink.console.unit") === "minutes")
+ assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
+ assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
+ assert(masterProp.getProperty("sink.servlet.sample") === "false")
+
+ val workerProp = conf.getInstance("worker")
+ assert(workerProp.size() === 6)
+ assert(workerProp.getProperty("sink.console.period") === "10")
+ assert(workerProp.getProperty("sink.console.unit") === "seconds")
+ assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
+ assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
+ assert(workerProp.getProperty("sink.servlet.sample") === "false")
+ }
+
+ test("MetricsConfig with subProperties") {
+ val conf = new MetricsConfig(Option(filePath))
+ conf.initialize()
+
+ val propCategories = conf.propertyCategories
+ assert(propCategories.size === 3)
+
+ val masterProp = conf.getInstance("master")
+ val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
+ assert(sourceProps.size === 1)
+ assert(sourceProps("jvm").getProperty("class") === "org.apache.spark.metrics.source.JvmSource")
+
+ val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
+ assert(sinkProps.size === 2)
+ assert(sinkProps.contains("console"))
+ assert(sinkProps.contains("servlet"))
+
+ val consoleProps = sinkProps("console")
+ assert(consoleProps.size() === 2)
+
+ val servletProps = sinkProps("servlet")
+ assert(servletProps.size() === 3)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
new file mode 100644
index 0000000000..7181333adf
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.deploy.master.MasterSource
+
+class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
+ var filePath: String = _
+
+ before {
+ filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
+ System.setProperty("spark.metrics.conf", filePath)
+ }
+
+ test("MetricsSystem with default config") {
+ val metricsSystem = MetricsSystem.createMetricsSystem("default")
+ val sources = metricsSystem.sources
+ val sinks = metricsSystem.sinks
+
+ assert(sources.length === 0)
+ assert(sinks.length === 0)
+ assert(!metricsSystem.getServletHandlers.isEmpty)
+ }
+
+ test("MetricsSystem with sources add") {
+ val metricsSystem = MetricsSystem.createMetricsSystem("test")
+ val sources = metricsSystem.sources
+ val sinks = metricsSystem.sinks
+
+ assert(sources.length === 0)
+ assert(sinks.length === 1)
+ assert(!metricsSystem.getServletHandlers.isEmpty)
+
+ val source = new MasterSource(null)
+ metricsSystem.registerSource(source)
+ assert(sources.length === 1)
+ }
+}
diff --git a/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
index dc8ca941c1..3d39a31252 100644
--- a/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark
import org.scalatest.{ BeforeAndAfter, FunSuite }
-import spark.SparkContext._
-import spark.rdd.JdbcRDD
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.JdbcRDD
import java.sql._
class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
diff --git a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index b102eaf4e6..31f97fc139 100644
--- a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark.rdd
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
import com.google.common.io.Files
+import org.apache.spark.SparkContext._
+import org.apache.spark.{Partitioner, SharedSparkContext}
-import spark.rdd.ShuffledRDD
-import spark.SparkContext._
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("groupByKey") {
diff --git a/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
index d1276d541f..a80afdee7e 100644
--- a/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.rdd
+package org.apache.spark.rdd
import scala.collection.immutable.NumericRange
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index cbddf4e523..adc971050e 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -15,14 +15,17 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark.rdd
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
-import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+import scala.collection.parallel.mutable
+import org.apache.spark._
+import org.apache.spark.rdd.CoalescedRDDPartition
class RDDSuite extends FunSuite with SharedSparkContext {
@@ -170,9 +173,69 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// we can optionally shuffle to keep the upstream parallel
val coalesced5 = data.coalesce(1, shuffle = true)
- assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] !=
+ assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
}
+ test("cogrouped RDDs with locality") {
+ val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
+ val coal3 = data3.coalesce(3)
+ val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+ assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
+
+ // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
+ val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i+2)).map{ j => "m" + (j%6)})))
+ val coalesced1 = data.coalesce(3)
+ assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
+
+ val splits = coalesced1.glom().collect().map(_.toList).toList
+ assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length)
+
+ assert(splits.forall(_.length >= 1) === true, "Some partitions were empty")
+
+ // If we try to coalesce into more partitions than the original RDD, it should just
+ // keep the original number of partitions.
+ val coalesced4 = data.coalesce(20)
+ val listOfLists = coalesced4.glom().collect().map(_.toList).toList
+ val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
+ assert( sortedList === (1 to 9).
+ map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
+ }
+
+ test("cogrouped RDDs with locality, large scale (10K partitions)") {
+ // large scale experiment
+ import collection.mutable
+ val rnd = scala.util.Random
+ val partitions = 10000
+ val numMachines = 50
+ val machines = mutable.ListBuffer[String]()
+ (1 to numMachines).foreach(machines += "m"+_)
+
+ val blocks = (1 to partitions).map(i =>
+ { (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) } )
+
+ val data2 = sc.makeRDD(blocks)
+ val coalesced2 = data2.coalesce(numMachines*2)
+
+ // test that you get over 90% locality in each group
+ val minLocality = coalesced2.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
+ .foldLeft(1.)((perc, loc) => math.min(perc,loc))
+ assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
+
+ // test that the groups are load balanced with 100 +/- 20 elements in each
+ val maxImbalance = coalesced2.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
+ .foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
+ assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
+
+ val data3 = sc.makeRDD(blocks).map(i => i*2) // derived RDD to test *current* pref locs
+ val coalesced3 = data3.coalesce(numMachines*2)
+ val minLocality2 = coalesced3.partitions
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
+ .foldLeft(1.)((perc, loc) => math.min(perc,loc))
+ assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
+ (minLocality2*100.).toInt + "%")
+ }
test("zipped RDDs") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala
index b933c4aab8..2f7bd370fc 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark.rdd
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.ShouldMatchers
-import SparkContext._
+
+import org.apache.spark.{Logging, SharedSparkContext}
+import org.apache.spark.SparkContext._
class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers with Logging {
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f802b66cf1..94f66c94c6 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -15,29 +15,26 @@
* limitations under the License.
*/
-package spark.scheduler
+package org.apache.spark.scheduler
import scala.collection.mutable.{Map, HashMap}
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
-import spark.LocalSparkContext
+import org.apache.spark.LocalSparkContext
+import org.apache.spark.MapOutputTracker
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.Partition
+import org.apache.spark.TaskContext
+import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency}
+import org.apache.spark.{FetchFailed, Success, TaskEndReason}
+import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster}
-import spark.storage.BlockManager
-import spark.storage.BlockManagerId
-import spark.storage.BlockManagerMaster
-import spark.{Dependency, ShuffleDependency, OneToOneDependency}
-import spark.FetchFailedException
-import spark.MapOutputTracker
-import spark.RDD
-import spark.SparkContext
-import spark.SparkException
-import spark.Partition
-import spark.TaskContext
-import spark.TaskEndReason
-
-import spark.{FetchFailed, Success}
+import org.apache.spark.scheduler.cluster.Pool
+import org.apache.spark.scheduler.cluster.SchedulingMode
+import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
@@ -56,11 +53,13 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
val taskScheduler = new TaskScheduler() {
+ override def rootPool: Pool = null
+ override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
- taskSet.tasks.foreach(_.generation = mapOutputTracker.getGeneration)
+ taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
taskSets += taskSet
}
override def setListener(listener: TaskSchedulerListener) = {}
@@ -300,10 +299,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val reduceRdd = makeRdd(2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
// pretend we were told hostA went away
- val oldGeneration = mapOutputTracker.getGeneration
+ val oldEpoch = mapOutputTracker.getEpoch
runEvent(ExecutorLost("exec-hostA"))
- val newGeneration = mapOutputTracker.getGeneration
- assert(newGeneration > oldGeneration)
+ val newEpoch = mapOutputTracker.getEpoch
+ assert(newEpoch > oldEpoch)
val noAccum = Map[Long, Any]()
val taskSet = taskSets(0)
// should be ignored for being too old
@@ -312,8 +311,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null))
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
- // should work because it's a new generation
- taskSet.tasks(1).generation = newGeneration
+ // should work because it's a new epoch
+ taskSet.tasks(1).epoch = newEpoch
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
@@ -402,12 +401,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(results === Map(0 -> 42))
}
- /** Assert that the supplied TaskSet has exactly the given preferredLocations. Note, converts taskSet's locations to host only. */
- private def assertLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) {
- assert(locations.size === taskSet.tasks.size)
- for ((expectLocs, taskLocs) <-
- taskSet.tasks.map(_.preferredLocations).zip(locations)) {
- assert(expectLocs.map(loc => spark.Utils.parseHostPort(loc)._1) === taskLocs)
+ /**
+ * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
+ * Note that this checks only the host and not the executor ID.
+ */
+ private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
+ assert(hosts.size === taskSet.tasks.size)
+ for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
+ assert(taskLocs.map(_.host) === expectedLocs)
}
}
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 0f855c38da..cece60dda7 100644
--- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -15,15 +15,19 @@
* limitations under the License.
*/
-package spark.scheduler
+package org.apache.spark.scheduler
import java.util.Properties
import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable
+
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
-import scala.collection.mutable
-import spark._
-import spark.SparkContext._
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
@@ -57,7 +61,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID, None)
val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID, None)
- joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
+ joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4, null))
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
parentRdd.setName("MyRDD")
joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 392d67d67b..aac7c207cb 100644
--- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package spark.scheduler
+package org.apache.spark.scheduler
import org.scalatest.FunSuite
-import spark.{SparkContext, LocalSparkContext}
+import org.apache.spark.{SparkContext, LocalSparkContext}
import scala.collection.mutable
import org.scalatest.matchers.ShouldMatchers
-import spark.SparkContext._
+import org.apache.spark.SparkContext._
/**
*
diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 95a6eee2fc..e31a116a75 100644
--- a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package spark.scheduler
+package org.apache.spark.scheduler
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
-import spark.TaskContext
-import spark.RDD
-import spark.SparkContext
-import spark.Partition
-import spark.LocalSparkContext
+import org.apache.spark.TaskContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.Partition
+import org.apache.spark.LocalSparkContext
class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 8f81d0b6ee..92ad9f09b2 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -15,25 +15,25 @@
* limitations under the License.
*/
-package spark.scheduler
+package org.apache.spark.scheduler.cluster
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
-import spark._
-import spark.scheduler._
-import spark.scheduler.cluster._
+import org.apache.spark._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster._
import scala.collection.mutable.ArrayBuffer
import java.util.Properties
-class DummyTaskSetManager(
+class FakeTaskSetManager(
initPriority: Int,
initStageId: Int,
initNumTasks: Int,
clusterScheduler: ClusterScheduler,
taskSet: TaskSet)
- extends ClusterTaskSetManager(clusterScheduler,taskSet) {
+ extends ClusterTaskSetManager(clusterScheduler, taskSet) {
parent = null
weight = 1
@@ -72,10 +72,16 @@ class DummyTaskSetManager(
override def executorLost(executorId: String, host: String): Unit = {
}
- override def slaveOffer(execId: String, host: String, avaiableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
+ override def resourceOffer(
+ execId: String,
+ host: String,
+ availableCpus: Int,
+ maxLocality: TaskLocality.TaskLocality)
+ : Option[TaskDescription] =
+ {
if (tasksFinished + runningTasks < numTasks) {
increaseRunningTasks(1)
- return Some(new TaskDescription(0, execId, "task 0:0", null))
+ return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
}
return None
}
@@ -98,17 +104,10 @@ class DummyTaskSetManager(
}
}
-class DummyTask(stageId: Int) extends Task[Int](stageId)
-{
- def run(attemptId: Long): Int = {
- return 0
- }
-}
-
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
- def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
- new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
+ def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
+ new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
}
def resourceOffer(rootPool: Pool): Int = {
@@ -118,7 +117,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
- taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
+ taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
case Some(task) =>
return taskSet.stageId
case None => {}
@@ -135,7 +134,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
- val task = new DummyTask(0)
+ val task = new FakeTask(0)
tasks += task
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
@@ -162,7 +161,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
- val task = new DummyTask(0)
+ val task = new FakeTask(0)
tasks += task
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
@@ -219,7 +218,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
- val task = new DummyTask(0)
+ val task = new FakeTask(0)
tasks += task
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
new file mode 100644
index 0000000000..ff70a2cdf0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import org.scalatest.FunSuite
+
+import org.apache.spark._
+import org.apache.spark.scheduler._
+import org.apache.spark.executor.TaskMetrics
+import java.nio.ByteBuffer
+import org.apache.spark.util.{Utils, FakeClock}
+
+/**
+ * A mock ClusterScheduler implementation that just remembers information about tasks started and
+ * feedback received from the TaskSetManagers. Note that it's important to initialize this with
+ * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
+ * to work, and these are required for locality in ClusterTaskSetManager.
+ */
+class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
+ extends ClusterScheduler(sc)
+{
+ val startedTasks = new ArrayBuffer[Long]
+ val endedTasks = new mutable.HashMap[Long, TaskEndReason]
+ val finishedManagers = new ArrayBuffer[TaskSetManager]
+
+ val executors = new mutable.HashMap[String, String] ++ liveExecutors
+
+ listener = new TaskSchedulerListener {
+ def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+ startedTasks += taskInfo.index
+ }
+
+ def taskEnded(
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: mutable.Map[Long, Any],
+ taskInfo: TaskInfo,
+ taskMetrics: TaskMetrics)
+ {
+ endedTasks(taskInfo.index) = reason
+ }
+
+ def executorGained(execId: String, host: String) {}
+
+ def executorLost(execId: String) {}
+
+ def taskSetFailed(taskSet: TaskSet, reason: String) {}
+ }
+
+ def removeExecutor(execId: String): Unit = executors -= execId
+
+ override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
+
+ override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
+
+ override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
+}
+
+class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
+ import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
+
+ val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
+
+ test("TaskSet with no preferences") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+ val taskSet = createTaskSet(1)
+ val manager = new ClusterTaskSetManager(sched, taskSet)
+
+ // Offer a host with no CPUs
+ assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
+
+ // Offer a host with process-local as the constraint; this should work because the TaskSet
+ // above won't have any locality preferences
+ val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ assert(task.executorId === "exec1")
+ assert(sched.startedTasks.contains(0))
+
+ // Re-offer the host -- now we should get no more tasks
+ assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
+
+ // Tell it the task has finished
+ manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
+ assert(sched.endedTasks(0) === Success)
+ assert(sched.finishedManagers.contains(manager))
+ }
+
+ test("multiple offers with no preferences") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+ val taskSet = createTaskSet(3)
+ val manager = new ClusterTaskSetManager(sched, taskSet)
+
+ // First three offers should all find tasks
+ for (i <- 0 until 3) {
+ val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ assert(task.executorId === "exec1")
+ }
+ assert(sched.startedTasks.toSet === Set(0, 1, 2))
+
+ // Re-offer the host -- now we should get no more tasks
+ assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+ // Finish the first two tasks
+ manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
+ manager.statusUpdate(1, TaskState.FINISHED, createTaskResult(1))
+ assert(sched.endedTasks(0) === Success)
+ assert(sched.endedTasks(1) === Success)
+ assert(!sched.finishedManagers.contains(manager))
+
+ // Finish the last task
+ manager.statusUpdate(2, TaskState.FINISHED, createTaskResult(2))
+ assert(sched.endedTasks(2) === Success)
+ assert(sched.finishedManagers.contains(manager))
+ }
+
+ test("basic delay scheduling") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = createTaskSet(4,
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host2", "exec2")),
+ Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
+ Seq() // Last task has no locality prefs
+ )
+ val clock = new FakeClock
+ val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+ // First offer host1, exec1: first task should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Offer host1, exec1 again: the last task, which has no prefs, should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
+
+ // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+ // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
+ assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
+
+ // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
+
+ // Offer host1, exec1 again, at ANY level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // Offer host1, exec1 again, at ANY level: task 1 should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+ // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+ }
+
+ test("delay scheduling with fallback") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc,
+ ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
+ val taskSet = createTaskSet(5,
+ Seq(TaskLocation("host1")),
+ Seq(TaskLocation("host2")),
+ Seq(TaskLocation("host2")),
+ Seq(TaskLocation("host3")),
+ Seq(TaskLocation("host2"))
+ )
+ val clock = new FakeClock
+ val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+ // First offer host1: first task should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Offer host1 again: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // Offer host1 again: second task (on host2) should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+ // Offer host1 again: third task (on host2) should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+
+ // Offer host2: fifth task (also on host2) should get chosen
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
+
+ // Now that we've launched a local task, we should no longer launch the task for host3
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // After another delay, we can go ahead and launch that task non-locally
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
+ }
+
+ test("delay scheduling with failed hosts") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = createTaskSet(3,
+ Seq(TaskLocation("host1")),
+ Seq(TaskLocation("host2")),
+ Seq(TaskLocation("host3"))
+ )
+ val clock = new FakeClock
+ val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+ // First offer host1: first task should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Offer host1 again: third task should be chosen immediately because host3 is not up
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+
+ // After this, nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+ // Now mark host2 as dead
+ sched.removeExecutor("exec2")
+ manager.executorLost("exec2", "host2")
+
+ // Task 1 should immediately be launched on host1 because its original host is gone
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+ // Now that all tasks have launched, nothing new should be launched anywhere else
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+ }
+
+ /**
+ * Utility method to create a TaskSet, potentially setting a particular sequence of preferred
+ * locations for each task (given as varargs) if this sequence is not empty.
+ */
+ def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
+ if (prefLocs.size != 0 && prefLocs.size != numTasks) {
+ throw new IllegalArgumentException("Wrong number of task locations")
+ }
+ val tasks = Array.tabulate[Task[_]](numTasks) { i =>
+ new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
+ }
+ new TaskSet(tasks, 0, 0, 0, null)
+ }
+
+ def createTaskResult(id: Int): ByteBuffer = {
+ ByteBuffer.wrap(Utils.serialize(new TaskResult[Int](id, mutable.Map.empty, new TaskMetrics)))
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
new file mode 100644
index 0000000000..2f12aaed18
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.scheduler.{TaskLocation, Task}
+
+class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId) {
+ override def run(attemptId: Long): Int = 0
+
+ override def preferredLocations: Seq[TaskLocation] = prefLocs
+}
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
index 14bb58731b..111340a65c 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package spark.scheduler
+package org.apache.spark.scheduler.local
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
-import spark._
-import spark.scheduler._
-import spark.scheduler.cluster._
+import org.apache.spark._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ConcurrentMap, HashMap}
import java.util.concurrent.Semaphore
@@ -57,23 +57,23 @@ object TaskThreadInfo {
* 1. each thread contains one job.
* 2. each job contains one stage.
* 3. each stage only contains one task.
- * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
- * it will get cpu core resource, and will wait to finished after user manually
- * release "Lock" and then cluster will contain another free cpu cores.
- * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
+ * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
+ * it will get cpu core resource, and will wait to finished after user manually
+ * release "Lock" and then cluster will contain another free cpu cores.
+ * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
* thus it will be scheduled later when cluster has free cpu cores.
*/
class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
-
+
TaskThreadInfo.threadToRunning(threadIndex) = false
val nums = sc.parallelize(threadIndex to threadIndex, 1)
TaskThreadInfo.threadToLock(threadIndex) = new Lock()
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
- sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName)
+ sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
}
override def run() {
val ans = nums.map(number => {
@@ -88,7 +88,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
}
}.start()
}
-
+
test("Local FIFO scheduler end-to-end test") {
System.setProperty("spark.cluster.schedulingmode", "FIFO")
sc = new SparkContext("local[4]", "test")
@@ -103,8 +103,8 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
createThread(4,null,sc,sem)
TaskThreadInfo.threadToStarted(4).await()
// thread 5 and 6 (stage pending)must meet following two points
- // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
- // queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
+ // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
+ // queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
// 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
// So I just use "sleep" 1s here for each thread.
// TODO: any better solution?
@@ -112,24 +112,24 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
Thread.sleep(1000)
createThread(6,null,sc,sem)
Thread.sleep(1000)
-
+
assert(TaskThreadInfo.threadToRunning(1) === true)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === false)
assert(TaskThreadInfo.threadToRunning(6) === false)
-
+
TaskThreadInfo.threadToLock(1).jobFinished()
TaskThreadInfo.threadToStarted(5).await()
-
+
assert(TaskThreadInfo.threadToRunning(1) === false)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === false)
-
+
TaskThreadInfo.threadToLock(3).jobFinished()
TaskThreadInfo.threadToStarted(6).await()
@@ -139,7 +139,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === true)
-
+
TaskThreadInfo.threadToLock(2).jobFinished()
TaskThreadInfo.threadToLock(4).jobFinished()
TaskThreadInfo.threadToLock(5).jobFinished()
@@ -160,18 +160,18 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToStarted(20).await()
createThread(30,"3",sc,sem)
TaskThreadInfo.threadToStarted(30).await()
-
+
assert(TaskThreadInfo.threadToRunning(10) === true)
assert(TaskThreadInfo.threadToRunning(20) === true)
assert(TaskThreadInfo.threadToRunning(30) === true)
-
+
createThread(11,"1",sc,sem)
TaskThreadInfo.threadToStarted(11).await()
createThread(21,"2",sc,sem)
TaskThreadInfo.threadToStarted(21).await()
createThread(31,"3",sc,sem)
TaskThreadInfo.threadToStarted(31).await()
-
+
assert(TaskThreadInfo.threadToRunning(11) === true)
assert(TaskThreadInfo.threadToRunning(21) === true)
assert(TaskThreadInfo.threadToRunning(31) === true)
@@ -185,19 +185,19 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(12) === true)
assert(TaskThreadInfo.threadToRunning(22) === true)
assert(TaskThreadInfo.threadToRunning(32) === false)
-
+
TaskThreadInfo.threadToLock(10).jobFinished()
TaskThreadInfo.threadToStarted(32).await()
-
+
assert(TaskThreadInfo.threadToRunning(32) === true)
- //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
+ //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
// queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
//2. priority of 23 and 33 will be meaningless as using fair scheduler here.
createThread(23,"2",sc,sem)
createThread(33,"3",sc,sem)
Thread.sleep(1000)
-
+
TaskThreadInfo.threadToLock(11).jobFinished()
TaskThreadInfo.threadToStarted(23).await()
@@ -206,7 +206,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToLock(12).jobFinished()
TaskThreadInfo.threadToStarted(33).await()
-
+
assert(TaskThreadInfo.threadToRunning(33) === true)
TaskThreadInfo.threadToLock(20).jobFinished()
@@ -217,7 +217,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToLock(31).jobFinished()
TaskThreadInfo.threadToLock(32).jobFinished()
TaskThreadInfo.threadToLock(33).jobFinished()
-
- sem.acquire(11)
+
+ sem.acquire(11)
}
}
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index c3323dcbb3..0164dda0ba 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark.serializer
import scala.collection.mutable
-import scala.collection.immutable
-import org.scalatest.FunSuite
-import com.esotericsoftware.kryo._
+import com.esotericsoftware.kryo.Kryo
-import SparkContext._
+import org.scalatest.FunSuite
+import org.apache.spark.SharedSparkContext
+import org.apache.spark.serializer.KryoTest._
-class KryoSerializerSuite extends FunSuite {
+class KryoSerializerSuite extends FunSuite with SharedSparkContext {
test("basic types") {
val ser = (new KryoSerializer).newInstance()
def check[T](t: T) {
@@ -53,6 +53,7 @@ class KryoSerializerSuite extends FunSuite {
check(Array(true, false, true))
check(Array('a', 'b', 'c'))
check(Array[Int]())
+ check(Array(Array("1", "2"), Array("1", "2", "3", "4")))
}
test("pairs") {
@@ -103,7 +104,6 @@ class KryoSerializerSuite extends FunSuite {
}
test("custom registrator") {
- import spark.test._
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
val ser = (new KryoSerializer).newInstance()
@@ -123,14 +123,65 @@ class KryoSerializerSuite extends FunSuite {
val hashMap = new java.util.HashMap[String, String]
hashMap.put("foo", "bar")
check(hashMap)
-
+
+ System.clearProperty("spark.kryo.registrator")
+ }
+
+ test("kryo with collect") {
+ val control = 1 :: 2 :: Nil
+ val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)).collect().map(_.x)
+ assert(control === result.toSeq)
+ }
+
+ test("kryo with parallelize") {
+ val control = 1 :: 2 :: Nil
+ val result = sc.parallelize(control.map(new ClassWithoutNoArgConstructor(_))).map(_.x).collect()
+ assert (control === result.toSeq)
+ }
+
+ test("kryo with parallelize for specialized tuples") {
+ assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).count === 3)
+ }
+
+ test("kryo with parallelize for primitive arrays") {
+ assert (sc.parallelize( Array(1, 2, 3) ).count === 3)
+ }
+
+ test("kryo with collect for specialized tuples") {
+ assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11))
+ }
+
+ test("kryo with reduce") {
+ val control = 1 :: 2 :: Nil
+ val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
+ .reduce((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
+ assert(control.sum === result)
+ }
+
+ // TODO: this still doesn't work
+ ignore("kryo with fold") {
+ val control = 1 :: 2 :: Nil
+ val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
+ .fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
+ assert(10 + control.sum === result)
+ }
+
+ override def beforeAll() {
+ System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
+ super.beforeAll()
+ }
+
+ override def afterAll() {
+ super.afterAll()
System.clearProperty("spark.kryo.registrator")
+ System.clearProperty("spark.serializer")
}
}
-package test {
+object KryoTest {
case class CaseClass(i: Int, s: String) {}
-
+
class ClassWithNoArgConstructor {
var x: Int = 0
override def equals(other: Any) = other match {
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index b719d65342..038a9acb85 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.storage
+package org.apache.spark.storage
import java.nio.ByteBuffer
@@ -29,11 +29,8 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers._
import org.scalatest.time.SpanSugar._
-import spark.JavaSerializer
-import spark.KryoSerializer
-import spark.SizeEstimator
-import spark.util.AkkaUtils
-import spark.util.ByteBufferInputStream
+import org.apache.spark.util.{SizeEstimator, Utils, AkkaUtils, ByteBufferInputStream}
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
@@ -56,7 +53,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
System.setProperty("spark.hostPort", "localhost:" + boundPort)
master = new BlockManagerMaster(
- actorSystem.actorOf(Props(new spark.storage.BlockManagerMasterActor(true))))
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
@@ -65,7 +62,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
// Set some value ...
- System.setProperty("spark.hostPort", spark.Utils.localHostName() + ":" + 1111)
+ System.setProperty("spark.hostPort", Utils.localHostName() + ":" + 1111)
}
after {
@@ -105,10 +102,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(level2 === level1, "level2 is not same as level1")
assert(level2.eq(level1), "level2 is not the same object as level1")
assert(level3 != level1, "level3 is same as level1")
- val bytes1 = spark.Utils.serialize(level1)
- val level1_ = spark.Utils.deserialize[StorageLevel](bytes1)
- val bytes2 = spark.Utils.serialize(level2)
- val level2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+ val bytes1 = Utils.serialize(level1)
+ val level1_ = Utils.deserialize[StorageLevel](bytes1)
+ val bytes2 = Utils.serialize(level2)
+ val level2_ = Utils.deserialize[StorageLevel](bytes2)
assert(level1_ === level1, "Deserialized level1 not same as original level1")
assert(level1_.eq(level1), "Deserialized level1 not the same object as original level2")
assert(level2_ === level2, "Deserialized level2 not same as original level2")
@@ -122,10 +119,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(id2 === id1, "id2 is not same as id1")
assert(id2.eq(id1), "id2 is not the same object as id1")
assert(id3 != id1, "id3 is same as id1")
- val bytes1 = spark.Utils.serialize(id1)
- val id1_ = spark.Utils.deserialize[BlockManagerId](bytes1)
- val bytes2 = spark.Utils.serialize(id2)
- val id2_ = spark.Utils.deserialize[BlockManagerId](bytes2)
+ val bytes1 = Utils.serialize(id1)
+ val id1_ = Utils.deserialize[BlockManagerId](bytes1)
+ val bytes2 = Utils.serialize(id2)
+ val id2_ = Utils.deserialize[BlockManagerId](bytes2)
assert(id1_ === id1, "Deserialized id1 is not same as original id1")
assert(id1_.eq(id1), "Deserialized id1 is not the same object as original id1")
assert(id2_ === id2, "Deserialized id2 is not same as original id2")
diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 56c1fed6ad..3321fb5eb7 100644
--- a/core/src/test/scala/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.ui
+package org.apache.spark.ui
import scala.util.{Failure, Success, Try}
import java.net.ServerSocket
@@ -24,14 +24,15 @@ import org.eclipse.jetty.server.Server
class UISuite extends FunSuite {
test("jetty port increases under contention") {
- val startPort = 33333
+ val startPort = 3030
val server = new Server(startPort)
server.start()
val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
- assert(boundPort1 === startPort + 1)
- assert(boundPort2 === startPort + 2)
+ // Allow some wiggle room in case ports on the machine are under contention
+ assert(boundPort1 > startPort && boundPort1 < startPort + 10)
+ assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
}
test("jetty binds to port 0 correctly") {
diff --git a/core/src/test/scala/spark/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 7d2831e19c..0ed366fb70 100644
--- a/core/src/test/scala/spark/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark.util
import java.io.NotSerializableException
import org.scalatest.FunSuite
-import spark.LocalSparkContext._
-import SparkContext._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.LocalSparkContext._
class ClosureCleanerSuite extends FunSuite {
test("closures inside an object") {
diff --git a/core/src/test/scala/spark/util/DistributionSuite.scala b/core/src/test/scala/org/apache/spark/util/DistributionSuite.scala
index 6578b55e82..63642461e4 100644
--- a/core/src/test/scala/spark/util/DistributionSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/DistributionSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.util
+package org.apache.spark.util
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
diff --git a/core/src/test/scala/org/apache/spark/util/FakeClock.scala b/core/src/test/scala/org/apache/spark/util/FakeClock.scala
new file mode 100644
index 0000000000..0a45917b08
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/FakeClock.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+class FakeClock extends Clock {
+ private var time = 0L
+
+ def advance(millis: Long): Unit = time += millis
+
+ def getTime(): Long = time
+}
diff --git a/core/src/test/scala/spark/util/NextIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
index fdbd43d941..45867463a5 100644
--- a/core/src/test/scala/spark/util/NextIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.util
+package org.apache.spark.util
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
diff --git a/core/src/test/scala/spark/util/RateLimitedOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
index 4c0044202f..a9dd0b1a5b 100644
--- a/core/src/test/scala/spark/util/RateLimitedOutputStreamSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.util
+package org.apache.spark.util
import org.scalatest.FunSuite
import java.io.ByteArrayOutputStream
diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 1ef812dfbd..4e40dcbdee 100644
--- a/core/src/test/scala/spark/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark.util
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 31c3b25c50..e2859caf58 100644
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark
+package org.apache.spark.util
import com.google.common.base.Charsets
import com.google.common.io.Files
@@ -26,14 +26,14 @@ import scala.util.Random
class UtilsSuite extends FunSuite {
- test("memoryBytesToString") {
- assert(Utils.memoryBytesToString(10) === "10.0 B")
- assert(Utils.memoryBytesToString(1500) === "1500.0 B")
- assert(Utils.memoryBytesToString(2000000) === "1953.1 KB")
- assert(Utils.memoryBytesToString(2097152) === "2.0 MB")
- assert(Utils.memoryBytesToString(2306867) === "2.2 MB")
- assert(Utils.memoryBytesToString(5368709120L) === "5.0 GB")
- assert(Utils.memoryBytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
+ test("bytesToString") {
+ assert(Utils.bytesToString(10) === "10.0 B")
+ assert(Utils.bytesToString(1500) === "1500.0 B")
+ assert(Utils.bytesToString(2000000) === "1953.1 KB")
+ assert(Utils.bytesToString(2097152) === "2.0 MB")
+ assert(Utils.bytesToString(2306867) === "2.2 MB")
+ assert(Utils.bytesToString(5368709120L) === "5.0 GB")
+ assert(Utils.bytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
}
test("copyStream") {