aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/Aggregator.scala4
-rw-r--r--core/src/main/scala/spark/CoGroupedRDD.scala7
-rw-r--r--core/src/main/scala/spark/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/spark/FetchFailedException.scala3
-rw-r--r--core/src/main/scala/spark/HadoopRDD.scala16
5 files changed, 21 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index a721a136b2..6f99270b1e 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -3,5 +3,5 @@ package spark
class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
- val mergeCombiners: (C, C) => C
- ) extends Serializable \ No newline at end of file
+ val mergeCombiners: (C, C) => C)
+ extends Serializable
diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala
index ed51f5ae47..93f453bc5e 100644
--- a/core/src/main/scala/spark/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/CoGroupedRDD.scala
@@ -15,11 +15,12 @@ class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with
override def hashCode(): Int = idx
}
-class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] (
+class CoGroupAggregator
+ extends Aggregator[Any, Any, ArrayBuffer[Any]](
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },
- { (b1, b2) => b1 ++ b2 }
- ) with Serializable
+ { (b1, b2) => b1 ++ b2 })
+ with Serializable
class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging {
diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala
index fc9e6581bf..8d4116b987 100644
--- a/core/src/main/scala/spark/DAGScheduler.scala
+++ b/core/src/main/scala/spark/DAGScheduler.scala
@@ -119,8 +119,10 @@ private trait DAGScheduler extends Scheduler with Logging {
cacheTracker.registerRDD(r.id, r.splits.size)
for (dep <- r.dependencies) {
dep match {
- case shufDep: ShuffleDependency[_,_,_] => parents += getShuffleMapStage(shufDep)
- case _ => visit(dep.rdd)
+ case shufDep: ShuffleDependency[_,_,_] =>
+ parents += getShuffleMapStage(shufDep)
+ case _ =>
+ visit(dep.rdd)
}
}
}
diff --git a/core/src/main/scala/spark/FetchFailedException.scala b/core/src/main/scala/spark/FetchFailedException.scala
index db711e099c..a3c4e7873d 100644
--- a/core/src/main/scala/spark/FetchFailedException.scala
+++ b/core/src/main/scala/spark/FetchFailedException.scala
@@ -13,5 +13,6 @@ class FetchFailedException(
override def getCause(): Throwable = cause
- def toTaskEndReason: TaskEndReason = FetchFailed(serverUri, shuffleId, mapId, reduceId)
+ def toTaskEndReason: TaskEndReason =
+ FetchFailed(serverUri, shuffleId, mapId, reduceId)
}
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala
index 41608e5a4e..cb7923317a 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -1,5 +1,8 @@
package spark
+import java.io.EOFException
+import java.util.NoSuchElementException
+
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
@@ -15,11 +18,9 @@ import org.apache.hadoop.util.ReflectionUtils
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
-class HadoopSplit(
- rddId: Int,
- idx: Int,
- @transient s: InputSplit)
- extends Split with Serializable {
+class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
+ extends Split
+ with Serializable {
val inputSplit = new SerializableWritable[InputSplit](s)
@@ -91,7 +92,8 @@ class HadoopRDD[K, V](
try {
finished = !reader.next(key, value)
} catch {
- case eofe: java.io.EOFException => finished = true
+ case eof: EOFException =>
+ finished = true
}
gotNext = true
}
@@ -106,7 +108,7 @@ class HadoopRDD[K, V](
finished = !reader.next(key, value)
}
if (finished) {
- throw new java.util.NoSuchElementException("End of stream")
+ throw new NoSuchElementException("End of stream")
}
gotNext = false
(key, value)