aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-29 18:20:45 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-29 18:20:45 -0700
commit5932a87cac3870f7b08947b1c768f80a65837008 (patch)
tree0c332e17dcdecd66b690a469b25d03fb4bf48a08
parent1f19fbb8db96135efc79fe56fcd96f1f02598b86 (diff)
parentd7f089323aaee839ab92673ff0d3d26bc9d81ef3 (diff)
downloadspark-5932a87cac3870f7b08947b1c768f80a65837008.tar.gz
spark-5932a87cac3870f7b08947b1c768f80a65837008.tar.bz2
spark-5932a87cac3870f7b08947b1c768f80a65837008.zip
Merge remote-tracking branch 'upstream/dev' into dev
-rw-r--r--core/src/main/scala/spark/Accumulators.scala105
-rw-r--r--core/src/main/scala/spark/SparkContext.scala15
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala6
-rw-r--r--core/src/main/scala/spark/util/Vector.scala (renamed from examples/src/main/scala/spark/examples/Vector.scala)39
-rw-r--r--core/src/test/scala/spark/AccumulatorSuite.scala92
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala9
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala3
-rw-r--r--examples/src/main/scala/spark/examples/LocalFileLR.scala2
-rw-r--r--examples/src/main/scala/spark/examples/LocalKMeans.scala3
-rw-r--r--examples/src/main/scala/spark/examples/LocalLR.scala2
-rw-r--r--examples/src/main/scala/spark/examples/SparkHdfsLR.scala2
-rw-r--r--examples/src/main/scala/spark/examples/SparkKMeans.scala2
-rw-r--r--examples/src/main/scala/spark/examples/SparkLR.scala2
-rwxr-xr-xrun5
14 files changed, 246 insertions, 41 deletions
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
index a2003d8049..a155adaa87 100644
--- a/core/src/main/scala/spark/Accumulators.scala
+++ b/core/src/main/scala/spark/Accumulators.scala
@@ -4,21 +4,39 @@ import java.io._
import scala.collection.mutable.Map
-class Accumulator[T] (
+class Accumulable[T,R] (
@transient initialValue: T,
- param: AccumulatorParam[T])
+ param: AccumulableParam[T,R])
extends Serializable {
val id = Accumulators.newId
@transient
- var value_ = initialValue // Current value on master
+ private var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
var deserialized = false
Accumulators.register(this, true)
- def += (term: T) { value_ = param.addInPlace(value_, term) }
- def value = this.value_
+ /**
+ * add more data to this accumulator / accumulable
+ * @param term the data to add
+ */
+ def += (term: R) { value_ = param.addAccumulator(value_, term) }
+
+ /**
+ * merge two accumulable objects together
+ *
+ * Normally, a user will not want to use this version, but will instead call `+=`.
+ * @param term the other Accumulable that will get merged with this
+ */
+ def ++= (term: T) { value_ = param.addInPlace(value_, term)}
+ def value = {
+ if (!deserialized) value_
+ else throw new UnsupportedOperationException("Can't use read value in task")
+ }
+
+ private[spark] def localValue = value_
+
def value_= (t: T) {
if (!deserialized) value_ = t
else throw new UnsupportedOperationException("Can't use value_= in task")
@@ -35,17 +53,58 @@ class Accumulator[T] (
override def toString = value_.toString
}
-trait AccumulatorParam[T] extends Serializable {
- def addInPlace(t1: T, t2: T): T
- def zero(initialValue: T): T
+class Accumulator[T](
+ @transient initialValue: T,
+ param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param)
+
+/**
+ * A simpler version of [[spark.AccumulableParam]] where the only datatype you can add in is the same type
+ * as the accumulated value
+ * @tparam T
+ */
+trait AccumulatorParam[T] extends AccumulableParam[T,T] {
+ def addAccumulator(t1: T, t2: T) : T = {
+ addInPlace(t1, t2)
+ }
+}
+
+/**
+ * A datatype that can be accumulated, ie. has a commutative & associative +.
+ *
+ * You must define how to add data, and how to merge two of these together. For some datatypes, these might be
+ * the same operation (eg., a counter). In that case, you might want to use [[spark.AccumulatorParam]]. They won't
+ * always be the same, though -- eg., imagine you are accumulating a set. You will add items to the set, and you
+ * will union two sets together.
+ *
+ * @tparam R the full accumulated data
+ * @tparam T partial data that can be added in
+ */
+trait AccumulableParam[R,T] extends Serializable {
+ /**
+ * Add additional data to the accumulator value.
+ * @param t1 the current value of the accumulator
+ * @param t2 the data to be added to the accumulator
+ * @return the new value of the accumulator
+ */
+ def addAccumulator(t1: R, t2: T) : R
+
+ /**
+ * merge two accumulated values together
+ * @param t1 one set of accumulated data
+ * @param t2 another set of accumulated data
+ * @return both data sets merged together
+ */
+ def addInPlace(t1: R, t2: R): R
+
+ def zero(initialValue: R): R
}
// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private object Accumulators {
// TODO: Use soft references? => need to make readObject work properly then
- val originals = Map[Long, Accumulator[_]]()
- val localAccums = Map[Thread, Map[Long, Accumulator[_]]]()
+ val originals = Map[Long, Accumulable[_,_]]()
+ val localAccums = Map[Thread, Map[Long, Accumulable[_,_]]]()
var lastId: Long = 0
def newId: Long = synchronized {
@@ -53,14 +112,12 @@ private object Accumulators {
return lastId
}
- def register(a: Accumulator[_], original: Boolean) {
- synchronized {
- if (original) {
- originals(a.id) = a
- } else {
- val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
- accums(a.id) = a
- }
+ def register(a: Accumulable[_,_], original: Boolean): Unit = synchronized {
+ if (original) {
+ originals(a.id) = a
+ } else {
+ val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
+ accums(a.id) = a
}
}
@@ -75,18 +132,16 @@ private object Accumulators {
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
- ret(id) = accum.value
+ ret(id) = accum.localValue
}
return ret
}
// Add values to the original accumulators with some given IDs
- def add(values: Map[Long, Any]) {
- synchronized {
- for ((id, value) <- values) {
- if (originals.contains(id)) {
- originals(id).asInstanceOf[Accumulator[Any]] += value
- }
+ def add(values: Map[Long, Any]): Unit = synchronized {
+ for ((id, value) <- values) {
+ if (originals.contains(id)) {
+ originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value
}
}
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index bfd3e8d732..d9c055cd40 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -274,13 +274,26 @@ class SparkContext(
}
/** Build the union of a list of RDDs. */
- def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = new UnionRDD(this, rdds)
+ def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
+
+ /** Build the union of a list of RDDs. */
+ def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] =
+ new UnionRDD(this, Seq(first) ++ rest)
// Methods for creating shared variables
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)
+ /**
+ * create an accumulatable shared variable, with a `+=` method
+ * @tparam T accumulator type
+ * @tparam R type that can be added to the accumulator
+ */
+ def accumulable[T,R](initialValue: T)(implicit param: AccumulableParam[T,R]) =
+ new Accumulable(initialValue, param)
+
+
// Keep around a weak hash map of values to Cached versions?
def broadcast[T](value: T) = Broadcast.getBroadcastFactory.newBroadcast[T] (value, isLocal)
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index 2d43bfa4ef..08c92b145e 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -177,7 +177,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
implicit val cm: ClassManifest[T] = first.classManifest
- sc.union(rdds: _*)(cm)
+ sc.union(rdds)(cm)
}
override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
@@ -186,12 +186,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
implicit val cm: ClassManifest[(K, V)] = first.classManifest
implicit val kcm: ClassManifest[K] = first.kManifest
implicit val vcm: ClassManifest[V] = first.vManifest
- new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm)
+ new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
}
override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
- new JavaDoubleRDD(sc.union(rdds: _*))
+ new JavaDoubleRDD(sc.union(rdds))
}
def intAccumulator(initialValue: Int): Accumulator[Int] =
diff --git a/examples/src/main/scala/spark/examples/Vector.scala b/core/src/main/scala/spark/util/Vector.scala
index 2abccbafce..4e95ac2ac6 100644
--- a/examples/src/main/scala/spark/examples/Vector.scala
+++ b/core/src/main/scala/spark/util/Vector.scala
@@ -1,8 +1,8 @@
-package spark.examples
+package spark.util
class Vector(val elements: Array[Double]) extends Serializable {
def length = elements.length
-
+
def apply(index: Int) = elements(index)
def + (other: Vector): Vector = {
@@ -29,12 +29,43 @@ class Vector(val elements: Array[Double]) extends Serializable {
return ans
}
+ /**
+ * return (this + plus) dot other, but without creating any intermediate storage
+ * @param plus
+ * @param other
+ * @return
+ */
+ def plusDot(plus: Vector, other: Vector): Double = {
+ if (length != other.length)
+ throw new IllegalArgumentException("Vectors of different length")
+ if (length != plus.length)
+ throw new IllegalArgumentException("Vectors of different length")
+ var ans = 0.0
+ var i = 0
+ while (i < length) {
+ ans += (this(i) + plus(i)) * other(i)
+ i += 1
+ }
+ return ans
+ }
+
+ def +=(other: Vector) {
+ if (length != other.length)
+ throw new IllegalArgumentException("Vectors of different length")
+ var ans = 0.0
+ var i = 0
+ while (i < length) {
+ elements(i) += other(i)
+ i += 1
+ }
+ }
+
def * (scale: Double): Vector = Vector(length, i => this(i) * scale)
def / (d: Double): Vector = this * (1 / d)
def unary_- = this * -1
-
+
def sum = elements.reduceLeft(_ + _)
def squaredDist(other: Vector): Double = {
@@ -76,6 +107,8 @@ object Vector {
implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] {
def addInPlace(t1: Vector, t2: Vector) = t1 + t2
+
def zero(initialValue: Vector) = Vector.zeros(initialValue.length)
}
+
}
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
new file mode 100644
index 0000000000..d55969c261
--- /dev/null
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -0,0 +1,92 @@
+package spark
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import collection.mutable
+import java.util.Random
+import scala.math.exp
+import scala.math.signum
+import spark.SparkContext._
+
+class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
+
+ var sc: SparkContext = null
+
+ after {
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
+ }
+
+ test ("basic accumulation"){
+ sc = new SparkContext("local", "test")
+ val acc : Accumulator[Int] = sc.accumulator(0)
+
+ val d = sc.parallelize(1 to 20)
+ d.foreach{x => acc += x}
+ acc.value should be (210)
+ }
+
+ test ("value not assignable from tasks") {
+ sc = new SparkContext("local", "test")
+ val acc : Accumulator[Int] = sc.accumulator(0)
+
+ val d = sc.parallelize(1 to 20)
+ evaluating {d.foreach{x => acc.value = x}} should produce [Exception]
+ }
+
+ test ("add value to collection accumulators") {
+ import SetAccum._
+ val maxI = 1000
+ for (nThreads <- List(1, 10)) { //test single & multi-threaded
+ sc = new SparkContext("local[" + nThreads + "]", "test")
+ val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
+ val d = sc.parallelize(1 to maxI)
+ d.foreach {
+ x => acc += x
+ }
+ val v = acc.value.asInstanceOf[mutable.Set[Int]]
+ for (i <- 1 to maxI) {
+ v should contain(i)
+ }
+ sc.stop()
+ sc = null
+ }
+ }
+
+
+ implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
+ def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
+ t1 ++= t2
+ t1
+ }
+ def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = {
+ t1 += t2
+ t1
+ }
+ def zero(t: mutable.Set[Any]) : mutable.Set[Any] = {
+ new mutable.HashSet[Any]()
+ }
+ }
+
+
+ test ("value not readable in tasks") {
+ import SetAccum._
+ val maxI = 1000
+ for (nThreads <- List(1, 10)) { //test single & multi-threaded
+ sc = new SparkContext("local[" + nThreads + "]", "test")
+ val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
+ val d = sc.parallelize(1 to maxI)
+ evaluating {
+ d.foreach {
+ x => acc.value += x
+ }
+ } should produce [SparkException]
+ sc.stop()
+ sc = null
+ }
+ }
+
+}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 3924a6890b..4a79c086e9 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -30,6 +30,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(partitionSums.collect().toList === List(3, 7))
}
+ test("SparkContext.union") {
+ sc = new SparkContext("local", "test")
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
+ assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
+ assert(sc.union(Seq(nums)).collect().toList === List(1, 2, 3, 4))
+ assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
+ }
+
test("aggregate") {
sc = new SparkContext("local", "test")
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 027d1423d4..1ed5519d37 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -215,6 +215,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY)
+ Thread.sleep(100)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size === 2)
@@ -224,7 +225,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach {
assert(store.get("list4").get.size === 2)
}
- test("ByteBufferInputStream bugs") {
+ test("negative byte values in ByteBufferInputStream") {
val buffer = ByteBuffer.wrap(Array[Int](254, 255, 0, 1, 2).map(_.toByte).toArray)
val stream = new ByteBufferInputStream(buffer)
val temp = new Array[Byte](10)
diff --git a/examples/src/main/scala/spark/examples/LocalFileLR.scala b/examples/src/main/scala/spark/examples/LocalFileLR.scala
index b819fe80fe..f958ef9f72 100644
--- a/examples/src/main/scala/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/spark/examples/LocalFileLR.scala
@@ -1,7 +1,7 @@
package spark.examples
import java.util.Random
-import Vector._
+import spark.util.Vector
object LocalFileLR {
val D = 10 // Numer of dimensions
diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala
index 7e8e7a6959..b442c604cd 100644
--- a/examples/src/main/scala/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala
@@ -1,8 +1,7 @@
package spark.examples
import java.util.Random
-import Vector._
-import spark.SparkContext
+import spark.util.Vector
import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala
index 72c5009109..f2ac2b3e06 100644
--- a/examples/src/main/scala/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/spark/examples/LocalLR.scala
@@ -1,7 +1,7 @@
package spark.examples
import java.util.Random
-import Vector._
+import spark.util.Vector
object LocalLR {
val N = 10000 // Number of data points
diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
index 13b6ec1d3f..5b2bc84d69 100644
--- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
@@ -2,7 +2,7 @@ package spark.examples
import java.util.Random
import scala.math.exp
-import Vector._
+import spark.util.Vector
import spark._
object SparkHdfsLR {
diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala
index 5eb1c95a16..adce551322 100644
--- a/examples/src/main/scala/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala
@@ -1,8 +1,8 @@
package spark.examples
import java.util.Random
-import Vector._
import spark.SparkContext
+import spark.util.Vector
import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala
index 7715e5a713..19123db738 100644
--- a/examples/src/main/scala/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkLR.scala
@@ -2,7 +2,7 @@ package spark.examples
import java.util.Random
import scala.math.exp
-import Vector._
+import spark.util.Vector
import spark._
object SparkLR {
diff --git a/run b/run
index d386892b95..8f7256b4e5 100755
--- a/run
+++ b/run
@@ -78,6 +78,7 @@ export CLASSPATH # Needed for spark-shell
# when we exit, so we allow it to set a variable to launch with scala.
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
RUNNER="${SCALA_HOME}/bin/scala"
+ EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS
else
CLASSPATH+=":$SCALA_HOME/lib/scala-library.jar"
CLASSPATH+=":$SCALA_HOME/lib/scala-compiler.jar"
@@ -87,6 +88,8 @@ else
else
RUNNER=java
fi
+ # The JVM doesn't read JAVA_OPTS by default so we need to pass it in
+ EXTRA_ARGS="$JAVA_OPTS"
fi
-exec "$RUNNER" -cp "$CLASSPATH" "$@"
+exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"