diff options
author | root <root@ip-10-191-125-131.ec2.internal> | 2011-10-17 18:07:35 +0000 |
---|---|---|
committer | root <root@ip-10-191-125-131.ec2.internal> | 2011-10-17 18:07:35 +0000 |
commit | 3a0e6c436317457c8483c7106d5ba6551b6367c4 (patch) | |
tree | 88937ec72f9b8e25d072a2842ec900ef97a5ebb6 /core | |
parent | 49505a0b0bc0292725fb84fc6af3f3370d6ea692 (diff) | |
download | spark-3a0e6c436317457c8483c7106d5ba6551b6367c4.tar.gz spark-3a0e6c436317457c8483c7106d5ba6551b6367c4.tar.bz2 spark-3a0e6c436317457c8483c7106d5ba6551b6367c4.zip |
Miscellaneous fixes:
- Executor should initialize logging properly
- groupByKey should allow custom partitioner
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/Executor.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 11 |
2 files changed, 12 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index a2af70989c..e1256a229e 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -22,6 +22,8 @@ class Executor extends org.apache.mesos.Executor with Logging { var threadPool: ExecutorService = null var env: SparkEnv = null + initLogging() + override def init(d: ExecutorDriver, args: ExecutorArgs) { // Read spark.* system properties from executor arg val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 9ae47a3385..6a743bf096 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -77,12 +77,21 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex bufs.asInstanceOf[RDD[(K, Seq[V])]] } + def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + def createCombiner(v: V) = ArrayBuffer(v) + def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 + val bufs = combineByKey[ArrayBuffer[V]]( + createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner) + bufs.asInstanceOf[RDD[(K, Seq[V])]] + } + def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, defaultParallelism, partitioner) + createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner) bufs.flatMapValues(buf => buf) } |