From 5a8abfb70efd89ec4120c7f78596d8b32a9f4f3d Mon Sep 17 00:00:00 2001 From: Henry Saputra Date: Sun, 12 Jan 2014 19:15:09 -0800 Subject: Address code review concerns and comments. --- core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 2 +- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 3 ++- core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 6 +++--- .../main/scala/org/apache/spark/storage/BlockManagerWorker.scala | 6 +++--- .../test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 3 ++- core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala | 5 +---- yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 +++--- .../stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 +++--- 8 files changed, 18 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index bba873a0b6..4e63117a51 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -189,7 +189,7 @@ object SparkHadoopWriter { if (path == null) { throw new IllegalArgumentException("Output path is null") } - var outputPath = new Path(path) + val outputPath = new Path(path) val fs = outputPath.getFileSystem(conf) if (outputPath == null || fs == null) { throw new IllegalArgumentException("Incorrectly formatted output path") diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 8830de7273..82527fe663 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag]( // Return an iterator that read lines from the process's stdout val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) - new Iterator[Array[Byte]] { + val stdoutIterator = new Iterator[Array[Byte]] { def next(): Array[Byte] = { val obj = _nextObj if (hasNext) { @@ -156,6 +156,7 @@ private[spark] class PythonRDD[T: ClassTag]( def hasNext = _nextObj.length != 0 } + stdoutIterator } val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index a5394a28e0..cefcc3d2d9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -295,9 +295,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc val prefPartActual = prefPart.get - if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows + if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows minPowerOfTwo // prefer balance over locality - else { + } else { prefPartActual // prefer locality over balance } } @@ -331,7 +331,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc */ def run(): Array[PartitionGroup] = { setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) - throwBalls() // assign partitions (balls) to each group (bins) + throwBalls() // assign partitions (balls) to each group (bins) getPartitions } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala index a36abe0670..42f52d7b26 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala @@ -45,7 +45,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Some(new BlockMessageArray(responseMessages).toBufferMessage) } catch { case e: Exception => logError("Exception handling buffer message", e) - return None + None } } case otherMessage: Any => { @@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging { val blockMessageArray = new BlockMessageArray(blockMessage) val resultMessage = connectionManager.sendMessageReliablySync( toConnManagerId, blockMessageArray.toBufferMessage) - return (resultMessage != None) + resultMessage != None } def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = { @@ -130,7 +130,7 @@ private[spark] object BlockManagerWorker extends Logging { return blockMessage.getData }) } - case None => logDebug("No response message received"); return null + case None => logDebug("No response message received") } null } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 14f89d50b7..f0236ef1e9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont locations: Seq[Seq[String]] = Nil ): MyRDD = { val maxPartition = numPartitions - 1 - new MyRDD(sc, dependencies) { + val newRDD = new MyRDD(sc, dependencies) { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") override def getPartitions = (0 to maxPartition).map(i => new Partition { @@ -135,6 +135,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont Nil override def toString: String = "DAGSchedulerSuiteRDD " + id } + newRDD } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala index 3880e68725..29102913c7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala @@ -42,10 +42,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage) } type MyRDD = RDD[(Int, Int)] - def makeRdd( - numPartitions: Int, - dependencies: List[Dependency[_]] - ): MyRDD = { + def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = { val maxPartition = numPartitions - 1 new MyRDD(sc, dependencies) { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e1fe09e3e2..e56bc02897 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -191,10 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } //check for ports if (srcUri.getPort() != dstUri.getPort()) { - false - } else { - true + return false } + + true } /** Copy the file into HDFS if needed. */ diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c084485734..51d9adb9d4 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -206,10 +206,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } //check for ports if (srcUri.getPort() != dstUri.getPort()) { - false - } else { - true + return false } + + true } /** Copy the file into HDFS if needed. */ -- cgit v1.2.3