aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorroot <root@ip-10-191-125-131.ec2.internal>2011-10-17 18:07:35 +0000
committerroot <root@ip-10-191-125-131.ec2.internal>2011-10-17 18:07:35 +0000
commit3a0e6c436317457c8483c7106d5ba6551b6367c4 (patch)
tree88937ec72f9b8e25d072a2842ec900ef97a5ebb6 /core
parent49505a0b0bc0292725fb84fc6af3f3370d6ea692 (diff)
downloadspark-3a0e6c436317457c8483c7106d5ba6551b6367c4.tar.gz
spark-3a0e6c436317457c8483c7106d5ba6551b6367c4.tar.bz2
spark-3a0e6c436317457c8483c7106d5ba6551b6367c4.zip
Miscellaneous fixes:
- Executor should initialize logging properly - groupByKey should allow custom partitioner
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Executor.scala2
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala11
2 files changed, 12 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index a2af70989c..e1256a229e 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -22,6 +22,8 @@ class Executor extends org.apache.mesos.Executor with Logging {
var threadPool: ExecutorService = null
var env: SparkEnv = null
+ initLogging()
+
override def init(d: ExecutorDriver, args: ExecutorArgs) {
// Read spark.* system properties from executor arg
val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray)
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 9ae47a3385..6a743bf096 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -77,12 +77,21 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
+ def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
+ def createCombiner(v: V) = ArrayBuffer(v)
+ def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+ def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
+ val bufs = combineByKey[ArrayBuffer[V]](
+ createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner)
+ bufs.asInstanceOf[RDD[(K, Seq[V])]]
+ }
+
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, defaultParallelism, partitioner)
+ createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner)
bufs.flatMapValues(buf => buf)
}