diff options
Diffstat (limited to 'core/src/test/scala/org/apache')
39 files changed, 1434 insertions, 701 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 4434f3b87c..c443c5266e 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -27,6 +27,21 @@ import org.apache.spark.SparkContext._ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext { + + implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] { + def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = { + t1 ++= t2 + t1 + } + def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = { + t1 += t2 + t1 + } + def zero(t: mutable.Set[A]) : mutable.Set[A] = { + new mutable.HashSet[A]() + } + } + test ("basic accumulation"){ sc = new SparkContext("local", "test") val acc : Accumulator[Int] = sc.accumulator(0) @@ -51,7 +66,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte } test ("add value to collection accumulators") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") @@ -68,22 +82,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte } } - implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] { - def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = { - t1 ++= t2 - t1 - } - def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = { - t1 += t2 - t1 - } - def zero(t: mutable.Set[Any]) : mutable.Set[Any] = { - new mutable.HashSet[Any]() - } - } - test ("value not readable in tasks") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") @@ -125,7 +124,6 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte } test ("localValue readable in tasks") { - import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index f26c44d3e7..ec13b329b2 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark +import scala.reflect.ClassTag import org.scalatest.FunSuite import java.io.File import org.apache.spark.rdd._ @@ -25,8 +26,6 @@ import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId} import org.apache.spark.util.Utils class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { - initLogging() - var checkpointDir: File = _ val partitioner = new HashPartitioner(2) @@ -56,17 +55,15 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } test("RDDs with one-to-one dependencies") { - testCheckpointing(_.map(x => x.toString)) - testCheckpointing(_.flatMap(x => 1 to x)) - testCheckpointing(_.filter(_ % 2 == 0)) - testCheckpointing(_.sample(false, 0.5, 0)) - testCheckpointing(_.glom()) - testCheckpointing(_.mapPartitions(_.map(_.toString))) - testCheckpointing(r => new MapPartitionsWithContextRDD(r, - (context: TaskContext, iter: Iterator[Int]) => iter.map(_.toString), false )) - testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString)) - testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x)) - testCheckpointing(_.pipe(Seq("cat"))) + testRDD(_.map(x => x.toString)) + testRDD(_.flatMap(x => 1 to x)) + testRDD(_.filter(_ % 2 == 0)) + testRDD(_.sample(false, 0.5, 0)) + testRDD(_.glom()) + testRDD(_.mapPartitions(_.map(_.toString))) + testRDD(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString)) + testRDD(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x)) + testRDD(_.pipe(Seq("cat"))) } test("ParallelCollection") { @@ -98,7 +95,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } test("ShuffledRDD") { - testCheckpointing(rdd => { + testRDD(rdd => { // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner) }) @@ -106,25 +103,17 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { test("UnionRDD") { def otherRDD = sc.makeRDD(1 to 10, 1) - - // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed. - // Current implementation of UnionRDD has transient reference to parent RDDs, - // so only the partitions will reduce in serialized size, not the RDD. - testCheckpointing(_.union(otherRDD), false, true) - testParentCheckpointing(_.union(otherRDD), false, true) + testRDD(_.union(otherRDD)) + testRDDPartitions(_.union(otherRDD)) } test("CartesianRDD") { def otherRDD = sc.makeRDD(1 to 10, 1) - testCheckpointing(new CartesianRDD(sc, _, otherRDD)) - - // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed - // Current implementation of CoalescedRDDPartition has transient reference to parent RDD, - // so only the RDD will reduce in serialized size, not the partitions. - testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false) + testRDD(new CartesianRDD(sc, _, otherRDD)) + testRDDPartitions(new CartesianRDD(sc, _, otherRDD)) // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after - // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions. + // the parent RDD has been checkpointed and parent partitions have been changed. // Note that this test is very specific to the current implementation of CartesianRDD. val ones = sc.makeRDD(1 to 100, 10).map(x => x) ones.checkpoint() // checkpoint that MappedRDD @@ -135,23 +124,20 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val splitAfterCheckpoint = serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition]) assert( - (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) && - (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2), - "CartesianRDD.parents not updated after parent RDD checkpointed" + (splitAfterCheckpoint.s1.getClass != splitBeforeCheckpoint.s1.getClass) && + (splitAfterCheckpoint.s2.getClass != splitBeforeCheckpoint.s2.getClass), + "CartesianRDD.s1 and CartesianRDD.s2 not updated after parent RDD is checkpointed" ) } test("CoalescedRDD") { - testCheckpointing(_.coalesce(2)) - - // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed - // Current implementation of CoalescedRDDPartition has transient reference to parent RDD, - // so only the RDD will reduce in serialized size, not the partitions. - testParentCheckpointing(_.coalesce(2), true, false) + testRDD(_.coalesce(2)) + testRDDPartitions(_.coalesce(2)) - // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after - // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions. - // Note that this test is very specific to the current implementation of CoalescedRDDPartitions + // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) + // after the parent RDD has been checkpointed and parent partitions have been changed. + // Note that this test is very specific to the current implementation of + // CoalescedRDDPartitions. val ones = sc.makeRDD(1 to 100, 10).map(x => x) ones.checkpoint() // checkpoint that MappedRDD val coalesced = new CoalescedRDD(ones, 2) @@ -161,33 +147,78 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val splitAfterCheckpoint = serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition]) assert( - splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head, - "CoalescedRDDPartition.parents not updated after parent RDD checkpointed" + splitAfterCheckpoint.parents.head.getClass != splitBeforeCheckpoint.parents.head.getClass, + "CoalescedRDDPartition.parents not updated after parent RDD is checkpointed" ) } test("CoGroupedRDD") { - val longLineageRDD1 = generateLongLineageRDDForCoGroupedRDD() - testCheckpointing(rdd => { + val longLineageRDD1 = generateFatPairRDD() + testRDD(rdd => { CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner) - }, false, true) + }) - val longLineageRDD2 = generateLongLineageRDDForCoGroupedRDD() - testParentCheckpointing(rdd => { + val longLineageRDD2 = generateFatPairRDD() + testRDDPartitions(rdd => { CheckpointSuite.cogroup( longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner) - }, false, true) + }) } test("ZippedRDD") { - testCheckpointing( - rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) - - // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed - // Current implementation of ZippedRDDPartitions has transient references to parent RDDs, - // so only the RDD will reduce in serialized size, not the partitions. - testParentCheckpointing( - rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) + testRDD(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x))) + testRDDPartitions(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x))) + + // Test that the ZippedPartition updates parent partitions + // after the parent RDD has been checkpointed and parent partitions have been changed. + // Note that this test is very specific to the current implementation of ZippedRDD. + val rdd = generateFatRDD() + val zippedRDD = new ZippedRDD(sc, rdd, rdd.map(x => x)) + zippedRDD.rdd1.checkpoint() + zippedRDD.rdd2.checkpoint() + val partitionBeforeCheckpoint = + serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]]) + zippedRDD.count() + val partitionAfterCheckpoint = + serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]]) + assert( + partitionAfterCheckpoint.partition1.getClass != partitionBeforeCheckpoint.partition1.getClass && + partitionAfterCheckpoint.partition2.getClass != partitionBeforeCheckpoint.partition2.getClass, + "ZippedRDD.partition1 and ZippedRDD.partition2 not updated after parent RDD is checkpointed" + ) + } + + test("PartitionerAwareUnionRDD") { + testRDD(rdd => { + new PartitionerAwareUnionRDD[(Int, Int)](sc, Array( + generateFatPairRDD(), + rdd.map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _) + )) + }) + + testRDDPartitions(rdd => { + new PartitionerAwareUnionRDD[(Int, Int)](sc, Array( + generateFatPairRDD(), + rdd.map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _) + )) + }) + + // Test that the PartitionerAwareUnionRDD updates parent partitions + // (PartitionerAwareUnionRDD.parents) after the parent RDD has been checkpointed and parent + // partitions have been changed. Note that this test is very specific to the current + // implementation of PartitionerAwareUnionRDD. + val pairRDD = generateFatPairRDD() + pairRDD.checkpoint() + val unionRDD = new PartitionerAwareUnionRDD(sc, Array(pairRDD)) + val partitionBeforeCheckpoint = serializeDeserialize( + unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition]) + pairRDD.count() + val partitionAfterCheckpoint = serializeDeserialize( + unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition]) + assert( + partitionBeforeCheckpoint.parents.head.getClass != partitionAfterCheckpoint.parents.head.getClass, + "PartitionerAwareUnionRDDPartition.parents not updated after parent RDD is checkpointed" + ) } test("CheckpointRDD with zero partitions") { @@ -201,29 +232,32 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } /** - * Test checkpointing of the final RDD generated by the given operation. By default, - * this method tests whether the size of serialized RDD has reduced after checkpointing or not. - * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or - * not, but this is not done by default as usually the partitions do not refer to any RDD and - * therefore never store the lineage. + * Test checkpointing of the RDD generated by the given operation. It tests whether the + * serialized size of the RDD is reduce after checkpointing or not. This function should be called + * on all RDDs that have a parent RDD (i.e., do not call on ParallelCollection, BlockRDD, etc.). */ - def testCheckpointing[U: ClassManifest]( - op: (RDD[Int]) => RDD[U], - testRDDSize: Boolean = true, - testRDDPartitionSize: Boolean = false - ) { + def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U]) { // Generate the final RDD using given RDD operation - val baseRDD = generateLongLineageRDD() + val baseRDD = generateFatRDD() val operatedRDD = op(baseRDD) val parentRDD = operatedRDD.dependencies.headOption.orNull val rddType = operatedRDD.getClass.getSimpleName val numPartitions = operatedRDD.partitions.length + // Force initialization of all the data structures in RDDs + // Without this, serializing the RDD will give a wrong estimate of the size of the RDD + initializeRdd(operatedRDD) + + val partitionsBeforeCheckpoint = operatedRDD.partitions + // Find serialized sizes before and after the checkpoint - val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) + logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString) + val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) operatedRDD.checkpoint() val result = operatedRDD.collect() - val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD) + operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables + val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD) + logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString) // Test whether the checkpoint file has been created assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result) @@ -231,6 +265,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { // Test whether dependencies have been changed from its earlier parent RDD assert(operatedRDD.dependencies.head.rdd != parentRDD) + // Test whether the partitions have been changed from its earlier partitions + assert(operatedRDD.partitions.toList != partitionsBeforeCheckpoint.toList) + // Test whether the partitions have been changed to the new Hadoop partitions assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList) @@ -240,122 +277,72 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { // Test whether the data in the checkpointed RDD is same as original assert(operatedRDD.collect() === result) - // Test whether serialized size of the RDD has reduced. If the RDD - // does not have any dependency to another RDD (e.g., ParallelCollection, - // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing. - if (testRDDSize) { - logInfo("Size of " + rddType + - "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]") - assert( - rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint, - "Size of " + rddType + " did not reduce after checkpointing " + - "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]" - ) - } + // Test whether serialized size of the RDD has reduced. + logInfo("Size of " + rddType + + " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]") + assert( + rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint, + "Size of " + rddType + " did not reduce after checkpointing " + + " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]" + ) - // Test whether serialized size of the partitions has reduced. If the partitions - // do not have any non-transient reference to another RDD or another RDD's partitions, it - // does not refer to a lineage and therefore may not reduce in size after checkpointing. - // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions - // must be forgotten after checkpointing (to remove all reference to parent RDDs) and - // replaced with the HadooPartitions of the checkpointed RDD. - if (testRDDPartitionSize) { - logInfo("Size of " + rddType + " partitions " - + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]") - assert( - splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint, - "Size of " + rddType + " partitions did not reduce after checkpointing " + - "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]" - ) - } } /** * Test whether checkpointing of the parent of the generated RDD also * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed, - * this RDD will remember the partitions and therefore potentially the whole lineage. + * the generated RDD will remember the partitions and therefore potentially the whole lineage. + * This function should be called only those RDD whose partitions refer to parent RDD's + * partitions (i.e., do not call it on simple RDD like MappedRDD). + * */ - def testParentCheckpointing[U: ClassManifest]( - op: (RDD[Int]) => RDD[U], - testRDDSize: Boolean, - testRDDPartitionSize: Boolean - ) { + def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U]) { // Generate the final RDD using given RDD operation - val baseRDD = generateLongLineageRDD() + val baseRDD = generateFatRDD() val operatedRDD = op(baseRDD) - val parentRDD = operatedRDD.dependencies.head.rdd + val parentRDDs = operatedRDD.dependencies.map(_.rdd) val rddType = operatedRDD.getClass.getSimpleName - val parentRDDType = parentRDD.getClass.getSimpleName - // Get the partitions and dependencies of the parent in case they're lazily computed - parentRDD.dependencies - parentRDD.partitions + // Force initialization of all the data structures in RDDs + // Without this, serializing the RDD will give a wrong estimate of the size of the RDD + initializeRdd(operatedRDD) // Find serialized sizes before and after the checkpoint - val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) - parentRDD.checkpoint() // checkpoint the parent RDD, not the generated one - val result = operatedRDD.collect() - val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD) + logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString) + val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) + parentRDDs.foreach(_.checkpoint()) // checkpoint the parent RDD, not the generated one + val result = operatedRDD.collect() // force checkpointing + operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables + val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD) + logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString) // Test whether the data in the checkpointed RDD is same as original assert(operatedRDD.collect() === result) - // Test whether serialized size of the RDD has reduced because of its parent being - // checkpointed. If this RDD or its parent RDD do not have any dependency - // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may - // not reduce in size after checkpointing. - if (testRDDSize) { - assert( - rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint, - "Size of " + rddType + " did not reduce after checkpointing parent " + parentRDDType + - "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]" - ) - } - - // Test whether serialized size of the partitions has reduced because of its parent being - // checkpointed. If the partitions do not have any non-transient reference to another RDD - // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce - // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent - // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's - // partitions must have changed after checkpointing. - if (testRDDPartitionSize) { - assert( - splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint, - "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType + - "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]" - ) - } - + // Test whether serialized size of the partitions has reduced + logInfo("Size of partitions of " + rddType + + " [" + partitionSizeBeforeCheckpoint + " --> " + partitionSizeAfterCheckpoint + "]") + assert( + partitionSizeAfterCheckpoint < partitionSizeBeforeCheckpoint, + "Size of " + rddType + " partitions did not reduce after checkpointing parent RDDs" + + " [" + partitionSizeBeforeCheckpoint + " --> " + partitionSizeAfterCheckpoint + "]" + ) } /** - * Generate an RDD with a long lineage of one-to-one dependencies. + * Generate an RDD such that both the RDD and its partitions have large size. */ - def generateLongLineageRDD(): RDD[Int] = { - var rdd = sc.makeRDD(1 to 100, 4) - for (i <- 1 to 50) { - rdd = rdd.map(x => x + 1) - } - rdd + def generateFatRDD(): RDD[Int] = { + new FatRDD(sc.makeRDD(1 to 100, 4)).map(x => x) } /** - * Generate an RDD with a long lineage specifically for CoGroupedRDD. - * A CoGroupedRDD can have a long lineage only one of its parents have a long lineage - * and narrow dependency with this RDD. This method generate such an RDD by a sequence - * of cogroups and mapValues which creates a long lineage of narrow dependencies. + * Generate an pair RDD (with partitioner) such that both the RDD and its partitions + * have large size. */ - def generateLongLineageRDDForCoGroupedRDD() = { - val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _) - - def ones: RDD[(Int, Int)] = sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _) - - var cogrouped: RDD[(Int, (Seq[Int], Seq[Int]))] = ones.cogroup(ones) - for(i <- 1 to 10) { - cogrouped = cogrouped.mapValues(add).cogroup(ones) - } - cogrouped.mapValues(add) + def generateFatPairRDD() = { + new FatPairRDD(sc.makeRDD(1 to 100, 4), partitioner).mapValues(x => x) } /** @@ -363,8 +350,26 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint. */ def getSerializedSizes(rdd: RDD[_]): (Int, Int) = { - (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length, - Utils.serialize(rdd.partitions).length) + val rddSize = Utils.serialize(rdd).size + val rddCpDataSize = Utils.serialize(rdd.checkpointData).size + val rddPartitionSize = Utils.serialize(rdd.partitions).size + val rddDependenciesSize = Utils.serialize(rdd.dependencies).size + + // Print detailed size, helps in debugging + logInfo("Serialized sizes of " + rdd + + ": RDD = " + rddSize + + ", RDD checkpoint data = " + rddCpDataSize + + ", RDD partitions = " + rddPartitionSize + + ", RDD dependencies = " + rddDependenciesSize + ) + // this makes sure that serializing the RDD's checkpoint data does not + // serialize the whole RDD as well + assert( + rddSize > rddCpDataSize, + "RDD's checkpoint data (" + rddCpDataSize + ") is equal or larger than the " + + "whole RDD with checkpoint data (" + rddSize + ")" + ) + (rddSize - rddCpDataSize, rddPartitionSize) } /** @@ -376,8 +381,49 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val bytes = Utils.serialize(obj) Utils.deserialize[T](bytes) } + + /** + * Recursively force the initialization of the all members of an RDD and it parents. + */ + def initializeRdd(rdd: RDD[_]) { + rdd.partitions // forces the + rdd.dependencies.map(_.rdd).foreach(initializeRdd(_)) + } } +/** RDD partition that has large serialized size. */ +class FatPartition(val partition: Partition) extends Partition { + val bigData = new Array[Byte](10000) + def index: Int = partition.index +} + +/** RDD that has large serialized size. */ +class FatRDD(parent: RDD[Int]) extends RDD[Int](parent) { + val bigData = new Array[Byte](100000) + + protected def getPartitions: Array[Partition] = { + parent.partitions.map(p => new FatPartition(p)) + } + + def compute(split: Partition, context: TaskContext): Iterator[Int] = { + parent.compute(split.asInstanceOf[FatPartition].partition, context) + } +} + +/** Pair RDD that has large serialized size. */ +class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int, Int)](parent) { + val bigData = new Array[Byte](100000) + + protected def getPartitions: Array[Partition] = { + parent.partitions.map(p => new FatPartition(p)) + } + + @transient override val partitioner = Some(_partitioner) + + def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = { + parent.compute(split.asInstanceOf[FatPartition].partition, context).map(x => (x, x)) + } +} object CheckpointSuite { // This is a custom cogroup function that does not use mapValues like diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 480bac84f3..d9cb7fead5 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -122,7 +122,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc.parallelize(1 to 10, 10).foreach(x => println(x / 0)) } assert(thrown.getClass === classOf[SparkException]) - assert(thrown.getMessage.contains("more than 4 times")) + assert(thrown.getMessage.contains("failed 4 times")) } test("caching") { @@ -303,12 +303,13 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter Thread.sleep(200) } } catch { - case _ => { Thread.sleep(10) } + case _: Throwable => { Thread.sleep(10) } // Do nothing. We might see exceptions because block manager // is racing this thread to remove entries from the driver. } } } + } object DistributedSuite { diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index 01a72d8401..fb89537258 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -30,13 +30,15 @@ import org.apache.spark.util.Utils class DriverSuite extends FunSuite with Timeouts { test("driver should exit after finishing") { - assert(System.getenv("SPARK_HOME") != null) + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing" val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) forAll(masters) { (master: String) => - failAfter(30 seconds) { - Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master), - new File(System.getenv("SPARK_HOME"))) + failAfter(60 seconds) { + Utils.executeAndGetOutput( + Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master), + new File(sparkHome), + Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) } } } diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index af448fcb37..befdc1589f 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -42,7 +42,7 @@ class FailureSuite extends FunSuite with LocalSparkContext { // Run a 3-task map job in which task 1 deterministically fails once, and check // whether the job completes successfully and we ran 4 tasks in total. test("failure in a single-stage job") { - sc = new SparkContext("local[1,1]", "test") + sc = new SparkContext("local[1,2]", "test") val results = sc.makeRDD(1 to 3, 3).map { x => FailureSuiteState.synchronized { FailureSuiteState.tasksRun += 1 @@ -62,7 +62,7 @@ class FailureSuite extends FunSuite with LocalSparkContext { // Run a map-reduce job in which a reduce task deterministically fails once. test("failure in a two-stage job") { - sc = new SparkContext("local[1,1]", "test") + sc = new SparkContext("local[1,2]", "test") val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map { case (k, v) => FailureSuiteState.synchronized { diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index c210dd5c3b..a2eb9a4e84 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -17,33 +17,49 @@ package org.apache.spark +import java.io._ +import java.util.jar.{JarEntry, JarOutputStream} + +import SparkContext._ import com.google.common.io.Files import org.scalatest.FunSuite -import java.io.{File, PrintWriter, FileReader, BufferedReader} -import SparkContext._ class FileServerSuite extends FunSuite with LocalSparkContext { @transient var tmpFile: File = _ - @transient var testJarFile: File = _ - - override def beforeEach() { - super.beforeEach() - // Create a sample text file - val tmpdir = new File(Files.createTempDir(), "test") - tmpdir.mkdir() - tmpFile = new File(tmpdir, "FileServerSuite.txt") - val pw = new PrintWriter(tmpFile) + @transient var tmpJarUrl: String = _ + + override def beforeAll() { + super.beforeAll() + val tmpDir = new File(Files.createTempDir(), "test") + tmpDir.mkdir() + + val textFile = new File(tmpDir, "FileServerSuite.txt") + val pw = new PrintWriter(textFile) pw.println("100") pw.close() - } + + val jarFile = new File(tmpDir, "test.jar") + val jarStream = new FileOutputStream(jarFile) + val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) - override def afterEach() { - super.afterEach() - // Clean up downloaded file - if (tmpFile.exists) { - tmpFile.delete() + val jarEntry = new JarEntry(textFile.getName) + jar.putNextEntry(jarEntry) + + val in = new FileInputStream(textFile) + val buffer = new Array[Byte](10240) + var nRead = 0 + while (nRead <= 0) { + nRead = in.read(buffer, 0, buffer.length) + jar.write(buffer, 0, nRead) } + + in.close() + jar.close() + jarStream.close() + + tmpFile = textFile + tmpJarUrl = jarFile.toURI.toURL.toString } test("Distributing files locally") { @@ -77,18 +93,13 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test ("Dynamically adding JARS locally") { sc = new SparkContext("local[4]", "test") - val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() - sc.addJar(sampleJarFile) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader() - .loadClass("org.uncommons.maths.Maths") - .getDeclaredMethod("factorial", classOf[Int]) - val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - a + b - }.collect() - assert(result.toSet === Set((1,2), (2,7), (3,121))) + sc.addJar(tmpJarUrl) + val testData = Array((1, 1)) + sc.parallelize(testData).foreach { x => + if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { + throw new SparkException("jar not added") + } + } } test("Distributing files on a standalone cluster") { @@ -107,33 +118,24 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test ("Dynamically adding JARS on a standalone cluster") { sc = new SparkContext("local-cluster[1,1,512]", "test") - val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() - sc.addJar(sampleJarFile) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader() - .loadClass("org.uncommons.maths.Maths") - .getDeclaredMethod("factorial", classOf[Int]) - val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - a + b - }.collect() - assert(result.toSet === Set((1,2), (2,7), (3,121))) + sc.addJar(tmpJarUrl) + val testData = Array((1,1)) + sc.parallelize(testData).foreach { x => + if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { + throw new SparkException("jar not added") + } + } } test ("Dynamically adding JARS on a standalone cluster using local: URL") { sc = new SparkContext("local-cluster[1,1,512]", "test") - val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() - sc.addJar(sampleJarFile.replace("file", "local")) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader() - .loadClass("org.uncommons.maths.Maths") - .getDeclaredMethod("factorial", classOf[Int]) - val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - a + b - }.collect() - assert(result.toSet === Set((1,2), (2,7), (3,121))) + sc.addJar(tmpJarUrl.replace("file", "local")) + val testData = Array((1,1)) + sc.parallelize(testData).foreach { x => + if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { + throw new SparkException("jar not added") + } + } } + } diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 352036f182..23ec6c3b31 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -365,6 +365,20 @@ public class JavaAPISuite implements Serializable { } @Test + public void javaDoubleRDDHistoGram() { + JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); + // Test using generated buckets + Tuple2<double[], long[]> results = rdd.histogram(2); + double[] expected_buckets = {1.0, 2.5, 4.0}; + long[] expected_counts = {2, 2}; + Assert.assertArrayEquals(expected_buckets, results._1, 0.1); + Assert.assertArrayEquals(expected_counts, results._2); + // Test with provided buckets + long[] histogram = rdd.histogram(expected_buckets); + Assert.assertArrayEquals(expected_counts, histogram); + } + + @Test public void map() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() { @@ -837,7 +851,7 @@ public class JavaAPISuite implements Serializable { public void checkpointAndComputation() { File tempDir = Files.createTempDir(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - sc.setCheckpointDir(tempDir.getAbsolutePath(), true); + sc.setCheckpointDir(tempDir.getAbsolutePath()); Assert.assertEquals(false, rdd.isCheckpointed()); rdd.checkpoint(); rdd.count(); // Forces the DAG to cause a checkpoint @@ -849,7 +863,7 @@ public class JavaAPISuite implements Serializable { public void checkpointAndRestore() { File tempDir = Files.createTempDir(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - sc.setCheckpointDir(tempDir.getAbsolutePath(), true); + sc.setCheckpointDir(tempDir.getAbsolutePath()); Assert.assertEquals(false, rdd.isCheckpointed()); rdd.checkpoint(); rdd.count(); // Forces the DAG to cause a checkpoint @@ -883,4 +897,69 @@ public class JavaAPISuite implements Serializable { new Tuple2<Integer, Integer>(0, 4)), rdd3.collect()); } + + @Test + public void collectPartitions() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); + + JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Integer i) throws Exception { + return new Tuple2<Integer, Integer>(i, i % 2); + } + }); + + List[] parts = rdd1.collectPartitions(new int[] {0}); + Assert.assertEquals(Arrays.asList(1, 2), parts[0]); + + parts = rdd1.collectPartitions(new int[] {1, 2}); + Assert.assertEquals(Arrays.asList(3, 4), parts[0]); + Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); + + Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1), + new Tuple2<Integer, Integer>(2, 0)), + rdd2.collectPartitions(new int[] {0})[0]); + + parts = rdd2.collectPartitions(new int[] {1, 2}); + Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1), + new Tuple2<Integer, Integer>(4, 0)), + parts[0]); + Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1), + new Tuple2<Integer, Integer>(6, 0), + new Tuple2<Integer, Integer>(7, 1)), + parts[1]); + } + + @Test + public void countApproxDistinct() { + List<Integer> arrayData = new ArrayList<Integer>(); + int size = 100; + for (int i = 0; i < 100000; i++) { + arrayData.add(i % size); + } + JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01); + } + + @Test + public void countApproxDistinctByKey() { + double relativeSD = 0.001; + + List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>(); + for (int i = 10; i < 100; i++) + for (int j = 0; j < i; j++) + arrayData.add(new Tuple2<Integer, Integer>(i, j)); + + JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData); + List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect(); + for (Tuple2<Integer, Object> resItem : res) { + double count = (double)resItem._1(); + Long resCount = (Long)resItem._2(); + Double error = Math.abs((resCount - count) / count); + Assert.assertTrue(error < relativeSD); + } + + } } diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index d8a0e983b2..1121e06e2e 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -114,7 +114,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf // Once A is cancelled, job B should finish fairly quickly. assert(jobB.get() === 100) } - +/* test("two jobs sharing the same stage") { // sem1: make sure cancel is issued after some tasks are launched // sem2: make sure the first stage is not finished until cancel is issued @@ -148,7 +148,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf intercept[SparkException] { f1.get() } intercept[SparkException] { f2.get() } } - + */ def testCount() { // Cancel before launching any tasks { diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index b7eb268bd5..afc1beff98 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -23,9 +23,10 @@ import akka.actor._ import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AkkaUtils +import scala.concurrent.Await class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { - + private val conf = new SparkConf test("compressSize") { assert(MapOutputTracker.compressSize(0L) === 0) assert(MapOutputTracker.compressSize(1L) === 1) @@ -48,14 +49,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master start and stop") { val actorSystem = ActorSystem("test") - val tracker = new MapOutputTrackerMaster() + val tracker = new MapOutputTrackerMaster(conf) tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) tracker.stop() } test("master register and fetch") { val actorSystem = ActorSystem("test") - val tracker = new MapOutputTrackerMaster() + val tracker = new MapOutputTrackerMaster(conf) tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) @@ -74,7 +75,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master register and unregister and fetch") { val actorSystem = ActorSystem("test") - val tracker = new MapOutputTrackerMaster() + val tracker = new MapOutputTrackerMaster(conf) tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) @@ -96,18 +97,20 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("remote fetch") { val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) - val masterTracker = new MapOutputTrackerMaster() + val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") - val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0) - val slaveTracker = new MapOutputTracker() - slaveTracker.trackerActor = slaveSystem.actorFor( - "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker") + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf) + val slaveTracker = new MapOutputTracker(conf) + val selection = slaveSystem.actorSelection( + s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + val timeout = AkkaUtils.lookupTimeout(conf) + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) masterTracker.registerShuffle(10, 1) masterTracker.incrementEpoch() diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala index 288aa14eeb..c650ef4ed5 100644 --- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala @@ -27,8 +27,10 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite => def sc: SparkContext = _sc + var conf = new SparkConf(false) + override def beforeAll() { - _sc = new SparkContext("local", "test") + _sc = new SparkContext("local", "test", conf) super.beforeAll() } diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala new file mode 100644 index 0000000000..ef5936dd2f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -0,0 +1,110 @@ +package org.apache.spark + +import org.scalatest.FunSuite + +class SparkConfSuite extends FunSuite with LocalSparkContext { + // This test uses the spark.conf in core/src/test/resources, which has a few test properties + test("loading from spark.conf") { + val conf = new SparkConf() + assert(conf.get("spark.test.intTestProperty") === "1") + assert(conf.get("spark.test.stringTestProperty") === "hi") + // NOTE: we don't use list properties yet, but when we do, we'll have to deal with this syntax + assert(conf.get("spark.test.listTestProperty") === "[a, b]") + } + + // This test uses the spark.conf in core/src/test/resources, which has a few test properties + test("system properties override spark.conf") { + try { + System.setProperty("spark.test.intTestProperty", "2") + val conf = new SparkConf() + assert(conf.get("spark.test.intTestProperty") === "2") + assert(conf.get("spark.test.stringTestProperty") === "hi") + } finally { + System.clearProperty("spark.test.intTestProperty") + } + } + + test("initializing without loading defaults") { + try { + System.setProperty("spark.test.intTestProperty", "2") + val conf = new SparkConf(false) + assert(!conf.contains("spark.test.intTestProperty")) + assert(!conf.contains("spark.test.stringTestProperty")) + } finally { + System.clearProperty("spark.test.intTestProperty") + } + } + + test("named set methods") { + val conf = new SparkConf(false) + + conf.setMaster("local[3]") + conf.setAppName("My app") + conf.setSparkHome("/path") + conf.setJars(Seq("a.jar", "b.jar")) + conf.setExecutorEnv("VAR1", "value1") + conf.setExecutorEnv(Seq(("VAR2", "value2"), ("VAR3", "value3"))) + + assert(conf.get("spark.master") === "local[3]") + assert(conf.get("spark.app.name") === "My app") + assert(conf.get("spark.home") === "/path") + assert(conf.get("spark.jars") === "a.jar,b.jar") + assert(conf.get("spark.executorEnv.VAR1") === "value1") + assert(conf.get("spark.executorEnv.VAR2") === "value2") + assert(conf.get("spark.executorEnv.VAR3") === "value3") + + // Test the Java-friendly versions of these too + conf.setJars(Array("c.jar", "d.jar")) + conf.setExecutorEnv(Array(("VAR4", "value4"), ("VAR5", "value5"))) + assert(conf.get("spark.jars") === "c.jar,d.jar") + assert(conf.get("spark.executorEnv.VAR4") === "value4") + assert(conf.get("spark.executorEnv.VAR5") === "value5") + } + + test("basic get and set") { + val conf = new SparkConf(false) + assert(conf.getAll.toSet === Set()) + conf.set("k1", "v1") + conf.setAll(Seq(("k2", "v2"), ("k3", "v3"))) + assert(conf.getAll.toSet === Set(("k1", "v1"), ("k2", "v2"), ("k3", "v3"))) + conf.set("k1", "v4") + conf.setAll(Seq(("k2", "v5"), ("k3", "v6"))) + assert(conf.getAll.toSet === Set(("k1", "v4"), ("k2", "v5"), ("k3", "v6"))) + assert(conf.contains("k1"), "conf did not contain k1") + assert(!conf.contains("k4"), "conf contained k4") + assert(conf.get("k1") === "v4") + intercept[Exception] { conf.get("k4") } + assert(conf.get("k4", "not found") === "not found") + assert(conf.getOption("k1") === Some("v4")) + assert(conf.getOption("k4") === None) + } + + test("creating SparkContext without master and app name") { + val conf = new SparkConf(false) + intercept[SparkException] { sc = new SparkContext(conf) } + } + + test("creating SparkContext without master") { + val conf = new SparkConf(false).setAppName("My app") + intercept[SparkException] { sc = new SparkContext(conf) } + } + + test("creating SparkContext without app name") { + val conf = new SparkConf(false).setMaster("local") + intercept[SparkException] { sc = new SparkContext(conf) } + } + + test("creating SparkContext with both master and app name") { + val conf = new SparkConf(false).setMaster("local").setAppName("My app") + sc = new SparkContext(conf) + assert(sc.master === "local") + assert(sc.appName === "My app") + } + + test("SparkContext property overriding") { + val conf = new SparkConf(false).setMaster("local").setAppName("My app") + sc = new SparkContext("local[2]", "My other app", conf) + assert(sc.master === "local[2]") + assert(sc.appName === "My other app") + } +} diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala new file mode 100644 index 0000000000..f28d5c7b13 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.scalatest.{FunSuite, PrivateMethodTester} + +import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskScheduler} +import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend} +import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import org.apache.spark.scheduler.local.LocalBackend + +class SparkContextSchedulerCreationSuite + extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging { + + def createTaskScheduler(master: String): TaskSchedulerImpl = { + // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the + // real schedulers, so we don't want to create a full SparkContext with the desired scheduler. + sc = new SparkContext("local", "test") + val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler) + val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test") + sched.asInstanceOf[TaskSchedulerImpl] + } + + test("bad-master") { + val e = intercept[SparkException] { + createTaskScheduler("localhost:1234") + } + assert(e.getMessage.contains("Could not parse Master URL")) + } + + test("local") { + val sched = createTaskScheduler("local") + sched.backend match { + case s: LocalBackend => assert(s.totalCores === 1) + case _ => fail() + } + } + + test("local-n") { + val sched = createTaskScheduler("local[5]") + assert(sched.maxTaskFailures === 1) + sched.backend match { + case s: LocalBackend => assert(s.totalCores === 5) + case _ => fail() + } + } + + test("local-n-failures") { + val sched = createTaskScheduler("local[4, 2]") + assert(sched.maxTaskFailures === 2) + sched.backend match { + case s: LocalBackend => assert(s.totalCores === 4) + case _ => fail() + } + } + + test("simr") { + createTaskScheduler("simr://uri").backend match { + case s: SimrSchedulerBackend => // OK + case _ => fail() + } + } + + test("local-cluster") { + createTaskScheduler("local-cluster[3, 14, 512]").backend match { + case s: SparkDeploySchedulerBackend => // OK + case _ => fail() + } + } + + def testYarn(master: String, expectedClassName: String) { + try { + val sched = createTaskScheduler(master) + assert(sched.getClass === Class.forName(expectedClassName)) + } catch { + case e: SparkException => + assert(e.getMessage.contains("YARN mode not available")) + logWarning("YARN not available, could not test actual YARN scheduler creation") + case e: Throwable => fail(e) + } + } + + test("yarn-standalone") { + testYarn("yarn-standalone", "org.apache.spark.scheduler.cluster.YarnClusterScheduler") + } + + test("yarn-client") { + testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") + } + + def testMesos(master: String, expectedClass: Class[_]) { + try { + val sched = createTaskScheduler(master) + assert(sched.backend.getClass === expectedClass) + } catch { + case e: UnsatisfiedLinkError => + assert(e.getMessage.contains("no mesos in")) + logWarning("Mesos not available, could not test actual Mesos scheduler creation") + case e: Throwable => fail(e) + } + } + + test("mesos fine-grained") { + System.setProperty("spark.mesos.coarse", "false") + testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend]) + } + + test("mesos coarse-grained") { + System.setProperty("spark.mesos.coarse", "true") + testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend]) + } + + test("mesos with zookeeper") { + System.setProperty("spark.mesos.coarse", "false") + testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend]) + } +} diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala index 46a2da1724..768ca3850e 100644 --- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala +++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala @@ -37,7 +37,7 @@ class UnpersistSuite extends FunSuite with LocalSparkContext { Thread.sleep(200) } } catch { - case _ => { Thread.sleep(10) } + case _: Throwable => { Thread.sleep(10) } // Do nothing. We might see exceptions because block manager // is racing this thread to remove entries from the driver. } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 0b38e239f9..331fa3a642 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -70,10 +70,11 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc() : ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map()) - new ApplicationDescription("name", 4, 1234, cmd, "sparkHome", "appUiUrl") + new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl") } def createAppInfo() : ApplicationInfo = { - new ApplicationInfo(3, "id", createAppDesc(), new Date(123456789), null, "appUriStr") + new ApplicationInfo( + 3, "id", createAppDesc(), new Date(123456789), null, "appUriStr", Int.MaxValue) } def createWorkerInfo() : WorkerInfo = { new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 8f0954122b..be93074b7b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -1,14 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker import java.io.File + import org.scalatest.FunSuite + import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription} class ExecutorRunnerTest extends FunSuite { test("command includes appId") { def f(s:String) = new File(s) - val sparkHome = sys.env("SPARK_HOME") - val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()), + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()), sparkHome, "appUiUrl") val appId = "12345-worker321-9876" val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome), diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index ab81bfbe55..8d7546085f 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -20,9 +20,11 @@ package org.apache.spark.io import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import org.scalatest.FunSuite +import org.apache.spark.SparkConf class CompressionCodecSuite extends FunSuite { + val conf = new SparkConf(false) def testCodec(codec: CompressionCodec) { // Write 1000 integers to the output stream, compressed. @@ -43,19 +45,19 @@ class CompressionCodecSuite extends FunSuite { } test("default compression codec") { - val codec = CompressionCodec.createCodec() + val codec = CompressionCodec.createCodec(conf) assert(codec.getClass === classOf[LZFCompressionCodec]) testCodec(codec) } test("lzf compression codec") { - val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName) + val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName) assert(codec.getClass === classOf[LZFCompressionCodec]) testCodec(codec) } test("snappy compression codec") { - val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName) + val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName) assert(codec.getClass === classOf[SnappyCompressionCodec]) testCodec(codec) } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 7181333adf..71a2c6c498 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -19,17 +19,19 @@ package org.apache.spark.metrics import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.deploy.master.MasterSource +import org.apache.spark.SparkConf class MetricsSystemSuite extends FunSuite with BeforeAndAfter { var filePath: String = _ + var conf: SparkConf = null before { filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile() - System.setProperty("spark.metrics.conf", filePath) + conf = new SparkConf(false).set("spark.metrics.conf", filePath) } test("MetricsSystem with default config") { - val metricsSystem = MetricsSystem.createMetricsSystem("default") + val metricsSystem = MetricsSystem.createMetricsSystem("default", conf) val sources = metricsSystem.sources val sinks = metricsSystem.sinks @@ -39,7 +41,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter { } test("MetricsSystem with sources add") { - val metricsSystem = MetricsSystem.createMetricsSystem("test") + val metricsSystem = MetricsSystem.createMetricsSystem("test", conf) val sources = metricsSystem.sources val sinks = metricsSystem.sinks diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index da032b17d9..0d4c10db8e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.rdd import java.util.concurrent.Semaphore +import scala.concurrent.{Await, TimeoutException} +import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext.Implicits.global import org.scalatest.{BeforeAndAfterAll, FunSuite} @@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts sem.acquire(2) } } + + /** + * Awaiting FutureAction results + */ + test("FutureAction result, infinite wait") { + val f = sc.parallelize(1 to 100, 4) + .countAsync() + assert(Await.result(f, Duration.Inf) === 100) + } + + test("FutureAction result, finite wait") { + val f = sc.parallelize(1 to 100, 4) + .countAsync() + assert(Await.result(f, Duration(30, "seconds")) === 100) + } + + test("FutureAction result, timeout") { + val f = sc.parallelize(1 to 100, 4) + .mapPartitions(itr => { Thread.sleep(20); itr }) + .countAsync() + intercept[TimeoutException] { + Await.result(f, Duration(20, "milliseconds")) + } + } } diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala new file mode 100644 index 0000000000..7f50a5a47c --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.math.abs +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ +import org.apache.spark._ + +class DoubleRDDSuite extends FunSuite with SharedSparkContext { + // Verify tests on the histogram functionality. We test with both evenly + // and non-evenly spaced buckets as the bucket lookup function changes. + test("WorksOnEmpty") { + // Make sure that it works on an empty input + val rdd: RDD[Double] = sc.parallelize(Seq()) + val buckets = Array(0.0, 10.0) + val histogramResults = rdd.histogram(buckets) + val histogramResults2 = rdd.histogram(buckets, true) + val expectedHistogramResults = Array(0) + assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) + } + + test("WorksWithOutOfRangeWithOneBucket") { + // Verify that if all of the elements are out of range the counts are zero + val rdd = sc.parallelize(Seq(10.01, -0.01)) + val buckets = Array(0.0, 10.0) + val histogramResults = rdd.histogram(buckets) + val histogramResults2 = rdd.histogram(buckets, true) + val expectedHistogramResults = Array(0) + assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) + } + + test("WorksInRangeWithOneBucket") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd = sc.parallelize(Seq(1, 2, 3, 4)) + val buckets = Array(0.0, 10.0) + val histogramResults = rdd.histogram(buckets) + val histogramResults2 = rdd.histogram(buckets, true) + val expectedHistogramResults = Array(4) + assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) + } + + test("WorksInRangeWithOneBucketExactMatch") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd = sc.parallelize(Seq(1, 2, 3, 4)) + val buckets = Array(1.0, 4.0) + val histogramResults = rdd.histogram(buckets) + val histogramResults2 = rdd.histogram(buckets, true) + val expectedHistogramResults = Array(4) + assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) + } + + test("WorksWithOutOfRangeWithTwoBuckets") { + // Verify that out of range works with two buckets + val rdd = sc.parallelize(Seq(10.01, -0.01)) + val buckets = Array(0.0, 5.0, 10.0) + val histogramResults = rdd.histogram(buckets) + val histogramResults2 = rdd.histogram(buckets, true) + val expectedHistogramResults = Array(0, 0) + assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) + } + + test("WorksWithOutOfRangeWithTwoUnEvenBuckets") { + // Verify that out of range works with two un even buckets + val rdd = sc.parallelize(Seq(10.01, -0.01)) + val buckets = Array(0.0, 4.0, 10.0) + val histogramResults = rdd.histogram(buckets) + val expectedHistogramResults = Array(0, 0) + assert(histogramResults === expectedHistogramResults) + } + + test("WorksInRangeWithTwoBuckets") { + // Make sure that it works with two equally spaced buckets and elements in each + val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6)) + val buckets = Array(0.0, 5.0, 10.0) + val histogramResults = rdd.histogram(buckets) + val histogramResults2 = rdd.histogram(buckets, true) + val expectedHistogramResults = Array(3, 2) + assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) + } + + test("WorksInRangeWithTwoBucketsAndNaN") { + // Make sure that it works with two equally spaced buckets and elements in each + val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6, Double.NaN)) + val buckets = Array(0.0, 5.0, 10.0) + val histogramResults = rdd.histogram(buckets) + val histogramResults2 = rdd.histogram(buckets, true) + val expectedHistogramResults = Array(3, 2) + assert(histogramResults === expectedHistogramResults) + assert(histogramResults2 === expectedHistogramResults) + } + + test("WorksInRangeWithTwoUnevenBuckets") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6)) + val buckets = Array(0.0, 5.0, 11.0) + val histogramResults = rdd.histogram(buckets) + val expectedHistogramResults = Array(3, 2) + assert(histogramResults === expectedHistogramResults) + } + + test("WorksMixedRangeWithTwoUnevenBuckets") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01)) + val buckets = Array(0.0, 5.0, 11.0) + val histogramResults = rdd.histogram(buckets) + val expectedHistogramResults = Array(4, 3) + assert(histogramResults === expectedHistogramResults) + } + + test("WorksMixedRangeWithFourUnevenBuckets") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, + 200.0, 200.1)) + val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0) + val histogramResults = rdd.histogram(buckets) + val expectedHistogramResults = Array(4, 2, 1, 3) + assert(histogramResults === expectedHistogramResults) + } + + test("WorksMixedRangeWithUnevenBucketsAndNaN") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, + 200.0, 200.1, Double.NaN)) + val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0) + val histogramResults = rdd.histogram(buckets) + val expectedHistogramResults = Array(4, 2, 1, 3) + assert(histogramResults === expectedHistogramResults) + } + // Make sure this works with a NaN end bucket + test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRange") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, + 200.0, 200.1, Double.NaN)) + val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN) + val histogramResults = rdd.histogram(buckets) + val expectedHistogramResults = Array(4, 2, 1, 2, 3) + assert(histogramResults === expectedHistogramResults) + } + // Make sure this works with a NaN end bucket and an inifity + test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRangeAndInfity") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, + 200.0, 200.1, 1.0/0.0, -1.0/0.0, Double.NaN)) + val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN) + val histogramResults = rdd.histogram(buckets) + val expectedHistogramResults = Array(4, 2, 1, 2, 4) + assert(histogramResults === expectedHistogramResults) + } + + test("WorksWithOutOfRangeWithInfiniteBuckets") { + // Verify that out of range works with two buckets + val rdd = sc.parallelize(Seq(10.01, -0.01, Double.NaN)) + val buckets = Array(-1.0/0.0 , 0.0, 1.0/0.0) + val histogramResults = rdd.histogram(buckets) + val expectedHistogramResults = Array(1, 1) + assert(histogramResults === expectedHistogramResults) + } + // Test the failure mode with an invalid bucket array + test("ThrowsExceptionOnInvalidBucketArray") { + val rdd = sc.parallelize(Seq(1.0)) + // Empty array + intercept[IllegalArgumentException] { + val buckets = Array.empty[Double] + val result = rdd.histogram(buckets) + } + // Single element array + intercept[IllegalArgumentException] { + val buckets = Array(1.0) + val result = rdd.histogram(buckets) + } + } + + // Test automatic histogram function + test("WorksWithoutBucketsBasic") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd = sc.parallelize(Seq(1, 2, 3, 4)) + val (histogramBuckets, histogramResults) = rdd.histogram(1) + val expectedHistogramResults = Array(4) + val expectedHistogramBuckets = Array(1.0, 4.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + // Test automatic histogram function with a single element + test("WorksWithoutBucketsBasicSingleElement") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd = sc.parallelize(Seq(1)) + val (histogramBuckets, histogramResults) = rdd.histogram(1) + val expectedHistogramResults = Array(1) + val expectedHistogramBuckets = Array(1.0, 1.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + // Test automatic histogram function with a single element + test("WorksWithoutBucketsBasicNoRange") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd = sc.parallelize(Seq(1, 1, 1, 1)) + val (histogramBuckets, histogramResults) = rdd.histogram(1) + val expectedHistogramResults = Array(4) + val expectedHistogramBuckets = Array(1.0, 1.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + + test("WorksWithoutBucketsBasicTwo") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd = sc.parallelize(Seq(1, 2, 3, 4)) + val (histogramBuckets, histogramResults) = rdd.histogram(2) + val expectedHistogramResults = Array(2, 2) + val expectedHistogramBuckets = Array(1.0, 2.5, 4.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + + test("WorksWithoutBucketsWithMoreRequestedThanElements") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd = sc.parallelize(Seq(1, 2)) + val (histogramBuckets, histogramResults) = rdd.histogram(10) + val expectedHistogramResults = + Array(1, 0, 0, 0, 0, 0, 0, 0, 0, 1) + val expectedHistogramBuckets = + Array(1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + + // Test the failure mode with an invalid RDD + test("ThrowsExceptionOnInvalidRDDs") { + // infinity + intercept[UnsupportedOperationException] { + val rdd = sc.parallelize(Seq(1, 1.0/0.0)) + val result = rdd.histogram(1) + } + // NaN + intercept[UnsupportedOperationException] { + val rdd = sc.parallelize(Seq(1, Double.NaN)) + val result = rdd.histogram(1) + } + // Empty + intercept[UnsupportedOperationException] { + val rdd: RDD[Double] = sc.parallelize(Seq()) + val result = rdd.histogram(1) + } + } + +} diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 57d3382ed0..5da538a1dd 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.rdd import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashSet +import scala.util.Random import org.scalatest.FunSuite @@ -109,6 +110,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(deps.size === 2) // ShuffledRDD, ParallelCollection. } + test("countApproxDistinctByKey") { + def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble + + /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is + * only a statistical bound, the tests can fail for large values of relativeSD. We will be using + * relatively tight error bounds to check correctness of functionality rather than checking + * whether the approximation conforms with the requested bound. + */ + val relativeSD = 0.001 + + // For each value i, there are i tuples with first element equal to i. + // Therefore, the expected count for key i would be i. + val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) + val rdd1 = sc.parallelize(stacked) + val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect() + counted1.foreach{ + case(k, count) => assert(error(count, k) < relativeSD) + } + + val rnd = new Random() + + // The expected count for key num would be num + val randStacked = (1 to 100).flatMap { i => + val num = rnd.nextInt % 500 + (1 to num).map(j => (num, j)) + } + val rdd2 = sc.parallelize(randStacked) + val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect() + counted2.foreach{ + case(k, count) => assert(error(count, k) < relativeSD) + } + } + test("join") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 354ab8ae5d..559ea051d3 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } + test("countApproxDistinct") { + + def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble + + val size = 100 + val uniformDistro = for (i <- 1 to 100000) yield i % size + val simpleRdd = sc.makeRDD(uniformDistro) + assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2) + assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05) + assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01) + assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001) + } + test("SparkContext.union") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(sc.union(nums).collect().toList === List(1, 2, 3, 4)) @@ -71,6 +84,33 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) } + test("partitioner aware union") { + import SparkContext._ + def makeRDDWithPartitioner(seq: Seq[Int]) = { + sc.makeRDD(seq, 1) + .map(x => (x, null)) + .partitionBy(new HashPartitioner(2)) + .mapPartitions(_.map(_._1), true) + } + + val nums1 = makeRDDWithPartitioner(1 to 4) + val nums2 = makeRDDWithPartitioner(5 to 8) + assert(nums1.partitioner == nums2.partitioner) + assert(new PartitionerAwareUnionRDD(sc, Seq(nums1)).collect().toSet === Set(1, 2, 3, 4)) + + val union = new PartitionerAwareUnionRDD(sc, Seq(nums1, nums2)) + assert(union.collect().toSet === Set(1, 2, 3, 4, 5, 6, 7, 8)) + val nums1Parts = nums1.collectPartitions() + val nums2Parts = nums2.collectPartitions() + val unionParts = union.collectPartitions() + assert(nums1Parts.length === 2) + assert(nums2Parts.length === 2) + assert(unionParts.length === 2) + assert((nums1Parts(0) ++ nums2Parts(0)).toList === unionParts(0).toList) + assert((nums1Parts(1) ++ nums2Parts(1)).toList === unionParts(1).toList) + assert(union.partitioner === nums1.partitioner) + } + test("aggregate") { val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) type StringMap = HashMap[String, Int] @@ -244,8 +284,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { // test that you get over 90% locality in each group val minLocality = coalesced2.partitions .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction) - .foldLeft(1.)((perc, loc) => math.min(perc,loc)) - assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%") + .foldLeft(1.0)((perc, loc) => math.min(perc,loc)) + assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.0).toInt + "%") // test that the groups are load balanced with 100 +/- 20 elements in each val maxImbalance = coalesced2.partitions @@ -257,9 +297,9 @@ class RDDSuite extends FunSuite with SharedSparkContext { val coalesced3 = data3.coalesce(numMachines*2) val minLocality2 = coalesced3.partitions .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction) - .foldLeft(1.)((perc, loc) => math.min(perc,loc)) + .foldLeft(1.0)((perc, loc) => math.min(perc,loc)) assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " + - (minLocality2*100.).toInt + "%") + (minLocality2*100.0).toInt + "%") } test("zipped RDDs") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala index 95d3553d91..7bf2020fe3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala @@ -15,14 +15,12 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter import org.apache.spark._ -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster._ import scala.collection.mutable.ArrayBuffer import java.util.Properties @@ -31,9 +29,9 @@ class FakeTaskSetManager( initPriority: Int, initStageId: Int, initNumTasks: Int, - clusterScheduler: ClusterScheduler, + clusterScheduler: TaskSchedulerImpl, taskSet: TaskSet) - extends ClusterTaskSetManager(clusterScheduler, taskSet) { + extends TaskSetManager(clusterScheduler, taskSet, 0) { parent = null weight = 1 @@ -106,7 +104,7 @@ class FakeTaskSetManager( class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging { - def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = { + def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = { new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet) } @@ -133,7 +131,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging test("FIFO Scheduler Test") { sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new ClusterScheduler(sc) + val clusterScheduler = new TaskSchedulerImpl(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -160,7 +158,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging test("Fair Scheduler Test") { sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new ClusterScheduler(sc) + val clusterScheduler = new TaskSchedulerImpl(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -169,7 +167,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() System.setProperty("spark.scheduler.allocation.file", xmlPath) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) schedulableBuilder.buildPools() assert(rootPool.getSchedulableByName("default") != null) @@ -217,7 +215,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging test("Nested Pool Test") { sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new ClusterScheduler(sc) + val clusterScheduler = new TaskSchedulerImpl(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a4d41ebbff..2aa259daf3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,21 +17,14 @@ package org.apache.spark.scheduler -import scala.collection.mutable.{Map, HashMap} - -import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter - -import org.apache.spark.LocalSparkContext -import org.apache.spark.MapOutputTrackerMaster -import org.apache.spark.SparkContext -import org.apache.spark.Partition -import org.apache.spark.TaskContext -import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency} -import org.apache.spark.{FetchFailed, Success, TaskEndReason} +import scala.Tuple2 +import scala.collection.mutable.{HashMap, Map} + +import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} +import org.scalatest.{BeforeAndAfter, FunSuite} /** * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler @@ -46,7 +39,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} * and capturing the resulting TaskSets from the mock TaskScheduler. */ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { - + val conf = new SparkConf /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() val taskScheduler = new TaskScheduler() { @@ -74,7 +67,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations - val blockManagerMaster = new BlockManagerMaster(null) { + val blockManagerMaster = new BlockManagerMaster(null, conf) { override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { blockIds.map { _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). @@ -99,7 +92,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont taskSets.clear() cacheLocations.clear() results.clear() - mapOutputTracker = new MapOutputTrackerMaster() + mapOutputTracker = new MapOutputTrackerMaster(conf) scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, sc.env) { override def runLocally(job: ActiveJob) { // don't bother with the thread while unit testing @@ -206,6 +199,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont submit(rdd, Array(0)) complete(taskSets(0), List((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty } test("local job") { @@ -219,6 +213,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val jobId = scheduler.nextJobId.getAndIncrement() runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, listener)) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty } test("run trivial job w/ dependency") { @@ -227,6 +222,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont submit(finalRdd, Array(0)) complete(taskSets(0), Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty } test("cache location preferences w/ dependency") { @@ -239,12 +235,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertLocations(taskSet, Seq(Seq("hostA", "hostB"))) complete(taskSet, Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty } test("trivial job failure") { submit(makeRdd(1, Nil), Array(0)) failed(taskSets(0), "some failure") assert(failure.getMessage === "Job aborted: some failure") + assertDataStructuresEmpty } test("run trivial shuffle") { @@ -260,6 +258,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) complete(taskSets(1), Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty } test("run trivial shuffle with fetch failure") { @@ -285,6 +284,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB")) complete(taskSets(3), Seq((Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) + assertDataStructuresEmpty } test("ignore late map task completions") { @@ -313,6 +313,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) + assertDataStructuresEmpty } test("run trivial shuffle with out-of-band failure and retry") { @@ -329,15 +330,16 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) - // have hostC complete the resubmitted task - complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) - assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) - complete(taskSets(2), Seq((Success, 42))) - assert(results === Map(0 -> 42)) - } - - test("recursive shuffle failures") { + // have hostC complete the resubmitted task + complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) + complete(taskSets(2), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assertDataStructuresEmpty + } + + test("recursive shuffle failures") { val shuffleOneRdd = makeRdd(2, Nil) val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) @@ -363,6 +365,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) complete(taskSets(5), Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty } test("cached post-shuffle") { @@ -394,6 +397,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1)))) complete(taskSets(4), Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty } /** @@ -413,4 +417,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont private def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345, 0) + private def assertDataStructuresEmpty = { + assert(scheduler.pendingTasks.isEmpty) + assert(scheduler.activeJobs.isEmpty) + assert(scheduler.failed.isEmpty) + assert(scheduler.idToActiveJob.isEmpty) + assert(scheduler.jobIdToStageIds.isEmpty) + assert(scheduler.stageIdToJobIds.isEmpty) + assert(scheduler.stageIdToStage.isEmpty) + assert(scheduler.stageToInfos.isEmpty) + assert(scheduler.resultStageToJob.isEmpty) + assert(scheduler.running.isEmpty) + assert(scheduler.shuffleToMapStage.isEmpty) + assert(scheduler.waiting.isEmpty) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index 0f01515179..0b90c4e74c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -15,10 +15,9 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import org.apache.spark.TaskContext -import org.apache.spark.scheduler.{TaskLocation, Task} class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) { override def runTask(context: TaskContext): Int = 0 diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala index 984881861c..5cc48ee00a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + val WAIT_TIMEOUT_MILLIS = 10000 test("inner method") { sc = new SparkContext("local", "joblogger") @@ -92,7 +93,9 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) } rdd.reduceByKey(_+_).collect() - val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER) + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + + val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER) joblogger.getLogDir should be ("/tmp/spark-%s".format(user)) joblogger.getJobIDtoPrintWriter.size should be (1) @@ -114,13 +117,15 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1 override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1 override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1 - override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1 + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1 override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1 } sc.addSparkListener(joblogger) val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) } rdd.reduceByKey(_+_).collect() - + + assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + joblogger.onJobStartCount should be (1) joblogger.onJobEndCount should be (1) joblogger.onTaskEndCount should be (8) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 1fd76420ea..1a16e438c4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -19,23 +19,26 @@ package org.apache.spark.scheduler import scala.collection.mutable.{Buffer, HashSet} -import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.matchers.ShouldMatchers import org.apache.spark.{LocalSparkContext, SparkContext} import org.apache.spark.SparkContext._ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers - with BeforeAndAfterAll { + with BeforeAndAfter with BeforeAndAfterAll { /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 + before { + sc = new SparkContext("local", "SparkListenerSuite") + } + override def afterAll { System.clearProperty("spark.akka.frameSize") } test("basic creation of StageInfo") { - sc = new SparkContext("local", "DAGSchedulerSuite") val listener = new SaveStageInfo sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) @@ -56,7 +59,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } test("StageInfo with fewer tasks than partitions") { - sc = new SparkContext("local", "DAGSchedulerSuite") val listener = new SaveStageInfo sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) @@ -72,7 +74,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } test("local metrics") { - sc = new SparkContext("local", "DAGSchedulerSuite") val listener = new SaveStageInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) @@ -135,17 +136,13 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } test("onTaskGettingResult() called when result fetched remotely") { - // Need to use local cluster mode here, because results are not ever returned through the - // block manager when using the LocalScheduler. - sc = new SparkContext("local-cluster[1,1,512]", "test") - val listener = new SaveTaskEvents sc.addSparkListener(listener) // Make a task whose result is larger than the akka frame size System.setProperty("spark.akka.frameSize", "1") val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,y) => x) assert(result === 1.to(akkaFrameSize).toArray) @@ -157,10 +154,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } test("onTaskGettingResult() not called when result sent directly") { - // Need to use local cluster mode here, because results are not ever returned through the - // block manager when using the LocalScheduler. - sc = new SparkContext("local-cluster[1,1,512]", "test") - val listener = new SaveTaskEvents sc.addSparkListener(listener) @@ -181,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc class SaveStageInfo extends SparkListener { val stageInfos = Buffer[StageInfo]() - override def onStageCompleted(stage: StageCompleted) { + override def onStageCompleted(stage: SparkListenerStageCompleted) { stageInfos += stage.stage } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index ee150a3107..4b52d9651e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import java.nio.ByteBuffer import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} -import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv} -import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv} import org.apache.spark.storage.TaskResultBlockId /** @@ -31,12 +30,12 @@ import org.apache.spark.storage.TaskResultBlockId * Used to test the case where a BlockManager evicts the task result (or dies) before the * TaskResult is retrieved. */ -class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) +class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl) extends TaskResultGetter(sparkEnv, scheduler) { var removedResult = false override def enqueueSuccessfulTask( - taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) { + taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) { if (!removedResult) { // Only remove the result once, since we'd like to test the case where the task eventually // succeeds. @@ -44,13 +43,13 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSched case IndirectTaskResult(blockId) => sparkEnv.blockManager.master.removeBlock(blockId) case directResult: DirectTaskResult[_] => - taskSetManager.abort("Internal error: expect only indirect results") + taskSetManager.abort("Internal error: expect only indirect results") } serializedData.rewind() removedResult = true } super.enqueueSuccessfulTask(taskSetManager, tid, serializedData) - } + } } /** @@ -65,24 +64,20 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA System.setProperty("spark.akka.frameSize", "1") } - before { - // Use local-cluster mode because results are returned differently when running with the - // LocalScheduler. - sc = new SparkContext("local-cluster[1,1,512]", "test") - } - override def afterAll { System.clearProperty("spark.akka.frameSize") } test("handling results smaller than Akka frame size") { + sc = new SparkContext("local", "test") val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x) assert(result === 2) } - test("handling results larger than Akka frame size") { + test("handling results larger than Akka frame size") { + sc = new SparkContext("local", "test") val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) assert(result === 1.to(akkaFrameSize).toArray) @@ -92,10 +87,13 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA } test("task retried if result missing from block manager") { + // Set the maximum number of task failures to > 0, so that the task set isn't aborted + // after the result is missing. + sc = new SparkContext("local[1,2]", "test") // If this test hangs, it's probably because no resource offers were made after the task // failed. - val scheduler: ClusterScheduler = sc.taskScheduler match { - case clusterScheduler: ClusterScheduler => + val scheduler: TaskSchedulerImpl = sc.taskScheduler match { + case clusterScheduler: TaskSchedulerImpl => clusterScheduler case _ => assert(false, "Expect local cluster to use ClusterScheduler") @@ -103,7 +101,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA } scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) assert(result === 1.to(akkaFrameSize).toArray) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index b97f2b19b5..1eec6726f4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import scala.collection.mutable.ArrayBuffer import scala.collection.mutable @@ -23,7 +23,6 @@ import scala.collection.mutable import org.scalatest.FunSuite import org.apache.spark._ -import org.apache.spark.scheduler._ import org.apache.spark.executor.TaskMetrics import java.nio.ByteBuffer import org.apache.spark.util.{Utils, FakeClock} @@ -56,10 +55,10 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler * A mock ClusterScheduler implementation that just remembers information about tasks started and * feedback received from the TaskSetManagers. Note that it's important to initialize this with * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost - * to work, and these are required for locality in ClusterTaskSetManager. + * to work, and these are required for locality in TaskSetManager. */ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */) - extends ClusterScheduler(sc) + extends TaskSchedulerImpl(sc) { val startedTasks = new ArrayBuffer[Long] val endedTasks = new mutable.HashMap[Long, TaskEndReason] @@ -79,16 +78,19 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host) } -class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { +class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL} - val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong + private val conf = new SparkConf + + val LOCALITY_WAIT = conf.get("spark.locality.wait", "3000").toLong + val MAX_TASK_FAILURES = 4 test("TaskSet with no preferences") { sc = new SparkContext("local", "test") val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) - val manager = new ClusterTaskSetManager(sched, taskSet) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) // Offer a host with no CPUs assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None) @@ -114,7 +116,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo sc = new SparkContext("local", "test") val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(3) - val manager = new ClusterTaskSetManager(sched, taskSet) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) // First three offers should all find tasks for (i <- 0 until 3) { @@ -151,7 +153,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo Seq() // Last task has no locality prefs ) val clock = new FakeClock - val manager = new ClusterTaskSetManager(sched, taskSet, clock) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) // First offer host1, exec1: first task should be chosen assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) @@ -197,7 +199,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo Seq(TaskLocation("host2")) ) val clock = new FakeClock - val manager = new ClusterTaskSetManager(sched, taskSet, clock) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) // First offer host1: first task should be chosen assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) @@ -234,7 +236,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo Seq(TaskLocation("host3")) ) val clock = new FakeClock - val manager = new ClusterTaskSetManager(sched, taskSet, clock) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) // First offer host1: first task should be chosen assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) @@ -262,7 +264,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) val clock = new FakeClock - val manager = new ClusterTaskSetManager(sched, taskSet, clock) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) @@ -279,17 +281,17 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) val clock = new FakeClock - val manager = new ClusterTaskSetManager(sched, taskSet, clock) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted // after the last failure. - (0 until manager.MAX_TASK_FAILURES).foreach { index => + (1 to manager.maxTaskFailures).foreach { index => val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY) assert(offerResult != None, "Expect resource offer on iteration %s to return a task".format(index)) assert(offerResult.get.index === 0) manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost)) - if (index < manager.MAX_TASK_FAILURES) { + if (index < MAX_TASK_FAILURES) { assert(!sched.taskSetsFailed.contains(taskSet.id)) } else { assert(sched.taskSetsFailed.contains(taskSet.id)) @@ -313,6 +315,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo } def createTaskResult(id: Int): DirectTaskResult[Int] = { - new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics) + val valueSer = SparkEnv.get.serializer.newInstance() + new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala deleted file mode 100644 index 1e676c1719..0000000000 --- a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.local - -import java.util.concurrent.Semaphore -import java.util.concurrent.CountDownLatch - -import scala.collection.mutable.HashMap - -import org.scalatest.{BeforeAndAfterEach, FunSuite} - -import org.apache.spark._ - - -class Lock() { - var finished = false - def jobWait() = { - synchronized { - while(!finished) { - this.wait() - } - } - } - - def jobFinished() = { - synchronized { - finished = true - this.notifyAll() - } - } -} - -object TaskThreadInfo { - val threadToLock = HashMap[Int, Lock]() - val threadToRunning = HashMap[Int, Boolean]() - val threadToStarted = HashMap[Int, CountDownLatch]() -} - -/* - * 1. each thread contains one job. - * 2. each job contains one stage. - * 3. each stage only contains one task. - * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure - * it will get cpu core resource, and will wait to finished after user manually - * release "Lock" and then cluster will contain another free cpu cores. - * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue, - * thus it will be scheduled later when cluster has free cpu cores. - */ -class LocalSchedulerSuite extends FunSuite with LocalSparkContext with BeforeAndAfterEach { - - override def afterEach() { - super.afterEach() - System.clearProperty("spark.scheduler.mode") - } - - def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) { - - TaskThreadInfo.threadToRunning(threadIndex) = false - val nums = sc.parallelize(threadIndex to threadIndex, 1) - TaskThreadInfo.threadToLock(threadIndex) = new Lock() - TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1) - new Thread { - if (poolName != null) { - sc.setLocalProperty("spark.scheduler.pool", poolName) - } - override def run() { - val ans = nums.map(number => { - TaskThreadInfo.threadToRunning(number) = true - TaskThreadInfo.threadToStarted(number).countDown() - TaskThreadInfo.threadToLock(number).jobWait() - TaskThreadInfo.threadToRunning(number) = false - number - }).collect() - assert(ans.toList === List(threadIndex)) - sem.release() - } - }.start() - } - - test("Local FIFO scheduler end-to-end test") { - System.setProperty("spark.scheduler.mode", "FIFO") - sc = new SparkContext("local[4]", "test") - val sem = new Semaphore(0) - - createThread(1,null,sc,sem) - TaskThreadInfo.threadToStarted(1).await() - createThread(2,null,sc,sem) - TaskThreadInfo.threadToStarted(2).await() - createThread(3,null,sc,sem) - TaskThreadInfo.threadToStarted(3).await() - createThread(4,null,sc,sem) - TaskThreadInfo.threadToStarted(4).await() - // thread 5 and 6 (stage pending)must meet following two points - // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager - // queue before executing TaskThreadInfo.threadToLock(1).jobFinished() - // 2. priority of stage in thread 5 should be prior to priority of stage in thread 6 - // So I just use "sleep" 1s here for each thread. - // TODO: any better solution? - createThread(5,null,sc,sem) - Thread.sleep(1000) - createThread(6,null,sc,sem) - Thread.sleep(1000) - - assert(TaskThreadInfo.threadToRunning(1) === true) - assert(TaskThreadInfo.threadToRunning(2) === true) - assert(TaskThreadInfo.threadToRunning(3) === true) - assert(TaskThreadInfo.threadToRunning(4) === true) - assert(TaskThreadInfo.threadToRunning(5) === false) - assert(TaskThreadInfo.threadToRunning(6) === false) - - TaskThreadInfo.threadToLock(1).jobFinished() - TaskThreadInfo.threadToStarted(5).await() - - assert(TaskThreadInfo.threadToRunning(1) === false) - assert(TaskThreadInfo.threadToRunning(2) === true) - assert(TaskThreadInfo.threadToRunning(3) === true) - assert(TaskThreadInfo.threadToRunning(4) === true) - assert(TaskThreadInfo.threadToRunning(5) === true) - assert(TaskThreadInfo.threadToRunning(6) === false) - - TaskThreadInfo.threadToLock(3).jobFinished() - TaskThreadInfo.threadToStarted(6).await() - - assert(TaskThreadInfo.threadToRunning(1) === false) - assert(TaskThreadInfo.threadToRunning(2) === true) - assert(TaskThreadInfo.threadToRunning(3) === false) - assert(TaskThreadInfo.threadToRunning(4) === true) - assert(TaskThreadInfo.threadToRunning(5) === true) - assert(TaskThreadInfo.threadToRunning(6) === true) - - TaskThreadInfo.threadToLock(2).jobFinished() - TaskThreadInfo.threadToLock(4).jobFinished() - TaskThreadInfo.threadToLock(5).jobFinished() - TaskThreadInfo.threadToLock(6).jobFinished() - sem.acquire(6) - } - - test("Local fair scheduler end-to-end test") { - System.setProperty("spark.scheduler.mode", "FAIR") - val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - System.setProperty("spark.scheduler.allocation.file", xmlPath) - - sc = new SparkContext("local[8]", "LocalSchedulerSuite") - val sem = new Semaphore(0) - - createThread(10,"1",sc,sem) - TaskThreadInfo.threadToStarted(10).await() - createThread(20,"2",sc,sem) - TaskThreadInfo.threadToStarted(20).await() - createThread(30,"3",sc,sem) - TaskThreadInfo.threadToStarted(30).await() - - assert(TaskThreadInfo.threadToRunning(10) === true) - assert(TaskThreadInfo.threadToRunning(20) === true) - assert(TaskThreadInfo.threadToRunning(30) === true) - - createThread(11,"1",sc,sem) - TaskThreadInfo.threadToStarted(11).await() - createThread(21,"2",sc,sem) - TaskThreadInfo.threadToStarted(21).await() - createThread(31,"3",sc,sem) - TaskThreadInfo.threadToStarted(31).await() - - assert(TaskThreadInfo.threadToRunning(11) === true) - assert(TaskThreadInfo.threadToRunning(21) === true) - assert(TaskThreadInfo.threadToRunning(31) === true) - - createThread(12,"1",sc,sem) - TaskThreadInfo.threadToStarted(12).await() - createThread(22,"2",sc,sem) - TaskThreadInfo.threadToStarted(22).await() - createThread(32,"3",sc,sem) - - assert(TaskThreadInfo.threadToRunning(12) === true) - assert(TaskThreadInfo.threadToRunning(22) === true) - assert(TaskThreadInfo.threadToRunning(32) === false) - - TaskThreadInfo.threadToLock(10).jobFinished() - TaskThreadInfo.threadToStarted(32).await() - - assert(TaskThreadInfo.threadToRunning(32) === true) - - //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager - // queue so that cluster will assign free cpu core to stage 23 after stage 11 finished. - //2. priority of 23 and 33 will be meaningless as using fair scheduler here. - createThread(23,"2",sc,sem) - createThread(33,"3",sc,sem) - Thread.sleep(1000) - - TaskThreadInfo.threadToLock(11).jobFinished() - TaskThreadInfo.threadToStarted(23).await() - - assert(TaskThreadInfo.threadToRunning(23) === true) - assert(TaskThreadInfo.threadToRunning(33) === false) - - TaskThreadInfo.threadToLock(12).jobFinished() - TaskThreadInfo.threadToStarted(33).await() - - assert(TaskThreadInfo.threadToRunning(33) === true) - - TaskThreadInfo.threadToLock(20).jobFinished() - TaskThreadInfo.threadToLock(21).jobFinished() - TaskThreadInfo.threadToLock(22).jobFinished() - TaskThreadInfo.threadToLock(23).jobFinished() - TaskThreadInfo.threadToLock(30).jobFinished() - TaskThreadInfo.threadToLock(31).jobFinished() - TaskThreadInfo.threadToLock(32).jobFinished() - TaskThreadInfo.threadToLock(33).jobFinished() - - sem.acquire(11) - } -} diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index c016c51171..3898583275 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -22,12 +22,15 @@ import scala.collection.mutable import com.esotericsoftware.kryo.Kryo import org.scalatest.FunSuite -import org.apache.spark.SharedSparkContext +import org.apache.spark.{SparkConf, SharedSparkContext} import org.apache.spark.serializer.KryoTest._ class KryoSerializerSuite extends FunSuite with SharedSparkContext { + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) + test("basic types") { - val ser = (new KryoSerializer).newInstance() + val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) } @@ -57,7 +60,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("pairs") { - val ser = (new KryoSerializer).newInstance() + val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) } @@ -81,7 +84,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("Scala data structures") { - val ser = (new KryoSerializer).newInstance() + val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) } @@ -104,7 +107,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("ranges") { - val ser = (new KryoSerializer).newInstance() + val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) // Check that very long ranges don't get written one element at a time @@ -125,9 +128,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("custom registrator") { - System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName) - - val ser = (new KryoSerializer).newInstance() + val ser = new KryoSerializer(conf).newInstance() def check[T](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) } @@ -172,6 +173,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11)) } + test("kryo with SerializableHyperLogLog") { + assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3) + } + test("kryo with reduce") { val control = 1 :: 2 :: Nil val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)) @@ -186,18 +191,6 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { .fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x assert(10 + control.sum === result) } - - override def beforeAll() { - System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName) - super.beforeAll() - } - - override def afterAll() { - super.afterAll() - System.clearProperty("spark.kryo.registrator") - System.clearProperty("spark.serializer") - } } object KryoTest { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index cb76275e39..b647e8a672 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -39,7 +39,7 @@ class BlockIdSuite extends FunSuite { fail() } catch { case e: IllegalStateException => // OK - case _ => fail() + case _: Throwable => fail() } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 484a654108..f60ce270c7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -31,41 +31,42 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.util.{SizeEstimator, Utils, AkkaUtils, ByteBufferInputStream} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.{SparkConf, SparkContext} class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { + private val conf = new SparkConf(false) var store: BlockManager = null var store2: BlockManager = null var actorSystem: ActorSystem = null var master: BlockManagerMaster = null var oldArch: String = null - var oldOops: String = null - var oldHeartBeat: String = null // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test - System.setProperty("spark.kryoserializer.buffer.mb", "1") - val serializer = new KryoSerializer + conf.set("spark.kryoserializer.buffer.mb", "1") + val serializer = new KryoSerializer(conf) // Implicitly convert strings to BlockIds for test clarity. implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId) before { - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf) this.actorSystem = actorSystem - System.setProperty("spark.driver.port", boundPort.toString) - System.setProperty("spark.hostPort", "localhost:" + boundPort) + conf.set("spark.driver.port", boundPort.toString) + conf.set("spark.hostPort", "localhost:" + boundPort) master = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))) + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case oldArch = System.setProperty("os.arch", "amd64") - oldOops = System.setProperty("spark.test.useCompressedOops", "true") - oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true") + conf.set("os.arch", "amd64") + conf.set("spark.test.useCompressedOops", "true") + conf.set("spark.storage.disableBlockManagerHeartBeat", "true") val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() // Set some value ... - System.setProperty("spark.hostPort", Utils.localHostName() + ":" + 1111) + conf.set("spark.hostPort", Utils.localHostName() + ":" + 1111) } after { @@ -86,16 +87,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT master = null if (oldArch != null) { - System.setProperty("os.arch", oldArch) + conf.set("os.arch", oldArch) } else { System.clearProperty("os.arch") } - if (oldOops != null) { - System.setProperty("spark.test.useCompressedOops", oldOops) - } else { - System.clearProperty("spark.test.useCompressedOops") - } + System.clearProperty("spark.test.useCompressedOops") } test("StorageLevel object caching") { @@ -133,7 +130,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("master + 1 manager interaction") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -163,8 +160,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("master + 2 managers interaction") { - store = new BlockManager("exec1", actorSystem, master, serializer, 2000) - store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000) + store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf) + store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf) val peers = master.getPeers(store.blockManagerId, 1) assert(peers.size === 1, "master did not return the other manager as a peer") @@ -179,7 +176,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("removing block") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -227,7 +224,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("removing rdd") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -261,7 +258,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) @@ -277,7 +274,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("reregistration on block update") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -296,7 +293,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("reregistration doesn't dead lock") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) val a1 = new Array[Byte](400) val a2 = List(new Array[Byte](400)) @@ -333,7 +330,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -352,7 +349,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage with serialization") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -371,7 +368,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of same RDD") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -390,7 +387,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of multiple RDDs") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) @@ -413,7 +410,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("on-disk storage") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -426,7 +423,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -441,7 +438,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with getLocalBytes") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -456,7 +453,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -471,7 +468,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization and getLocalBytes") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -486,7 +483,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -511,7 +508,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU with streams") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -535,7 +532,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels and streams") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -581,7 +578,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("overly large block") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 500) + store = new BlockManager("<driver>", actorSystem, master, serializer, 500, conf) store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) @@ -591,53 +588,53 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("block compression") { try { - System.setProperty("spark.shuffle.compress", "true") - store = new BlockManager("exec1", actorSystem, master, serializer, 2000) + conf.set("spark.shuffle.compress", "true") + store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf) store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100, "shuffle_0_0_0 was not compressed") store.stop() store = null - System.setProperty("spark.shuffle.compress", "false") - store = new BlockManager("exec2", actorSystem, master, serializer, 2000) + conf.set("spark.shuffle.compress", "false") + store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf) store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000, "shuffle_0_0_0 was compressed") store.stop() store = null - System.setProperty("spark.broadcast.compress", "true") - store = new BlockManager("exec3", actorSystem, master, serializer, 2000) + conf.set("spark.broadcast.compress", "true") + store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf) store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100, "broadcast_0 was not compressed") store.stop() store = null - System.setProperty("spark.broadcast.compress", "false") - store = new BlockManager("exec4", actorSystem, master, serializer, 2000) + conf.set("spark.broadcast.compress", "false") + store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf) store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed") store.stop() store = null - System.setProperty("spark.rdd.compress", "true") - store = new BlockManager("exec5", actorSystem, master, serializer, 2000) + conf.set("spark.rdd.compress", "true") + store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf) store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed") store.stop() store = null - System.setProperty("spark.rdd.compress", "false") - store = new BlockManager("exec6", actorSystem, master, serializer, 2000) + conf.set("spark.rdd.compress", "false") + store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf) store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed") store.stop() store = null // Check that any other block types are also kept uncompressed - store = new BlockManager("exec7", actorSystem, master, serializer, 2000) + store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf) store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") store.stop() @@ -651,7 +648,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("block store put failure") { // Use Java serializer so we can create an unserializable error. - store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf) // The put should fail since a1 is not serializable. class UnserializableClass diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 0b9056344c..829f389460 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -1,14 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.storage -import java.io.{FileWriter, File} +import java.io.{File, FileWriter} import scala.collection.mutable import com.google.common.io.Files +import org.apache.spark.SparkConf import org.scalatest.{BeforeAndAfterEach, FunSuite} class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { - + private val testConf = new SparkConf(false) val rootDir0 = Files.createTempDir() rootDir0.deleteOnExit() val rootDir1 = Files.createTempDir() @@ -16,7 +34,12 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { val rootDirs = rootDir0.getName + "," + rootDir1.getName println("Created root dirs: " + rootDirs) + // This suite focuses primarily on consolidation features, + // so we coerce consolidation if not already enabled. + testConf.set("spark.shuffle.consolidateFiles", "true") + val shuffleBlockManager = new ShuffleBlockManager(null) { + override def conf = testConf.clone var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]() override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id) } diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 8f0ec6683b..3764f4d1a0 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -34,7 +34,6 @@ class UISuite extends FunSuite { } val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq()) val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq()) - // Allow some wiggle room in case ports on the machine are under contention assert(boundPort1 > startPort && boundPort1 < startPort + 10) assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10) diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala new file mode 100644 index 0000000000..67a57a0e7f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import org.scalatest.FunSuite +import org.apache.spark.scheduler._ +import org.apache.spark.{LocalSparkContext, SparkContext, Success} +import org.apache.spark.scheduler.SparkListenerTaskStart +import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} + +class JobProgressListenerSuite extends FunSuite with LocalSparkContext { + test("test executor id to summary") { + val sc = new SparkContext("local", "test") + val listener = new JobProgressListener(sc) + val taskMetrics = new TaskMetrics() + val shuffleReadMetrics = new ShuffleReadMetrics() + + // nothing in it + assert(listener.stageIdToExecutorSummaries.size == 0) + + // finish this task, should get updated shuffleRead + shuffleReadMetrics.remoteBytesRead = 1000 + taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) + var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) + .shuffleRead == 1000) + + // finish a task with unknown executor-id, nothing should happen + taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.stageIdToExecutorSummaries.size == 1) + + // finish this task, should get updated duration + shuffleReadMetrics.remoteBytesRead = 1000 + taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) + taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) + .shuffleRead == 2000) + + // finish this task, should get updated duration + shuffleReadMetrics.remoteBytesRead = 1000 + taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) + taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail()) + .shuffleRead == 1000) + } +} diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala index 4e40dcbdee..11ebdc352b 100644 --- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import org.scalatest.FunSuite import org.scalatest.BeforeAndAfterAll import org.scalatest.PrivateMethodTester +import org.apache.spark.SparkContext class DummyClass1 {} @@ -63,54 +64,53 @@ class SizeEstimatorSuite } test("simple classes") { - assert(SizeEstimator.estimate(new DummyClass1) === 16) - assert(SizeEstimator.estimate(new DummyClass2) === 16) - assert(SizeEstimator.estimate(new DummyClass3) === 24) - assert(SizeEstimator.estimate(new DummyClass4(null)) === 24) - assert(SizeEstimator.estimate(new DummyClass4(new DummyClass3)) === 48) + expectResult(16)(SizeEstimator.estimate(new DummyClass1)) + expectResult(16)(SizeEstimator.estimate(new DummyClass2)) + expectResult(24)(SizeEstimator.estimate(new DummyClass3)) + expectResult(24)(SizeEstimator.estimate(new DummyClass4(null))) + expectResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) } // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors // (Sun vs IBM). Use a DummyString class to make tests deterministic. test("strings") { - assert(SizeEstimator.estimate(DummyString("")) === 40) - assert(SizeEstimator.estimate(DummyString("a")) === 48) - assert(SizeEstimator.estimate(DummyString("ab")) === 48) - assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56) + expectResult(40)(SizeEstimator.estimate(DummyString(""))) + expectResult(48)(SizeEstimator.estimate(DummyString("a"))) + expectResult(48)(SizeEstimator.estimate(DummyString("ab"))) + expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh"))) } test("primitive arrays") { - assert(SizeEstimator.estimate(new Array[Byte](10)) === 32) - assert(SizeEstimator.estimate(new Array[Char](10)) === 40) - assert(SizeEstimator.estimate(new Array[Short](10)) === 40) - assert(SizeEstimator.estimate(new Array[Int](10)) === 56) - assert(SizeEstimator.estimate(new Array[Long](10)) === 96) - assert(SizeEstimator.estimate(new Array[Float](10)) === 56) - assert(SizeEstimator.estimate(new Array[Double](10)) === 96) - assert(SizeEstimator.estimate(new Array[Int](1000)) === 4016) - assert(SizeEstimator.estimate(new Array[Long](1000)) === 8016) + expectResult(32)(SizeEstimator.estimate(new Array[Byte](10))) + expectResult(40)(SizeEstimator.estimate(new Array[Char](10))) + expectResult(40)(SizeEstimator.estimate(new Array[Short](10))) + expectResult(56)(SizeEstimator.estimate(new Array[Int](10))) + expectResult(96)(SizeEstimator.estimate(new Array[Long](10))) + expectResult(56)(SizeEstimator.estimate(new Array[Float](10))) + expectResult(96)(SizeEstimator.estimate(new Array[Double](10))) + expectResult(4016)(SizeEstimator.estimate(new Array[Int](1000))) + expectResult(8016)(SizeEstimator.estimate(new Array[Long](1000))) } test("object arrays") { // Arrays containing nulls should just have one pointer per element - assert(SizeEstimator.estimate(new Array[String](10)) === 56) - assert(SizeEstimator.estimate(new Array[AnyRef](10)) === 56) - + expectResult(56)(SizeEstimator.estimate(new Array[String](10))) + expectResult(56)(SizeEstimator.estimate(new Array[AnyRef](10))) // For object arrays with non-null elements, each object should take one pointer plus // however many bytes that class takes. (Note that Array.fill calls the code in its // second parameter separately for each object, so we get distinct objects.) - assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)) === 216) - assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)) === 216) - assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)) === 296) - assert(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)) === 56) + expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1))) + expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2))) + expectResult(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3))) + expectResult(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2))) // Past size 100, our samples 100 elements, but we should still get the right size. - assert(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)) === 28016) + expectResult(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3))) // If an array contains the *same* element many times, we should only count it once. val d1 = new DummyClass1 - assert(SizeEstimator.estimate(Array.fill(10)(d1)) === 72) // 10 pointers plus 8-byte object - assert(SizeEstimator.estimate(Array.fill(100)(d1)) === 432) // 100 pointers plus 8-byte object + expectResult(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object + expectResult(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object // Same thing with huge array containing the same element many times. Note that this won't // return exactly 4032 because it can't tell that *all* the elements will equal the first @@ -128,11 +128,10 @@ class SizeEstimatorSuite val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - assert(SizeEstimator.estimate(DummyString("")) === 40) - assert(SizeEstimator.estimate(DummyString("a")) === 48) - assert(SizeEstimator.estimate(DummyString("ab")) === 48) - assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56) - + expectResult(40)(SizeEstimator.estimate(DummyString(""))) + expectResult(48)(SizeEstimator.estimate(DummyString("a"))) + expectResult(48)(SizeEstimator.estimate(DummyString("ab"))) + expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh"))) resetOrClear("os.arch", arch) } @@ -141,14 +140,13 @@ class SizeEstimatorSuite test("64-bit arch with no compressed oops") { val arch = System.setProperty("os.arch", "amd64") val oops = System.setProperty("spark.test.useCompressedOops", "false") - val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - assert(SizeEstimator.estimate(DummyString("")) === 56) - assert(SizeEstimator.estimate(DummyString("a")) === 64) - assert(SizeEstimator.estimate(DummyString("ab")) === 64) - assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 72) + expectResult(56)(SizeEstimator.estimate(DummyString(""))) + expectResult(64)(SizeEstimator.estimate(DummyString("a"))) + expectResult(64)(SizeEstimator.estimate(DummyString("ab"))) + expectResult(72)(SizeEstimator.estimate(DummyString("abcdefgh"))) resetOrClear("os.arch", arch) resetOrClear("spark.test.useCompressedOops", oops) diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala index ca3f684668..e9b62ea70d 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala @@ -1,9 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.util.collection import scala.collection.mutable.HashSet import org.scalatest.FunSuite - -class OpenHashMapSuite extends FunSuite { +import org.scalatest.matchers.ShouldMatchers +import org.apache.spark.util.SizeEstimator + +class OpenHashMapSuite extends FunSuite with ShouldMatchers { + + test("size for specialized, primitive value (int)") { + val capacity = 1024 + val map = new OpenHashMap[String, Int](capacity) + val actualSize = SizeEstimator.estimate(map) + // 64 bit for pointers, 32 bit for ints, and 1 bit for the bitset. + val expectedSize = capacity * (64 + 32 + 1) / 8 + // Make sure we are not allocating a significant amount of memory beyond our expected. + actualSize should be <= (expectedSize * 1.1).toLong + } test("initialization") { val goodMap1 = new OpenHashMap[String, Int](1) diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala index 4e11e8a628..1b24f8f287 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala @@ -1,9 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.util.collection import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.util.SizeEstimator + +class OpenHashSetSuite extends FunSuite with ShouldMatchers { -class OpenHashSetSuite extends FunSuite { + test("size for specialized, primitive int") { + val loadFactor = 0.7 + val set = new OpenHashSet[Int](64, loadFactor) + for (i <- 0 until 1024) { + set.add(i) + } + assert(set.size === 1024) + assert(set.capacity > 1024) + val actualSize = SizeEstimator.estimate(set) + // 32 bits for the ints + 1 bit for the bitset + val expectedSize = set.capacity * (32 + 1) / 8 + // Make sure we are not allocating a significant amount of memory beyond our expected. + actualSize should be <= (expectedSize * 1.1).toLong + } test("primitive int") { val set = new OpenHashSet[Int] diff --git a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala index dfd6aed2c4..3b60decee9 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala @@ -1,9 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.util.collection import scala.collection.mutable.HashSet import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers +import org.apache.spark.util.SizeEstimator + +class PrimitiveKeyOpenHashMapSuite extends FunSuite with ShouldMatchers { -class PrimitiveKeyOpenHashSetSuite extends FunSuite { + test("size for specialized, primitive key, value (int, int)") { + val capacity = 1024 + val map = new PrimitiveKeyOpenHashMap[Int, Int](capacity) + val actualSize = SizeEstimator.estimate(map) + // 32 bit for keys, 32 bit for values, and 1 bit for the bitset. + val expectedSize = capacity * (32 + 32 + 1) / 8 + // Make sure we are not allocating a significant amount of memory beyond our expected. + actualSize should be <= (expectedSize * 1.1).toLong + } test("initialization") { val goodMap1 = new PrimitiveKeyOpenHashMap[Int, Int](1) |