From 67df7f2fa2e09487fe8dcf39ab80606d95383ea5 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 5 Feb 2013 21:08:21 -0600 Subject: Add private, minor formatting. --- .../scala/spark/network/ConnectionManager.scala | 35 +++++++++------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index c7f226044d..b6ec664d7e 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -66,31 +66,28 @@ private[spark] class ConnectionManager(port: Int) extends Logging { val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort) logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id) - val thisInstance = this val selectorThread = new Thread("connection-manager-thread") { - override def run() { - thisInstance.run() - } + override def run() = ConnectionManager.this.run() } selectorThread.setDaemon(true) selectorThread.start() - def run() { + private def run() { try { while(!selectorThread.isInterrupted) { - for( (connectionManagerId, sendingConnection) <- connectionRequests) { + for ((connectionManagerId, sendingConnection) <- connectionRequests) { sendingConnection.connect() addConnection(sendingConnection) connectionRequests -= connectionManagerId } sendMessageRequests.synchronized { - while(!sendMessageRequests.isEmpty) { + while (!sendMessageRequests.isEmpty) { val (message, connection) = sendMessageRequests.dequeue connection.send(message) } } - while(!keyInterestChangeRequests.isEmpty) { + while (!keyInterestChangeRequests.isEmpty) { val (key, ops) = keyInterestChangeRequests.dequeue val connection = connectionsByKey(key) val lastOps = key.interestOps() @@ -126,14 +123,11 @@ private[spark] class ConnectionManager(port: Int) extends Logging { if (key.isValid) { if (key.isAcceptable) { acceptConnection(key) - } else - if (key.isConnectable) { + } else if (key.isConnectable) { connectionsByKey(key).asInstanceOf[SendingConnection].finishConnect() - } else - if (key.isReadable) { + } else if (key.isReadable) { connectionsByKey(key).read() - } else - if (key.isWritable) { + } else if (key.isWritable) { connectionsByKey(key).write() } } @@ -144,7 +138,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } } - def acceptConnection(key: SelectionKey) { + private def acceptConnection(key: SelectionKey) { val serverChannel = key.channel.asInstanceOf[ServerSocketChannel] val newChannel = serverChannel.accept() val newConnection = new ReceivingConnection(newChannel, selector) @@ -154,7 +148,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { logInfo("Accepted connection from [" + newConnection.remoteAddress.getAddress + "]") } - def addConnection(connection: Connection) { + private def addConnection(connection: Connection) { connectionsByKey += ((connection.key, connection)) if (connection.isInstanceOf[SendingConnection]) { val sendingConnection = connection.asInstanceOf[SendingConnection] @@ -165,7 +159,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging { connection.onClose(removeConnection) } - def removeConnection(connection: Connection) { + private def removeConnection(connection: Connection) { connectionsByKey -= connection.key if (connection.isInstanceOf[SendingConnection]) { val sendingConnection = connection.asInstanceOf[SendingConnection] @@ -222,16 +216,16 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } } - def handleConnectionError(connection: Connection, e: Exception) { + private def handleConnectionError(connection: Connection, e: Exception) { logInfo("Handling connection error on connection to " + connection.remoteConnectionManagerId) removeConnection(connection) } - def changeConnectionKeyInterest(connection: Connection, ops: Int) { + private def changeConnectionKeyInterest(connection: Connection, ops: Int) { keyInterestChangeRequests += ((connection.key, ops)) } - def receiveMessage(connection: Connection, message: Message) { + private def receiveMessage(connection: Connection, message: Message) { val connectionManagerId = ConnectionManagerId.fromSocketAddress(message.senderAddress) logDebug("Received [" + message + "] from [" + connectionManagerId + "]") val runnable = new Runnable() { @@ -351,7 +345,6 @@ private[spark] class ConnectionManager(port: Int) extends Logging { private[spark] object ConnectionManager { def main(args: Array[String]) { - val manager = new ConnectionManager(9999) manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { println("Received [" + msg + "] from [" + id + "]") -- cgit v1.2.3 From f2bc7480131c7468eb6d3bc6089a4deadf0a2a88 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 5 Feb 2013 21:23:36 -0600 Subject: Add RDD.coalesce. --- core/src/main/scala/spark/RDD.scala | 7 +++++++ core/src/main/scala/spark/api/java/JavaRDDLike.scala | 10 ++++++++++ core/src/test/scala/spark/CheckpointSuite.scala | 4 ++-- core/src/test/scala/spark/RDDSuite.scala | 8 ++++---- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 9d6ea782bd..f0bc85865c 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -20,6 +20,7 @@ import spark.partial.BoundedDouble import spark.partial.CountEvaluator import spark.partial.GroupedCountEvaluator import spark.partial.PartialResult +import spark.rdd.CoalescedRDD import spark.rdd.CartesianRDD import spark.rdd.FilteredRDD import spark.rdd.FlatMappedRDD @@ -231,6 +232,12 @@ abstract class RDD[T: ClassManifest]( def distinct(): RDD[T] = distinct(splits.size) + /** + * Return a new RDD that is reduced into `numSplits` partitions. + */ + def coalesce(numSplits: Int = sc.defaultParallelism): RDD[T] = + new CoalescedRDD(this, numSplits) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 60025b459c..295eaa57c0 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -130,6 +130,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest, other.classManifest) + /** + * Return a new RDD that is reduced into the default number of partitions. + */ + def coalesce(): RDD[T] = coalesce(rdd.context.defaultParallelism) + + /** + * Return a new RDD that is reduced into `numSplits` partitions. + */ + def coalesce(numSplits: Int): RDD[T] = rdd.coalesce(numSplits) + /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 0b74607fb8..0d08fd2396 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -114,12 +114,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } test("CoalescedRDD") { - testCheckpointing(new CoalescedRDD(_, 2)) + testCheckpointing(_.coalesce(2)) // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed // Current implementation of CoalescedRDDSplit has transient reference to parent RDD, // so only the RDD will reduce in serialized size, not the splits. - testParentCheckpointing(new CoalescedRDD(_, 2), true, false) + testParentCheckpointing(_.coalesce(2), true, false) // Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits. diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index fe7deb10d6..ffa866de75 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -122,7 +122,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val data = sc.parallelize(1 to 10, 10) - val coalesced1 = new CoalescedRDD(data, 2) + val coalesced1 = data.coalesce(2) assert(coalesced1.collect().toList === (1 to 10).toList) assert(coalesced1.glom().collect().map(_.toList).toList === List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10))) @@ -133,19 +133,19 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList === List(5, 6, 7, 8, 9)) - val coalesced2 = new CoalescedRDD(data, 3) + val coalesced2 = data.coalesce(3) assert(coalesced2.collect().toList === (1 to 10).toList) assert(coalesced2.glom().collect().map(_.toList).toList === List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10))) - val coalesced3 = new CoalescedRDD(data, 10) + val coalesced3 = data.coalesce(10) assert(coalesced3.collect().toList === (1 to 10).toList) assert(coalesced3.glom().collect().map(_.toList).toList === (1 to 10).map(x => List(x)).toList) // If we try to coalesce into more partitions than the original RDD, it should just // keep the original number of partitions. - val coalesced4 = new CoalescedRDD(data, 20) + val coalesced4 = data.coalesce(20) assert(coalesced4.collect().toList === (1 to 10).toList) assert(coalesced4.glom().collect().map(_.toList).toList === (1 to 10).map(x => List(x)).toList) -- cgit v1.2.3 From f4d43cb43e64ec3436a129cf3f7a177374451060 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 5 Feb 2013 21:26:44 -0600 Subject: Remove unneeded zipWithIndex. Also rename r->rdd and remove unneeded extra type info. --- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 4893fe8d78..021118c8ba 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -47,7 +47,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) @transient var deps_ = { val deps = new ArrayBuffer[Dependency[_]] - for ((rdd, index) <- rdds.zipWithIndex) { + for (rdd <- rdds) { if (rdd.partitioner == Some(part)) { logInfo("Adding one-to-one dependency with " + rdd) deps += new OneToOneDependency(rdd) @@ -65,12 +65,14 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) @transient var splits_ : Array[Split] = { val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { - array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) => + // Each CoGroupSplit will have a dependency per contributing RDD + array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (rdd, j) => + // Assume each RDD contributed a single dependency, and get it dependencies(j) match { case s: ShuffleDependency[_, _] => - new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep + new ShuffleCoGroupSplitDep(s.shuffleId) case _ => - new NarrowCoGroupSplitDep(r, i, r.splits(i)): CoGroupSplitDep + new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i)) } }.toList) } @@ -97,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) } } for ((dep, depNum) <- split.deps.zipWithIndex) dep match { - case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => { + case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { // Read them from the parent for ((k, v) <- rdd.iterator(itsSplit, context)) { getSeq(k.asInstanceOf[K])(depNum) += v -- cgit v1.2.3 From a9c8d53cfa0bd09565799cec88344b286d7cc436 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 5 Feb 2013 22:14:18 -0600 Subject: Clean up RDDs, mainly to use getSplits. Also made sure clearDependencies() was calling super, to ensure the getSplits/getDependencies vars in the RDD base class get cleaned up. --- core/src/main/scala/spark/RDD.scala | 1 - core/src/main/scala/spark/rdd/BlockRDD.scala | 12 ++++-------- core/src/main/scala/spark/rdd/CartesianRDD.scala | 3 ++- core/src/main/scala/spark/rdd/CheckpointRDD.scala | 4 +--- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 21 +++++++-------------- core/src/main/scala/spark/rdd/CoalescedRDD.scala | 13 ++++++------- core/src/main/scala/spark/rdd/HadoopRDD.scala | 7 ++----- core/src/main/scala/spark/rdd/NewHadoopRDD.scala | 6 ++---- core/src/main/scala/spark/rdd/SampledRDD.scala | 8 +------- core/src/main/scala/spark/rdd/UnionRDD.scala | 8 ++------ core/src/main/scala/spark/rdd/ZippedRDD.scala | 6 +++--- 11 files changed, 30 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index f0bc85865c..5f99591fd5 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -656,7 +656,6 @@ abstract class RDD[T: ClassManifest]( */ private[spark] def markCheckpointed(checkpointRDD: RDD[_]) { clearDependencies() - dependencies_ = null splits_ = null deps = null // Forget the constructor argument for dependencies too } diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala index 2c022f88e0..4214817c65 100644 --- a/core/src/main/scala/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/spark/rdd/BlockRDD.scala @@ -11,10 +11,6 @@ private[spark] class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String]) extends RDD[T](sc, Nil) { - @transient var splits_ : Array[Split] = (0 until blockIds.size).map(i => { - new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split] - }).toArray - @transient lazy val locations_ = { val blockManager = SparkEnv.get.blockManager /*val locations = blockIds.map(id => blockManager.getLocations(id))*/ @@ -22,7 +18,10 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St HashMap(blockIds.zip(locations):_*) } - override def getSplits = splits_ + override def getSplits = (0 until blockIds.size).map(i => { + new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split] + }).toArray + override def compute(split: Split, context: TaskContext): Iterator[T] = { val blockManager = SparkEnv.get.blockManager @@ -37,8 +36,5 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St override def getPreferredLocations(split: Split) = locations_(split.asInstanceOf[BlockRDDSplit].blockId) - override def clearDependencies() { - splits_ = null - } } diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala index 0f9ca06531..2f572a1941 100644 --- a/core/src/main/scala/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala @@ -35,7 +35,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( val numSplitsInRdd2 = rdd2.splits.size - override def getSplits: Array[Split] = { + override def getSplits = { // create the cross product split val array = new Array[Split](rdd1.splits.size * rdd2.splits.size) for (s1 <- rdd1.splits; s2 <- rdd2.splits) { @@ -66,6 +66,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( ) override def clearDependencies() { + super.clearDependencies() rdd1 = null rdd2 = null } diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 96b593ba7c..7cde523f11 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -20,7 +20,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) - @transient val splits_ : Array[Split] = { + override def getSplits = { val dirContents = fs.listStatus(new Path(checkpointPath)) val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted val numSplits = splitFiles.size @@ -34,8 +34,6 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri checkpointData = Some(new RDDCheckpointData[T](this)) checkpointData.get.cpFile = Some(checkpointPath) - override def getSplits = splits_ - override def getPreferredLocations(split: Split): Seq[String] = { val status = fs.getFileStatus(new Path(checkpointPath)) val locations = fs.getFileBlockLocations(status, 0, status.getLen) diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 021118c8ba..d31ce13706 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -43,26 +43,22 @@ private[spark] class CoGroupAggregator class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging { - val aggr = new CoGroupAggregator + private val aggr = new CoGroupAggregator - @transient var deps_ = { - val deps = new ArrayBuffer[Dependency[_]] - for (rdd <- rdds) { + override def getDependencies = { + rdds.map { rdd => if (rdd.partitioner == Some(part)) { logInfo("Adding one-to-one dependency with " + rdd) - deps += new OneToOneDependency(rdd) + new OneToOneDependency(rdd) } else { logInfo("Adding shuffle dependency with " + rdd) val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true) - deps += new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part) + new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part) } } - deps.toList } - override def getDependencies = deps_ - - @transient var splits_ : Array[Split] = { + override def getSplits = { val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { // Each CoGroupSplit will have a dependency per contributing RDD @@ -79,8 +75,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) array } - override def getSplits = splits_ - override val partitioner = Some(part) override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { @@ -117,8 +111,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) } override def clearDependencies() { - deps_ = null - splits_ = null + super.clearDependencies() rdds = null } } diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 4c57434b65..a1aa7a30b0 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -31,7 +31,7 @@ class CoalescedRDD[T: ClassManifest]( maxPartitions: Int) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies - override def getSplits: Array[Split] = { + override def getSplits = { val prevSplits = prev.splits if (prevSplits.length < maxPartitions) { prevSplits.map(_.index).map{idx => new CoalescedRDDSplit(idx, prev, Array(idx)) } @@ -50,14 +50,13 @@ class CoalescedRDD[T: ClassManifest]( } } - override def getDependencies: Seq[Dependency[_]] = List( - new NarrowDependency(prev) { - def getParents(id: Int): Seq[Int] = - splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices - } - ) + override def getDependencies = Seq(new NarrowDependency(prev) { + def getParents(id: Int): Seq[Int] = + splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices + }) override def clearDependencies() { + super.clearDependencies() prev = null } } diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index f547f53812..cd948de967 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -45,10 +45,9 @@ class HadoopRDD[K, V]( extends RDD[(K, V)](sc, Nil) { // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it - val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) - @transient - val splits_ : Array[Split] = { + override def getSplits = { val inputFormat = createInputFormat(conf) val inputSplits = inputFormat.getSplits(conf, minSplits) val array = new Array[Split](inputSplits.size) @@ -63,8 +62,6 @@ class HadoopRDD[K, V]( .asInstanceOf[InputFormat[K, V]] } - override def getSplits = splits_ - override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopSplit] var reader: RecordReader[K, V] = null diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index c3b155fcbd..2d000f5c68 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -29,7 +29,7 @@ class NewHadoopRDD[K, V]( with HadoopMapReduceUtil { // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it - val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) // private val serializableConf = new SerializableWritable(conf) private val jobtrackerId: String = { @@ -39,7 +39,7 @@ class NewHadoopRDD[K, V]( @transient private val jobId = new JobID(jobtrackerId, id) - @transient private val splits_ : Array[Split] = { + override def getSplits = { val inputFormat = inputFormatClass.newInstance val jobContext = newJobContext(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray @@ -50,8 +50,6 @@ class NewHadoopRDD[K, V]( result } - override def getSplits = splits_ - override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopSplit] val conf = confBroadcast.value.value diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala index e24ad23b21..81626d5009 100644 --- a/core/src/main/scala/spark/rdd/SampledRDD.scala +++ b/core/src/main/scala/spark/rdd/SampledRDD.scala @@ -19,13 +19,11 @@ class SampledRDD[T: ClassManifest]( seed: Int) extends RDD[T](prev) { - @transient var splits_ : Array[Split] = { + override def getSplits = { val rg = new Random(seed) firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt)) } - override def getSplits = splits_ - override def getPreferredLocations(split: Split) = firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev) @@ -48,8 +46,4 @@ class SampledRDD[T: ClassManifest]( firstParent[T].iterator(split.prev, context).filter(x => (rand.nextDouble <= frac)) } } - - override def clearDependencies() { - splits_ = null - } } diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala index 26a2d511f2..5ac24d2ffc 100644 --- a/core/src/main/scala/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/spark/rdd/UnionRDD.scala @@ -28,7 +28,7 @@ class UnionRDD[T: ClassManifest]( @transient var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) { // Nil since we implement getDependencies - override def getSplits: Array[Split] = { + override def getSplits = { val array = new Array[Split](rdds.map(_.splits.size).sum) var pos = 0 for (rdd <- rdds; split <- rdd.splits) { @@ -38,7 +38,7 @@ class UnionRDD[T: ClassManifest]( array } - override def getDependencies: Seq[Dependency[_]] = { + override def getDependencies = { val deps = new ArrayBuffer[Dependency[_]] var pos = 0 for (rdd <- rdds) { @@ -53,8 +53,4 @@ class UnionRDD[T: ClassManifest]( override def getPreferredLocations(s: Split): Seq[String] = s.asInstanceOf[UnionSplit[T]].preferredLocations() - - override def clearDependencies() { - rdds = null - } } diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index e5df6d8c72..a079720a93 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -29,10 +29,9 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( sc: SparkContext, var rdd1: RDD[T], var rdd2: RDD[U]) - extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) - with Serializable { + extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) { - override def getSplits: Array[Split] = { + override def getSplits = { if (rdd1.splits.size != rdd2.splits.size) { throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") } @@ -54,6 +53,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( } override def clearDependencies() { + super.clearDependencies() rdd1 = null rdd2 = null } -- cgit v1.2.3 From da52b16b38a5d6200ef2c6a3b7ba28ddf35a30f8 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 9 Feb 2013 10:11:54 -0600 Subject: Remove RDD.coalesce default arguments. --- core/src/main/scala/spark/RDD.scala | 3 +-- core/src/main/scala/spark/api/java/JavaRDDLike.scala | 5 ----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 5f99591fd5..dea52eb5c6 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -235,8 +235,7 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD that is reduced into `numSplits` partitions. */ - def coalesce(numSplits: Int = sc.defaultParallelism): RDD[T] = - new CoalescedRDD(this, numSplits) + def coalesce(numSplits: Int): RDD[T] = new CoalescedRDD(this, numSplits) /** * Return a sampled subset of this RDD. diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 295eaa57c0..d3a4b62553 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -130,11 +130,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest, other.classManifest) - /** - * Return a new RDD that is reduced into the default number of partitions. - */ - def coalesce(): RDD[T] = coalesce(rdd.context.defaultParallelism) - /** * Return a new RDD that is reduced into `numSplits` partitions. */ -- cgit v1.2.3 From 2a18cd826c42d7c6b35eaedde1e4c423b6a1b1e5 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 9 Feb 2013 10:12:04 -0600 Subject: Add back return types. --- core/src/main/scala/spark/rdd/BlockRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/CartesianRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/CheckpointRDD.scala | 2 +- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/CoalescedRDD.scala | 12 +++++++----- core/src/main/scala/spark/rdd/FilteredRDD.scala | 2 +- core/src/main/scala/spark/rdd/FlatMappedRDD.scala | 2 +- core/src/main/scala/spark/rdd/GlommedRDD.scala | 2 +- core/src/main/scala/spark/rdd/HadoopRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/MapPartitionsRDD.scala | 2 +- .../src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala | 2 +- core/src/main/scala/spark/rdd/MappedRDD.scala | 2 +- core/src/main/scala/spark/rdd/NewHadoopRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/PartitionPruningRDD.scala | 2 +- core/src/main/scala/spark/rdd/PipedRDD.scala | 2 +- core/src/main/scala/spark/rdd/SampledRDD.scala | 6 +++--- core/src/main/scala/spark/rdd/ShuffledRDD.scala | 4 +++- core/src/main/scala/spark/rdd/UnionRDD.scala | 4 ++-- core/src/main/scala/spark/rdd/ZippedRDD.scala | 2 +- 19 files changed, 35 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala index 4214817c65..17989c5ce5 100644 --- a/core/src/main/scala/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/spark/rdd/BlockRDD.scala @@ -18,7 +18,7 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St HashMap(blockIds.zip(locations):_*) } - override def getSplits = (0 until blockIds.size).map(i => { + override def getSplits: Array[Split] = (0 until blockIds.size).map(i => { new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split] }).toArray @@ -33,7 +33,7 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St } } - override def getPreferredLocations(split: Split) = + override def getPreferredLocations(split: Split): Seq[String] = locations_(split.asInstanceOf[BlockRDDSplit].blockId) } diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala index 2f572a1941..41cbbd0093 100644 --- a/core/src/main/scala/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala @@ -35,7 +35,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( val numSplitsInRdd2 = rdd2.splits.size - override def getSplits = { + override def getSplits: Array[Split] = { // create the cross product split val array = new Array[Split](rdd1.splits.size * rdd2.splits.size) for (s1 <- rdd1.splits; s2 <- rdd2.splits) { @@ -45,7 +45,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( array } - override def getPreferredLocations(split: Split) = { + override def getPreferredLocations(split: Split): Seq[String] = { val currSplit = split.asInstanceOf[CartesianSplit] rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2) } diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 7cde523f11..3558d4673f 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -20,7 +20,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) - override def getSplits = { + override def getSplits: Array[Split] = { val dirContents = fs.listStatus(new Path(checkpointPath)) val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted val numSplits = splitFiles.size diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index d31ce13706..0a1e2cbee0 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -45,7 +45,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) private val aggr = new CoGroupAggregator - override def getDependencies = { + override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd => if (rdd.partitioner == Some(part)) { logInfo("Adding one-to-one dependency with " + rdd) @@ -58,7 +58,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) } } - override def getSplits = { + override def getSplits: Array[Split] = { val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { // Each CoGroupSplit will have a dependency per contributing RDD diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index a1aa7a30b0..fcd26da43a 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -31,7 +31,7 @@ class CoalescedRDD[T: ClassManifest]( maxPartitions: Int) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies - override def getSplits = { + override def getSplits: Array[Split] = { val prevSplits = prev.splits if (prevSplits.length < maxPartitions) { prevSplits.map(_.index).map{idx => new CoalescedRDDSplit(idx, prev, Array(idx)) } @@ -50,10 +50,12 @@ class CoalescedRDD[T: ClassManifest]( } } - override def getDependencies = Seq(new NarrowDependency(prev) { - def getParents(id: Int): Seq[Int] = - splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices - }) + override def getDependencies: Seq[Dependency[_]] = { + Seq(new NarrowDependency(prev) { + def getParents(id: Int): Seq[Int] = + splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices + }) + } override def clearDependencies() { super.clearDependencies() diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala index 6dbe235bd9..93e398ea2b 100644 --- a/core/src/main/scala/spark/rdd/FilteredRDD.scala +++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala @@ -7,7 +7,7 @@ private[spark] class FilteredRDD[T: ClassManifest]( f: T => Boolean) extends RDD[T](prev) { - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override val partitioner = prev.partitioner // Since filter cannot change a partition's keys diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala index 1b604c66e2..8c2a610593 100644 --- a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala +++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala @@ -9,7 +9,7 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( f: T => TraversableOnce[U]) extends RDD[U](prev) { - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context).flatMap(f) diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala index 051bffed19..70b9b4e34e 100644 --- a/core/src/main/scala/spark/rdd/GlommedRDD.scala +++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala @@ -5,7 +5,7 @@ import spark.{RDD, Split, TaskContext} private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev) { - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override def compute(split: Split, context: TaskContext) = Array(firstParent[T].iterator(split, context).toArray).iterator diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index cd948de967..854993737b 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -47,7 +47,7 @@ class HadoopRDD[K, V]( // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) - override def getSplits = { + override def getSplits: Array[Split] = { val inputFormat = createInputFormat(conf) val inputSplits = inputFormat.getSplits(conf, minSplits) val array = new Array[Split](inputSplits.size) @@ -106,7 +106,7 @@ class HadoopRDD[K, V]( } } - override def getPreferredLocations(split: Split) = { + override def getPreferredLocations(split: Split): Seq[String] = { // TODO: Filtering out "localhost" in case of file:// URLs val hadoopSplit = split.asInstanceOf[HadoopSplit] hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost") diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala index 073f7d7d2a..7b0b4525c7 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala @@ -13,7 +13,7 @@ class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override def compute(split: Split, context: TaskContext) = f(firstParent[T].iterator(split, context)) diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala index 2ddc3d01b6..c6dc1080a9 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala @@ -15,7 +15,7 @@ class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest]( preservesPartitioning: Boolean ) extends RDD[U](prev) { - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override val partitioner = if (preservesPartitioning) prev.partitioner else None diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala index 5466c9c657..6074f411e3 100644 --- a/core/src/main/scala/spark/rdd/MappedRDD.scala +++ b/core/src/main/scala/spark/rdd/MappedRDD.scala @@ -6,7 +6,7 @@ private[spark] class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U) extends RDD[U](prev) { - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context).map(f) diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index 2d000f5c68..345ae79d74 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -39,7 +39,7 @@ class NewHadoopRDD[K, V]( @transient private val jobId = new JobID(jobtrackerId, id) - override def getSplits = { + override def getSplits: Array[Split] = { val inputFormat = inputFormatClass.newInstance val jobContext = newJobContext(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray @@ -83,7 +83,7 @@ class NewHadoopRDD[K, V]( } } - override def getPreferredLocations(split: Split) = { + override def getPreferredLocations(split: Split): Seq[String] = { val theSplit = split.asInstanceOf[NewHadoopSplit] theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost") } diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala index a50ce75171..d1553181c1 100644 --- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala @@ -37,6 +37,6 @@ class PartitionPruningRDD[T: ClassManifest]( override def compute(split: Split, context: TaskContext) = firstParent[T].iterator( split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context) - override protected def getSplits = + override protected def getSplits: Array[Split] = getDependencies.head.asInstanceOf[PruneDependency[T]].partitions } diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala index 6631f83510..56032a8659 100644 --- a/core/src/main/scala/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/spark/rdd/PipedRDD.scala @@ -27,7 +27,7 @@ class PipedRDD[T: ClassManifest]( // using a standard StringTokenizer (i.e. by spaces) def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command)) - override def getSplits = firstParent[T].splits + override def getSplits: Array[Split] = firstParent[T].splits override def compute(split: Split, context: TaskContext): Iterator[String] = { val pb = new ProcessBuilder(command) diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala index 81626d5009..f2a144e2e0 100644 --- a/core/src/main/scala/spark/rdd/SampledRDD.scala +++ b/core/src/main/scala/spark/rdd/SampledRDD.scala @@ -19,15 +19,15 @@ class SampledRDD[T: ClassManifest]( seed: Int) extends RDD[T](prev) { - override def getSplits = { + override def getSplits: Array[Split] = { val rg = new Random(seed) firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt)) } - override def getPreferredLocations(split: Split) = + override def getPreferredLocations(split: Split): Seq[String] = firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev) - override def compute(splitIn: Split, context: TaskContext) = { + override def compute(splitIn: Split, context: TaskContext): Iterator[T] = { val split = splitIn.asInstanceOf[SampledRDDSplit] if (withReplacement) { // For large datasets, the expected number of occurrences of each element in a sample with diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index d396478673..bf69b5150b 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -22,7 +22,9 @@ class ShuffledRDD[K, V]( override val partitioner = Some(part) - override def getSplits = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) + override def getSplits: Array[Split] = { + Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) + } override def compute(split: Split, context: TaskContext): Iterator[(K, V)] = { val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala index 5ac24d2ffc..ebc0068228 100644 --- a/core/src/main/scala/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/spark/rdd/UnionRDD.scala @@ -28,7 +28,7 @@ class UnionRDD[T: ClassManifest]( @transient var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) { // Nil since we implement getDependencies - override def getSplits = { + override def getSplits: Array[Split] = { val array = new Array[Split](rdds.map(_.splits.size).sum) var pos = 0 for (rdd <- rdds; split <- rdd.splits) { @@ -38,7 +38,7 @@ class UnionRDD[T: ClassManifest]( array } - override def getDependencies = { + override def getDependencies: Seq[Dependency[_]] = { val deps = new ArrayBuffer[Dependency[_]] var pos = 0 for (rdd <- rdds) { diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index a079720a93..1ce70268bb 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -31,7 +31,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( var rdd2: RDD[U]) extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) { - override def getSplits = { + override def getSplits: Array[Split] = { if (rdd1.splits.size != rdd2.splits.size) { throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") } -- cgit v1.2.3 From fb7599870f4e3ee4e5a1e3c6e74ac2eaa2aaabf0 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 9 Feb 2013 16:10:52 -0600 Subject: Fix JavaRDDLike.coalesce return type. --- core/src/main/scala/spark/api/java/JavaRDDLike.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index d3a4b62553..9e52c224dd 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -133,7 +133,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround /** * Return a new RDD that is reduced into `numSplits` partitions. */ - def coalesce(numSplits: Int): RDD[T] = rdd.coalesce(numSplits) + def coalesce(numSplits: Int): JavaRDD[T] = rdd.coalesce(numSplits) /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements -- cgit v1.2.3 From 4619ee0787066da15628970bd55cb8cec31a372c Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 9 Feb 2013 20:05:42 -0600 Subject: Move JavaRDDLike.coalesce into the right places. --- core/src/main/scala/spark/api/java/JavaDoubleRDD.scala | 5 +++++ core/src/main/scala/spark/api/java/JavaPairRDD.scala | 5 +++++ core/src/main/scala/spark/api/java/JavaRDD.scala | 5 +++++ core/src/main/scala/spark/api/java/JavaRDDLike.scala | 5 ----- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index 843e1bd18b..2810631b41 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -52,6 +52,11 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD = fromRDD(srdd.filter(x => f(x).booleanValue())) + /** + * Return a new RDD that is reduced into `numSplits` partitions. + */ + def coalesce(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numSplits)) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 8ce32e0e2f..8a123bdb47 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -62,6 +62,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue())) + /** + * Return a new RDD that is reduced into `numSplits` partitions. + */ + def coalesce(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numSplits)) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index ac31350ec3..23e7ae2726 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -38,6 +38,11 @@ JavaRDDLike[T, JavaRDD[T]] { def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] = wrapRDD(rdd.filter((x => f(x).booleanValue()))) + /** + * Return a new RDD that is reduced into `numSplits` partitions. + */ + def coalesce(numSplits: Int): JavaRDD[T] = rdd.coalesce(numSplits) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 9e52c224dd..60025b459c 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -130,11 +130,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest, other.classManifest) - /** - * Return a new RDD that is reduced into `numSplits` partitions. - */ - def coalesce(numSplits: Int): JavaRDD[T] = rdd.coalesce(numSplits) - /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. -- cgit v1.2.3