aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-09 20:31:42 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-09 20:31:42 -0800
commit4be1cdc8b9ca038ab1a99fafa2eedd336e36dcc7 (patch)
tree349a0b8e99d1166d48ea67c301fdaab4663d2bf1 /core
parente42721601898ff199ca1c6cfeae159ad3ef691e3 (diff)
parent3e796bdd57297134ed40b20d7692cd9c8cd6efba (diff)
downloadspark-4be1cdc8b9ca038ab1a99fafa2eedd336e36dcc7.tar.gz
spark-4be1cdc8b9ca038ab1a99fafa2eedd336e36dcc7.tar.bz2
spark-4be1cdc8b9ca038ab1a99fafa2eedd336e36dcc7.zip
Merge pull request #5 from radlab/flume-integration
Flume integration
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/ParallelCollection.scala11
-rw-r--r--core/src/main/scala/spark/SparkContext.scala10
2 files changed, 18 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala
index 9725017b61..4bd9e1bd54 100644
--- a/core/src/main/scala/spark/ParallelCollection.scala
+++ b/core/src/main/scala/spark/ParallelCollection.scala
@@ -2,6 +2,7 @@ package spark
import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer
+import scala.collection.Map
private[spark] class ParallelCollectionSplit[T: ClassManifest](
val rddId: Long,
@@ -24,7 +25,8 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest](
private[spark] class ParallelCollection[T: ClassManifest](
@transient sc : SparkContext,
@transient data: Seq[T],
- numSlices: Int)
+ numSlices: Int,
+ locationPrefs : Map[Int,Seq[String]])
extends RDD[T](sc, Nil) {
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
// cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
@@ -40,7 +42,12 @@ private[spark] class ParallelCollection[T: ClassManifest](
override def compute(s: Split) = s.asInstanceOf[ParallelCollectionSplit[T]].iterator
- override def preferredLocations(s: Split): Seq[String] = Nil
+ override def preferredLocations(s: Split): Seq[String] = {
+ locationPrefs.get(splits_.indexOf(s)) match {
+ case Some(s) => s
+ case _ => Nil
+ }
+ }
}
private object ParallelCollection {
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index d7b46bee38..3ccdbfe10e 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -194,7 +194,7 @@ class SparkContext(
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
- new ParallelCollection[T](this, seq, numSlices)
+ new ParallelCollection[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
/** Distribute a local Scala collection to form an RDD. */
@@ -202,6 +202,14 @@ class SparkContext(
parallelize(seq, numSlices)
}
+ /** Distribute a local Scala collection to form an RDD, with one or more
+ * location preferences (hostnames of Spark nodes) for each object.
+ * Create a new partition for each collection item. */
+ def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
+ val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
+ new ParallelCollection[T](this, seq.map(_._1), seq.size, indexToPrefs)
+ }
+
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.