aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala366
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/FailureSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala108
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java83
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/SharedSparkContext.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala110
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala132
-rw-r--r--core/src/test/scala/org/apache/spark/UnpersistSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala271
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala34
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala48
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala (renamed from core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala)18
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala66
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala (renamed from core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala)3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala (renamed from core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala)34
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala (renamed from core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala)35
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala227
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala101
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala73
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala74
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala37
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala)31
39 files changed, 1434 insertions, 701 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 4434f3b87c..c443c5266e 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -27,6 +27,21 @@ import org.apache.spark.SparkContext._
class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
+
+ implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
+ def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
+ t1 ++= t2
+ t1
+ }
+ def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
+ t1 += t2
+ t1
+ }
+ def zero(t: mutable.Set[A]) : mutable.Set[A] = {
+ new mutable.HashSet[A]()
+ }
+ }
+
test ("basic accumulation"){
sc = new SparkContext("local", "test")
val acc : Accumulator[Int] = sc.accumulator(0)
@@ -51,7 +66,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
}
test ("add value to collection accumulators") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -68,22 +82,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
}
}
- implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
- def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
- t1 ++= t2
- t1
- }
- def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = {
- t1 += t2
- t1
- }
- def zero(t: mutable.Set[Any]) : mutable.Set[Any] = {
- new mutable.HashSet[Any]()
- }
- }
-
test ("value not readable in tasks") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -125,7 +124,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
}
test ("localValue readable in tasks") {
- import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index f26c44d3e7..ec13b329b2 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark
+import scala.reflect.ClassTag
import org.scalatest.FunSuite
import java.io.File
import org.apache.spark.rdd._
@@ -25,8 +26,6 @@ import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
import org.apache.spark.util.Utils
class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
- initLogging()
-
var checkpointDir: File = _
val partitioner = new HashPartitioner(2)
@@ -56,17 +55,15 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
test("RDDs with one-to-one dependencies") {
- testCheckpointing(_.map(x => x.toString))
- testCheckpointing(_.flatMap(x => 1 to x))
- testCheckpointing(_.filter(_ % 2 == 0))
- testCheckpointing(_.sample(false, 0.5, 0))
- testCheckpointing(_.glom())
- testCheckpointing(_.mapPartitions(_.map(_.toString)))
- testCheckpointing(r => new MapPartitionsWithContextRDD(r,
- (context: TaskContext, iter: Iterator[Int]) => iter.map(_.toString), false ))
- testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
- testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
- testCheckpointing(_.pipe(Seq("cat")))
+ testRDD(_.map(x => x.toString))
+ testRDD(_.flatMap(x => 1 to x))
+ testRDD(_.filter(_ % 2 == 0))
+ testRDD(_.sample(false, 0.5, 0))
+ testRDD(_.glom())
+ testRDD(_.mapPartitions(_.map(_.toString)))
+ testRDD(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
+ testRDD(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
+ testRDD(_.pipe(Seq("cat")))
}
test("ParallelCollection") {
@@ -98,7 +95,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
test("ShuffledRDD") {
- testCheckpointing(rdd => {
+ testRDD(rdd => {
// Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
})
@@ -106,25 +103,17 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("UnionRDD") {
def otherRDD = sc.makeRDD(1 to 10, 1)
-
- // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed.
- // Current implementation of UnionRDD has transient reference to parent RDDs,
- // so only the partitions will reduce in serialized size, not the RDD.
- testCheckpointing(_.union(otherRDD), false, true)
- testParentCheckpointing(_.union(otherRDD), false, true)
+ testRDD(_.union(otherRDD))
+ testRDDPartitions(_.union(otherRDD))
}
test("CartesianRDD") {
def otherRDD = sc.makeRDD(1 to 10, 1)
- testCheckpointing(new CartesianRDD(sc, _, otherRDD))
-
- // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
- // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
- // so only the RDD will reduce in serialized size, not the partitions.
- testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false)
+ testRDD(new CartesianRDD(sc, _, otherRDD))
+ testRDDPartitions(new CartesianRDD(sc, _, otherRDD))
// Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after
- // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
+ // the parent RDD has been checkpointed and parent partitions have been changed.
// Note that this test is very specific to the current implementation of CartesianRDD.
val ones = sc.makeRDD(1 to 100, 10).map(x => x)
ones.checkpoint() // checkpoint that MappedRDD
@@ -135,23 +124,20 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
val splitAfterCheckpoint =
serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
assert(
- (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) &&
- (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2),
- "CartesianRDD.parents not updated after parent RDD checkpointed"
+ (splitAfterCheckpoint.s1.getClass != splitBeforeCheckpoint.s1.getClass) &&
+ (splitAfterCheckpoint.s2.getClass != splitBeforeCheckpoint.s2.getClass),
+ "CartesianRDD.s1 and CartesianRDD.s2 not updated after parent RDD is checkpointed"
)
}
test("CoalescedRDD") {
- testCheckpointing(_.coalesce(2))
-
- // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
- // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
- // so only the RDD will reduce in serialized size, not the partitions.
- testParentCheckpointing(_.coalesce(2), true, false)
+ testRDD(_.coalesce(2))
+ testRDDPartitions(_.coalesce(2))
- // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after
- // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
- // Note that this test is very specific to the current implementation of CoalescedRDDPartitions
+ // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents)
+ // after the parent RDD has been checkpointed and parent partitions have been changed.
+ // Note that this test is very specific to the current implementation of
+ // CoalescedRDDPartitions.
val ones = sc.makeRDD(1 to 100, 10).map(x => x)
ones.checkpoint() // checkpoint that MappedRDD
val coalesced = new CoalescedRDD(ones, 2)
@@ -161,33 +147,78 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
val splitAfterCheckpoint =
serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
assert(
- splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head,
- "CoalescedRDDPartition.parents not updated after parent RDD checkpointed"
+ splitAfterCheckpoint.parents.head.getClass != splitBeforeCheckpoint.parents.head.getClass,
+ "CoalescedRDDPartition.parents not updated after parent RDD is checkpointed"
)
}
test("CoGroupedRDD") {
- val longLineageRDD1 = generateLongLineageRDDForCoGroupedRDD()
- testCheckpointing(rdd => {
+ val longLineageRDD1 = generateFatPairRDD()
+ testRDD(rdd => {
CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
- }, false, true)
+ })
- val longLineageRDD2 = generateLongLineageRDDForCoGroupedRDD()
- testParentCheckpointing(rdd => {
+ val longLineageRDD2 = generateFatPairRDD()
+ testRDDPartitions(rdd => {
CheckpointSuite.cogroup(
longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
- }, false, true)
+ })
}
test("ZippedRDD") {
- testCheckpointing(
- rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
-
- // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed
- // Current implementation of ZippedRDDPartitions has transient references to parent RDDs,
- // so only the RDD will reduce in serialized size, not the partitions.
- testParentCheckpointing(
- rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
+ testRDD(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)))
+ testRDDPartitions(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)))
+
+ // Test that the ZippedPartition updates parent partitions
+ // after the parent RDD has been checkpointed and parent partitions have been changed.
+ // Note that this test is very specific to the current implementation of ZippedRDD.
+ val rdd = generateFatRDD()
+ val zippedRDD = new ZippedRDD(sc, rdd, rdd.map(x => x))
+ zippedRDD.rdd1.checkpoint()
+ zippedRDD.rdd2.checkpoint()
+ val partitionBeforeCheckpoint =
+ serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]])
+ zippedRDD.count()
+ val partitionAfterCheckpoint =
+ serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]])
+ assert(
+ partitionAfterCheckpoint.partition1.getClass != partitionBeforeCheckpoint.partition1.getClass &&
+ partitionAfterCheckpoint.partition2.getClass != partitionBeforeCheckpoint.partition2.getClass,
+ "ZippedRDD.partition1 and ZippedRDD.partition2 not updated after parent RDD is checkpointed"
+ )
+ }
+
+ test("PartitionerAwareUnionRDD") {
+ testRDD(rdd => {
+ new PartitionerAwareUnionRDD[(Int, Int)](sc, Array(
+ generateFatPairRDD(),
+ rdd.map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
+ ))
+ })
+
+ testRDDPartitions(rdd => {
+ new PartitionerAwareUnionRDD[(Int, Int)](sc, Array(
+ generateFatPairRDD(),
+ rdd.map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
+ ))
+ })
+
+ // Test that the PartitionerAwareUnionRDD updates parent partitions
+ // (PartitionerAwareUnionRDD.parents) after the parent RDD has been checkpointed and parent
+ // partitions have been changed. Note that this test is very specific to the current
+ // implementation of PartitionerAwareUnionRDD.
+ val pairRDD = generateFatPairRDD()
+ pairRDD.checkpoint()
+ val unionRDD = new PartitionerAwareUnionRDD(sc, Array(pairRDD))
+ val partitionBeforeCheckpoint = serializeDeserialize(
+ unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
+ pairRDD.count()
+ val partitionAfterCheckpoint = serializeDeserialize(
+ unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
+ assert(
+ partitionBeforeCheckpoint.parents.head.getClass != partitionAfterCheckpoint.parents.head.getClass,
+ "PartitionerAwareUnionRDDPartition.parents not updated after parent RDD is checkpointed"
+ )
}
test("CheckpointRDD with zero partitions") {
@@ -201,29 +232,32 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
/**
- * Test checkpointing of the final RDD generated by the given operation. By default,
- * this method tests whether the size of serialized RDD has reduced after checkpointing or not.
- * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or
- * not, but this is not done by default as usually the partitions do not refer to any RDD and
- * therefore never store the lineage.
+ * Test checkpointing of the RDD generated by the given operation. It tests whether the
+ * serialized size of the RDD is reduce after checkpointing or not. This function should be called
+ * on all RDDs that have a parent RDD (i.e., do not call on ParallelCollection, BlockRDD, etc.).
*/
- def testCheckpointing[U: ClassManifest](
- op: (RDD[Int]) => RDD[U],
- testRDDSize: Boolean = true,
- testRDDPartitionSize: Boolean = false
- ) {
+ def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
// Generate the final RDD using given RDD operation
- val baseRDD = generateLongLineageRDD()
+ val baseRDD = generateFatRDD()
val operatedRDD = op(baseRDD)
val parentRDD = operatedRDD.dependencies.headOption.orNull
val rddType = operatedRDD.getClass.getSimpleName
val numPartitions = operatedRDD.partitions.length
+ // Force initialization of all the data structures in RDDs
+ // Without this, serializing the RDD will give a wrong estimate of the size of the RDD
+ initializeRdd(operatedRDD)
+
+ val partitionsBeforeCheckpoint = operatedRDD.partitions
+
// Find serialized sizes before and after the checkpoint
- val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+ logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
+ val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
operatedRDD.checkpoint()
val result = operatedRDD.collect()
- val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+ operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
+ val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+ logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
// Test whether the checkpoint file has been created
assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
@@ -231,6 +265,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
// Test whether dependencies have been changed from its earlier parent RDD
assert(operatedRDD.dependencies.head.rdd != parentRDD)
+ // Test whether the partitions have been changed from its earlier partitions
+ assert(operatedRDD.partitions.toList != partitionsBeforeCheckpoint.toList)
+
// Test whether the partitions have been changed to the new Hadoop partitions
assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList)
@@ -240,122 +277,72 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
// Test whether the data in the checkpointed RDD is same as original
assert(operatedRDD.collect() === result)
- // Test whether serialized size of the RDD has reduced. If the RDD
- // does not have any dependency to another RDD (e.g., ParallelCollection,
- // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing.
- if (testRDDSize) {
- logInfo("Size of " + rddType +
- "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
- assert(
- rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
- "Size of " + rddType + " did not reduce after checkpointing " +
- "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
- )
- }
+ // Test whether serialized size of the RDD has reduced.
+ logInfo("Size of " + rddType +
+ " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
+ assert(
+ rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
+ "Size of " + rddType + " did not reduce after checkpointing " +
+ " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
+ )
- // Test whether serialized size of the partitions has reduced. If the partitions
- // do not have any non-transient reference to another RDD or another RDD's partitions, it
- // does not refer to a lineage and therefore may not reduce in size after checkpointing.
- // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions
- // must be forgotten after checkpointing (to remove all reference to parent RDDs) and
- // replaced with the HadooPartitions of the checkpointed RDD.
- if (testRDDPartitionSize) {
- logInfo("Size of " + rddType + " partitions "
- + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]")
- assert(
- splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
- "Size of " + rddType + " partitions did not reduce after checkpointing " +
- "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
- )
- }
}
/**
* Test whether checkpointing of the parent of the generated RDD also
* truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent
* RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
- * this RDD will remember the partitions and therefore potentially the whole lineage.
+ * the generated RDD will remember the partitions and therefore potentially the whole lineage.
+ * This function should be called only those RDD whose partitions refer to parent RDD's
+ * partitions (i.e., do not call it on simple RDD like MappedRDD).
+ *
*/
- def testParentCheckpointing[U: ClassManifest](
- op: (RDD[Int]) => RDD[U],
- testRDDSize: Boolean,
- testRDDPartitionSize: Boolean
- ) {
+ def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
// Generate the final RDD using given RDD operation
- val baseRDD = generateLongLineageRDD()
+ val baseRDD = generateFatRDD()
val operatedRDD = op(baseRDD)
- val parentRDD = operatedRDD.dependencies.head.rdd
+ val parentRDDs = operatedRDD.dependencies.map(_.rdd)
val rddType = operatedRDD.getClass.getSimpleName
- val parentRDDType = parentRDD.getClass.getSimpleName
- // Get the partitions and dependencies of the parent in case they're lazily computed
- parentRDD.dependencies
- parentRDD.partitions
+ // Force initialization of all the data structures in RDDs
+ // Without this, serializing the RDD will give a wrong estimate of the size of the RDD
+ initializeRdd(operatedRDD)
// Find serialized sizes before and after the checkpoint
- val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
- parentRDD.checkpoint() // checkpoint the parent RDD, not the generated one
- val result = operatedRDD.collect()
- val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+ logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
+ val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+ parentRDDs.foreach(_.checkpoint()) // checkpoint the parent RDD, not the generated one
+ val result = operatedRDD.collect() // force checkpointing
+ operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
+ val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+ logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
// Test whether the data in the checkpointed RDD is same as original
assert(operatedRDD.collect() === result)
- // Test whether serialized size of the RDD has reduced because of its parent being
- // checkpointed. If this RDD or its parent RDD do not have any dependency
- // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may
- // not reduce in size after checkpointing.
- if (testRDDSize) {
- assert(
- rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
- "Size of " + rddType + " did not reduce after checkpointing parent " + parentRDDType +
- "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
- )
- }
-
- // Test whether serialized size of the partitions has reduced because of its parent being
- // checkpointed. If the partitions do not have any non-transient reference to another RDD
- // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce
- // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent
- // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's
- // partitions must have changed after checkpointing.
- if (testRDDPartitionSize) {
- assert(
- splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
- "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType +
- "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
- )
- }
-
+ // Test whether serialized size of the partitions has reduced
+ logInfo("Size of partitions of " + rddType +
+ " [" + partitionSizeBeforeCheckpoint + " --> " + partitionSizeAfterCheckpoint + "]")
+ assert(
+ partitionSizeAfterCheckpoint < partitionSizeBeforeCheckpoint,
+ "Size of " + rddType + " partitions did not reduce after checkpointing parent RDDs" +
+ " [" + partitionSizeBeforeCheckpoint + " --> " + partitionSizeAfterCheckpoint + "]"
+ )
}
/**
- * Generate an RDD with a long lineage of one-to-one dependencies.
+ * Generate an RDD such that both the RDD and its partitions have large size.
*/
- def generateLongLineageRDD(): RDD[Int] = {
- var rdd = sc.makeRDD(1 to 100, 4)
- for (i <- 1 to 50) {
- rdd = rdd.map(x => x + 1)
- }
- rdd
+ def generateFatRDD(): RDD[Int] = {
+ new FatRDD(sc.makeRDD(1 to 100, 4)).map(x => x)
}
/**
- * Generate an RDD with a long lineage specifically for CoGroupedRDD.
- * A CoGroupedRDD can have a long lineage only one of its parents have a long lineage
- * and narrow dependency with this RDD. This method generate such an RDD by a sequence
- * of cogroups and mapValues which creates a long lineage of narrow dependencies.
+ * Generate an pair RDD (with partitioner) such that both the RDD and its partitions
+ * have large size.
*/
- def generateLongLineageRDDForCoGroupedRDD() = {
- val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _)
-
- def ones: RDD[(Int, Int)] = sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
-
- var cogrouped: RDD[(Int, (Seq[Int], Seq[Int]))] = ones.cogroup(ones)
- for(i <- 1 to 10) {
- cogrouped = cogrouped.mapValues(add).cogroup(ones)
- }
- cogrouped.mapValues(add)
+ def generateFatPairRDD() = {
+ new FatPairRDD(sc.makeRDD(1 to 100, 4), partitioner).mapValues(x => x)
}
/**
@@ -363,8 +350,26 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint.
*/
def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
- (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length,
- Utils.serialize(rdd.partitions).length)
+ val rddSize = Utils.serialize(rdd).size
+ val rddCpDataSize = Utils.serialize(rdd.checkpointData).size
+ val rddPartitionSize = Utils.serialize(rdd.partitions).size
+ val rddDependenciesSize = Utils.serialize(rdd.dependencies).size
+
+ // Print detailed size, helps in debugging
+ logInfo("Serialized sizes of " + rdd +
+ ": RDD = " + rddSize +
+ ", RDD checkpoint data = " + rddCpDataSize +
+ ", RDD partitions = " + rddPartitionSize +
+ ", RDD dependencies = " + rddDependenciesSize
+ )
+ // this makes sure that serializing the RDD's checkpoint data does not
+ // serialize the whole RDD as well
+ assert(
+ rddSize > rddCpDataSize,
+ "RDD's checkpoint data (" + rddCpDataSize + ") is equal or larger than the " +
+ "whole RDD with checkpoint data (" + rddSize + ")"
+ )
+ (rddSize - rddCpDataSize, rddPartitionSize)
}
/**
@@ -376,8 +381,49 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
val bytes = Utils.serialize(obj)
Utils.deserialize[T](bytes)
}
+
+ /**
+ * Recursively force the initialization of the all members of an RDD and it parents.
+ */
+ def initializeRdd(rdd: RDD[_]) {
+ rdd.partitions // forces the
+ rdd.dependencies.map(_.rdd).foreach(initializeRdd(_))
+ }
}
+/** RDD partition that has large serialized size. */
+class FatPartition(val partition: Partition) extends Partition {
+ val bigData = new Array[Byte](10000)
+ def index: Int = partition.index
+}
+
+/** RDD that has large serialized size. */
+class FatRDD(parent: RDD[Int]) extends RDD[Int](parent) {
+ val bigData = new Array[Byte](100000)
+
+ protected def getPartitions: Array[Partition] = {
+ parent.partitions.map(p => new FatPartition(p))
+ }
+
+ def compute(split: Partition, context: TaskContext): Iterator[Int] = {
+ parent.compute(split.asInstanceOf[FatPartition].partition, context)
+ }
+}
+
+/** Pair RDD that has large serialized size. */
+class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int, Int)](parent) {
+ val bigData = new Array[Byte](100000)
+
+ protected def getPartitions: Array[Partition] = {
+ parent.partitions.map(p => new FatPartition(p))
+ }
+
+ @transient override val partitioner = Some(_partitioner)
+
+ def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = {
+ parent.compute(split.asInstanceOf[FatPartition].partition, context).map(x => (x, x))
+ }
+}
object CheckpointSuite {
// This is a custom cogroup function that does not use mapValues like
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 480bac84f3..d9cb7fead5 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -122,7 +122,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
}
assert(thrown.getClass === classOf[SparkException])
- assert(thrown.getMessage.contains("more than 4 times"))
+ assert(thrown.getMessage.contains("failed 4 times"))
}
test("caching") {
@@ -303,12 +303,13 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
Thread.sleep(200)
}
} catch {
- case _ => { Thread.sleep(10) }
+ case _: Throwable => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
}
}
+
}
object DistributedSuite {
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 01a72d8401..fb89537258 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -30,13 +30,15 @@ import org.apache.spark.util.Utils
class DriverSuite extends FunSuite with Timeouts {
test("driver should exit after finishing") {
- assert(System.getenv("SPARK_HOME") != null)
+ val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
- failAfter(30 seconds) {
- Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master),
- new File(System.getenv("SPARK_HOME")))
+ failAfter(60 seconds) {
+ Utils.executeAndGetOutput(
+ Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
+ new File(sparkHome),
+ Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index af448fcb37..befdc1589f 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -42,7 +42,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
// Run a 3-task map job in which task 1 deterministically fails once, and check
// whether the job completes successfully and we ran 4 tasks in total.
test("failure in a single-stage job") {
- sc = new SparkContext("local[1,1]", "test")
+ sc = new SparkContext("local[1,2]", "test")
val results = sc.makeRDD(1 to 3, 3).map { x =>
FailureSuiteState.synchronized {
FailureSuiteState.tasksRun += 1
@@ -62,7 +62,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
// Run a map-reduce job in which a reduce task deterministically fails once.
test("failure in a two-stage job") {
- sc = new SparkContext("local[1,1]", "test")
+ sc = new SparkContext("local[1,2]", "test")
val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
case (k, v) =>
FailureSuiteState.synchronized {
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index c210dd5c3b..a2eb9a4e84 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -17,33 +17,49 @@
package org.apache.spark
+import java.io._
+import java.util.jar.{JarEntry, JarOutputStream}
+
+import SparkContext._
import com.google.common.io.Files
import org.scalatest.FunSuite
-import java.io.{File, PrintWriter, FileReader, BufferedReader}
-import SparkContext._
class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpFile: File = _
- @transient var testJarFile: File = _
-
- override def beforeEach() {
- super.beforeEach()
- // Create a sample text file
- val tmpdir = new File(Files.createTempDir(), "test")
- tmpdir.mkdir()
- tmpFile = new File(tmpdir, "FileServerSuite.txt")
- val pw = new PrintWriter(tmpFile)
+ @transient var tmpJarUrl: String = _
+
+ override def beforeAll() {
+ super.beforeAll()
+ val tmpDir = new File(Files.createTempDir(), "test")
+ tmpDir.mkdir()
+
+ val textFile = new File(tmpDir, "FileServerSuite.txt")
+ val pw = new PrintWriter(textFile)
pw.println("100")
pw.close()
- }
+
+ val jarFile = new File(tmpDir, "test.jar")
+ val jarStream = new FileOutputStream(jarFile)
+ val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
- override def afterEach() {
- super.afterEach()
- // Clean up downloaded file
- if (tmpFile.exists) {
- tmpFile.delete()
+ val jarEntry = new JarEntry(textFile.getName)
+ jar.putNextEntry(jarEntry)
+
+ val in = new FileInputStream(textFile)
+ val buffer = new Array[Byte](10240)
+ var nRead = 0
+ while (nRead <= 0) {
+ nRead = in.read(buffer, 0, buffer.length)
+ jar.write(buffer, 0, nRead)
}
+
+ in.close()
+ jar.close()
+ jarStream.close()
+
+ tmpFile = textFile
+ tmpJarUrl = jarFile.toURI.toURL.toString
}
test("Distributing files locally") {
@@ -77,18 +93,13 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test ("Dynamically adding JARS locally") {
sc = new SparkContext("local[4]", "test")
- val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
- sc.addJar(sampleJarFile)
- val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
- val result = sc.parallelize(testData).reduceByKey { (x,y) =>
- val fac = Thread.currentThread.getContextClassLoader()
- .loadClass("org.uncommons.maths.Maths")
- .getDeclaredMethod("factorial", classOf[Int])
- val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
- val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
- a + b
- }.collect()
- assert(result.toSet === Set((1,2), (2,7), (3,121)))
+ sc.addJar(tmpJarUrl)
+ val testData = Array((1, 1))
+ sc.parallelize(testData).foreach { x =>
+ if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
+ throw new SparkException("jar not added")
+ }
+ }
}
test("Distributing files on a standalone cluster") {
@@ -107,33 +118,24 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
- val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
- sc.addJar(sampleJarFile)
- val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
- val result = sc.parallelize(testData).reduceByKey { (x,y) =>
- val fac = Thread.currentThread.getContextClassLoader()
- .loadClass("org.uncommons.maths.Maths")
- .getDeclaredMethod("factorial", classOf[Int])
- val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
- val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
- a + b
- }.collect()
- assert(result.toSet === Set((1,2), (2,7), (3,121)))
+ sc.addJar(tmpJarUrl)
+ val testData = Array((1,1))
+ sc.parallelize(testData).foreach { x =>
+ if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
+ throw new SparkException("jar not added")
+ }
+ }
}
test ("Dynamically adding JARS on a standalone cluster using local: URL") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
- val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
- sc.addJar(sampleJarFile.replace("file", "local"))
- val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
- val result = sc.parallelize(testData).reduceByKey { (x,y) =>
- val fac = Thread.currentThread.getContextClassLoader()
- .loadClass("org.uncommons.maths.Maths")
- .getDeclaredMethod("factorial", classOf[Int])
- val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
- val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
- a + b
- }.collect()
- assert(result.toSet === Set((1,2), (2,7), (3,121)))
+ sc.addJar(tmpJarUrl.replace("file", "local"))
+ val testData = Array((1,1))
+ sc.parallelize(testData).foreach { x =>
+ if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
+ throw new SparkException("jar not added")
+ }
+ }
}
+
}
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 352036f182..23ec6c3b31 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -365,6 +365,20 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void javaDoubleRDDHistoGram() {
+ JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+ // Test using generated buckets
+ Tuple2<double[], long[]> results = rdd.histogram(2);
+ double[] expected_buckets = {1.0, 2.5, 4.0};
+ long[] expected_counts = {2, 2};
+ Assert.assertArrayEquals(expected_buckets, results._1, 0.1);
+ Assert.assertArrayEquals(expected_counts, results._2);
+ // Test with provided buckets
+ long[] histogram = rdd.histogram(expected_buckets);
+ Assert.assertArrayEquals(expected_counts, histogram);
+ }
+
+ @Test
public void map() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
@@ -837,7 +851,7 @@ public class JavaAPISuite implements Serializable {
public void checkpointAndComputation() {
File tempDir = Files.createTempDir();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
+ sc.setCheckpointDir(tempDir.getAbsolutePath());
Assert.assertEquals(false, rdd.isCheckpointed());
rdd.checkpoint();
rdd.count(); // Forces the DAG to cause a checkpoint
@@ -849,7 +863,7 @@ public class JavaAPISuite implements Serializable {
public void checkpointAndRestore() {
File tempDir = Files.createTempDir();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
+ sc.setCheckpointDir(tempDir.getAbsolutePath());
Assert.assertEquals(false, rdd.isCheckpointed());
rdd.checkpoint();
rdd.count(); // Forces the DAG to cause a checkpoint
@@ -883,4 +897,69 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
}
+
+ @Test
+ public void collectPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i % 2);
+ }
+ });
+
+ List[] parts = rdd1.collectPartitions(new int[] {0});
+ Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+ parts = rdd1.collectPartitions(new int[] {1, 2});
+ Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+ Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(2, 0)),
+ rdd2.collectPartitions(new int[] {0})[0]);
+
+ parts = rdd2.collectPartitions(new int[] {1, 2});
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+ new Tuple2<Integer, Integer>(4, 0)),
+ parts[0]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+ new Tuple2<Integer, Integer>(6, 0),
+ new Tuple2<Integer, Integer>(7, 1)),
+ parts[1]);
+ }
+
+ @Test
+ public void countApproxDistinct() {
+ List<Integer> arrayData = new ArrayList<Integer>();
+ int size = 100;
+ for (int i = 0; i < 100000; i++) {
+ arrayData.add(i % size);
+ }
+ JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
+ Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
+ Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
+ Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
+ }
+
+ @Test
+ public void countApproxDistinctByKey() {
+ double relativeSD = 0.001;
+
+ List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
+ for (int i = 10; i < 100; i++)
+ for (int j = 0; j < i; j++)
+ arrayData.add(new Tuple2<Integer, Integer>(i, j));
+
+ JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
+ List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect();
+ for (Tuple2<Integer, Object> resItem : res) {
+ double count = (double)resItem._1();
+ Long resCount = (Long)resItem._2();
+ Double error = Math.abs((resCount - count) / count);
+ Assert.assertTrue(error < relativeSD);
+ }
+
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index d8a0e983b2..1121e06e2e 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -114,7 +114,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
// Once A is cancelled, job B should finish fairly quickly.
assert(jobB.get() === 100)
}
-
+/*
test("two jobs sharing the same stage") {
// sem1: make sure cancel is issued after some tasks are launched
// sem2: make sure the first stage is not finished until cancel is issued
@@ -148,7 +148,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
intercept[SparkException] { f1.get() }
intercept[SparkException] { f2.get() }
}
-
+ */
def testCount() {
// Cancel before launching any tasks
{
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index b7eb268bd5..afc1beff98 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -23,9 +23,10 @@ import akka.actor._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils
+import scala.concurrent.Await
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
-
+ private val conf = new SparkConf
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1)
@@ -48,14 +49,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master start and stop") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTrackerMaster()
+ val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.stop()
}
test("master register and fetch") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTrackerMaster()
+ val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
@@ -74,7 +75,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTrackerMaster()
+ val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
@@ -96,18 +97,20 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("remote fetch") {
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
- val masterTracker = new MapOutputTrackerMaster()
+ val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
- val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
- val slaveTracker = new MapOutputTracker()
- slaveTracker.trackerActor = slaveSystem.actorFor(
- "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf)
+ val slaveTracker = new MapOutputTracker(conf)
+ val selection = slaveSystem.actorSelection(
+ s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+ val timeout = AkkaUtils.lookupTimeout(conf)
+ slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
index 288aa14eeb..c650ef4ed5 100644
--- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
@@ -27,8 +27,10 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
def sc: SparkContext = _sc
+ var conf = new SparkConf(false)
+
override def beforeAll() {
- _sc = new SparkContext("local", "test")
+ _sc = new SparkContext("local", "test", conf)
super.beforeAll()
}
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
new file mode 100644
index 0000000000..ef5936dd2f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -0,0 +1,110 @@
+package org.apache.spark
+
+import org.scalatest.FunSuite
+
+class SparkConfSuite extends FunSuite with LocalSparkContext {
+ // This test uses the spark.conf in core/src/test/resources, which has a few test properties
+ test("loading from spark.conf") {
+ val conf = new SparkConf()
+ assert(conf.get("spark.test.intTestProperty") === "1")
+ assert(conf.get("spark.test.stringTestProperty") === "hi")
+ // NOTE: we don't use list properties yet, but when we do, we'll have to deal with this syntax
+ assert(conf.get("spark.test.listTestProperty") === "[a, b]")
+ }
+
+ // This test uses the spark.conf in core/src/test/resources, which has a few test properties
+ test("system properties override spark.conf") {
+ try {
+ System.setProperty("spark.test.intTestProperty", "2")
+ val conf = new SparkConf()
+ assert(conf.get("spark.test.intTestProperty") === "2")
+ assert(conf.get("spark.test.stringTestProperty") === "hi")
+ } finally {
+ System.clearProperty("spark.test.intTestProperty")
+ }
+ }
+
+ test("initializing without loading defaults") {
+ try {
+ System.setProperty("spark.test.intTestProperty", "2")
+ val conf = new SparkConf(false)
+ assert(!conf.contains("spark.test.intTestProperty"))
+ assert(!conf.contains("spark.test.stringTestProperty"))
+ } finally {
+ System.clearProperty("spark.test.intTestProperty")
+ }
+ }
+
+ test("named set methods") {
+ val conf = new SparkConf(false)
+
+ conf.setMaster("local[3]")
+ conf.setAppName("My app")
+ conf.setSparkHome("/path")
+ conf.setJars(Seq("a.jar", "b.jar"))
+ conf.setExecutorEnv("VAR1", "value1")
+ conf.setExecutorEnv(Seq(("VAR2", "value2"), ("VAR3", "value3")))
+
+ assert(conf.get("spark.master") === "local[3]")
+ assert(conf.get("spark.app.name") === "My app")
+ assert(conf.get("spark.home") === "/path")
+ assert(conf.get("spark.jars") === "a.jar,b.jar")
+ assert(conf.get("spark.executorEnv.VAR1") === "value1")
+ assert(conf.get("spark.executorEnv.VAR2") === "value2")
+ assert(conf.get("spark.executorEnv.VAR3") === "value3")
+
+ // Test the Java-friendly versions of these too
+ conf.setJars(Array("c.jar", "d.jar"))
+ conf.setExecutorEnv(Array(("VAR4", "value4"), ("VAR5", "value5")))
+ assert(conf.get("spark.jars") === "c.jar,d.jar")
+ assert(conf.get("spark.executorEnv.VAR4") === "value4")
+ assert(conf.get("spark.executorEnv.VAR5") === "value5")
+ }
+
+ test("basic get and set") {
+ val conf = new SparkConf(false)
+ assert(conf.getAll.toSet === Set())
+ conf.set("k1", "v1")
+ conf.setAll(Seq(("k2", "v2"), ("k3", "v3")))
+ assert(conf.getAll.toSet === Set(("k1", "v1"), ("k2", "v2"), ("k3", "v3")))
+ conf.set("k1", "v4")
+ conf.setAll(Seq(("k2", "v5"), ("k3", "v6")))
+ assert(conf.getAll.toSet === Set(("k1", "v4"), ("k2", "v5"), ("k3", "v6")))
+ assert(conf.contains("k1"), "conf did not contain k1")
+ assert(!conf.contains("k4"), "conf contained k4")
+ assert(conf.get("k1") === "v4")
+ intercept[Exception] { conf.get("k4") }
+ assert(conf.get("k4", "not found") === "not found")
+ assert(conf.getOption("k1") === Some("v4"))
+ assert(conf.getOption("k4") === None)
+ }
+
+ test("creating SparkContext without master and app name") {
+ val conf = new SparkConf(false)
+ intercept[SparkException] { sc = new SparkContext(conf) }
+ }
+
+ test("creating SparkContext without master") {
+ val conf = new SparkConf(false).setAppName("My app")
+ intercept[SparkException] { sc = new SparkContext(conf) }
+ }
+
+ test("creating SparkContext without app name") {
+ val conf = new SparkConf(false).setMaster("local")
+ intercept[SparkException] { sc = new SparkContext(conf) }
+ }
+
+ test("creating SparkContext with both master and app name") {
+ val conf = new SparkConf(false).setMaster("local").setAppName("My app")
+ sc = new SparkContext(conf)
+ assert(sc.master === "local")
+ assert(sc.appName === "My app")
+ }
+
+ test("SparkContext property overriding") {
+ val conf = new SparkConf(false).setMaster("local").setAppName("My app")
+ sc = new SparkContext("local[2]", "My other app", conf)
+ assert(sc.master === "local[2]")
+ assert(sc.appName === "My other app")
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
new file mode 100644
index 0000000000..f28d5c7b13
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.{FunSuite, PrivateMethodTester}
+
+import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskScheduler}
+import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import org.apache.spark.scheduler.local.LocalBackend
+
+class SparkContextSchedulerCreationSuite
+ extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {
+
+ def createTaskScheduler(master: String): TaskSchedulerImpl = {
+ // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
+ // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
+ sc = new SparkContext("local", "test")
+ val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
+ val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test")
+ sched.asInstanceOf[TaskSchedulerImpl]
+ }
+
+ test("bad-master") {
+ val e = intercept[SparkException] {
+ createTaskScheduler("localhost:1234")
+ }
+ assert(e.getMessage.contains("Could not parse Master URL"))
+ }
+
+ test("local") {
+ val sched = createTaskScheduler("local")
+ sched.backend match {
+ case s: LocalBackend => assert(s.totalCores === 1)
+ case _ => fail()
+ }
+ }
+
+ test("local-n") {
+ val sched = createTaskScheduler("local[5]")
+ assert(sched.maxTaskFailures === 1)
+ sched.backend match {
+ case s: LocalBackend => assert(s.totalCores === 5)
+ case _ => fail()
+ }
+ }
+
+ test("local-n-failures") {
+ val sched = createTaskScheduler("local[4, 2]")
+ assert(sched.maxTaskFailures === 2)
+ sched.backend match {
+ case s: LocalBackend => assert(s.totalCores === 4)
+ case _ => fail()
+ }
+ }
+
+ test("simr") {
+ createTaskScheduler("simr://uri").backend match {
+ case s: SimrSchedulerBackend => // OK
+ case _ => fail()
+ }
+ }
+
+ test("local-cluster") {
+ createTaskScheduler("local-cluster[3, 14, 512]").backend match {
+ case s: SparkDeploySchedulerBackend => // OK
+ case _ => fail()
+ }
+ }
+
+ def testYarn(master: String, expectedClassName: String) {
+ try {
+ val sched = createTaskScheduler(master)
+ assert(sched.getClass === Class.forName(expectedClassName))
+ } catch {
+ case e: SparkException =>
+ assert(e.getMessage.contains("YARN mode not available"))
+ logWarning("YARN not available, could not test actual YARN scheduler creation")
+ case e: Throwable => fail(e)
+ }
+ }
+
+ test("yarn-standalone") {
+ testYarn("yarn-standalone", "org.apache.spark.scheduler.cluster.YarnClusterScheduler")
+ }
+
+ test("yarn-client") {
+ testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
+ }
+
+ def testMesos(master: String, expectedClass: Class[_]) {
+ try {
+ val sched = createTaskScheduler(master)
+ assert(sched.backend.getClass === expectedClass)
+ } catch {
+ case e: UnsatisfiedLinkError =>
+ assert(e.getMessage.contains("no mesos in"))
+ logWarning("Mesos not available, could not test actual Mesos scheduler creation")
+ case e: Throwable => fail(e)
+ }
+ }
+
+ test("mesos fine-grained") {
+ System.setProperty("spark.mesos.coarse", "false")
+ testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend])
+ }
+
+ test("mesos coarse-grained") {
+ System.setProperty("spark.mesos.coarse", "true")
+ testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend])
+ }
+
+ test("mesos with zookeeper") {
+ System.setProperty("spark.mesos.coarse", "false")
+ testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend])
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
index 46a2da1724..768ca3850e 100644
--- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
@@ -37,7 +37,7 @@ class UnpersistSuite extends FunSuite with LocalSparkContext {
Thread.sleep(200)
}
} catch {
- case _ => { Thread.sleep(10) }
+ case _: Throwable => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 0b38e239f9..331fa3a642 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -70,10 +70,11 @@ class JsonProtocolSuite extends FunSuite {
def createAppDesc() : ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
- new ApplicationDescription("name", 4, 1234, cmd, "sparkHome", "appUiUrl")
+ new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl")
}
def createAppInfo() : ApplicationInfo = {
- new ApplicationInfo(3, "id", createAppDesc(), new Date(123456789), null, "appUriStr")
+ new ApplicationInfo(
+ 3, "id", createAppDesc(), new Date(123456789), null, "appUriStr", Int.MaxValue)
}
def createWorkerInfo() : WorkerInfo = {
new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 8f0954122b..be93074b7b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -1,14 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.deploy.worker
import java.io.File
+
import org.scalatest.FunSuite
+
import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription}
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
- val sparkHome = sys.env("SPARK_HOME")
- val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()),
+ val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+ val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index ab81bfbe55..8d7546085f 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.io
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
class CompressionCodecSuite extends FunSuite {
+ val conf = new SparkConf(false)
def testCodec(codec: CompressionCodec) {
// Write 1000 integers to the output stream, compressed.
@@ -43,19 +45,19 @@ class CompressionCodecSuite extends FunSuite {
}
test("default compression codec") {
- val codec = CompressionCodec.createCodec()
+ val codec = CompressionCodec.createCodec(conf)
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}
test("lzf compression codec") {
- val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
+ val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}
test("snappy compression codec") {
- val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
+ val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 7181333adf..71a2c6c498 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -19,17 +19,19 @@ package org.apache.spark.metrics
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.deploy.master.MasterSource
+import org.apache.spark.SparkConf
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
+ var conf: SparkConf = null
before {
filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
- System.setProperty("spark.metrics.conf", filePath)
+ conf = new SparkConf(false).set("spark.metrics.conf", filePath)
}
test("MetricsSystem with default config") {
- val metricsSystem = MetricsSystem.createMetricsSystem("default")
+ val metricsSystem = MetricsSystem.createMetricsSystem("default", conf)
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
@@ -39,7 +41,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
}
test("MetricsSystem with sources add") {
- val metricsSystem = MetricsSystem.createMetricsSystem("test")
+ val metricsSystem = MetricsSystem.createMetricsSystem("test", conf)
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index da032b17d9..0d4c10db8e 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.rdd
import java.util.concurrent.Semaphore
+import scala.concurrent.{Await, TimeoutException}
+import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
import org.scalatest.{BeforeAndAfterAll, FunSuite}
@@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts
sem.acquire(2)
}
}
+
+ /**
+ * Awaiting FutureAction results
+ */
+ test("FutureAction result, infinite wait") {
+ val f = sc.parallelize(1 to 100, 4)
+ .countAsync()
+ assert(Await.result(f, Duration.Inf) === 100)
+ }
+
+ test("FutureAction result, finite wait") {
+ val f = sc.parallelize(1 to 100, 4)
+ .countAsync()
+ assert(Await.result(f, Duration(30, "seconds")) === 100)
+ }
+
+ test("FutureAction result, timeout") {
+ val f = sc.parallelize(1 to 100, 4)
+ .mapPartitions(itr => { Thread.sleep(20); itr })
+ .countAsync()
+ intercept[TimeoutException] {
+ Await.result(f, Duration(20, "milliseconds"))
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
new file mode 100644
index 0000000000..7f50a5a47c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.math.abs
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+import org.apache.spark._
+
+class DoubleRDDSuite extends FunSuite with SharedSparkContext {
+ // Verify tests on the histogram functionality. We test with both evenly
+ // and non-evenly spaced buckets as the bucket lookup function changes.
+ test("WorksOnEmpty") {
+ // Make sure that it works on an empty input
+ val rdd: RDD[Double] = sc.parallelize(Seq())
+ val buckets = Array(0.0, 10.0)
+ val histogramResults = rdd.histogram(buckets)
+ val histogramResults2 = rdd.histogram(buckets, true)
+ val expectedHistogramResults = Array(0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
+ }
+
+ test("WorksWithOutOfRangeWithOneBucket") {
+ // Verify that if all of the elements are out of range the counts are zero
+ val rdd = sc.parallelize(Seq(10.01, -0.01))
+ val buckets = Array(0.0, 10.0)
+ val histogramResults = rdd.histogram(buckets)
+ val histogramResults2 = rdd.histogram(buckets, true)
+ val expectedHistogramResults = Array(0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
+ }
+
+ test("WorksInRangeWithOneBucket") {
+ // Verify the basic case of one bucket and all elements in that bucket works
+ val rdd = sc.parallelize(Seq(1, 2, 3, 4))
+ val buckets = Array(0.0, 10.0)
+ val histogramResults = rdd.histogram(buckets)
+ val histogramResults2 = rdd.histogram(buckets, true)
+ val expectedHistogramResults = Array(4)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
+ }
+
+ test("WorksInRangeWithOneBucketExactMatch") {
+ // Verify the basic case of one bucket and all elements in that bucket works
+ val rdd = sc.parallelize(Seq(1, 2, 3, 4))
+ val buckets = Array(1.0, 4.0)
+ val histogramResults = rdd.histogram(buckets)
+ val histogramResults2 = rdd.histogram(buckets, true)
+ val expectedHistogramResults = Array(4)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
+ }
+
+ test("WorksWithOutOfRangeWithTwoBuckets") {
+ // Verify that out of range works with two buckets
+ val rdd = sc.parallelize(Seq(10.01, -0.01))
+ val buckets = Array(0.0, 5.0, 10.0)
+ val histogramResults = rdd.histogram(buckets)
+ val histogramResults2 = rdd.histogram(buckets, true)
+ val expectedHistogramResults = Array(0, 0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
+ }
+
+ test("WorksWithOutOfRangeWithTwoUnEvenBuckets") {
+ // Verify that out of range works with two un even buckets
+ val rdd = sc.parallelize(Seq(10.01, -0.01))
+ val buckets = Array(0.0, 4.0, 10.0)
+ val histogramResults = rdd.histogram(buckets)
+ val expectedHistogramResults = Array(0, 0)
+ assert(histogramResults === expectedHistogramResults)
+ }
+
+ test("WorksInRangeWithTwoBuckets") {
+ // Make sure that it works with two equally spaced buckets and elements in each
+ val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6))
+ val buckets = Array(0.0, 5.0, 10.0)
+ val histogramResults = rdd.histogram(buckets)
+ val histogramResults2 = rdd.histogram(buckets, true)
+ val expectedHistogramResults = Array(3, 2)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
+ }
+
+ test("WorksInRangeWithTwoBucketsAndNaN") {
+ // Make sure that it works with two equally spaced buckets and elements in each
+ val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6, Double.NaN))
+ val buckets = Array(0.0, 5.0, 10.0)
+ val histogramResults = rdd.histogram(buckets)
+ val histogramResults2 = rdd.histogram(buckets, true)
+ val expectedHistogramResults = Array(3, 2)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramResults2 === expectedHistogramResults)
+ }
+
+ test("WorksInRangeWithTwoUnevenBuckets") {
+ // Make sure that it works with two unequally spaced buckets and elements in each
+ val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6))
+ val buckets = Array(0.0, 5.0, 11.0)
+ val histogramResults = rdd.histogram(buckets)
+ val expectedHistogramResults = Array(3, 2)
+ assert(histogramResults === expectedHistogramResults)
+ }
+
+ test("WorksMixedRangeWithTwoUnevenBuckets") {
+ // Make sure that it works with two unequally spaced buckets and elements in each
+ val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01))
+ val buckets = Array(0.0, 5.0, 11.0)
+ val histogramResults = rdd.histogram(buckets)
+ val expectedHistogramResults = Array(4, 3)
+ assert(histogramResults === expectedHistogramResults)
+ }
+
+ test("WorksMixedRangeWithFourUnevenBuckets") {
+ // Make sure that it works with two unequally spaced buckets and elements in each
+ val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+ 200.0, 200.1))
+ val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0)
+ val histogramResults = rdd.histogram(buckets)
+ val expectedHistogramResults = Array(4, 2, 1, 3)
+ assert(histogramResults === expectedHistogramResults)
+ }
+
+ test("WorksMixedRangeWithUnevenBucketsAndNaN") {
+ // Make sure that it works with two unequally spaced buckets and elements in each
+ val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+ 200.0, 200.1, Double.NaN))
+ val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0)
+ val histogramResults = rdd.histogram(buckets)
+ val expectedHistogramResults = Array(4, 2, 1, 3)
+ assert(histogramResults === expectedHistogramResults)
+ }
+ // Make sure this works with a NaN end bucket
+ test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRange") {
+ // Make sure that it works with two unequally spaced buckets and elements in each
+ val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+ 200.0, 200.1, Double.NaN))
+ val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN)
+ val histogramResults = rdd.histogram(buckets)
+ val expectedHistogramResults = Array(4, 2, 1, 2, 3)
+ assert(histogramResults === expectedHistogramResults)
+ }
+ // Make sure this works with a NaN end bucket and an inifity
+ test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRangeAndInfity") {
+ // Make sure that it works with two unequally spaced buckets and elements in each
+ val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+ 200.0, 200.1, 1.0/0.0, -1.0/0.0, Double.NaN))
+ val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN)
+ val histogramResults = rdd.histogram(buckets)
+ val expectedHistogramResults = Array(4, 2, 1, 2, 4)
+ assert(histogramResults === expectedHistogramResults)
+ }
+
+ test("WorksWithOutOfRangeWithInfiniteBuckets") {
+ // Verify that out of range works with two buckets
+ val rdd = sc.parallelize(Seq(10.01, -0.01, Double.NaN))
+ val buckets = Array(-1.0/0.0 , 0.0, 1.0/0.0)
+ val histogramResults = rdd.histogram(buckets)
+ val expectedHistogramResults = Array(1, 1)
+ assert(histogramResults === expectedHistogramResults)
+ }
+ // Test the failure mode with an invalid bucket array
+ test("ThrowsExceptionOnInvalidBucketArray") {
+ val rdd = sc.parallelize(Seq(1.0))
+ // Empty array
+ intercept[IllegalArgumentException] {
+ val buckets = Array.empty[Double]
+ val result = rdd.histogram(buckets)
+ }
+ // Single element array
+ intercept[IllegalArgumentException] {
+ val buckets = Array(1.0)
+ val result = rdd.histogram(buckets)
+ }
+ }
+
+ // Test automatic histogram function
+ test("WorksWithoutBucketsBasic") {
+ // Verify the basic case of one bucket and all elements in that bucket works
+ val rdd = sc.parallelize(Seq(1, 2, 3, 4))
+ val (histogramBuckets, histogramResults) = rdd.histogram(1)
+ val expectedHistogramResults = Array(4)
+ val expectedHistogramBuckets = Array(1.0, 4.0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramBuckets === expectedHistogramBuckets)
+ }
+ // Test automatic histogram function with a single element
+ test("WorksWithoutBucketsBasicSingleElement") {
+ // Verify the basic case of one bucket and all elements in that bucket works
+ val rdd = sc.parallelize(Seq(1))
+ val (histogramBuckets, histogramResults) = rdd.histogram(1)
+ val expectedHistogramResults = Array(1)
+ val expectedHistogramBuckets = Array(1.0, 1.0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramBuckets === expectedHistogramBuckets)
+ }
+ // Test automatic histogram function with a single element
+ test("WorksWithoutBucketsBasicNoRange") {
+ // Verify the basic case of one bucket and all elements in that bucket works
+ val rdd = sc.parallelize(Seq(1, 1, 1, 1))
+ val (histogramBuckets, histogramResults) = rdd.histogram(1)
+ val expectedHistogramResults = Array(4)
+ val expectedHistogramBuckets = Array(1.0, 1.0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramBuckets === expectedHistogramBuckets)
+ }
+
+ test("WorksWithoutBucketsBasicTwo") {
+ // Verify the basic case of one bucket and all elements in that bucket works
+ val rdd = sc.parallelize(Seq(1, 2, 3, 4))
+ val (histogramBuckets, histogramResults) = rdd.histogram(2)
+ val expectedHistogramResults = Array(2, 2)
+ val expectedHistogramBuckets = Array(1.0, 2.5, 4.0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramBuckets === expectedHistogramBuckets)
+ }
+
+ test("WorksWithoutBucketsWithMoreRequestedThanElements") {
+ // Verify the basic case of one bucket and all elements in that bucket works
+ val rdd = sc.parallelize(Seq(1, 2))
+ val (histogramBuckets, histogramResults) = rdd.histogram(10)
+ val expectedHistogramResults =
+ Array(1, 0, 0, 0, 0, 0, 0, 0, 0, 1)
+ val expectedHistogramBuckets =
+ Array(1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0)
+ assert(histogramResults === expectedHistogramResults)
+ assert(histogramBuckets === expectedHistogramBuckets)
+ }
+
+ // Test the failure mode with an invalid RDD
+ test("ThrowsExceptionOnInvalidRDDs") {
+ // infinity
+ intercept[UnsupportedOperationException] {
+ val rdd = sc.parallelize(Seq(1, 1.0/0.0))
+ val result = rdd.histogram(1)
+ }
+ // NaN
+ intercept[UnsupportedOperationException] {
+ val rdd = sc.parallelize(Seq(1, Double.NaN))
+ val result = rdd.histogram(1)
+ }
+ // Empty
+ intercept[UnsupportedOperationException] {
+ val rdd: RDD[Double] = sc.parallelize(Seq())
+ val result = rdd.histogram(1)
+ }
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 57d3382ed0..5da538a1dd 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
+import scala.util.Random
import org.scalatest.FunSuite
@@ -109,6 +110,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
}
+ test("countApproxDistinctByKey") {
+ def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
+
+ /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
+ * only a statistical bound, the tests can fail for large values of relativeSD. We will be using
+ * relatively tight error bounds to check correctness of functionality rather than checking
+ * whether the approximation conforms with the requested bound.
+ */
+ val relativeSD = 0.001
+
+ // For each value i, there are i tuples with first element equal to i.
+ // Therefore, the expected count for key i would be i.
+ val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
+ val rdd1 = sc.parallelize(stacked)
+ val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
+ counted1.foreach{
+ case(k, count) => assert(error(count, k) < relativeSD)
+ }
+
+ val rnd = new Random()
+
+ // The expected count for key num would be num
+ val randStacked = (1 to 100).flatMap { i =>
+ val num = rnd.nextInt % 500
+ (1 to num).map(j => (num, j))
+ }
+ val rdd2 = sc.parallelize(randStacked)
+ val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
+ counted2.foreach{
+ case(k, count) => assert(error(count, k) < relativeSD)
+ }
+ }
+
test("join") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 354ab8ae5d..559ea051d3 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}
+ test("countApproxDistinct") {
+
+ def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
+
+ val size = 100
+ val uniformDistro = for (i <- 1 to 100000) yield i % size
+ val simpleRdd = sc.makeRDD(uniformDistro)
+ assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
+ assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
+ assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
+ assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
+ }
+
test("SparkContext.union") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
@@ -71,6 +84,33 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
}
+ test("partitioner aware union") {
+ import SparkContext._
+ def makeRDDWithPartitioner(seq: Seq[Int]) = {
+ sc.makeRDD(seq, 1)
+ .map(x => (x, null))
+ .partitionBy(new HashPartitioner(2))
+ .mapPartitions(_.map(_._1), true)
+ }
+
+ val nums1 = makeRDDWithPartitioner(1 to 4)
+ val nums2 = makeRDDWithPartitioner(5 to 8)
+ assert(nums1.partitioner == nums2.partitioner)
+ assert(new PartitionerAwareUnionRDD(sc, Seq(nums1)).collect().toSet === Set(1, 2, 3, 4))
+
+ val union = new PartitionerAwareUnionRDD(sc, Seq(nums1, nums2))
+ assert(union.collect().toSet === Set(1, 2, 3, 4, 5, 6, 7, 8))
+ val nums1Parts = nums1.collectPartitions()
+ val nums2Parts = nums2.collectPartitions()
+ val unionParts = union.collectPartitions()
+ assert(nums1Parts.length === 2)
+ assert(nums2Parts.length === 2)
+ assert(unionParts.length === 2)
+ assert((nums1Parts(0) ++ nums2Parts(0)).toList === unionParts(0).toList)
+ assert((nums1Parts(1) ++ nums2Parts(1)).toList === unionParts(1).toList)
+ assert(union.partitioner === nums1.partitioner)
+ }
+
test("aggregate") {
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
type StringMap = HashMap[String, Int]
@@ -244,8 +284,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// test that you get over 90% locality in each group
val minLocality = coalesced2.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
- .foldLeft(1.)((perc, loc) => math.min(perc,loc))
- assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
+ .foldLeft(1.0)((perc, loc) => math.min(perc,loc))
+ assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.0).toInt + "%")
// test that the groups are load balanced with 100 +/- 20 elements in each
val maxImbalance = coalesced2.partitions
@@ -257,9 +297,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val coalesced3 = data3.coalesce(numMachines*2)
val minLocality2 = coalesced3.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
- .foldLeft(1.)((perc, loc) => math.min(perc,loc))
+ .foldLeft(1.0)((perc, loc) => math.min(perc,loc))
assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
- (minLocality2*100.).toInt + "%")
+ (minLocality2*100.0).toInt + "%")
}
test("zipped RDDs") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
index 95d3553d91..7bf2020fe3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.apache.spark._
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster._
import scala.collection.mutable.ArrayBuffer
import java.util.Properties
@@ -31,9 +29,9 @@ class FakeTaskSetManager(
initPriority: Int,
initStageId: Int,
initNumTasks: Int,
- clusterScheduler: ClusterScheduler,
+ clusterScheduler: TaskSchedulerImpl,
taskSet: TaskSet)
- extends ClusterTaskSetManager(clusterScheduler, taskSet) {
+ extends TaskSetManager(clusterScheduler, taskSet, 0) {
parent = null
weight = 1
@@ -106,7 +104,7 @@ class FakeTaskSetManager(
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
- def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
+ def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = {
new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
}
@@ -133,7 +131,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
test("FIFO Scheduler Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
+ val clusterScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
@@ -160,7 +158,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
test("Fair Scheduler Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
+ val clusterScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
@@ -169,7 +167,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()
assert(rootPool.getSchedulableByName("default") != null)
@@ -217,7 +215,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
test("Nested Pool Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
+ val clusterScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index a4d41ebbff..2aa259daf3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,21 +17,14 @@
package org.apache.spark.scheduler
-import scala.collection.mutable.{Map, HashMap}
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.LocalSparkContext
-import org.apache.spark.MapOutputTrackerMaster
-import org.apache.spark.SparkContext
-import org.apache.spark.Partition
-import org.apache.spark.TaskContext
-import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency}
-import org.apache.spark.{FetchFailed, Success, TaskEndReason}
+import scala.Tuple2
+import scala.collection.mutable.{HashMap, Map}
+
+import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
+import org.scalatest.{BeforeAndAfter, FunSuite}
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
@@ -46,7 +39,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
* and capturing the resulting TaskSets from the mock TaskScheduler.
*/
class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
+ val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
val taskScheduler = new TaskScheduler() {
@@ -74,7 +67,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
*/
val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
// stub out BlockManagerMaster.getLocations to use our cacheLocations
- val blockManagerMaster = new BlockManagerMaster(null) {
+ val blockManagerMaster = new BlockManagerMaster(null, conf) {
override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = {
blockIds.map {
_.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
@@ -99,7 +92,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
taskSets.clear()
cacheLocations.clear()
results.clear()
- mapOutputTracker = new MapOutputTrackerMaster()
+ mapOutputTracker = new MapOutputTrackerMaster(conf)
scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, sc.env) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing
@@ -206,6 +199,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
submit(rdd, Array(0))
complete(taskSets(0), List((Success, 42)))
assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
}
test("local job") {
@@ -219,6 +213,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, listener))
assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
}
test("run trivial job w/ dependency") {
@@ -227,6 +222,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
submit(finalRdd, Array(0))
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
}
test("cache location preferences w/ dependency") {
@@ -239,12 +235,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
complete(taskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
}
test("trivial job failure") {
submit(makeRdd(1, Nil), Array(0))
failed(taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted: some failure")
+ assertDataStructuresEmpty
}
test("run trivial shuffle") {
@@ -260,6 +258,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
complete(taskSets(1), Seq((Success, 42)))
assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
}
test("run trivial shuffle with fetch failure") {
@@ -285,6 +284,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
complete(taskSets(3), Seq((Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
+ assertDataStructuresEmpty
}
test("ignore late map task completions") {
@@ -313,6 +313,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
+ assertDataStructuresEmpty
}
test("run trivial shuffle with out-of-band failure and retry") {
@@ -329,15 +330,16 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
- // have hostC complete the resubmitted task
- complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
- assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
- Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
- complete(taskSets(2), Seq((Success, 42)))
- assert(results === Map(0 -> 42))
- }
-
- test("recursive shuffle failures") {
+ // have hostC complete the resubmitted task
+ complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
+ assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
+ Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+ complete(taskSets(2), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
+ }
+
+ test("recursive shuffle failures") {
val shuffleOneRdd = makeRdd(2, Nil)
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
@@ -363,6 +365,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(5), Seq((Success, 42)))
assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
}
test("cached post-shuffle") {
@@ -394,6 +397,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
complete(taskSets(4), Seq((Success, 42)))
assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
}
/**
@@ -413,4 +417,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345, 0)
+ private def assertDataStructuresEmpty = {
+ assert(scheduler.pendingTasks.isEmpty)
+ assert(scheduler.activeJobs.isEmpty)
+ assert(scheduler.failed.isEmpty)
+ assert(scheduler.idToActiveJob.isEmpty)
+ assert(scheduler.jobIdToStageIds.isEmpty)
+ assert(scheduler.stageIdToJobIds.isEmpty)
+ assert(scheduler.stageIdToStage.isEmpty)
+ assert(scheduler.stageToInfos.isEmpty)
+ assert(scheduler.resultStageToJob.isEmpty)
+ assert(scheduler.running.isEmpty)
+ assert(scheduler.shuffleToMapStage.isEmpty)
+ assert(scheduler.waiting.isEmpty)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index 0f01515179..0b90c4e74c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import org.apache.spark.TaskContext
-import org.apache.spark.scheduler.{TaskLocation, Task}
class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) {
override def runTask(context: TaskContext): Int = 0
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 984881861c..5cc48ee00a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD
class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
+ val WAIT_TIMEOUT_MILLIS = 10000
test("inner method") {
sc = new SparkContext("local", "joblogger")
@@ -92,7 +93,9 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
rdd.reduceByKey(_+_).collect()
- val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+
+ val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)
joblogger.getLogDir should be ("/tmp/spark-%s".format(user))
joblogger.getJobIDtoPrintWriter.size should be (1)
@@ -114,13 +117,15 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1
override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
- override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
}
sc.addSparkListener(joblogger)
val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
rdd.reduceByKey(_+_).collect()
-
+
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+
joblogger.onJobStartCount should be (1)
joblogger.onJobEndCount should be (1)
joblogger.onTaskEndCount should be (8)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 1fd76420ea..1a16e438c4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -19,23 +19,26 @@ package org.apache.spark.scheduler
import scala.collection.mutable.{Buffer, HashSet}
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.{LocalSparkContext, SparkContext}
import org.apache.spark.SparkContext._
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
- with BeforeAndAfterAll {
+ with BeforeAndAfter with BeforeAndAfterAll {
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
+ before {
+ sc = new SparkContext("local", "SparkListenerSuite")
+ }
+
override def afterAll {
System.clearProperty("spark.akka.frameSize")
}
test("basic creation of StageInfo") {
- sc = new SparkContext("local", "DAGSchedulerSuite")
val listener = new SaveStageInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
@@ -56,7 +59,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
}
test("StageInfo with fewer tasks than partitions") {
- sc = new SparkContext("local", "DAGSchedulerSuite")
val listener = new SaveStageInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
@@ -72,7 +74,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
}
test("local metrics") {
- sc = new SparkContext("local", "DAGSchedulerSuite")
val listener = new SaveStageInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
@@ -135,17 +136,13 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
}
test("onTaskGettingResult() called when result fetched remotely") {
- // Need to use local cluster mode here, because results are not ever returned through the
- // block manager when using the LocalScheduler.
- sc = new SparkContext("local-cluster[1,1,512]", "test")
-
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
// Make a task whose result is larger than the akka frame size
System.setProperty("spark.akka.frameSize", "1")
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
@@ -157,10 +154,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
}
test("onTaskGettingResult() not called when result sent directly") {
- // Need to use local cluster mode here, because results are not ever returned through the
- // block manager when using the LocalScheduler.
- sc = new SparkContext("local-cluster[1,1,512]", "test")
-
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
@@ -181,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
class SaveStageInfo extends SparkListener {
val stageInfos = Buffer[StageInfo]()
- override def onStageCompleted(stage: StageCompleted) {
+ override def onStageCompleted(stage: SparkListenerStageCompleted) {
stageInfos += stage.stage
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index ee150a3107..4b52d9651e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import java.nio.ByteBuffer
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.storage.TaskResultBlockId
/**
@@ -31,12 +30,12 @@ import org.apache.spark.storage.TaskResultBlockId
* Used to test the case where a BlockManager evicts the task result (or dies) before the
* TaskResult is retrieved.
*/
-class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
+class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
extends TaskResultGetter(sparkEnv, scheduler) {
var removedResult = false
override def enqueueSuccessfulTask(
- taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
+ taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
if (!removedResult) {
// Only remove the result once, since we'd like to test the case where the task eventually
// succeeds.
@@ -44,13 +43,13 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSched
case IndirectTaskResult(blockId) =>
sparkEnv.blockManager.master.removeBlock(blockId)
case directResult: DirectTaskResult[_] =>
- taskSetManager.abort("Internal error: expect only indirect results")
+ taskSetManager.abort("Internal error: expect only indirect results")
}
serializedData.rewind()
removedResult = true
}
super.enqueueSuccessfulTask(taskSetManager, tid, serializedData)
- }
+ }
}
/**
@@ -65,24 +64,20 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
System.setProperty("spark.akka.frameSize", "1")
}
- before {
- // Use local-cluster mode because results are returned differently when running with the
- // LocalScheduler.
- sc = new SparkContext("local-cluster[1,1,512]", "test")
- }
-
override def afterAll {
System.clearProperty("spark.akka.frameSize")
}
test("handling results smaller than Akka frame size") {
+ sc = new SparkContext("local", "test")
val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
assert(result === 2)
}
- test("handling results larger than Akka frame size") {
+ test("handling results larger than Akka frame size") {
+ sc = new SparkContext("local", "test")
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
@@ -92,10 +87,13 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
}
test("task retried if result missing from block manager") {
+ // Set the maximum number of task failures to > 0, so that the task set isn't aborted
+ // after the result is missing.
+ sc = new SparkContext("local[1,2]", "test")
// If this test hangs, it's probably because no resource offers were made after the task
// failed.
- val scheduler: ClusterScheduler = sc.taskScheduler match {
- case clusterScheduler: ClusterScheduler =>
+ val scheduler: TaskSchedulerImpl = sc.taskScheduler match {
+ case clusterScheduler: TaskSchedulerImpl =>
clusterScheduler
case _ =>
assert(false, "Expect local cluster to use ClusterScheduler")
@@ -103,7 +101,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
}
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index b97f2b19b5..1eec6726f4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
@@ -23,7 +23,6 @@ import scala.collection.mutable
import org.scalatest.FunSuite
import org.apache.spark._
-import org.apache.spark.scheduler._
import org.apache.spark.executor.TaskMetrics
import java.nio.ByteBuffer
import org.apache.spark.util.{Utils, FakeClock}
@@ -56,10 +55,10 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler
* A mock ClusterScheduler implementation that just remembers information about tasks started and
* feedback received from the TaskSetManagers. Note that it's important to initialize this with
* a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
- * to work, and these are required for locality in ClusterTaskSetManager.
+ * to work, and these are required for locality in TaskSetManager.
*/
class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
- extends ClusterScheduler(sc)
+ extends TaskSchedulerImpl(sc)
{
val startedTasks = new ArrayBuffer[Long]
val endedTasks = new mutable.HashMap[Long, TaskEndReason]
@@ -79,16 +78,19 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
}
-class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
+class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
- val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
+ private val conf = new SparkConf
+
+ val LOCALITY_WAIT = conf.get("spark.locality.wait", "3000").toLong
+ val MAX_TASK_FAILURES = 4
test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
- val manager = new ClusterTaskSetManager(sched, taskSet)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
// Offer a host with no CPUs
assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
@@ -114,7 +116,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(3)
- val manager = new ClusterTaskSetManager(sched, taskSet)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
// First three offers should all find tasks
for (i <- 0 until 3) {
@@ -151,7 +153,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
Seq() // Last task has no locality prefs
)
val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -197,7 +199,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
Seq(TaskLocation("host2"))
)
val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -234,7 +236,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
Seq(TaskLocation("host3"))
)
val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -262,7 +264,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -279,17 +281,17 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
- val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.
- (0 until manager.MAX_TASK_FAILURES).foreach { index =>
+ (1 to manager.maxTaskFailures).foreach { index =>
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
assert(offerResult != None,
"Expect resource offer on iteration %s to return a task".format(index))
assert(offerResult.get.index === 0)
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
- if (index < manager.MAX_TASK_FAILURES) {
+ if (index < MAX_TASK_FAILURES) {
assert(!sched.taskSetsFailed.contains(taskSet.id))
} else {
assert(sched.taskSetsFailed.contains(taskSet.id))
@@ -313,6 +315,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
}
def createTaskResult(id: Int): DirectTaskResult[Int] = {
- new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
+ val valueSer = SparkEnv.get.serializer.newInstance()
+ new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
deleted file mode 100644
index 1e676c1719..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.local
-
-import java.util.concurrent.Semaphore
-import java.util.concurrent.CountDownLatch
-
-import scala.collection.mutable.HashMap
-
-import org.scalatest.{BeforeAndAfterEach, FunSuite}
-
-import org.apache.spark._
-
-
-class Lock() {
- var finished = false
- def jobWait() = {
- synchronized {
- while(!finished) {
- this.wait()
- }
- }
- }
-
- def jobFinished() = {
- synchronized {
- finished = true
- this.notifyAll()
- }
- }
-}
-
-object TaskThreadInfo {
- val threadToLock = HashMap[Int, Lock]()
- val threadToRunning = HashMap[Int, Boolean]()
- val threadToStarted = HashMap[Int, CountDownLatch]()
-}
-
-/*
- * 1. each thread contains one job.
- * 2. each job contains one stage.
- * 3. each stage only contains one task.
- * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
- * it will get cpu core resource, and will wait to finished after user manually
- * release "Lock" and then cluster will contain another free cpu cores.
- * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
- * thus it will be scheduled later when cluster has free cpu cores.
- */
-class LocalSchedulerSuite extends FunSuite with LocalSparkContext with BeforeAndAfterEach {
-
- override def afterEach() {
- super.afterEach()
- System.clearProperty("spark.scheduler.mode")
- }
-
- def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
-
- TaskThreadInfo.threadToRunning(threadIndex) = false
- val nums = sc.parallelize(threadIndex to threadIndex, 1)
- TaskThreadInfo.threadToLock(threadIndex) = new Lock()
- TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
- new Thread {
- if (poolName != null) {
- sc.setLocalProperty("spark.scheduler.pool", poolName)
- }
- override def run() {
- val ans = nums.map(number => {
- TaskThreadInfo.threadToRunning(number) = true
- TaskThreadInfo.threadToStarted(number).countDown()
- TaskThreadInfo.threadToLock(number).jobWait()
- TaskThreadInfo.threadToRunning(number) = false
- number
- }).collect()
- assert(ans.toList === List(threadIndex))
- sem.release()
- }
- }.start()
- }
-
- test("Local FIFO scheduler end-to-end test") {
- System.setProperty("spark.scheduler.mode", "FIFO")
- sc = new SparkContext("local[4]", "test")
- val sem = new Semaphore(0)
-
- createThread(1,null,sc,sem)
- TaskThreadInfo.threadToStarted(1).await()
- createThread(2,null,sc,sem)
- TaskThreadInfo.threadToStarted(2).await()
- createThread(3,null,sc,sem)
- TaskThreadInfo.threadToStarted(3).await()
- createThread(4,null,sc,sem)
- TaskThreadInfo.threadToStarted(4).await()
- // thread 5 and 6 (stage pending)must meet following two points
- // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
- // queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
- // 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
- // So I just use "sleep" 1s here for each thread.
- // TODO: any better solution?
- createThread(5,null,sc,sem)
- Thread.sleep(1000)
- createThread(6,null,sc,sem)
- Thread.sleep(1000)
-
- assert(TaskThreadInfo.threadToRunning(1) === true)
- assert(TaskThreadInfo.threadToRunning(2) === true)
- assert(TaskThreadInfo.threadToRunning(3) === true)
- assert(TaskThreadInfo.threadToRunning(4) === true)
- assert(TaskThreadInfo.threadToRunning(5) === false)
- assert(TaskThreadInfo.threadToRunning(6) === false)
-
- TaskThreadInfo.threadToLock(1).jobFinished()
- TaskThreadInfo.threadToStarted(5).await()
-
- assert(TaskThreadInfo.threadToRunning(1) === false)
- assert(TaskThreadInfo.threadToRunning(2) === true)
- assert(TaskThreadInfo.threadToRunning(3) === true)
- assert(TaskThreadInfo.threadToRunning(4) === true)
- assert(TaskThreadInfo.threadToRunning(5) === true)
- assert(TaskThreadInfo.threadToRunning(6) === false)
-
- TaskThreadInfo.threadToLock(3).jobFinished()
- TaskThreadInfo.threadToStarted(6).await()
-
- assert(TaskThreadInfo.threadToRunning(1) === false)
- assert(TaskThreadInfo.threadToRunning(2) === true)
- assert(TaskThreadInfo.threadToRunning(3) === false)
- assert(TaskThreadInfo.threadToRunning(4) === true)
- assert(TaskThreadInfo.threadToRunning(5) === true)
- assert(TaskThreadInfo.threadToRunning(6) === true)
-
- TaskThreadInfo.threadToLock(2).jobFinished()
- TaskThreadInfo.threadToLock(4).jobFinished()
- TaskThreadInfo.threadToLock(5).jobFinished()
- TaskThreadInfo.threadToLock(6).jobFinished()
- sem.acquire(6)
- }
-
- test("Local fair scheduler end-to-end test") {
- System.setProperty("spark.scheduler.mode", "FAIR")
- val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
- System.setProperty("spark.scheduler.allocation.file", xmlPath)
-
- sc = new SparkContext("local[8]", "LocalSchedulerSuite")
- val sem = new Semaphore(0)
-
- createThread(10,"1",sc,sem)
- TaskThreadInfo.threadToStarted(10).await()
- createThread(20,"2",sc,sem)
- TaskThreadInfo.threadToStarted(20).await()
- createThread(30,"3",sc,sem)
- TaskThreadInfo.threadToStarted(30).await()
-
- assert(TaskThreadInfo.threadToRunning(10) === true)
- assert(TaskThreadInfo.threadToRunning(20) === true)
- assert(TaskThreadInfo.threadToRunning(30) === true)
-
- createThread(11,"1",sc,sem)
- TaskThreadInfo.threadToStarted(11).await()
- createThread(21,"2",sc,sem)
- TaskThreadInfo.threadToStarted(21).await()
- createThread(31,"3",sc,sem)
- TaskThreadInfo.threadToStarted(31).await()
-
- assert(TaskThreadInfo.threadToRunning(11) === true)
- assert(TaskThreadInfo.threadToRunning(21) === true)
- assert(TaskThreadInfo.threadToRunning(31) === true)
-
- createThread(12,"1",sc,sem)
- TaskThreadInfo.threadToStarted(12).await()
- createThread(22,"2",sc,sem)
- TaskThreadInfo.threadToStarted(22).await()
- createThread(32,"3",sc,sem)
-
- assert(TaskThreadInfo.threadToRunning(12) === true)
- assert(TaskThreadInfo.threadToRunning(22) === true)
- assert(TaskThreadInfo.threadToRunning(32) === false)
-
- TaskThreadInfo.threadToLock(10).jobFinished()
- TaskThreadInfo.threadToStarted(32).await()
-
- assert(TaskThreadInfo.threadToRunning(32) === true)
-
- //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
- // queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
- //2. priority of 23 and 33 will be meaningless as using fair scheduler here.
- createThread(23,"2",sc,sem)
- createThread(33,"3",sc,sem)
- Thread.sleep(1000)
-
- TaskThreadInfo.threadToLock(11).jobFinished()
- TaskThreadInfo.threadToStarted(23).await()
-
- assert(TaskThreadInfo.threadToRunning(23) === true)
- assert(TaskThreadInfo.threadToRunning(33) === false)
-
- TaskThreadInfo.threadToLock(12).jobFinished()
- TaskThreadInfo.threadToStarted(33).await()
-
- assert(TaskThreadInfo.threadToRunning(33) === true)
-
- TaskThreadInfo.threadToLock(20).jobFinished()
- TaskThreadInfo.threadToLock(21).jobFinished()
- TaskThreadInfo.threadToLock(22).jobFinished()
- TaskThreadInfo.threadToLock(23).jobFinished()
- TaskThreadInfo.threadToLock(30).jobFinished()
- TaskThreadInfo.threadToLock(31).jobFinished()
- TaskThreadInfo.threadToLock(32).jobFinished()
- TaskThreadInfo.threadToLock(33).jobFinished()
-
- sem.acquire(11)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index c016c51171..3898583275 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -22,12 +22,15 @@ import scala.collection.mutable
import com.esotericsoftware.kryo.Kryo
import org.scalatest.FunSuite
-import org.apache.spark.SharedSparkContext
+import org.apache.spark.{SparkConf, SharedSparkContext}
import org.apache.spark.serializer.KryoTest._
class KryoSerializerSuite extends FunSuite with SharedSparkContext {
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+
test("basic types") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
@@ -57,7 +60,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("pairs") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
@@ -81,7 +84,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("Scala data structures") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
@@ -104,7 +107,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("ranges") {
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
@@ -125,9 +128,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("custom registrator") {
- System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
-
- val ser = (new KryoSerializer).newInstance()
+ val ser = new KryoSerializer(conf).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
@@ -172,6 +173,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11))
}
+ test("kryo with SerializableHyperLogLog") {
+ assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3)
+ }
+
test("kryo with reduce") {
val control = 1 :: 2 :: Nil
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
@@ -186,18 +191,6 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
.fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
assert(10 + control.sum === result)
}
-
- override def beforeAll() {
- System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
- super.beforeAll()
- }
-
- override def afterAll() {
- super.afterAll()
- System.clearProperty("spark.kryo.registrator")
- System.clearProperty("spark.serializer")
- }
}
object KryoTest {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
index cb76275e39..b647e8a672 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -39,7 +39,7 @@ class BlockIdSuite extends FunSuite {
fail()
} catch {
case e: IllegalStateException => // OK
- case _ => fail()
+ case _: Throwable => fail()
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 484a654108..f60ce270c7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,41 +31,42 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.util.{SizeEstimator, Utils, AkkaUtils, ByteBufferInputStream}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.{SparkConf, SparkContext}
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
+ private val conf = new SparkConf(false)
var store: BlockManager = null
var store2: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
var oldArch: String = null
- var oldOops: String = null
- var oldHeartBeat: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
- System.setProperty("spark.kryoserializer.buffer.mb", "1")
- val serializer = new KryoSerializer
+ conf.set("spark.kryoserializer.buffer.mb", "1")
+ val serializer = new KryoSerializer(conf)
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
before {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf)
this.actorSystem = actorSystem
- System.setProperty("spark.driver.port", boundPort.toString)
- System.setProperty("spark.hostPort", "localhost:" + boundPort)
+ conf.set("spark.driver.port", boundPort.toString)
+ conf.set("spark.hostPort", "localhost:" + boundPort)
master = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
- oldOops = System.setProperty("spark.test.useCompressedOops", "true")
- oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
+ conf.set("os.arch", "amd64")
+ conf.set("spark.test.useCompressedOops", "true")
+ conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
// Set some value ...
- System.setProperty("spark.hostPort", Utils.localHostName() + ":" + 1111)
+ conf.set("spark.hostPort", Utils.localHostName() + ":" + 1111)
}
after {
@@ -86,16 +87,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master = null
if (oldArch != null) {
- System.setProperty("os.arch", oldArch)
+ conf.set("os.arch", oldArch)
} else {
System.clearProperty("os.arch")
}
- if (oldOops != null) {
- System.setProperty("spark.test.useCompressedOops", oldOops)
- } else {
- System.clearProperty("spark.test.useCompressedOops")
- }
+ System.clearProperty("spark.test.useCompressedOops")
}
test("StorageLevel object caching") {
@@ -133,7 +130,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 1 manager interaction") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -163,8 +160,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 2 managers interaction") {
- store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
- store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000)
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf)
+ store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -179,7 +176,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("removing block") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -227,7 +224,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("removing rdd") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -261,7 +258,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -277,7 +274,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("reregistration on block update") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
@@ -296,7 +293,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("reregistration doesn't dead lock") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = List(new Array[Byte](400))
@@ -333,7 +330,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -352,7 +349,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage with serialization") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -371,7 +368,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -390,7 +387,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -413,7 +410,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -426,7 +423,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -441,7 +438,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -456,7 +453,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -471,7 +468,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -486,7 +483,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -511,7 +508,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -535,7 +532,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -581,7 +578,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 500)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 500, conf)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -591,53 +588,53 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block compression") {
try {
- System.setProperty("spark.shuffle.compress", "true")
- store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
+ conf.set("spark.shuffle.compress", "true")
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf)
store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
"shuffle_0_0_0 was not compressed")
store.stop()
store = null
- System.setProperty("spark.shuffle.compress", "false")
- store = new BlockManager("exec2", actorSystem, master, serializer, 2000)
+ conf.set("spark.shuffle.compress", "false")
+ store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf)
store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000,
"shuffle_0_0_0 was compressed")
store.stop()
store = null
- System.setProperty("spark.broadcast.compress", "true")
- store = new BlockManager("exec3", actorSystem, master, serializer, 2000)
+ conf.set("spark.broadcast.compress", "true")
+ store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf)
store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100,
"broadcast_0 was not compressed")
store.stop()
store = null
- System.setProperty("spark.broadcast.compress", "false")
- store = new BlockManager("exec4", actorSystem, master, serializer, 2000)
+ conf.set("spark.broadcast.compress", "false")
+ store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf)
store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed")
store.stop()
store = null
- System.setProperty("spark.rdd.compress", "true")
- store = new BlockManager("exec5", actorSystem, master, serializer, 2000)
+ conf.set("spark.rdd.compress", "true")
+ store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf)
store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed")
store.stop()
store = null
- System.setProperty("spark.rdd.compress", "false")
- store = new BlockManager("exec6", actorSystem, master, serializer, 2000)
+ conf.set("spark.rdd.compress", "false")
+ store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf)
store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager("exec7", actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf)
store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
store.stop()
@@ -651,7 +648,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
- store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf)
// The put should fail since a1 is not serializable.
class UnserializableClass
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 0b9056344c..829f389460 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -1,14 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.storage
-import java.io.{FileWriter, File}
+import java.io.{File, FileWriter}
import scala.collection.mutable
import com.google.common.io.Files
+import org.apache.spark.SparkConf
import org.scalatest.{BeforeAndAfterEach, FunSuite}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
-
+ private val testConf = new SparkConf(false)
val rootDir0 = Files.createTempDir()
rootDir0.deleteOnExit()
val rootDir1 = Files.createTempDir()
@@ -16,7 +34,12 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
val rootDirs = rootDir0.getName + "," + rootDir1.getName
println("Created root dirs: " + rootDirs)
+ // This suite focuses primarily on consolidation features,
+ // so we coerce consolidation if not already enabled.
+ testConf.set("spark.shuffle.consolidateFiles", "true")
+
val shuffleBlockManager = new ShuffleBlockManager(null) {
+ override def conf = testConf.clone
var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 8f0ec6683b..3764f4d1a0 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -34,7 +34,6 @@ class UISuite extends FunSuite {
}
val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
-
// Allow some wiggle room in case ports on the machine are under contention
assert(boundPort1 > startPort && boundPort1 < startPort + 10)
assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
new file mode 100644
index 0000000000..67a57a0e7f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import org.scalatest.FunSuite
+import org.apache.spark.scheduler._
+import org.apache.spark.{LocalSparkContext, SparkContext, Success}
+import org.apache.spark.scheduler.SparkListenerTaskStart
+import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
+
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
+ test("test executor id to summary") {
+ val sc = new SparkContext("local", "test")
+ val listener = new JobProgressListener(sc)
+ val taskMetrics = new TaskMetrics()
+ val shuffleReadMetrics = new ShuffleReadMetrics()
+
+ // nothing in it
+ assert(listener.stageIdToExecutorSummaries.size == 0)
+
+ // finish this task, should get updated shuffleRead
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
+ .shuffleRead == 1000)
+
+ // finish a task with unknown executor-id, nothing should happen
+ taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.size == 1)
+
+ // finish this task, should get updated duration
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
+ .shuffleRead == 2000)
+
+ // finish this task, should get updated duration
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
+ .shuffleRead == 1000)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 4e40dcbdee..11ebdc352b 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
import org.scalatest.PrivateMethodTester
+import org.apache.spark.SparkContext
class DummyClass1 {}
@@ -63,54 +64,53 @@ class SizeEstimatorSuite
}
test("simple classes") {
- assert(SizeEstimator.estimate(new DummyClass1) === 16)
- assert(SizeEstimator.estimate(new DummyClass2) === 16)
- assert(SizeEstimator.estimate(new DummyClass3) === 24)
- assert(SizeEstimator.estimate(new DummyClass4(null)) === 24)
- assert(SizeEstimator.estimate(new DummyClass4(new DummyClass3)) === 48)
+ expectResult(16)(SizeEstimator.estimate(new DummyClass1))
+ expectResult(16)(SizeEstimator.estimate(new DummyClass2))
+ expectResult(24)(SizeEstimator.estimate(new DummyClass3))
+ expectResult(24)(SizeEstimator.estimate(new DummyClass4(null)))
+ expectResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
}
// NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
// (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("strings") {
- assert(SizeEstimator.estimate(DummyString("")) === 40)
- assert(SizeEstimator.estimate(DummyString("a")) === 48)
- assert(SizeEstimator.estimate(DummyString("ab")) === 48)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
+ expectResult(40)(SizeEstimator.estimate(DummyString("")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("a")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("ab")))
+ expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
}
test("primitive arrays") {
- assert(SizeEstimator.estimate(new Array[Byte](10)) === 32)
- assert(SizeEstimator.estimate(new Array[Char](10)) === 40)
- assert(SizeEstimator.estimate(new Array[Short](10)) === 40)
- assert(SizeEstimator.estimate(new Array[Int](10)) === 56)
- assert(SizeEstimator.estimate(new Array[Long](10)) === 96)
- assert(SizeEstimator.estimate(new Array[Float](10)) === 56)
- assert(SizeEstimator.estimate(new Array[Double](10)) === 96)
- assert(SizeEstimator.estimate(new Array[Int](1000)) === 4016)
- assert(SizeEstimator.estimate(new Array[Long](1000)) === 8016)
+ expectResult(32)(SizeEstimator.estimate(new Array[Byte](10)))
+ expectResult(40)(SizeEstimator.estimate(new Array[Char](10)))
+ expectResult(40)(SizeEstimator.estimate(new Array[Short](10)))
+ expectResult(56)(SizeEstimator.estimate(new Array[Int](10)))
+ expectResult(96)(SizeEstimator.estimate(new Array[Long](10)))
+ expectResult(56)(SizeEstimator.estimate(new Array[Float](10)))
+ expectResult(96)(SizeEstimator.estimate(new Array[Double](10)))
+ expectResult(4016)(SizeEstimator.estimate(new Array[Int](1000)))
+ expectResult(8016)(SizeEstimator.estimate(new Array[Long](1000)))
}
test("object arrays") {
// Arrays containing nulls should just have one pointer per element
- assert(SizeEstimator.estimate(new Array[String](10)) === 56)
- assert(SizeEstimator.estimate(new Array[AnyRef](10)) === 56)
-
+ expectResult(56)(SizeEstimator.estimate(new Array[String](10)))
+ expectResult(56)(SizeEstimator.estimate(new Array[AnyRef](10)))
// For object arrays with non-null elements, each object should take one pointer plus
// however many bytes that class takes. (Note that Array.fill calls the code in its
// second parameter separately for each object, so we get distinct objects.)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)) === 216)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)) === 216)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)) === 296)
- assert(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)) === 56)
+ expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
+ expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
+ expectResult(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
+ expectResult(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))
// Past size 100, our samples 100 elements, but we should still get the right size.
- assert(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)) === 28016)
+ expectResult(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
// If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1
- assert(SizeEstimator.estimate(Array.fill(10)(d1)) === 72) // 10 pointers plus 8-byte object
- assert(SizeEstimator.estimate(Array.fill(100)(d1)) === 432) // 100 pointers plus 8-byte object
+ expectResult(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
+ expectResult(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
// Same thing with huge array containing the same element many times. Note that this won't
// return exactly 4032 because it can't tell that *all* the elements will equal the first
@@ -128,11 +128,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- assert(SizeEstimator.estimate(DummyString("")) === 40)
- assert(SizeEstimator.estimate(DummyString("a")) === 48)
- assert(SizeEstimator.estimate(DummyString("ab")) === 48)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
-
+ expectResult(40)(SizeEstimator.estimate(DummyString("")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("a")))
+ expectResult(48)(SizeEstimator.estimate(DummyString("ab")))
+ expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
}
@@ -141,14 +140,13 @@ class SizeEstimatorSuite
test("64-bit arch with no compressed oops") {
val arch = System.setProperty("os.arch", "amd64")
val oops = System.setProperty("spark.test.useCompressedOops", "false")
-
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- assert(SizeEstimator.estimate(DummyString("")) === 56)
- assert(SizeEstimator.estimate(DummyString("a")) === 64)
- assert(SizeEstimator.estimate(DummyString("ab")) === 64)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 72)
+ expectResult(56)(SizeEstimator.estimate(DummyString("")))
+ expectResult(64)(SizeEstimator.estimate(DummyString("a")))
+ expectResult(64)(SizeEstimator.estimate(DummyString("ab")))
+ expectResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
resetOrClear("spark.test.useCompressedOops", oops)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
index ca3f684668..e9b62ea70d 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
@@ -1,9 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.util.collection
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
-
-class OpenHashMapSuite extends FunSuite {
+import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.util.SizeEstimator
+
+class OpenHashMapSuite extends FunSuite with ShouldMatchers {
+
+ test("size for specialized, primitive value (int)") {
+ val capacity = 1024
+ val map = new OpenHashMap[String, Int](capacity)
+ val actualSize = SizeEstimator.estimate(map)
+ // 64 bit for pointers, 32 bit for ints, and 1 bit for the bitset.
+ val expectedSize = capacity * (64 + 32 + 1) / 8
+ // Make sure we are not allocating a significant amount of memory beyond our expected.
+ actualSize should be <= (expectedSize * 1.1).toLong
+ }
test("initialization") {
val goodMap1 = new OpenHashMap[String, Int](1)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala
index 4e11e8a628..1b24f8f287 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala
@@ -1,9 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.util.collection
import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+
+import org.apache.spark.util.SizeEstimator
+
+class OpenHashSetSuite extends FunSuite with ShouldMatchers {
-class OpenHashSetSuite extends FunSuite {
+ test("size for specialized, primitive int") {
+ val loadFactor = 0.7
+ val set = new OpenHashSet[Int](64, loadFactor)
+ for (i <- 0 until 1024) {
+ set.add(i)
+ }
+ assert(set.size === 1024)
+ assert(set.capacity > 1024)
+ val actualSize = SizeEstimator.estimate(set)
+ // 32 bits for the ints + 1 bit for the bitset
+ val expectedSize = set.capacity * (32 + 1) / 8
+ // Make sure we are not allocating a significant amount of memory beyond our expected.
+ actualSize should be <= (expectedSize * 1.1).toLong
+ }
test("primitive int") {
val set = new OpenHashSet[Int]
diff --git a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala
index dfd6aed2c4..3b60decee9 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala
@@ -1,9 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.util.collection
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.util.SizeEstimator
+
+class PrimitiveKeyOpenHashMapSuite extends FunSuite with ShouldMatchers {
-class PrimitiveKeyOpenHashSetSuite extends FunSuite {
+ test("size for specialized, primitive key, value (int, int)") {
+ val capacity = 1024
+ val map = new PrimitiveKeyOpenHashMap[Int, Int](capacity)
+ val actualSize = SizeEstimator.estimate(map)
+ // 32 bit for keys, 32 bit for values, and 1 bit for the bitset.
+ val expectedSize = capacity * (32 + 32 + 1) / 8
+ // Make sure we are not allocating a significant amount of memory beyond our expected.
+ actualSize should be <= (expectedSize * 1.1).toLong
+ }
test("initialization") {
val goodMap1 = new PrimitiveKeyOpenHashMap[Int, Int](1)