aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-20 14:24:19 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-20 14:24:19 -0800
commit8512dd3225a3ce9c38608a319d7f85fdf75798b4 (patch)
tree088fa6bcdca8b5970131d69ab2a2bc5ccd679cf9 /core
parentfe777eb77dee3c5bc5a7a332098d27f517ad3fe4 (diff)
parent2a87d816a24c62215d682e3a7af65489c0d6e708 (diff)
downloadspark-8512dd3225a3ce9c38608a319d7f85fdf75798b4.tar.gz
spark-8512dd3225a3ce9c38608a319d7f85fdf75798b4.tar.bz2
spark-8512dd3225a3ce9c38608a319d7f85fdf75798b4.zip
Merge branch 'dev' of github.com:radlab/spark into dev-checkpoint
Conflicts: core/src/main/scala/spark/ParallelCollection.scala core/src/test/scala/spark/CheckpointSuite.scala streaming/src/main/scala/spark/streaming/DStream.scala
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/ParallelCollection.scala11
-rw-r--r--core/src/main/scala/spark/SparkContext.scala10
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala16
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java2
4 files changed, 29 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala
index 0bc5b2ff11..f5a3c2990b 100644
--- a/core/src/main/scala/spark/ParallelCollection.scala
+++ b/core/src/main/scala/spark/ParallelCollection.scala
@@ -2,6 +2,7 @@ package spark
import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer
+import scala.collection.Map
private[spark] class ParallelCollectionSplit[T: ClassManifest](
val rddId: Long,
@@ -24,7 +25,8 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest](
private[spark] class ParallelCollection[T: ClassManifest](
@transient sc : SparkContext,
@transient data: Seq[T],
- numSlices: Int)
+ numSlices: Int,
+ locationPrefs : Map[Int,Seq[String]])
extends RDD[T](sc, Nil) {
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
// cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
@@ -41,6 +43,13 @@ private[spark] class ParallelCollection[T: ClassManifest](
override def compute(s: Split) = s.asInstanceOf[ParallelCollectionSplit[T]].iterator
+ override def getPreferredLocations(s: Split): Seq[String] = {
+ locationPrefs.get(s.index) match {
+ case Some(s) => s
+ case _ => Nil
+ }
+ }
+
override def clearDependencies() {
splits_ = null
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 362aa04e66..709b5c38d9 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -190,7 +190,7 @@ class SparkContext(
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
- new ParallelCollection[T](this, seq, numSlices)
+ new ParallelCollection[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
/** Distribute a local Scala collection to form an RDD. */
@@ -198,6 +198,14 @@ class SparkContext(
parallelize(seq, numSlices)
}
+ /** Distribute a local Scala collection to form an RDD, with one or more
+ * location preferences (hostnames of Spark nodes) for each object.
+ * Create a new partition for each collection item. */
+ def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
+ val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
+ new ParallelCollection[T](this, seq.map(_._1), seq.size, indexToPrefs)
+ }
+
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 6bc667bd4c..1f025d175b 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -186,19 +186,19 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
// Test whether the checkpoint file has been created
assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
-
+
// Test whether dependencies have been changed from its earlier parent RDD
assert(operatedRDD.dependencies.head.rdd != parentRDD)
-
+
// Test whether the splits have been changed to the new Hadoop splits
assert(operatedRDD.splits.toList === operatedRDD.checkpointData.get.getSplits.toList)
// Test whether the number of splits is same as before
assert(operatedRDD.splits.length === numSplits)
-
+
// Test whether the data in the checkpointed RDD is same as original
assert(operatedRDD.collect() === result)
-
+
// Test whether serialized size of the RDD has reduced. If the RDD
// does not have any dependency to another RDD (e.g., ParallelCollection,
// ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing.
@@ -211,7 +211,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
"[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
)
}
-
+
// Test whether serialized size of the splits has reduced. If the splits
// do not have any non-transient reference to another RDD or another RDD's splits, it
// does not refer to a lineage and therefore may not reduce in size after checkpointing.
@@ -255,7 +255,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
// Test whether the data in the checkpointed RDD is same as original
assert(operatedRDD.collect() === result)
-
+
// Test whether serialized size of the RDD has reduced because of its parent being
// checkpointed. If this RDD or its parent RDD do not have any dependency
// to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may
@@ -267,7 +267,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
"[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
)
}
-
+
// Test whether serialized size of the splits has reduced because of its parent being
// checkpointed. If the splits do not have any non-transient reference to another RDD
// or another RDD's splits, it does not refer to a lineage and therefore may not reduce
@@ -281,7 +281,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
"[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
)
}
-
+
}
/**
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 5875506179..6bd9836a93 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -44,6 +44,8 @@ public class JavaAPISuite implements Serializable {
public void tearDown() {
sc.stop();
sc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port");
}
static class ReverseIntComparator implements Comparator<Integer>, Serializable {