aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenry Saputra <hsaputra@apache.org>2014-01-12 19:15:09 -0800
committerHenry Saputra <hsaputra@apache.org>2014-01-12 19:15:09 -0800
commit5a8abfb70efd89ec4120c7f78596d8b32a9f4f3d (patch)
tree216544d119cda6e775e97b3c6eb47be3e45e89d7
parentf1c5eca494f798fc22c46d245435381a89098fe4 (diff)
downloadspark-5a8abfb70efd89ec4120c7f78596d8b32a9f4f3d.tar.gz
spark-5a8abfb70efd89ec4120c7f78596d8b32a9f4f3d.tar.bz2
spark-5a8abfb70efd89ec4120c7f78596d8b32a9f4f3d.zip
Address code review concerns and comments.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala5
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala6
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala6
8 files changed, 18 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index bba873a0b6..4e63117a51 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -189,7 +189,7 @@ object SparkHadoopWriter {
if (path == null) {
throw new IllegalArgumentException("Output path is null")
}
- var outputPath = new Path(path)
+ val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf)
if (outputPath == null || fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8830de7273..82527fe663 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag](
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
- new Iterator[Array[Byte]] {
+ val stdoutIterator = new Iterator[Array[Byte]] {
def next(): Array[Byte] = {
val obj = _nextObj
if (hasNext) {
@@ -156,6 +156,7 @@ private[spark] class PythonRDD[T: ClassTag](
def hasNext = _nextObj.length != 0
}
+ stdoutIterator
}
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index a5394a28e0..cefcc3d2d9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -295,9 +295,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val prefPartActual = prefPart.get
- if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows
+ if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows
minPowerOfTwo // prefer balance over locality
- else {
+ } else {
prefPartActual // prefer locality over balance
}
}
@@ -331,7 +331,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
*/
def run(): Array[PartitionGroup] = {
setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
- throwBalls() // assign partitions (balls) to each group (bins)
+ throwBalls() // assign partitions (balls) to each group (bins)
getPartitions
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index a36abe0670..42f52d7b26 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -45,7 +45,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
Some(new BlockMessageArray(responseMessages).toBufferMessage)
} catch {
case e: Exception => logError("Exception handling buffer message", e)
- return None
+ None
}
}
case otherMessage: Any => {
@@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging {
val blockMessageArray = new BlockMessageArray(blockMessage)
val resultMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage)
- return (resultMessage != None)
+ resultMessage != None
}
def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
@@ -130,7 +130,7 @@ private[spark] object BlockManagerWorker extends Logging {
return blockMessage.getData
})
}
- case None => logDebug("No response message received"); return null
+ case None => logDebug("No response message received")
}
null
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 14f89d50b7..f0236ef1e9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
locations: Seq[Seq[String]] = Nil
): MyRDD = {
val maxPartition = numPartitions - 1
- new MyRDD(sc, dependencies) {
+ val newRDD = new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
override def getPartitions = (0 to maxPartition).map(i => new Partition {
@@ -135,6 +135,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
Nil
override def toString: String = "DAGSchedulerSuiteRDD " + id
}
+ newRDD
}
/**
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 3880e68725..29102913c7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -42,10 +42,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage)
}
type MyRDD = RDD[(Int, Int)]
- def makeRdd(
- numPartitions: Int,
- dependencies: List[Dependency[_]]
- ): MyRDD = {
+ def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = {
val maxPartition = numPartitions - 1
new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e1fe09e3e2..e56bc02897 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -191,10 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
//check for ports
if (srcUri.getPort() != dstUri.getPort()) {
- false
- } else {
- true
+ return false
}
+
+ true
}
/** Copy the file into HDFS if needed. */
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index c084485734..51d9adb9d4 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -206,10 +206,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
//check for ports
if (srcUri.getPort() != dstUri.getPort()) {
- false
- } else {
- true
+ return false
}
+
+ true
}
/** Copy the file into HDFS if needed. */