diff options
-rw-r--r-- | core/src/main/scala/spark/Aggregator.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/CoGroupedRDD.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/DAGScheduler.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/FetchFailedException.scala | 3 | ||||
-rw-r--r-- | core/src/main/scala/spark/HadoopRDD.scala | 16 |
5 files changed, 21 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index a721a136b2..6f99270b1e 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -3,5 +3,5 @@ package spark class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, - val mergeCombiners: (C, C) => C - ) extends Serializable
\ No newline at end of file + val mergeCombiners: (C, C) => C) + extends Serializable diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index ed51f5ae47..93f453bc5e 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -15,11 +15,12 @@ class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with override def hashCode(): Int = idx } -class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] ( +class CoGroupAggregator + extends Aggregator[Any, Any, ArrayBuffer[Any]]( { x => ArrayBuffer(x) }, { (b, x) => b += x }, - { (b1, b2) => b1 ++ b2 } - ) with Serializable + { (b1, b2) => b1 ++ b2 }) + with Serializable class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging { diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala index fc9e6581bf..8d4116b987 100644 --- a/core/src/main/scala/spark/DAGScheduler.scala +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -119,8 +119,10 @@ private trait DAGScheduler extends Scheduler with Logging { cacheTracker.registerRDD(r.id, r.splits.size) for (dep <- r.dependencies) { dep match { - case shufDep: ShuffleDependency[_,_,_] => parents += getShuffleMapStage(shufDep) - case _ => visit(dep.rdd) + case shufDep: ShuffleDependency[_,_,_] => + parents += getShuffleMapStage(shufDep) + case _ => + visit(dep.rdd) } } } diff --git a/core/src/main/scala/spark/FetchFailedException.scala b/core/src/main/scala/spark/FetchFailedException.scala index db711e099c..a3c4e7873d 100644 --- a/core/src/main/scala/spark/FetchFailedException.scala +++ b/core/src/main/scala/spark/FetchFailedException.scala @@ -13,5 +13,6 @@ class FetchFailedException( override def getCause(): Throwable = cause - def toTaskEndReason: TaskEndReason = FetchFailed(serverUri, shuffleId, mapId, reduceId) + def toTaskEndReason: TaskEndReason = + FetchFailed(serverUri, shuffleId, mapId, reduceId) } diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala index 41608e5a4e..cb7923317a 100644 --- a/core/src/main/scala/spark/HadoopRDD.scala +++ b/core/src/main/scala/spark/HadoopRDD.scala @@ -1,5 +1,8 @@ package spark +import java.io.EOFException +import java.util.NoSuchElementException + import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text @@ -15,11 +18,9 @@ import org.apache.hadoop.util.ReflectionUtils /** * A Spark split class that wraps around a Hadoop InputSplit. */ -class HadoopSplit( - rddId: Int, - idx: Int, - @transient s: InputSplit) - extends Split with Serializable { +class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) + extends Split + with Serializable { val inputSplit = new SerializableWritable[InputSplit](s) @@ -91,7 +92,8 @@ class HadoopRDD[K, V]( try { finished = !reader.next(key, value) } catch { - case eofe: java.io.EOFException => finished = true + case eof: EOFException => + finished = true } gotNext = true } @@ -106,7 +108,7 @@ class HadoopRDD[K, V]( finished = !reader.next(key, value) } if (finished) { - throw new java.util.NoSuchElementException("End of stream") + throw new NoSuchElementException("End of stream") } gotNext = false (key, value) |