diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-05 22:00:22 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-05 22:00:22 -0700 |
commit | 70f02fa91264296bab0b38492891a514907c23c2 (patch) | |
tree | 4b600cc80630cdaebea6c249a8cc33772fb2389c | |
parent | 69588baf65ffb26c980468dcbce4f50c8163efbb (diff) | |
parent | 95ef307ef56543bdceb33ec87c09529decfb2a93 (diff) | |
download | spark-70f02fa91264296bab0b38492891a514907c23c2.tar.gz spark-70f02fa91264296bab0b38492891a514907c23c2.tar.bz2 spark-70f02fa91264296bab0b38492891a514907c23c2.zip |
Merge branch 'dev' of github.com:mesos/spark into dev
25 files changed, 228 insertions, 99 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 3244753bfe..f0d2b2d783 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -31,6 +31,17 @@ import spark.partial.BoundedDouble import spark.partial.CountEvaluator import spark.partial.GroupedCountEvaluator import spark.partial.PartialResult +import spark.rdd.BlockRDD +import spark.rdd.CartesianRDD +import spark.rdd.FilteredRDD +import spark.rdd.FlatMappedRDD +import spark.rdd.GlommedRDD +import spark.rdd.MappedRDD +import spark.rdd.MapPartitionsRDD +import spark.rdd.MapPartitionsWithSplitRDD +import spark.rdd.PipedRDD +import spark.rdd.SampledRDD +import spark.rdd.UnionRDD import spark.storage.StorageLevel import SparkContext._ @@ -413,67 +424,4 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial private[spark] def collectPartitions(): Array[Array[T]] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray) } -} - -private[spark] -class MappedRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: T => U) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).map(f) -} - -private[spark] -class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: T => TraversableOnce[U]) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).flatMap(f) -} - -private[spark] -class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) { - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).filter(f) -} - -private[spark] -class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator -} - -private[spark] -class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: Iterator[T] => Iterator[U]) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = f(prev.iterator(split)) -} - -/** - * A variant of the MapPartitionsRDD that passes the split index into the - * closure. This can be used to generate or collect partition specific - * information such as the number of tuples in a partition. - */ -private[spark] -class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: (Int, Iterator[T]) => Iterator[U]) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = f(split.index, prev.iterator(split)) -} +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 37ba308546..84fc541f82 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -4,12 +4,11 @@ import java.io._ import java.util.concurrent.atomic.AtomicInteger import java.net.{URI, URLClassLoader} -import akka.actor.Actor -import akka.actor.Actor._ - import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.generic.Growable +import akka.actor.Actor +import akka.actor.Actor._ import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.InputFormat @@ -27,20 +26,22 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.TextInputFormat - import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} - import org.apache.mesos.{Scheduler, MesosNativeLibrary} import spark.broadcast._ - import spark.deploy.LocalSparkCluster - import spark.partial.ApproximateEvaluator import spark.partial.PartialResult - +import spark.rdd.DoubleRDDFunctions +import spark.rdd.HadoopRDD +import spark.rdd.NewHadoopRDD +import spark.rdd.OrderedRDDFunctions +import spark.rdd.PairRDDFunctions +import spark.rdd.SequenceFileRDDFunctions +import spark.rdd.UnionRDD import spark.scheduler.ShuffleMapTask import spark.scheduler.DAGScheduler import spark.scheduler.TaskScheduler diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 84ec386ce4..3c4399493c 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -1,13 +1,5 @@ package spark.api.java -import spark.SparkContext.rddToPairRDDFunctions -import spark.api.java.function.{Function2 => JFunction2} -import spark.api.java.function.{Function => JFunction} -import spark.partial.BoundedDouble -import spark.partial.PartialResult -import spark.storage.StorageLevel -import spark._ - import java.util.{List => JList} import java.util.Comparator @@ -19,6 +11,17 @@ import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.conf.Configuration +import spark.api.java.function.{Function2 => JFunction2} +import spark.api.java.function.{Function => JFunction} +import spark.partial.BoundedDouble +import spark.partial.PartialResult +import spark.rdd.OrderedRDDFunctions +import spark.storage.StorageLevel +import spark.HashPartitioner +import spark.Partitioner +import spark.RDD +import spark.SparkContext.rddToPairRDDFunctions + class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K], implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala index afc732234f..cb73976aed 100644 --- a/core/src/main/scala/spark/BlockRDD.scala +++ b/core/src/main/scala/spark/rdd/BlockRDD.scala @@ -1,7 +1,13 @@ -package spark +package spark.rdd import scala.collection.mutable.HashMap +import spark.Dependency +import spark.RDD +import spark.SparkContext +import spark.SparkEnv +import spark.Split + private[spark] class BlockRDDSplit(val blockId: String, idx: Int) extends Split { val index = idx } diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala index 52aab5f32f..7c354b6b2e 100644 --- a/core/src/main/scala/spark/CartesianRDD.scala +++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala @@ -1,4 +1,9 @@ -package spark +package spark.rdd + +import spark.NarrowDependency +import spark.RDD +import spark.SparkContext +import spark.Split private[spark] class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable { diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index daba719b14..8fa0749184 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -1,11 +1,22 @@ -package spark +package spark.rdd import java.net.URL import java.io.EOFException import java.io.ObjectInputStream + import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap +import spark.Aggregator +import spark.Dependency +import spark.Logging +import spark.OneToOneDependency +import spark.Partitioner +import spark.RDD +import spark.ShuffleDependency +import spark.SparkEnv +import spark.Split + private[spark] sealed trait CoGroupSplitDep extends Serializable private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep diff --git a/core/src/main/scala/spark/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index f1ae346a44..0967f4f5df 100644 --- a/core/src/main/scala/spark/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -1,4 +1,8 @@ -package spark +package spark.rdd + +import spark.NarrowDependency +import spark.RDD +import spark.Split private class CoalescedRDDSplit(val index: Int, val parents: Array[Split]) extends Split diff --git a/core/src/main/scala/spark/DoubleRDDFunctions.scala b/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala index 1fbf66b7de..d232ddeb7c 100644 --- a/core/src/main/scala/spark/DoubleRDDFunctions.scala +++ b/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala @@ -1,10 +1,13 @@ -package spark +package spark.rdd import spark.partial.BoundedDouble import spark.partial.MeanEvaluator import spark.partial.PartialResult import spark.partial.SumEvaluator +import spark.Logging +import spark.RDD +import spark.TaskContext import spark.util.StatCounter /** diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala new file mode 100644 index 0000000000..dfe9dc73f3 --- /dev/null +++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala @@ -0,0 +1,12 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) { + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).filter(f) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala new file mode 100644 index 0000000000..3534dc8057 --- /dev/null +++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: T => TraversableOnce[U]) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).flatMap(f) +} diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala new file mode 100644 index 0000000000..e30564f2da --- /dev/null +++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala @@ -0,0 +1,12 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index 6d448116a9..bf29a1f075 100644 --- a/core/src/main/scala/spark/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import java.io.EOFException import java.util.NoSuchElementException @@ -15,6 +15,12 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils +import spark.Dependency +import spark.RDD +import spark.SerializableWritable +import spark.SparkContext +import spark.Split + /** * A Spark split class that wraps around a Hadoop InputSplit. */ diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala new file mode 100644 index 0000000000..b2c7a1cb9e --- /dev/null +++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: Iterator[T] => Iterator[U]) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = f(prev.iterator(split)) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala new file mode 100644 index 0000000000..adc541694e --- /dev/null +++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala @@ -0,0 +1,21 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +/** + * A variant of the MapPartitionsRDD that passes the split index into the + * closure. This can be used to generate or collect partition specific + * information such as the number of tuples in a partition. + */ +private[spark] +class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: (Int, Iterator[T]) => Iterator[U]) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = f(split.index, prev.iterator(split)) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala new file mode 100644 index 0000000000..59bedad8ef --- /dev/null +++ b/core/src/main/scala/spark/rdd/MappedRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class MappedRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: T => U) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).map(f) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index 9072698357..dcbceab246 100644 --- a/core/src/main/scala/spark/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Writable @@ -13,6 +13,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptID import java.util.Date import java.text.SimpleDateFormat +import spark.Dependency +import spark.RDD +import spark.SerializableWritable +import spark.SparkContext +import spark.Split + private[spark] class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) extends Split { diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/rdd/PairRDDFunctions.scala index 80d62caf25..2a94ea263a 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/rdd/PairRDDFunctions.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import java.io.EOFException import java.io.ObjectInputStream @@ -34,9 +34,20 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} import org.apache.hadoop.mapreduce.TaskAttemptID import org.apache.hadoop.mapreduce.TaskAttemptContext -import spark.SparkContext._ import spark.partial.BoundedDouble import spark.partial.PartialResult +import spark.Aggregator +import spark.HashPartitioner +import spark.Logging +import spark.OneToOneDependency +import spark.Partitioner +import spark.RangePartitioner +import spark.RDD +import spark.SerializableWritable +import spark.SparkContext._ +import spark.SparkException +import spark.Split +import spark.TaskContext /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala index 3103d7889b..98ea0c92d6 100644 --- a/core/src/main/scala/spark/PipedRDD.scala +++ b/core/src/main/scala/spark/rdd/PipedRDD.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import java.io.PrintWriter import java.util.StringTokenizer @@ -8,6 +8,12 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.io.Source +import spark.OneToOneDependency +import spark.RDD +import spark.SparkEnv +import spark.Split + + /** * An RDD that pipes the contents of each parent partition through an external command * (printing them one per line) and returns the output as a collection of strings. diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala index ac10aed477..87a5268f27 100644 --- a/core/src/main/scala/spark/SampledRDD.scala +++ b/core/src/main/scala/spark/rdd/SampledRDD.scala @@ -1,9 +1,13 @@ -package spark +package spark.rdd import java.util.Random import cern.jet.random.Poisson import cern.jet.random.engine.DRand +import spark.RDD +import spark.OneToOneDependency +import spark.Split + private[spark] class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable { override val index: Int = prev.index diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala index ea7171d3a1..24c731fa92 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import java.io.EOFException import java.net.URL @@ -23,7 +23,9 @@ import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.Text -import SparkContext._ +import spark.Logging +import spark.RDD +import spark.SparkContext._ /** * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile, diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 1a9f4cfec3..769ccf8caa 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -1,8 +1,15 @@ -package spark +package spark.rdd import scala.collection.mutable.ArrayBuffer import java.util.{HashMap => JHashMap} +import spark.Aggregator +import spark.Partitioner +import spark.RangePartitioner +import spark.RDD +import spark.ShuffleDependency +import spark.SparkEnv +import spark.Split private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { override val index = idx diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala index 3e795ea2a2..4ba2848491 100644 --- a/core/src/main/scala/spark/UnionRDD.scala +++ b/core/src/main/scala/spark/rdd/UnionRDD.scala @@ -1,7 +1,13 @@ -package spark +package spark.rdd import scala.collection.mutable.ArrayBuffer +import spark.Dependency +import spark.RangeDependency +import spark.RDD +import spark.SparkContext +import spark.Split + private[spark] class UnionSplit[T: ClassManifest]( idx: Int, rdd: RDD[T], diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index b920e53534..403e675f37 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -18,6 +18,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc.stop() sc = null } + System.clearProperty("spark.master.port") } test ("basic accumulation"){ @@ -91,7 +92,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter val maxI = 1000 for (nThreads <- List(1, 10)) { // test single & multi-threaded - val sc = new SparkContext("local[" + nThreads + "]", "test") + sc = new SparkContext("local[" + nThreads + "]", "test") val setAcc = sc.accumulableCollection(mutable.HashSet[Int]()) val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]()) val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]()) @@ -110,6 +111,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter mapAcc.value should contain (i -> i.toString) } sc.stop() + sc = null } } @@ -117,7 +119,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded - val sc = new SparkContext("local[" + nThreads + "]", "test") + sc = new SparkContext("local[" + nThreads + "]", "test") val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet} val d = sc.parallelize(groupedInts) @@ -125,6 +127,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter x => acc.localValue ++= x } acc.value should be ( (0 to maxI).toSet) + sc.stop() + sc = null } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index ade457c0f9..95e402627c 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -3,6 +3,8 @@ package spark import scala.collection.mutable.HashMap import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter + +import spark.rdd.CoalescedRDD import SparkContext._ class RDDSuite extends FunSuite with BeforeAndAfter { diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 90760b8a85..068607824b 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -1,5 +1,7 @@ package spark +import scala.collection.mutable.ArrayBuffer + import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter import org.scalatest.matchers.ShouldMatchers @@ -10,8 +12,7 @@ import org.scalacheck.Prop._ import com.google.common.io.Files -import scala.collection.mutable.ArrayBuffer - +import spark.rdd.ShuffledAggregatedRDD import SparkContext._ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { |