aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-09-11 16:57:17 -0700
committerDenny <dennybritz@gmail.com>2012-09-11 16:57:17 -0700
commit5e4076e3f2eb6b0206119c5d67ac6ee405cee1ad (patch)
treeeeeadcd957958ad0210d86d0e8defde534ab28eb /core/src/main
parent77873d2c8eda58278e136f01f03e154cba40ee79 (diff)
parent943df48348662d1ca17091dd403c5365e27924a8 (diff)
downloadspark-5e4076e3f2eb6b0206119c5d67ac6ee405cee1ad.tar.gz
spark-5e4076e3f2eb6b0206119c5d67ac6ee405cee1ad.tar.bz2
spark-5e4076e3f2eb6b0206119c5d67ac6ee405cee1ad.zip
Merge branch 'dev' into feature/fileserver
Conflicts: core/src/main/scala/spark/SparkContext.scala
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/Accumulators.scala24
-rw-r--r--core/src/main/scala/spark/SparkContext.scala11
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerWorker.scala2
3 files changed, 36 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
index d764ffc29d..c157cc8feb 100644
--- a/core/src/main/scala/spark/Accumulators.scala
+++ b/core/src/main/scala/spark/Accumulators.scala
@@ -3,6 +3,7 @@ package spark
import java.io._
import scala.collection.mutable.Map
+import scala.collection.generic.Growable
/**
* A datatype that can be accumulated, i.e. has an commutative and associative +.
@@ -92,6 +93,29 @@ trait AccumulableParam[R, T] extends Serializable {
def zero(initialValue: R): R
}
+class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
+ extends AccumulableParam[R,T] {
+
+ def addAccumulator(growable: R, elem: T) : R = {
+ growable += elem
+ growable
+ }
+
+ def addInPlace(t1: R, t2: R) : R = {
+ t1 ++= t2
+ t1
+ }
+
+ def zero(initialValue: R): R = {
+ // We need to clone initialValue, but it's hard to specify that R should also be Cloneable.
+ // Instead we'll serialize it to a buffer and load it back.
+ val ser = (new spark.JavaSerializer).newInstance()
+ val copy = ser.deserialize[R](ser.serialize(initialValue))
+ copy.clear() // In case it contained stuff
+ copy
+ }
+}
+
/**
* A simpler value of [[spark.Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged.
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 2bd07f10d4..758c42fa61 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -8,6 +8,7 @@ import akka.actor.Actor
import akka.actor.Actor._
import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.generic.Growable
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
@@ -315,6 +316,16 @@ class SparkContext(
def accumulable[T,R](initialValue: T)(implicit param: AccumulableParam[T,R]) =
new Accumulable(initialValue, param)
+ /**
+ * Create an accumulator from a "mutable collection" type.
+ *
+ * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
+ * standard mutable collections. So you can use this with mutable Map, Set, etc.
+ */
+ def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = {
+ val param = new GrowableAccumulableParam[R,T]
+ new Accumulable(initialValue, param)
+ }
// Keep around a weak hash map of values to Cached versions?
def broadcast[T](value: T) = SparkEnv.get.broadcastManager.newBroadcast[T] (value, isLocal)
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index e317ad3642..0eaa558f44 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -34,7 +34,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
/*logDebug("Processed block messages")*/
return Some(new BlockMessageArray(responseMessages).toBufferMessage)
} catch {
- case e: Exception => logError("Exception handling buffer message: " + e.getMessage)
+ case e: Exception => logError("Exception handling buffer message", e)
return None
}
}