diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-20 14:24:19 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-20 14:24:19 -0800 |
commit | 8512dd3225a3ce9c38608a319d7f85fdf75798b4 (patch) | |
tree | 088fa6bcdca8b5970131d69ab2a2bc5ccd679cf9 /core | |
parent | fe777eb77dee3c5bc5a7a332098d27f517ad3fe4 (diff) | |
parent | 2a87d816a24c62215d682e3a7af65489c0d6e708 (diff) | |
download | spark-8512dd3225a3ce9c38608a319d7f85fdf75798b4.tar.gz spark-8512dd3225a3ce9c38608a319d7f85fdf75798b4.tar.bz2 spark-8512dd3225a3ce9c38608a319d7f85fdf75798b4.zip |
Merge branch 'dev' of github.com:radlab/spark into dev-checkpoint
Conflicts:
core/src/main/scala/spark/ParallelCollection.scala
core/src/test/scala/spark/CheckpointSuite.scala
streaming/src/main/scala/spark/streaming/DStream.scala
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/ParallelCollection.scala | 11 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 10 | ||||
-rw-r--r-- | core/src/test/scala/spark/CheckpointSuite.scala | 16 | ||||
-rw-r--r-- | core/src/test/scala/spark/JavaAPISuite.java | 2 |
4 files changed, 29 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala index 0bc5b2ff11..f5a3c2990b 100644 --- a/core/src/main/scala/spark/ParallelCollection.scala +++ b/core/src/main/scala/spark/ParallelCollection.scala @@ -2,6 +2,7 @@ package spark import scala.collection.immutable.NumericRange import scala.collection.mutable.ArrayBuffer +import scala.collection.Map private[spark] class ParallelCollectionSplit[T: ClassManifest]( val rddId: Long, @@ -24,7 +25,8 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest]( private[spark] class ParallelCollection[T: ClassManifest]( @transient sc : SparkContext, @transient data: Seq[T], - numSlices: Int) + numSlices: Int, + locationPrefs : Map[Int,Seq[String]]) extends RDD[T](sc, Nil) { // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split @@ -41,6 +43,13 @@ private[spark] class ParallelCollection[T: ClassManifest]( override def compute(s: Split) = s.asInstanceOf[ParallelCollectionSplit[T]].iterator + override def getPreferredLocations(s: Split): Seq[String] = { + locationPrefs.get(s.index) match { + case Some(s) => s + case _ => Nil + } + } + override def clearDependencies() { splits_ = null } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 362aa04e66..709b5c38d9 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -190,7 +190,7 @@ class SparkContext( /** Distribute a local Scala collection to form an RDD. */ def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { - new ParallelCollection[T](this, seq, numSlices) + new ParallelCollection[T](this, seq, numSlices, Map[Int, Seq[String]]()) } /** Distribute a local Scala collection to form an RDD. */ @@ -198,6 +198,14 @@ class SparkContext( parallelize(seq, numSlices) } + /** Distribute a local Scala collection to form an RDD, with one or more + * location preferences (hostnames of Spark nodes) for each object. + * Create a new partition for each collection item. */ + def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = { + val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap + new ParallelCollection[T](this, seq.map(_._1), seq.size, indexToPrefs) + } + /** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 6bc667bd4c..1f025d175b 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -186,19 +186,19 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { // Test whether the checkpoint file has been created assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result) - + // Test whether dependencies have been changed from its earlier parent RDD assert(operatedRDD.dependencies.head.rdd != parentRDD) - + // Test whether the splits have been changed to the new Hadoop splits assert(operatedRDD.splits.toList === operatedRDD.checkpointData.get.getSplits.toList) // Test whether the number of splits is same as before assert(operatedRDD.splits.length === numSplits) - + // Test whether the data in the checkpointed RDD is same as original assert(operatedRDD.collect() === result) - + // Test whether serialized size of the RDD has reduced. If the RDD // does not have any dependency to another RDD (e.g., ParallelCollection, // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing. @@ -211,7 +211,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]" ) } - + // Test whether serialized size of the splits has reduced. If the splits // do not have any non-transient reference to another RDD or another RDD's splits, it // does not refer to a lineage and therefore may not reduce in size after checkpointing. @@ -255,7 +255,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { // Test whether the data in the checkpointed RDD is same as original assert(operatedRDD.collect() === result) - + // Test whether serialized size of the RDD has reduced because of its parent being // checkpointed. If this RDD or its parent RDD do not have any dependency // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may @@ -267,7 +267,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]" ) } - + // Test whether serialized size of the splits has reduced because of its parent being // checkpointed. If the splits do not have any non-transient reference to another RDD // or another RDD's splits, it does not refer to a lineage and therefore may not reduce @@ -281,7 +281,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]" ) } - + } /** diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 5875506179..6bd9836a93 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -44,6 +44,8 @@ public class JavaAPISuite implements Serializable { public void tearDown() { sc.stop(); sc = null; + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port"); } static class ReverseIntComparator implements Comparator<Integer>, Serializable { |