aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala14
-rw-r--r--core/src/main/scala/spark/RDD.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala32
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala212
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala25
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala124
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala69
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala22
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala5
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala12
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala16
-rw-r--r--core/src/test/scala/spark/CacheTrackerSuite.scala2
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala10
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala16
-rw-r--r--docs/_config.yml2
-rw-r--r--docs/bagel-programming-guide.md5
-rw-r--r--docs/quick-start.md15
-rw-r--r--docs/running-on-yarn.md6
-rw-r--r--docs/scala-programming-guide.md8
-rw-r--r--project/SparkBuild.scala56
-rw-r--r--project/plugins.sbt4
23 files changed, 593 insertions, 76 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index d9cbe3730a..c5db6ce63a 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -208,23 +208,19 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
// TODO: fetch any remote copy of the split that may be available
// TODO: also register a listener for when it unloads
logInfo("Computing partition " + split)
+ val elements = new ArrayBuffer[Any]
+ elements ++= rdd.compute(split)
try {
- // BlockManager will iterate over results from compute to create RDD
- blockManager.put(key, rdd.compute(split), storageLevel, true)
+ // Try to put this block in the blockManager
+ blockManager.put(key, elements, storageLevel, true)
//future.apply() // Wait for the reply from the cache tracker
- blockManager.get(key) match {
- case Some(values) =>
- return values.asInstanceOf[Iterator[T]]
- case None =>
- logWarning("loading partition failed after computing it " + key)
- return null
- }
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
+ return elements.iterator.asInstanceOf[Iterator[T]]
}
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 984738ef73..c9334f68a8 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -461,7 +461,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
return buf.toArray
}
- /*
+ /**
* Return the first element in this RDD.
*/
def first(): T = take(1) match {
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index 9731bb4eac..4ad006c4a7 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -22,8 +22,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
import JavaDoubleRDD.fromRDD
+ /** Persist this RDD with the default storage level (MEMORY_ONLY). */
def cache(): JavaDoubleRDD = fromRDD(srdd.cache())
+ /**
+ * Set this RDD's storage level to persist its values across operations after the first time
+ * it is computed. Can only be called once on each RDD.
+ */
def persist(newLevel: StorageLevel): JavaDoubleRDD = fromRDD(srdd.persist(newLevel))
// first() has to be overriden here in order for its return type to be Double instead of Object.
@@ -31,38 +36,63 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
// Transformations (return a new RDD)
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct())
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits))
+ /**
+ * Return a new RDD containing only the elements that satisfy a predicate.
+ */
def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD =
fromRDD(srdd.filter(x => f(x).booleanValue()))
+ /**
+ * Return a sampled subset of this RDD.
+ */
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD =
fromRDD(srdd.sample(withReplacement, fraction, seed))
+ /**
+ * Return the union of this RDD and another one. Any identical elements will appear multiple
+ * times (use `.distinct()` to eliminate them).
+ */
def union(other: JavaDoubleRDD): JavaDoubleRDD = fromRDD(srdd.union(other.srdd))
// Double RDD functions
+ /** Return the sum of the elements in this RDD. */
def sum(): Double = srdd.sum()
+ /** Return a [[spark.StatCounter]] describing the elements in this RDD. */
def stats(): StatCounter = srdd.stats()
+ /** Return the mean of the elements in this RDD. */
def mean(): Double = srdd.mean()
+ /** Return the variance of the elements in this RDD. */
def variance(): Double = srdd.variance()
+ /** Return the standard deviation of the elements in this RDD. */
def stdev(): Double = srdd.stdev()
+ /** Return the approximate mean of the elements in this RDD. */
def meanApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
srdd.meanApprox(timeout, confidence)
+ /** Return the approximate mean of the elements in this RDD. */
def meanApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.meanApprox(timeout)
+ /** Return the approximate sum of the elements in this RDD. */
def sumApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
srdd.sumApprox(timeout, confidence)
-
+
+ /** Return the approximate sum of the elements in this RDD. */
def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout)
}
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index d361de8f8f..cfaa26ff52 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -34,23 +34,44 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
// Common RDD functions
+ /** Persist this RDD with the default storage level (MEMORY_ONLY). */
def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache())
+ /**
+ * Set this RDD's storage level to persist its values across operations after the first time
+ * it is computed. Can only be called once on each RDD.
+ */
def persist(newLevel: StorageLevel): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.persist(newLevel))
// Transformations (return a new RDD)
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct())
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits))
+ /**
+ * Return a new RDD containing only the elements that satisfy a predicate.
+ */
def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
+ /**
+ * Return a sampled subset of this RDD.
+ */
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed))
+ /**
+ * Return the union of this RDD and another one. Any identical elements will appear multiple
+ * times (use `.distinct()` to eliminate them).
+ */
def union(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.union(other.rdd))
@@ -61,7 +82,21 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
override def first(): (K, V) = rdd.first()
// Pair RDD functions
-
+
+ /**
+ * Generic function to combine the elements for each key using a custom set of aggregation
+ * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
+ * "combined type" C * Note that V and C can be different -- for example, one might group an
+ * RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
+ * functions:
+ *
+ * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
+ * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
+ * - `mergeCombiners`, to combine two C's into a single one.
+ *
+ * In addition, users can control the partitioning of the output RDD, and whether to perform
+ * map-side aggregation (if a mapper can produce multiple items with the same key).
+ */
def combineByKey[C](createCombiner: Function[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
@@ -76,50 +111,113 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
))
}
+ /**
+ * Simplified version of combineByKey that hash-partitions the output RDD.
+ */
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
numSplits: Int): JavaPairRDD[K, C] =
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits))
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce.
+ */
def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.reduceByKey(partitioner, func))
+ /**
+ * Merge the values for each key using an associative reduce function, but return the results
+ * immediately to the master as a Map. This will also perform the merging locally on each mapper
+ * before sending results to a reducer, similarly to a "combiner" in MapReduce.
+ */
def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] =
mapAsJavaMap(rdd.reduceByKeyLocally(func))
+ /** Count the number of elements for each key, and return the result to the master as a Map. */
def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey())
+ /**
+ * (Experimental) Approximate version of countByKey that can return a partial result if it does
+ * not finish within a timeout.
+ */
def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout).map(mapAsJavaMap)
+ /**
+ * (Experimental) Approximate version of countByKey that can return a partial result if it does
+ * not finish within a timeout.
+ */
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
: PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits.
+ */
def reduceByKey(func: JFunction2[V, V, V], numSplits: Int): JavaPairRDD[K, V] =
fromRDD(rdd.reduceByKey(func, numSplits))
+ /**
+ * Group the values for each key in the RDD into a single sequence. Allows controlling the
+ * partitioning of the resulting key-value pair RDD by passing a Partitioner.
+ */
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
+ /**
+ * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+ * resulting RDD with into `numSplits` partitions.
+ */
def groupByKey(numSplits: Int): JavaPairRDD[K, JList[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numSplits)))
+ /**
+ * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
+ * is true, Spark will group values of the same key together on the map side before the
+ * repartitioning, to only send each key over the network once. If a large number of
+ * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
+ * be set to true.
+ */
def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
fromRDD(rdd.partitionBy(partitioner))
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce.
+ */
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
fromRDD(rdd.join(other, partitioner))
+ /**
+ * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+ * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
+ * partition the output RDD.
+ */
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (V, Option[W])] =
fromRDD(rdd.leftOuterJoin(other, partitioner))
+ /**
+ * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+ * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
+ * partition the output RDD.
+ */
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Option[V], W)] =
fromRDD(rdd.rightOuterJoin(other, partitioner))
+ /**
+ * Simplified version of combineByKey that hash-partitions the resulting RDD using the default
+ * parallelism level.
+ */
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
@@ -128,40 +226,94 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners))
}
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level.
+ */
def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
val partitioner = rdd.defaultPartitioner(rdd)
fromRDD(reduceByKey(partitioner, func))
}
+ /**
+ * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+ * resulting RDD with the default parallelism level.
+ */
def groupByKey(): JavaPairRDD[K, JList[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey()))
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+ * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+ * (k, v2) is in `other`. Performs a hash join across the cluster.
+ */
def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] =
fromRDD(rdd.join(other))
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+ * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+ * (k, v2) is in `other`. Performs a hash join across the cluster.
+ */
def join[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, W)] =
fromRDD(rdd.join(other, numSplits))
+ /**
+ * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+ * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
+ * using the default level of parallelism.
+ */
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] =
fromRDD(rdd.leftOuterJoin(other))
+ /**
+ * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+ * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
+ * into `numSplits` partitions.
+ */
def leftOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, Option[W])] =
fromRDD(rdd.leftOuterJoin(other, numSplits))
+ /**
+ * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+ * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
+ * RDD using the default parallelism level.
+ */
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] =
fromRDD(rdd.rightOuterJoin(other))
+ /**
+ * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+ * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
+ * RDD into the given number of partitions.
+ */
def rightOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (Option[V], W)] =
fromRDD(rdd.rightOuterJoin(other, numSplits))
+ /**
+ * Return the key-value pairs in this RDD to the master as a Map.
+ */
def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap())
+ /**
+ * Pass each value in the key-value pair RDD through a map function without changing the keys;
+ * this also retains the original RDD's partitioning.
+ */
def mapValues[U](f: Function[V, U]): JavaPairRDD[K, U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
fromRDD(rdd.mapValues(f))
}
+ /**
+ * Pass each value in the key-value pair RDD through a flatMap function without changing the
+ * keys; this also retains the original RDD's partitioning.
+ */
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
@@ -170,37 +322,68 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(rdd.flatMapValues(fn))
}
+ /**
+ * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+ * list of values for that key in `this` as well as `other`.
+ */
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
+ /**
+ * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+ * tuple with the list of values for that key in `this`, `other1` and `other2`.
+ */
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
+ /**
+ * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+ * list of values for that key in `this` as well as `other`.
+ */
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other)))
+ /**
+ * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+ * tuple with the list of values for that key in `this`, `other1` and `other2`.
+ */
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
+ /**
+ * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+ * list of values for that key in `this` as well as `other`.
+ */
def cogroup[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (JList[V], JList[W])]
= fromRDD(cogroupResultToJava(rdd.cogroup(other, numSplits)))
+ /**
+ * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+ * tuple with the list of values for that key in `this`, `other1` and `other2`.
+ */
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numSplits: Int)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numSplits)))
+ /** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
+ /** Alias for cogroup. */
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
+ /**
+ * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
+ * RDD has a known partitioner by only searching the partition that the key maps to.
+ */
def lookup(key: K): JList[V] = seqAsJavaList(rdd.lookup(key))
+ /** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
@@ -210,6 +393,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}
+ /** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
@@ -218,6 +402,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
}
+ /** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
keyClass: Class[_],
@@ -227,6 +412,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}
+ /** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
keyClass: Class[_],
@@ -235,6 +421,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
}
+ /**
+ * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
+ * that storage system. The JobConf should set an OutputFormat and any output paths required
+ * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
+ * MapReduce job.
+ */
def saveAsHadoopDataset(conf: JobConf) {
rdd.saveAsHadoopDataset(conf)
}
@@ -243,13 +435,31 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
// Ordered RDD Functions
def sortByKey(): JavaPairRDD[K, V] = sortByKey(true)
+ /**
+ * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+ * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+ * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+ * order of the keys).
+ */
def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
sortByKey(comp, true)
}
+ /**
+ * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+ * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+ * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+ * order of the keys).
+ */
def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V] = sortByKey(comp, true)
+ /**
+ * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+ * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+ * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+ * order of the keys).
+ */
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
class KeyOrdering(val a: K) extends Ordered[K] {
override def compare(b: K) = comp.compare(a, b)
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index b3e1977bcb..50be5a6cbe 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -11,22 +11,43 @@ JavaRDDLike[T, JavaRDD[T]] {
// Common RDD functions
+ /** Persist this RDD with the default storage level (MEMORY_ONLY). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
+ /**
+ * Set this RDD's storage level to persist its values across operations after the first time
+ * it is computed. Can only be called once on each RDD.
+ */
def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel))
// Transformations (return a new RDD)
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
def distinct(): JavaRDD[T] = wrapRDD(rdd.distinct())
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits))
-
+
+ /**
+ * Return a new RDD containing only the elements that satisfy a predicate.
+ */
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
wrapRDD(rdd.filter((x => f(x).booleanValue())))
+ /**
+ * Return a sampled subset of this RDD.
+ */
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
wrapRDD(rdd.sample(withReplacement, fraction, seed))
-
+
+ /**
+ * Return the union of this RDD and another one. Any identical elements will appear multiple
+ * times (use `.distinct()` to eliminate them).
+ */
def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd))
}
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 785dd96394..43d0ca0e2f 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -19,41 +19,71 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def rdd: RDD[T]
+ /** Set of partitions in this RDD. */
def splits: JList[Split] = new java.util.ArrayList(rdd.splits.toSeq)
+ /** The [[spark.SparkContext]] that this RDD was created on. */
def context: SparkContext = rdd.context
-
+
+ /** A unique ID for this RDD (within its SparkContext). */
def id: Int = rdd.id
+ /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel: StorageLevel = rdd.getStorageLevel
+ /**
+ * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
+ * This should ''not'' be called by users directly, but is available for implementors of custom
+ * subclasses of RDD.
+ */
def iterator(split: Split): java.util.Iterator[T] = asJavaIterator(rdd.iterator(split))
// Transformations (return a new RDD)
+ /**
+ * Return a new RDD by applying a function to all elements of this RDD.
+ */
def map[R](f: JFunction[T, R]): JavaRDD[R] =
new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
+ /**
+ * Return a new RDD by applying a function to all elements of this RDD.
+ */
def map[R](f: DoubleFunction[T]): JavaDoubleRDD =
new JavaDoubleRDD(rdd.map(x => f(x).doubleValue()))
+ /**
+ * Return a new RDD by applying a function to all elements of this RDD.
+ */
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
}
+ /**
+ * Return a new RDD by first applying a function to all elements of this
+ * RDD, and then flattening the results.
+ */
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(f.elementType()))(f.elementType())
}
+ /**
+ * Return a new RDD by first applying a function to all elements of this
+ * RDD, and then flattening the results.
+ */
def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
}
+ /**
+ * Return a new RDD by first applying a function to all elements of this
+ * RDD, and then flattening the results.
+ */
def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
@@ -61,22 +91,35 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaRDD.fromRDD(rdd.mapPartitions(fn)(f.elementType()))(f.elementType())
}
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
}
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]):
JavaPairRDD[K, V] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
}
+ /**
+ * Return an RDD created by coalescing all elements within each partition into an array.
+ */
def glom(): JavaRDD[JList[T]] =
new JavaRDD(rdd.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
@@ -84,6 +127,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest,
other.classManifest)
+ /**
+ * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
+ * mapping to that key.
+ */
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
implicit val kcm: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
@@ -92,6 +139,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm)
}
+ /**
+ * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
+ * mapping to that key.
+ */
def groupBy[K](f: JFunction[T, K], numSplits: Int): JavaPairRDD[K, JList[T]] = {
implicit val kcm: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
@@ -100,56 +151,114 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numSplits)(f.returnType)))(kcm, vcm)
}
+ /**
+ * Return an RDD created by piping elements to a forked external process.
+ */
def pipe(command: String): JavaRDD[String] = rdd.pipe(command)
+ /**
+ * Return an RDD created by piping elements to a forked external process.
+ */
def pipe(command: JList[String]): JavaRDD[String] =
rdd.pipe(asScalaBuffer(command))
+ /**
+ * Return an RDD created by piping elements to a forked external process.
+ */
def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env))
// Actions (launch a job to return a value to the user program)
-
+
+ /**
+ * Applies a function f to all elements of this RDD.
+ */
def foreach(f: VoidFunction[T]) {
val cleanF = rdd.context.clean(f)
rdd.foreach(cleanF)
}
+ /**
+ * Return an array that contains all of the elements in this RDD.
+ */
def collect(): JList[T] = {
import scala.collection.JavaConversions._
val arr: java.util.Collection[T] = rdd.collect().toSeq
new java.util.ArrayList(arr)
}
-
+
+ /**
+ * Reduces the elements of this RDD using the specified associative binary operator.
+ */
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
+ /**
+ * Aggregate the elements of each partition, and then the results for all the partitions, using a
+ * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
+ * modify t1 and return it as its result value to avoid object allocation; however, it should not
+ * modify t2.
+ */
def fold(zeroValue: T)(f: JFunction2[T, T, T]): T =
rdd.fold(zeroValue)(f)
+ /**
+ * Aggregate the elements of each partition, and then the results for all the partitions, using
+ * given combine functions and a neutral "zero value". This function can return a different result
+ * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
+ * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
+ * allowed to modify and return their first argument instead of creating a new U to avoid memory
+ * allocation.
+ */
def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],
combOp: JFunction2[U, U, U]): U =
rdd.aggregate(zeroValue)(seqOp, combOp)(seqOp.returnType)
+ /**
+ * Return the number of elements in the RDD.
+ */
def count(): Long = rdd.count()
+ /**
+ * (Experimental) Approximate version of count() that returns a potentially incomplete result
+ * within a timeout, even if not all tasks have finished.
+ */
def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
rdd.countApprox(timeout, confidence)
+ /**
+ * (Experimental) Approximate version of count() that returns a potentially incomplete result
+ * within a timeout, even if not all tasks have finished.
+ */
def countApprox(timeout: Long): PartialResult[BoundedDouble] =
rdd.countApprox(timeout)
+ /**
+ * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
+ * combine step happens locally on the master, equivalent to running a single reduce task.
+ */
def countByValue(): java.util.Map[T, java.lang.Long] =
mapAsJavaMap(rdd.countByValue().map((x => (x._1, new lang.Long(x._2)))))
+ /**
+ * (Experimental) Approximate version of countByValue().
+ */
def countByValueApprox(
timeout: Long,
confidence: Double
): PartialResult[java.util.Map[T, BoundedDouble]] =
rdd.countByValueApprox(timeout, confidence).map(mapAsJavaMap)
+ /**
+ * (Experimental) Approximate version of countByValue().
+ */
def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] =
rdd.countByValueApprox(timeout).map(mapAsJavaMap)
+ /**
+ * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
+ * it will be slow if a lot of partitions are required. In that case, use collect() to get the
+ * whole RDD instead.
+ */
def take(num: Int): JList[T] = {
import scala.collection.JavaConversions._
val arr: java.util.Collection[T] = rdd.take(num).toSeq
@@ -162,9 +271,18 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
new java.util.ArrayList(arr)
}
+ /**
+ * Return the first element in this RDD.
+ */
def first(): T = rdd.first()
+ /**
+ * Save this RDD as a text file, using string representations of elements.
+ */
def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
+ /**
+ * Save this RDD as a SequenceFile of serialized objects.
+ */
def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
}
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index 4a7d945a8d..c78b09c750 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -18,26 +18,47 @@ import scala.collection.JavaConversions
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
+ /**
+ * @constructor Returns a new SparkContext.
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param frameworkName A name for your job, to display on the cluster web UI
+ */
def this(master: String, frameworkName: String) = this(new SparkContext(master, frameworkName))
+ /**
+ * @constructor Returns a new SparkContext.
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param frameworkName A name for your job, to display on the cluster web UI
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jarFile A path to a local jar file containing this job
+ */
def this(master: String, frameworkName: String, sparkHome: String, jarFile: String) =
this(new SparkContext(master, frameworkName, sparkHome, Seq(jarFile)))
+ /**
+ * @constructor Returns a new SparkContext.
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param frameworkName A name for your job, to display on the cluster web UI
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jars A set of jar files relating to this job
+ */
def this(master: String, frameworkName: String, sparkHome: String, jars: Array[String]) =
this(new SparkContext(master, frameworkName, sparkHome, jars.toSeq))
val env = sc.env
+ /** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
}
+ /** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T]): JavaRDD[T] =
parallelize(list, sc.defaultParallelism)
-
+ /** Distribute a local Scala collection to form an RDD. */
def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
: JavaPairRDD[K, V] = {
implicit val kcm: ClassManifest[K] =
@@ -47,21 +68,32 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
}
+ /** Distribute a local Scala collection to form an RDD. */
def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]]): JavaPairRDD[K, V] =
parallelizePairs(list, sc.defaultParallelism)
+ /** Distribute a local Scala collection to form an RDD. */
def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD =
JavaDoubleRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list).map(_.doubleValue()),
numSlices))
+ /** Distribute a local Scala collection to form an RDD. */
def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =
parallelizeDoubles(list, sc.defaultParallelism)
+ /**
+ * Read a text file from HDFS, a local file system (available on all nodes), or any
+ * Hadoop-supported file system URI, and return it as an RDD of Strings.
+ */
def textFile(path: String): JavaRDD[String] = sc.textFile(path)
+ /**
+ * Read a text file from HDFS, a local file system (available on all nodes), or any
+ * Hadoop-supported file system URI, and return it as an RDD of Strings.
+ */
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
- /**Get an RDD for a Hadoop SequenceFile with given key and value types */
+ /**Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
@@ -72,6 +104,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
}
+ /**Get an RDD for a Hadoop SequenceFile. */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
JavaPairRDD[K, V] = {
implicit val kcm = ClassManifest.fromClass(keyClass)
@@ -92,6 +125,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.objectFile(path, minSplits)(cm)
}
+ /**
+ * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
+ * BytesWritable values that contain a serialized partition. This is still an experimental storage
+ * format and may not be supported exactly as is in future Spark releases. It will also be pretty
+ * slow if you use the default serializer (Java serialization), though the nice thing about it is
+ * that there's very little effort required to save arbitrary objects.
+ */
def objectFile[T](path: String): JavaRDD[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
@@ -180,12 +220,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
}
+ /** Build the union of two or more RDDs. */
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
implicit val cm: ClassManifest[T] = first.classManifest
sc.union(rdds)(cm)
}
+ /** Build the union of two or more RDDs. */
override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
: JavaPairRDD[K, V] = {
val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
@@ -195,26 +237,49 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
}
+ /** Build the union of two or more RDDs. */
override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
new JavaDoubleRDD(sc.union(rdds))
}
+ /**
+ * Create an [[spark.Accumulator]] integer variable, which tasks can "add" values
+ * to using the `+=` method. Only the master can access the accumulator's `value`.
+ */
def intAccumulator(initialValue: Int): Accumulator[Int] =
sc.accumulator(initialValue)(IntAccumulatorParam)
+ /**
+ * Create an [[spark.Accumulator]] double variable, which tasks can "add" values
+ * to using the `+=` method. Only the master can access the accumulator's `value`.
+ */
def doubleAccumulator(initialValue: Double): Accumulator[Double] =
sc.accumulator(initialValue)(DoubleAccumulatorParam)
+ /**
+ * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
+ * to using the `+=` method. Only the master can access the accumulator's `value`.
+ */
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
sc.accumulator(initialValue)(accumulatorParam)
+ /**
+ * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
+ * reading it in distributed functions. The variable will be sent to each cluster only once.
+ */
def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value)
+ /** Shut down the SparkContext. */
def stop() {
sc.stop()
}
+ /**
+ * Get Spark's home location from either a value set through the constructor,
+ * or the spark.home Java property, or the SPARK_HOME environment variable
+ * (in that order of preference). If neither of these is set, return None.
+ */
def getSparkHome(): Option[String] = sc.getSparkHome()
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 6ecf9fa8da..dfdb22024e 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -141,12 +141,12 @@ private[spark] class Executor extends Logging {
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
- logInfo("Fetching " + name)
+ logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File("."))
currentFiles(name) = timestamp
}
- for ((name, timestamp) <- newJars if currentFiles.getOrElse(name, -1L) < timestamp) {
- logInfo("Fetching " + name)
+ for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
+ logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File("."))
currentJars(name) = timestamp
// Add it to our class loader
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index 2b38d8b52e..b84b4dc2ed 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -109,12 +109,12 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
- logInfo("Fetching " + name)
+ logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File("."))
currentFiles(name) = timestamp
}
- for ((name, timestamp) <- newJars if currentFiles.getOrElse(name, -1L) < timestamp) {
- logInfo("Fetching " + name)
+ for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
+ logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File("."))
currentJars(name) = timestamp
// Add it to our class loader
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 91b7bebfb3..8a111f44c9 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -237,7 +237,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
diskStore.getValues(blockId) match {
case Some(iterator) =>
// Put the block back in memory before returning it
- memoryStore.putValues(blockId, iterator, level, true).data match {
+ // TODO: Consider creating a putValues that also takes in a iterator ?
+ val elements = new ArrayBuffer[Any]
+ elements ++= iterator
+ memoryStore.putValues(blockId, elements, level, true).data match {
case Left(iterator2) =>
return Some(iterator2)
case _ =>
@@ -529,11 +532,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
+ : Long = {
+ val elements = new ArrayBuffer[Any]
+ elements ++= values
+ put(blockId, elements, level, tellMaster)
+ }
+
/**
* Put a new block of values to the block manager. Returns its (estimated) size in bytes.
*/
- def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true)
- : Long = {
+ def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
+ tellMaster: Boolean = true) : Long = {
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
@@ -766,7 +776,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*/
- def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
+ def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
@@ -774,8 +784,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo("Writing block " + blockId + " to disk")
data match {
- case Left(iterator) =>
- diskStore.putValues(blockId, iterator, level, false)
+ case Left(elements) =>
+ diskStore.putValues(blockId, elements, level, false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 1286600cd1..096bf8bdd9 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,6 +1,7 @@
package spark.storage
import java.nio.ByteBuffer
+import scala.collection.mutable.ArrayBuffer
import spark.Logging
@@ -18,8 +19,8 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
- def putValues(blockId: String, values: Iterator[Any], level: StorageLevel, returnValues: Boolean)
- : PutResult
+ def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
+ returnValues: Boolean) : PutResult
/**
* Return the size of a block in bytes.
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index fd92a3dc67..8ba64e4b76 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -3,11 +3,15 @@ package spark.storage
import java.nio.ByteBuffer
import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel.MapMode
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import java.util.{Random, Date}
-import spark.Utils
import java.text.SimpleDateFormat
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark.Utils
+
/**
* Stores BlockManager blocks on disk.
*/
@@ -45,7 +49,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def putValues(
blockId: String,
- values: Iterator[Any],
+ values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
@@ -56,7 +60,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val fileOut = blockManager.wrapForCompression(blockId,
new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
- objOut.writeAll(values)
+ objOut.writeAll(values.iterator)
objOut.close()
val length = file.length()
logDebug("Block %s stored as %s file on disk in %d ms".format(
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index e9288fdf43..773970446a 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -46,19 +46,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def putValues(
blockId: String,
- values: Iterator[Any],
+ values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
if (level.deserialized) {
- val elements = new ArrayBuffer[Any]
- elements ++= values
- val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- tryToPut(blockId, elements, sizeEstimate, true)
- PutResult(sizeEstimate, Left(elements.iterator))
+ val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
+ tryToPut(blockId, values, sizeEstimate, true)
+ PutResult(sizeEstimate, Left(values.iterator))
} else {
- val bytes = blockManager.dataSerialize(blockId, values)
+ val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes))
}
@@ -146,7 +144,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
- Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ Left(value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
@@ -199,7 +197,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
for (blockId <- selectedBlocks) {
val entry = entries.get(blockId)
val data = if (entry.deserialized) {
- Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala
index 426c0d26e9..467605981b 100644
--- a/core/src/test/scala/spark/CacheTrackerSuite.scala
+++ b/core/src/test/scala/spark/CacheTrackerSuite.scala
@@ -22,7 +22,7 @@ class CacheTrackerSuite extends FunSuite {
} catch {
case e: Exception =>
throw new SparkException("Error communicating with actor", e)
- }
+ }
}
test("CacheTrackerActor slave initialization & cache status") {
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 433d2fdc19..76b0884481 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -27,6 +27,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc = null
}
System.clearProperty("spark.reducer.maxMbInFlight")
+ System.clearProperty("spark.storage.memoryFraction")
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
@@ -156,4 +157,13 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(data.count() === 1000)
assert(data.count() === 1000)
}
+
+ test("compute without caching with low memory") {
+ System.setProperty("spark.storage.memoryFraction", "0.0001")
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ }
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 31b33eae09..b9c19e61cd 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -268,9 +268,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY)
- store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true)
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
assert(store.get("list3") != None, "list3 was not in store")
@@ -279,7 +279,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
assert(store.get("list1") != None, "list1 was not in store")
assert(store.get("list1").get.size == 2)
assert(store.get("list2") != None, "list2 was not in store")
@@ -294,9 +294,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list3 = List(new Array[Byte](200), new Array[Byte](200))
val list4 = List(new Array[Byte](200), new Array[Byte](200))
// First store list1 and list2, both in memory, and list3, on disk only
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER)
- store.put("list3", list3.iterator, StorageLevel.DISK_ONLY)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true)
+ store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true)
// At this point LRU should not kick in because list3 is only on disk
assert(store.get("list1") != None, "list2 was not in store")
assert(store.get("list1").get.size === 2)
@@ -311,7 +311,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list3") != None, "list1 was not in store")
assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER)
+ store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size === 2)
diff --git a/docs/_config.yml b/docs/_config.yml
index 7c18ab3fa9..07d068fecb 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -3,6 +3,6 @@ markdown: kramdown
# These allow the documentation to be updated with nerw releases
# of Spark, Scala, and Mesos.
-SPARK_VERSION: 0.6.0
+SPARK_VERSION: 0.6.0-SNAPSHOT
SCALA_VERSION: 2.9.2
MESOS_VERSION: 0.9.0-incubating
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
index cfaa73f45d..e8d3865d75 100644
--- a/docs/bagel-programming-guide.md
+++ b/docs/bagel-programming-guide.md
@@ -14,8 +14,9 @@ This guide shows the programming model and features of Bagel by walking through
To write a Bagel application, you will need to add Spark, its dependencies, and Bagel to your CLASSPATH:
1. Run `sbt/sbt update` to fetch Spark's dependencies, if you haven't already done so.
-2. Run `sbt/sbt assembly` to build Spark and its dependencies into one JAR (`core/target/scala_2.8.1/Spark Core-assembly-0.3-SNAPSHOT.jar`) and Bagel into a second JAR (`bagel/target/scala_2.8.1/Bagel-assembly-0.3-SNAPSHOT.jar`).
-3. Add these two JARs to your CLASSPATH.
+2. Run `sbt/sbt assembly` to build Spark and its dependencies into one JAR (`core/target/spark-core-assembly-{{site.SPARK_VERSION}}.jar`)
+3. Run `sbt/sbt package` build the Bagel JAR (`bagel/target/scala_{{site.SCALA_VERSION}}/spark-bagel_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar`).
+4. Add these two JARs to your CLASSPATH.
## Programming Model
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 7d35fb01bb..5625fc2ddf 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -101,13 +101,9 @@ res9: Long = 15
It may seem silly to use a Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark).
# A Standalone Job in Scala
-Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you using other build systems, please reference the Spark assembly JAR in the developer guide. The first step is to publish Spark to our local Ivy/Maven repositories. From the Spark directory:
+Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you using other build systems, consider using the Spark assembly JAR described in the developer guide.
-{% highlight bash %}
-$ sbt/sbt publish-local
-{% endhighlight %}
-
-Next, we'll create a very simple Spark job in Scala. So simple, in fact, that it's named `SimpleJob.scala`:
+We'll create a very simple Spark job in Scala. So simple, in fact, that it's named `SimpleJob.scala`:
{% highlight scala %}
/*** SimpleJob.scala ***/
@@ -159,12 +155,9 @@ Lines with a: 8422, Lines with b: 1836
This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS.
# A Standalone Job In Java
-Now say we wanted to write a standalone job using the Java API. We will walk through doing this with Maven. If you using other build systems, please reference the Spark assembly JAR in the developer guide. The first step is to publish Spark to our local Ivy/Maven repositories. From the Spark directory:
+Now say we wanted to write a standalone job using the Java API. We will walk through doing this with Maven. If you using other build systems, consider using the Spark assembly JAR described in the developer guide.
-{% highlight bash %}
-$ sbt/sbt publish-local
-{% endhighlight %}
-Next, we'll create a very simple Spark job, `SimpleJob.java`:
+We'll create a very simple Spark job, `SimpleJob.java`:
{% highlight java %}
/*** SimpleJob.java ***/
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index dd094ab131..6fb81b6004 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -19,7 +19,7 @@ branch of Spark, called `yarn`, which you can do as follows:
- In order to distribute Spark within the cluster, it must be packaged into a single JAR file. This can be done by running `sbt/sbt assembly`
- Your application code must be packaged into a separate JAR file.
-If you want to test out the YARN deployment mode, you can use the current Spark examples. A `spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}-SNAPSHOT.jar` file can be generated by running `sbt/sbt package`. NOTE: since the documentation you're reading is for Spark version {{site.SPARK_VERSION}}, we are assuming here that you have downloaded Spark {{site.SPARK_VERSION}} or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.
+If you want to test out the YARN deployment mode, you can use the current Spark examples. A `spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}` file can be generated by running `sbt/sbt package`. NOTE: since the documentation you're reading is for Spark version {{site.SPARK_VERSION}}, we are assuming here that you have downloaded Spark {{site.SPARK_VERSION}} or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.
# Launching Spark on YARN
@@ -35,8 +35,8 @@ The command to launch the YARN Client is as follows:
For example:
- SPARK_JAR=./core/target/spark-core-assembly-{{site.SPARK_VERSION}}-SNAPSHOT.jar ./run spark.deploy.yarn.Client \
- --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}-SNAPSHOT.jar \
+ SPARK_JAR=./core/target/spark-core-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \
+ --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \
--class spark.examples.SparkPi \
--args standalone \
--num-workers 3 \
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 8c084528d7..73f8b123be 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -17,7 +17,13 @@ This guide shows each of these features and walks through some samples. It assum
# Linking with Spark
-To write a Spark application, you will need to add both Spark and its dependencies to your CLASSPATH. The easiest way to do this is to run `sbt/sbt assembly` to build both Spark and its dependencies into one JAR (`core/target/spark-core-assembly-0.6.0.jar`), then add this to your CLASSPATH. Alternatively, you can publish Spark to the Maven cache on your machine using `sbt/sbt publish-local`. It will be an artifact called `spark-core` under the organization `org.spark-project`.
+To write a Spark application, you will need to add both Spark and its dependencies to your CLASSPATH. If you use sbt or Maven, Spark is available through Maven Central at:
+
+ groupId = org.spark_project
+ artifactId = spark-core_{{site.SCALA_VERSION}}
+ version = {{site.SPARK_VERSION}}
+
+For other build systems or environments, you can run `sbt/sbt assembly` to build both Spark and its dependencies into one JAR (`core/target/spark-core-assembly-0.6.0.jar`), then add this to your CLASSPATH.
In addition, you'll need to import some Spark classes and implicit conversions. Add the following lines at the top of your program:
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 4184008506..5aefdd2eed 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -4,13 +4,15 @@ import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._
import twirl.sbt.TwirlPlugin._
+// For Sonatype publishing
+// import com.jsuereth.pgp.sbtplugin.PgpKeys._
object SparkBuild extends Build {
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
// "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop.
val HADOOP_VERSION = "0.20.205.0"
- lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel)
+ lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel)
lazy val core = Project("core", file("core"), settings = coreSettings)
@@ -33,7 +35,51 @@ object SparkBuild extends Build {
retrieveManaged := true,
transitiveClassifiers in Scope.GlobalScope := Seq("sources"),
testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))),
- publishTo <<= baseDirectory { base => Some(Resolver.file("Local", base / "target" / "maven" asFile)(Patterns(true, Resolver.mavenStyleBasePattern))) },
+
+ /* For Sonatype publishing
+ resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
+ "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"),
+
+ publishMavenStyle := true,
+
+ useGpg in Global := true,
+
+ pomExtra := (
+ <url>http://spark-project.org/</url>
+ <licenses>
+ <license>
+ <name>BSD License</name>
+ <url>https://github.com/mesos/spark/blob/master/LICENSE</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+ <scm>
+ <connection>scm:git:git@github.com:mesos/spark.git</connection>
+ <url>scm:git:git@github.com:mesos/spark.git</url>
+ </scm>
+ <developers>
+ <developer>
+ <id>matei</id>
+ <name>Matei Zaharia</name>
+ <email>matei.zaharia@gmail.com</email>
+ <url>http://www.cs.berkeley.edu/~matei</url>
+ <organization>U.C. Berkeley Computer Science</organization>
+ <organizationUrl>http://www.cs.berkeley.edu/</organizationUrl>
+ </developer>
+ </developers>
+ ),
+
+ publishTo <<= version { (v: String) =>
+ val nexus = "https://oss.sonatype.org/"
+ if (v.trim.endsWith("SNAPSHOT"))
+ Some("sonatype-snapshots" at nexus + "content/repositories/snapshots")
+ else
+ Some("sonatype-staging" at nexus + "service/local/staging/deploy/maven2")
+ },
+
+ credentials += Credentials(Path.userHome / ".sbt" / "sonatype.credentials"),
+ */
+
libraryDependencies ++= Seq(
"org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
"org.scalatest" %% "scalatest" % "1.6.1" % "test",
@@ -64,6 +110,7 @@ object SparkBuild extends Build {
"Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/"
),
+
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "11.0.1",
"log4j" % "log4j" % "1.2.16",
@@ -85,6 +132,10 @@ object SparkBuild extends Build {
)
) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings
+ def rootSettings = sharedSettings ++ Seq(
+ publish := {}
+ )
+
def replSettings = sharedSettings ++ Seq(
name := "spark-repl",
libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _)
@@ -103,5 +154,4 @@ object SparkBuild extends Build {
case _ => MergeStrategy.first
}
)
-
}
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 896fa4834f..36497f8eda 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -11,3 +11,7 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1")
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0")
addSbtPlugin("cc.spray" %% "sbt-twirl" % "0.5.2")
+
+// For Sonatype publishing
+// resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
+// addSbtPlugin("com.jsuereth" % "xsbt-gpg-plugin" % "0.6")