diff options
author | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-07-29 18:20:45 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-07-29 18:20:45 -0700 |
commit | 5932a87cac3870f7b08947b1c768f80a65837008 (patch) | |
tree | 0c332e17dcdecd66b690a469b25d03fb4bf48a08 | |
parent | 1f19fbb8db96135efc79fe56fcd96f1f02598b86 (diff) | |
parent | d7f089323aaee839ab92673ff0d3d26bc9d81ef3 (diff) | |
download | spark-5932a87cac3870f7b08947b1c768f80a65837008.tar.gz spark-5932a87cac3870f7b08947b1c768f80a65837008.tar.bz2 spark-5932a87cac3870f7b08947b1c768f80a65837008.zip |
Merge remote-tracking branch 'upstream/dev' into dev
-rw-r--r-- | core/src/main/scala/spark/Accumulators.scala | 105 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 15 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaSparkContext.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/util/Vector.scala (renamed from examples/src/main/scala/spark/examples/Vector.scala) | 39 | ||||
-rw-r--r-- | core/src/test/scala/spark/AccumulatorSuite.scala | 92 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 9 | ||||
-rw-r--r-- | core/src/test/scala/spark/storage/BlockManagerSuite.scala | 3 | ||||
-rw-r--r-- | examples/src/main/scala/spark/examples/LocalFileLR.scala | 2 | ||||
-rw-r--r-- | examples/src/main/scala/spark/examples/LocalKMeans.scala | 3 | ||||
-rw-r--r-- | examples/src/main/scala/spark/examples/LocalLR.scala | 2 | ||||
-rw-r--r-- | examples/src/main/scala/spark/examples/SparkHdfsLR.scala | 2 | ||||
-rw-r--r-- | examples/src/main/scala/spark/examples/SparkKMeans.scala | 2 | ||||
-rw-r--r-- | examples/src/main/scala/spark/examples/SparkLR.scala | 2 | ||||
-rwxr-xr-x | run | 5 |
14 files changed, 246 insertions, 41 deletions
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index a2003d8049..a155adaa87 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -4,21 +4,39 @@ import java.io._ import scala.collection.mutable.Map -class Accumulator[T] ( +class Accumulable[T,R] ( @transient initialValue: T, - param: AccumulatorParam[T]) + param: AccumulableParam[T,R]) extends Serializable { val id = Accumulators.newId @transient - var value_ = initialValue // Current value on master + private var value_ = initialValue // Current value on master val zero = param.zero(initialValue) // Zero value to be passed to workers var deserialized = false Accumulators.register(this, true) - def += (term: T) { value_ = param.addInPlace(value_, term) } - def value = this.value_ + /** + * add more data to this accumulator / accumulable + * @param term the data to add + */ + def += (term: R) { value_ = param.addAccumulator(value_, term) } + + /** + * merge two accumulable objects together + * + * Normally, a user will not want to use this version, but will instead call `+=`. + * @param term the other Accumulable that will get merged with this + */ + def ++= (term: T) { value_ = param.addInPlace(value_, term)} + def value = { + if (!deserialized) value_ + else throw new UnsupportedOperationException("Can't use read value in task") + } + + private[spark] def localValue = value_ + def value_= (t: T) { if (!deserialized) value_ = t else throw new UnsupportedOperationException("Can't use value_= in task") @@ -35,17 +53,58 @@ class Accumulator[T] ( override def toString = value_.toString } -trait AccumulatorParam[T] extends Serializable { - def addInPlace(t1: T, t2: T): T - def zero(initialValue: T): T +class Accumulator[T]( + @transient initialValue: T, + param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param) + +/** + * A simpler version of [[spark.AccumulableParam]] where the only datatype you can add in is the same type + * as the accumulated value + * @tparam T + */ +trait AccumulatorParam[T] extends AccumulableParam[T,T] { + def addAccumulator(t1: T, t2: T) : T = { + addInPlace(t1, t2) + } +} + +/** + * A datatype that can be accumulated, ie. has a commutative & associative +. + * + * You must define how to add data, and how to merge two of these together. For some datatypes, these might be + * the same operation (eg., a counter). In that case, you might want to use [[spark.AccumulatorParam]]. They won't + * always be the same, though -- eg., imagine you are accumulating a set. You will add items to the set, and you + * will union two sets together. + * + * @tparam R the full accumulated data + * @tparam T partial data that can be added in + */ +trait AccumulableParam[R,T] extends Serializable { + /** + * Add additional data to the accumulator value. + * @param t1 the current value of the accumulator + * @param t2 the data to be added to the accumulator + * @return the new value of the accumulator + */ + def addAccumulator(t1: R, t2: T) : R + + /** + * merge two accumulated values together + * @param t1 one set of accumulated data + * @param t2 another set of accumulated data + * @return both data sets merged together + */ + def addInPlace(t1: R, t2: R): R + + def zero(initialValue: R): R } // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right private object Accumulators { // TODO: Use soft references? => need to make readObject work properly then - val originals = Map[Long, Accumulator[_]]() - val localAccums = Map[Thread, Map[Long, Accumulator[_]]]() + val originals = Map[Long, Accumulable[_,_]]() + val localAccums = Map[Thread, Map[Long, Accumulable[_,_]]]() var lastId: Long = 0 def newId: Long = synchronized { @@ -53,14 +112,12 @@ private object Accumulators { return lastId } - def register(a: Accumulator[_], original: Boolean) { - synchronized { - if (original) { - originals(a.id) = a - } else { - val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map()) - accums(a.id) = a - } + def register(a: Accumulable[_,_], original: Boolean): Unit = synchronized { + if (original) { + originals(a.id) = a + } else { + val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map()) + accums(a.id) = a } } @@ -75,18 +132,16 @@ private object Accumulators { def values: Map[Long, Any] = synchronized { val ret = Map[Long, Any]() for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) { - ret(id) = accum.value + ret(id) = accum.localValue } return ret } // Add values to the original accumulators with some given IDs - def add(values: Map[Long, Any]) { - synchronized { - for ((id, value) <- values) { - if (originals.contains(id)) { - originals(id).asInstanceOf[Accumulator[Any]] += value - } + def add(values: Map[Long, Any]): Unit = synchronized { + for ((id, value) <- values) { + if (originals.contains(id)) { + originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value } } } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index bfd3e8d732..d9c055cd40 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -274,13 +274,26 @@ class SparkContext( } /** Build the union of a list of RDDs. */ - def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = new UnionRDD(this, rdds) + def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) + + /** Build the union of a list of RDDs. */ + def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] = + new UnionRDD(this, Seq(first) ++ rest) // Methods for creating shared variables def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = new Accumulator(initialValue, param) + /** + * create an accumulatable shared variable, with a `+=` method + * @tparam T accumulator type + * @tparam R type that can be added to the accumulator + */ + def accumulable[T,R](initialValue: T)(implicit param: AccumulableParam[T,R]) = + new Accumulable(initialValue, param) + + // Keep around a weak hash map of values to Cached versions? def broadcast[T](value: T) = Broadcast.getBroadcastFactory.newBroadcast[T] (value, isLocal) diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index 2d43bfa4ef..08c92b145e 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -177,7 +177,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) implicit val cm: ClassManifest[T] = first.classManifest - sc.union(rdds: _*)(cm) + sc.union(rdds)(cm) } override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]]) @@ -186,12 +186,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork implicit val cm: ClassManifest[(K, V)] = first.classManifest implicit val kcm: ClassManifest[K] = first.kManifest implicit val vcm: ClassManifest[V] = first.vManifest - new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm) + new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm) } override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = { val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd) - new JavaDoubleRDD(sc.union(rdds: _*)) + new JavaDoubleRDD(sc.union(rdds)) } def intAccumulator(initialValue: Int): Accumulator[Int] = diff --git a/examples/src/main/scala/spark/examples/Vector.scala b/core/src/main/scala/spark/util/Vector.scala index 2abccbafce..4e95ac2ac6 100644 --- a/examples/src/main/scala/spark/examples/Vector.scala +++ b/core/src/main/scala/spark/util/Vector.scala @@ -1,8 +1,8 @@ -package spark.examples +package spark.util class Vector(val elements: Array[Double]) extends Serializable { def length = elements.length - + def apply(index: Int) = elements(index) def + (other: Vector): Vector = { @@ -29,12 +29,43 @@ class Vector(val elements: Array[Double]) extends Serializable { return ans } + /** + * return (this + plus) dot other, but without creating any intermediate storage + * @param plus + * @param other + * @return + */ + def plusDot(plus: Vector, other: Vector): Double = { + if (length != other.length) + throw new IllegalArgumentException("Vectors of different length") + if (length != plus.length) + throw new IllegalArgumentException("Vectors of different length") + var ans = 0.0 + var i = 0 + while (i < length) { + ans += (this(i) + plus(i)) * other(i) + i += 1 + } + return ans + } + + def +=(other: Vector) { + if (length != other.length) + throw new IllegalArgumentException("Vectors of different length") + var ans = 0.0 + var i = 0 + while (i < length) { + elements(i) += other(i) + i += 1 + } + } + def * (scale: Double): Vector = Vector(length, i => this(i) * scale) def / (d: Double): Vector = this * (1 / d) def unary_- = this * -1 - + def sum = elements.reduceLeft(_ + _) def squaredDist(other: Vector): Double = { @@ -76,6 +107,8 @@ object Vector { implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] { def addInPlace(t1: Vector, t2: Vector) = t1 + t2 + def zero(initialValue: Vector) = Vector.zeros(initialValue.length) } + } diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala new file mode 100644 index 0000000000..d55969c261 --- /dev/null +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -0,0 +1,92 @@ +package spark + +import org.scalatest.BeforeAndAfter +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers +import collection.mutable +import java.util.Random +import scala.math.exp +import scala.math.signum +import spark.SparkContext._ + +class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { + + var sc: SparkContext = null + + after { + if (sc != null) { + sc.stop() + sc = null + } + } + + test ("basic accumulation"){ + sc = new SparkContext("local", "test") + val acc : Accumulator[Int] = sc.accumulator(0) + + val d = sc.parallelize(1 to 20) + d.foreach{x => acc += x} + acc.value should be (210) + } + + test ("value not assignable from tasks") { + sc = new SparkContext("local", "test") + val acc : Accumulator[Int] = sc.accumulator(0) + + val d = sc.parallelize(1 to 20) + evaluating {d.foreach{x => acc.value = x}} should produce [Exception] + } + + test ("add value to collection accumulators") { + import SetAccum._ + val maxI = 1000 + for (nThreads <- List(1, 10)) { //test single & multi-threaded + sc = new SparkContext("local[" + nThreads + "]", "test") + val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) + val d = sc.parallelize(1 to maxI) + d.foreach { + x => acc += x + } + val v = acc.value.asInstanceOf[mutable.Set[Int]] + for (i <- 1 to maxI) { + v should contain(i) + } + sc.stop() + sc = null + } + } + + + implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] { + def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = { + t1 ++= t2 + t1 + } + def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = { + t1 += t2 + t1 + } + def zero(t: mutable.Set[Any]) : mutable.Set[Any] = { + new mutable.HashSet[Any]() + } + } + + + test ("value not readable in tasks") { + import SetAccum._ + val maxI = 1000 + for (nThreads <- List(1, 10)) { //test single & multi-threaded + sc = new SparkContext("local[" + nThreads + "]", "test") + val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) + val d = sc.parallelize(1 to maxI) + evaluating { + d.foreach { + x => acc.value += x + } + } should produce [SparkException] + sc.stop() + sc = null + } + } + +} diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 3924a6890b..4a79c086e9 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -30,6 +30,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter { assert(partitionSums.collect().toList === List(3, 7)) } + test("SparkContext.union") { + sc = new SparkContext("local", "test") + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + assert(sc.union(nums).collect().toList === List(1, 2, 3, 4)) + assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) + assert(sc.union(Seq(nums)).collect().toList === List(1, 2, 3, 4)) + assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) + } + test("aggregate") { sc = new SparkContext("local", "test") val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 027d1423d4..1ed5519d37 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -215,6 +215,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { assert(store.get("list3").get.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY) + Thread.sleep(100) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2").get.size === 2) @@ -224,7 +225,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { assert(store.get("list4").get.size === 2) } - test("ByteBufferInputStream bugs") { + test("negative byte values in ByteBufferInputStream") { val buffer = ByteBuffer.wrap(Array[Int](254, 255, 0, 1, 2).map(_.toByte).toArray) val stream = new ByteBufferInputStream(buffer) val temp = new Array[Byte](10) diff --git a/examples/src/main/scala/spark/examples/LocalFileLR.scala b/examples/src/main/scala/spark/examples/LocalFileLR.scala index b819fe80fe..f958ef9f72 100644 --- a/examples/src/main/scala/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/spark/examples/LocalFileLR.scala @@ -1,7 +1,7 @@ package spark.examples import java.util.Random -import Vector._ +import spark.util.Vector object LocalFileLR { val D = 10 // Numer of dimensions diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala index 7e8e7a6959..b442c604cd 100644 --- a/examples/src/main/scala/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala @@ -1,8 +1,7 @@ package spark.examples import java.util.Random -import Vector._ -import spark.SparkContext +import spark.util.Vector import spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala index 72c5009109..f2ac2b3e06 100644 --- a/examples/src/main/scala/spark/examples/LocalLR.scala +++ b/examples/src/main/scala/spark/examples/LocalLR.scala @@ -1,7 +1,7 @@ package spark.examples import java.util.Random -import Vector._ +import spark.util.Vector object LocalLR { val N = 10000 // Number of data points diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala index 13b6ec1d3f..5b2bc84d69 100644 --- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala @@ -2,7 +2,7 @@ package spark.examples import java.util.Random import scala.math.exp -import Vector._ +import spark.util.Vector import spark._ object SparkHdfsLR { diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala index 5eb1c95a16..adce551322 100644 --- a/examples/src/main/scala/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala @@ -1,8 +1,8 @@ package spark.examples import java.util.Random -import Vector._ import spark.SparkContext +import spark.util.Vector import spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala index 7715e5a713..19123db738 100644 --- a/examples/src/main/scala/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/spark/examples/SparkLR.scala @@ -2,7 +2,7 @@ package spark.examples import java.util.Random import scala.math.exp -import Vector._ +import spark.util.Vector import spark._ object SparkLR { @@ -78,6 +78,7 @@ export CLASSPATH # Needed for spark-shell # when we exit, so we allow it to set a variable to launch with scala. if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then RUNNER="${SCALA_HOME}/bin/scala" + EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS else CLASSPATH+=":$SCALA_HOME/lib/scala-library.jar" CLASSPATH+=":$SCALA_HOME/lib/scala-compiler.jar" @@ -87,6 +88,8 @@ else else RUNNER=java fi + # The JVM doesn't read JAVA_OPTS by default so we need to pass it in + EXTRA_ARGS="$JAVA_OPTS" fi -exec "$RUNNER" -cp "$CLASSPATH" "$@" +exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" |