aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-05 22:00:22 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-05 22:00:22 -0700
commit70f02fa91264296bab0b38492891a514907c23c2 (patch)
tree4b600cc80630cdaebea6c249a8cc33772fb2389c /core
parent69588baf65ffb26c980468dcbce4f50c8163efbb (diff)
parent95ef307ef56543bdceb33ec87c09529decfb2a93 (diff)
downloadspark-70f02fa91264296bab0b38492891a514907c23c2.tar.gz
spark-70f02fa91264296bab0b38492891a514907c23c2.tar.bz2
spark-70f02fa91264296bab0b38492891a514907c23c2.zip
Merge branch 'dev' of github.com:mesos/spark into dev
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala76
-rw-r--r--core/src/main/scala/spark/SparkContext.scala17
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala19
-rw-r--r--core/src/main/scala/spark/rdd/BlockRDD.scala (renamed from core/src/main/scala/spark/BlockRDD.scala)8
-rw-r--r--core/src/main/scala/spark/rdd/CartesianRDD.scala (renamed from core/src/main/scala/spark/CartesianRDD.scala)7
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala (renamed from core/src/main/scala/spark/CoGroupedRDD.scala)13
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala (renamed from core/src/main/scala/spark/CoalescedRDD.scala)6
-rw-r--r--core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala (renamed from core/src/main/scala/spark/DoubleRDDFunctions.scala)5
-rw-r--r--core/src/main/scala/spark/rdd/FilteredRDD.scala12
-rw-r--r--core/src/main/scala/spark/rdd/FlatMappedRDD.scala16
-rw-r--r--core/src/main/scala/spark/rdd/GlommedRDD.scala12
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala (renamed from core/src/main/scala/spark/HadoopRDD.scala)8
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsRDD.scala16
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala21
-rw-r--r--core/src/main/scala/spark/rdd/MappedRDD.scala16
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala (renamed from core/src/main/scala/spark/NewHadoopRDD.scala)8
-rw-r--r--core/src/main/scala/spark/rdd/PairRDDFunctions.scala (renamed from core/src/main/scala/spark/PairRDDFunctions.scala)15
-rw-r--r--core/src/main/scala/spark/rdd/PipedRDD.scala (renamed from core/src/main/scala/spark/PipedRDD.scala)8
-rw-r--r--core/src/main/scala/spark/rdd/SampledRDD.scala (renamed from core/src/main/scala/spark/SampledRDD.scala)6
-rw-r--r--core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala (renamed from core/src/main/scala/spark/SequenceFileRDDFunctions.scala)6
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala (renamed from core/src/main/scala/spark/ShuffledRDD.scala)9
-rw-r--r--core/src/main/scala/spark/rdd/UnionRDD.scala (renamed from core/src/main/scala/spark/UnionRDD.scala)8
-rw-r--r--core/src/test/scala/spark/AccumulatorSuite.scala8
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala2
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala5
25 files changed, 228 insertions, 99 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 3244753bfe..f0d2b2d783 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -31,6 +31,17 @@ import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
import spark.partial.PartialResult
+import spark.rdd.BlockRDD
+import spark.rdd.CartesianRDD
+import spark.rdd.FilteredRDD
+import spark.rdd.FlatMappedRDD
+import spark.rdd.GlommedRDD
+import spark.rdd.MappedRDD
+import spark.rdd.MapPartitionsRDD
+import spark.rdd.MapPartitionsWithSplitRDD
+import spark.rdd.PipedRDD
+import spark.rdd.SampledRDD
+import spark.rdd.UnionRDD
import spark.storage.StorageLevel
import SparkContext._
@@ -413,67 +424,4 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
private[spark] def collectPartitions(): Array[Array[T]] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
}
-}
-
-private[spark]
-class MappedRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: T => U)
- extends RDD[U](prev.context) {
-
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = prev.iterator(split).map(f)
-}
-
-private[spark]
-class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: T => TraversableOnce[U])
- extends RDD[U](prev.context) {
-
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = prev.iterator(split).flatMap(f)
-}
-
-private[spark]
-class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) {
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = prev.iterator(split).filter(f)
-}
-
-private[spark]
-class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) {
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator
-}
-
-private[spark]
-class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: Iterator[T] => Iterator[U])
- extends RDD[U](prev.context) {
-
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = f(prev.iterator(split))
-}
-
-/**
- * A variant of the MapPartitionsRDD that passes the split index into the
- * closure. This can be used to generate or collect partition specific
- * information such as the number of tuples in a partition.
- */
-private[spark]
-class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: (Int, Iterator[T]) => Iterator[U])
- extends RDD[U](prev.context) {
-
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = f(split.index, prev.iterator(split))
-}
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 37ba308546..84fc541f82 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -4,12 +4,11 @@ import java.io._
import java.util.concurrent.atomic.AtomicInteger
import java.net.{URI, URLClassLoader}
-import akka.actor.Actor
-import akka.actor.Actor._
-
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.generic.Growable
+import akka.actor.Actor
+import akka.actor.Actor._
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.InputFormat
@@ -27,20 +26,22 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.TextInputFormat
-
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
-
import org.apache.mesos.{Scheduler, MesosNativeLibrary}
import spark.broadcast._
-
import spark.deploy.LocalSparkCluster
-
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
-
+import spark.rdd.DoubleRDDFunctions
+import spark.rdd.HadoopRDD
+import spark.rdd.NewHadoopRDD
+import spark.rdd.OrderedRDDFunctions
+import spark.rdd.PairRDDFunctions
+import spark.rdd.SequenceFileRDDFunctions
+import spark.rdd.UnionRDD
import spark.scheduler.ShuffleMapTask
import spark.scheduler.DAGScheduler
import spark.scheduler.TaskScheduler
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 84ec386ce4..3c4399493c 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -1,13 +1,5 @@
package spark.api.java
-import spark.SparkContext.rddToPairRDDFunctions
-import spark.api.java.function.{Function2 => JFunction2}
-import spark.api.java.function.{Function => JFunction}
-import spark.partial.BoundedDouble
-import spark.partial.PartialResult
-import spark.storage.StorageLevel
-import spark._
-
import java.util.{List => JList}
import java.util.Comparator
@@ -19,6 +11,17 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
+import spark.api.java.function.{Function2 => JFunction2}
+import spark.api.java.function.{Function => JFunction}
+import spark.partial.BoundedDouble
+import spark.partial.PartialResult
+import spark.rdd.OrderedRDDFunctions
+import spark.storage.StorageLevel
+import spark.HashPartitioner
+import spark.Partitioner
+import spark.RDD
+import spark.SparkContext.rddToPairRDDFunctions
+
class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index afc732234f..cb73976aed 100644
--- a/core/src/main/scala/spark/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -1,7 +1,13 @@
-package spark
+package spark.rdd
import scala.collection.mutable.HashMap
+import spark.Dependency
+import spark.RDD
+import spark.SparkContext
+import spark.SparkEnv
+import spark.Split
+
private[spark] class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
val index = idx
}
diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 52aab5f32f..7c354b6b2e 100644
--- a/core/src/main/scala/spark/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -1,4 +1,9 @@
-package spark
+package spark.rdd
+
+import spark.NarrowDependency
+import spark.RDD
+import spark.SparkContext
+import spark.Split
private[spark]
class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index daba719b14..8fa0749184 100644
--- a/core/src/main/scala/spark/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -1,11 +1,22 @@
-package spark
+package spark.rdd
import java.net.URL
import java.io.EOFException
import java.io.ObjectInputStream
+
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
+import spark.Aggregator
+import spark.Dependency
+import spark.Logging
+import spark.OneToOneDependency
+import spark.Partitioner
+import spark.RDD
+import spark.ShuffleDependency
+import spark.SparkEnv
+import spark.Split
+
private[spark] sealed trait CoGroupSplitDep extends Serializable
private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
diff --git a/core/src/main/scala/spark/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index f1ae346a44..0967f4f5df 100644
--- a/core/src/main/scala/spark/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -1,4 +1,8 @@
-package spark
+package spark.rdd
+
+import spark.NarrowDependency
+import spark.RDD
+import spark.Split
private class CoalescedRDDSplit(val index: Int, val parents: Array[Split]) extends Split
diff --git a/core/src/main/scala/spark/DoubleRDDFunctions.scala b/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala
index 1fbf66b7de..d232ddeb7c 100644
--- a/core/src/main/scala/spark/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala
@@ -1,10 +1,13 @@
-package spark
+package spark.rdd
import spark.partial.BoundedDouble
import spark.partial.MeanEvaluator
import spark.partial.PartialResult
import spark.partial.SumEvaluator
+import spark.Logging
+import spark.RDD
+import spark.TaskContext
import spark.util.StatCounter
/**
diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala
new file mode 100644
index 0000000000..dfe9dc73f3
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala
@@ -0,0 +1,12 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+private[spark]
+class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) {
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = prev.iterator(split).filter(f)
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
new file mode 100644
index 0000000000..3534dc8057
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
@@ -0,0 +1,16 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+private[spark]
+class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T],
+ f: T => TraversableOnce[U])
+ extends RDD[U](prev.context) {
+
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = prev.iterator(split).flatMap(f)
+}
diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala
new file mode 100644
index 0000000000..e30564f2da
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala
@@ -0,0 +1,12 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+private[spark]
+class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) {
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 6d448116a9..bf29a1f075 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.rdd
import java.io.EOFException
import java.util.NoSuchElementException
@@ -15,6 +15,12 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
+import spark.Dependency
+import spark.RDD
+import spark.SerializableWritable
+import spark.SparkContext
+import spark.Split
+
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
new file mode 100644
index 0000000000..b2c7a1cb9e
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
@@ -0,0 +1,16 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+private[spark]
+class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T],
+ f: Iterator[T] => Iterator[U])
+ extends RDD[U](prev.context) {
+
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = f(prev.iterator(split))
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
new file mode 100644
index 0000000000..adc541694e
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
@@ -0,0 +1,21 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+/**
+ * A variant of the MapPartitionsRDD that passes the split index into the
+ * closure. This can be used to generate or collect partition specific
+ * information such as the number of tuples in a partition.
+ */
+private[spark]
+class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T],
+ f: (Int, Iterator[T]) => Iterator[U])
+ extends RDD[U](prev.context) {
+
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = f(split.index, prev.iterator(split))
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala
new file mode 100644
index 0000000000..59bedad8ef
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MappedRDD.scala
@@ -0,0 +1,16 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+private[spark]
+class MappedRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T],
+ f: T => U)
+ extends RDD[U](prev.context) {
+
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = prev.iterator(split).map(f)
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index 9072698357..dcbceab246 100644
--- a/core/src/main/scala/spark/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.rdd
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Writable
@@ -13,6 +13,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptID
import java.util.Date
import java.text.SimpleDateFormat
+import spark.Dependency
+import spark.RDD
+import spark.SerializableWritable
+import spark.SparkContext
+import spark.Split
+
private[spark]
class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
extends Split {
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/rdd/PairRDDFunctions.scala
index 80d62caf25..2a94ea263a 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/rdd/PairRDDFunctions.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.rdd
import java.io.EOFException
import java.io.ObjectInputStream
@@ -34,9 +34,20 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
import org.apache.hadoop.mapreduce.TaskAttemptID
import org.apache.hadoop.mapreduce.TaskAttemptContext
-import spark.SparkContext._
import spark.partial.BoundedDouble
import spark.partial.PartialResult
+import spark.Aggregator
+import spark.HashPartitioner
+import spark.Logging
+import spark.OneToOneDependency
+import spark.Partitioner
+import spark.RangePartitioner
+import spark.RDD
+import spark.SerializableWritable
+import spark.SparkContext._
+import spark.SparkException
+import spark.Split
+import spark.TaskContext
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala
index 3103d7889b..98ea0c92d6 100644
--- a/core/src/main/scala/spark/PipedRDD.scala
+++ b/core/src/main/scala/spark/rdd/PipedRDD.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.rdd
import java.io.PrintWriter
import java.util.StringTokenizer
@@ -8,6 +8,12 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
+import spark.OneToOneDependency
+import spark.RDD
+import spark.SparkEnv
+import spark.Split
+
+
/**
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala
index ac10aed477..87a5268f27 100644
--- a/core/src/main/scala/spark/SampledRDD.scala
+++ b/core/src/main/scala/spark/rdd/SampledRDD.scala
@@ -1,9 +1,13 @@
-package spark
+package spark.rdd
import java.util.Random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
+import spark.RDD
+import spark.OneToOneDependency
+import spark.Split
+
private[spark]
class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
override val index: Int = prev.index
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala
index ea7171d3a1..24c731fa92 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.rdd
import java.io.EOFException
import java.net.URL
@@ -23,7 +23,9 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.Text
-import SparkContext._
+import spark.Logging
+import spark.RDD
+import spark.SparkContext._
/**
* Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile,
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 1a9f4cfec3..769ccf8caa 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -1,8 +1,15 @@
-package spark
+package spark.rdd
import scala.collection.mutable.ArrayBuffer
import java.util.{HashMap => JHashMap}
+import spark.Aggregator
+import spark.Partitioner
+import spark.RangePartitioner
+import spark.RDD
+import spark.ShuffleDependency
+import spark.SparkEnv
+import spark.Split
private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx
diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala
index 3e795ea2a2..4ba2848491 100644
--- a/core/src/main/scala/spark/UnionRDD.scala
+++ b/core/src/main/scala/spark/rdd/UnionRDD.scala
@@ -1,7 +1,13 @@
-package spark
+package spark.rdd
import scala.collection.mutable.ArrayBuffer
+import spark.Dependency
+import spark.RangeDependency
+import spark.RDD
+import spark.SparkContext
+import spark.Split
+
private[spark] class UnionSplit[T: ClassManifest](
idx: Int,
rdd: RDD[T],
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
index b920e53534..403e675f37 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -18,6 +18,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc.stop()
sc = null
}
+ System.clearProperty("spark.master.port")
}
test ("basic accumulation"){
@@ -91,7 +92,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
val maxI = 1000
for (nThreads <- List(1, 10)) {
// test single & multi-threaded
- val sc = new SparkContext("local[" + nThreads + "]", "test")
+ sc = new SparkContext("local[" + nThreads + "]", "test")
val setAcc = sc.accumulableCollection(mutable.HashSet[Int]())
val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]())
val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]())
@@ -110,6 +111,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
mapAcc.value should contain (i -> i.toString)
}
sc.stop()
+ sc = null
}
}
@@ -117,7 +119,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
- val sc = new SparkContext("local[" + nThreads + "]", "test")
+ sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
val d = sc.parallelize(groupedInts)
@@ -125,6 +127,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
x => acc.localValue ++= x
}
acc.value should be ( (0 to maxI).toSet)
+ sc.stop()
+ sc = null
}
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index ade457c0f9..95e402627c 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -3,6 +3,8 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
+
+import spark.rdd.CoalescedRDD
import SparkContext._
class RDDSuite extends FunSuite with BeforeAndAfter {
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 90760b8a85..068607824b 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -1,5 +1,7 @@
package spark
+import scala.collection.mutable.ArrayBuffer
+
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.ShouldMatchers
@@ -10,8 +12,7 @@ import org.scalacheck.Prop._
import com.google.common.io.Files
-import scala.collection.mutable.ArrayBuffer
-
+import spark.rdd.ShuffledAggregatedRDD
import SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {