aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-30 11:24:01 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-30 11:24:01 -0700
commitf471c82558347216985a8b7015309c4cce099cc4 (patch)
treeaeab3b7688a2e033e1f3b3a70d80c3a8c2a4181c
parentd7f089323aaee839ab92673ff0d3d26bc9d81ef3 (diff)
downloadspark-f471c82558347216985a8b7015309c4cce099cc4.tar.gz
spark-f471c82558347216985a8b7015309c4cce099cc4.tar.bz2
spark-f471c82558347216985a8b7015309c4cce099cc4.zip
Various reorganization and formatting fixes
-rw-r--r--core/src/main/scala/spark/Accumulators.scala102
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala2
-rw-r--r--core/src/main/scala/spark/SparkContext.scala13
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala19
4 files changed, 79 insertions, 57 deletions
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
index a155adaa87..d764ffc29d 100644
--- a/core/src/main/scala/spark/Accumulators.scala
+++ b/core/src/main/scala/spark/Accumulators.scala
@@ -4,9 +4,22 @@ import java.io._
import scala.collection.mutable.Map
-class Accumulable[T,R] (
- @transient initialValue: T,
- param: AccumulableParam[T,R])
+/**
+ * A datatype that can be accumulated, i.e. has an commutative and associative +.
+ *
+ * You must define how to add data, and how to merge two of these together. For some datatypes, these might be
+ * the same operation (eg., a counter). In that case, you might want to use [[spark.AccumulatorParam]]. They won't
+ * always be the same, though -- eg., imagine you are accumulating a set. You will add items to the set, and you
+ * will union two sets together.
+ *
+ * @param initialValue initial value of accumulator
+ * @param param helper object defining how to add elements of type `T`
+ * @tparam R the full accumulated data
+ * @tparam T partial data that can be added in
+ */
+class Accumulable[R, T] (
+ @transient initialValue: R,
+ param: AccumulableParam[R, T])
extends Serializable {
val id = Accumulators.newId
@@ -18,28 +31,28 @@ class Accumulable[T,R] (
Accumulators.register(this, true)
/**
- * add more data to this accumulator / accumulable
+ * Add more data to this accumulator / accumulable
* @param term the data to add
*/
- def += (term: R) { value_ = param.addAccumulator(value_, term) }
+ def += (term: T) { value_ = param.addAccumulator(value_, term) }
/**
- * merge two accumulable objects together
+ * Merge two accumulable objects together
*
* Normally, a user will not want to use this version, but will instead call `+=`.
* @param term the other Accumulable that will get merged with this
*/
- def ++= (term: T) { value_ = param.addInPlace(value_, term)}
+ def ++= (term: R) { value_ = param.addInPlace(value_, term)}
def value = {
if (!deserialized) value_
- else throw new UnsupportedOperationException("Can't use read value in task")
+ else throw new UnsupportedOperationException("Can't read accumulator value in task")
}
private[spark] def localValue = value_
- def value_= (t: T) {
- if (!deserialized) value_ = t
- else throw new UnsupportedOperationException("Can't use value_= in task")
+ def value_= (r: R) {
+ if (!deserialized) value_ = r
+ else throw new UnsupportedOperationException("Can't assign accumulator value in task")
}
// Called by Java when deserializing an object
@@ -53,58 +66,61 @@ class Accumulable[T,R] (
override def toString = value_.toString
}
-class Accumulator[T](
- @transient initialValue: T,
- param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param)
-
-/**
- * A simpler version of [[spark.AccumulableParam]] where the only datatype you can add in is the same type
- * as the accumulated value
- * @tparam T
- */
-trait AccumulatorParam[T] extends AccumulableParam[T,T] {
- def addAccumulator(t1: T, t2: T) : T = {
- addInPlace(t1, t2)
- }
-}
-
/**
- * A datatype that can be accumulated, ie. has a commutative & associative +.
- *
- * You must define how to add data, and how to merge two of these together. For some datatypes, these might be
- * the same operation (eg., a counter). In that case, you might want to use [[spark.AccumulatorParam]]. They won't
- * always be the same, though -- eg., imagine you are accumulating a set. You will add items to the set, and you
- * will union two sets together.
+ * Helper object defining how to accumulate values of a particular type.
*
* @tparam R the full accumulated data
* @tparam T partial data that can be added in
*/
-trait AccumulableParam[R,T] extends Serializable {
+trait AccumulableParam[R, T] extends Serializable {
/**
* Add additional data to the accumulator value.
- * @param t1 the current value of the accumulator
- * @param t2 the data to be added to the accumulator
+ * @param r the current value of the accumulator
+ * @param t the data to be added to the accumulator
* @return the new value of the accumulator
*/
- def addAccumulator(t1: R, t2: T) : R
+ def addAccumulator(r: R, t: T) : R
/**
- * merge two accumulated values together
- * @param t1 one set of accumulated data
- * @param t2 another set of accumulated data
+ * Merge two accumulated values together
+ * @param r1 one set of accumulated data
+ * @param r2 another set of accumulated data
* @return both data sets merged together
*/
- def addInPlace(t1: R, t2: R): R
+ def addInPlace(r1: R, r2: R): R
def zero(initialValue: R): R
}
+/**
+ * A simpler value of [[spark.Accumulable]] where the result type being accumulated is the same
+ * as the types of elements being merged.
+ *
+ * @param initialValue initial value of accumulator
+ * @param param helper object defining how to add elements of type `T`
+ * @tparam T result type
+ */
+class Accumulator[T](
+ @transient initialValue: T,
+ param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param)
+
+/**
+ * A simpler version of [[spark.AccumulableParam]] where the only datatype you can add in is the same type
+ * as the accumulated value
+ * @tparam T type of value to accumulate
+ */
+trait AccumulatorParam[T] extends AccumulableParam[T, T] {
+ def addAccumulator(t1: T, t2: T) : T = {
+ addInPlace(t1, t2)
+ }
+}
+
// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private object Accumulators {
// TODO: Use soft references? => need to make readObject work properly then
- val originals = Map[Long, Accumulable[_,_]]()
- val localAccums = Map[Thread, Map[Long, Accumulable[_,_]]]()
+ val originals = Map[Long, Accumulable[_, _]]()
+ val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
var lastId: Long = 0
def newId: Long = synchronized {
@@ -112,7 +128,7 @@ private object Accumulators {
return lastId
}
- def register(a: Accumulable[_,_], original: Boolean): Unit = synchronized {
+ def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
if (original) {
originals(a.id) = a
} else {
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 9da73c4b02..ea7171d3a1 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -31,7 +31,7 @@ import SparkContext._
* we need more implicit parameters to convert our keys and values to Writable.
*/
class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
- self: RDD[(K,V)])
+ self: RDD[(K, V)])
extends Logging
with Serializable {
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 32f37822a5..b0f5e12a76 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -134,7 +134,7 @@ class SparkContext(
}
/**
- * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
+ * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf giving its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*/
@@ -286,7 +286,7 @@ class SparkContext(
new Accumulator(initialValue, param)
/**
- * create an accumulatable shared variable, with a `+=` method
+ * Create an accumulable shared variable, with a `+=` method
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
@@ -303,14 +303,7 @@ class SparkContext(
dagScheduler = null
taskScheduler = null
// TODO: Broadcast.stop(), Cache.stop()?
- env.mapOutputTracker.stop()
- env.cacheTracker.stop()
- env.shuffleFetcher.stop()
- env.shuffleManager.stop()
- env.blockManager.stop()
- BlockManagerMaster.stopBlockManagerMaster()
- env.actorSystem.shutdown()
- env.actorSystem.awaitTermination()
+ env.stop()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
logInfo("Successfully stopped SparkContext")
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 602fcca0f9..25593c596b 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -21,7 +21,20 @@ class SparkEnv (
) {
/** No-parameter constructor for unit tests. */
- def this() = this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null)
+ def this() = {
+ this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null)
+ }
+
+ def stop() {
+ mapOutputTracker.stop()
+ cacheTracker.stop()
+ shuffleFetcher.stop()
+ shuffleManager.stop()
+ blockManager.stop()
+ BlockManagerMaster.stopBlockManagerMaster()
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ }
}
object SparkEnv {
@@ -54,8 +67,8 @@ object SparkEnv {
val serializer = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer]
BlockManagerMaster.startBlockManagerMaster(actorSystem, isMaster, isLocal)
-
- var blockManager = new BlockManager(serializer)
+
+ val blockManager = new BlockManager(serializer)
val connectionManager = blockManager.connectionManager