diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-12-10 11:34:10 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-12-10 11:47:16 +0530 |
commit | 17db6a9041d5e83d7b6fe47f9c36758d0613fcd6 (patch) | |
tree | 6c99fe68332d7bec882be2fd5e50347c3fb3c134 /core/src/main/scala/org/apache | |
parent | c1201f47e0d44e92da42adb23d27f60d9d494642 (diff) | |
download | spark-17db6a9041d5e83d7b6fe47f9c36758d0613fcd6.tar.gz spark-17db6a9041d5e83d7b6fe47f9c36758d0613fcd6.tar.bz2 spark-17db6a9041d5e83d7b6fe47f9c36758d0613fcd6.zip |
Style fixes and addressed review comments at #221
Diffstat (limited to 'core/src/main/scala/org/apache')
10 files changed, 41 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 88a7f24884..d36e1b13a6 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -72,12 +72,11 @@ private[spark] class MapOutputTracker extends Logging { // throw a SparkException if this fails. private def askTracker(message: Any): Any = { try { - val future = if (trackerActor.isLeft ) { - trackerActor.left.get.ask(message)(timeout) - } else { - trackerActor.right.get.ask(message)(timeout) + val future = trackerActor match { + case Left(a: ActorRef) => a.ask(message)(timeout) + case Right(b: ActorSelection) => b.ask(message)(timeout) } - return Await.result(future, timeout) + Await.result(future, timeout) } catch { case e: Exception => throw new SparkException("Error communicating with MapOutputTracker", e) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 62b608c088..bcec41c439 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -17,11 +17,11 @@ package org.apache.spark -import org.apache.spark.util.Utils -import org.apache.spark.rdd.RDD - import scala.reflect.ClassTag +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + /** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index a38e32b339..6c18a3c245 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -17,23 +17,19 @@ package org.apache.spark.deploy.worker.ui -import akka.actor.ActorRef -import akka.util.Timeout +import java.io.File import scala.concurrent.duration._ -import java.io.{FileInputStream, File} - +import akka.util.Timeout import javax.servlet.http.HttpServletRequest -import org.eclipse.jetty.server.{Handler, Server} - +import org.apache.spark.Logging import org.apache.spark.deploy.worker.Worker -import org.apache.spark.{Logging} -import org.apache.spark.ui.JettyUtils +import org.apache.spark.ui.{JettyUtils, UIUtils} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils +import org.eclipse.jetty.server.{Handler, Server} /** * Web UI server for the standalone worker. diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 99ea6e8ee8..a712ef1c27 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -17,16 +17,13 @@ package org.apache.spark.rdd +import java.io.IOException + import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.{NullWritable, BytesWritable} -import org.apache.hadoop.util.ReflectionUtils -import org.apache.hadoop.fs.Path -import java.io.{File, IOException, EOFException} -import java.text.NumberFormat private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index e72f86fb13..8df8718f3b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import java.sql.{Connection, ResultSet} import scala.reflect.ClassTag + import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} import org.apache.spark.util.NextIterator diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala index eb3b19907d..8d7c288593 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala @@ -17,9 +17,10 @@ package org.apache.spark.rdd -import org.apache.spark.{Partition, TaskContext} import scala.reflect.ClassTag +import org.apache.spark.{Partition, TaskContext} + private[spark] class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U) extends RDD[U](prev) { diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index 4a465840c6..d5691f2267 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -17,9 +17,10 @@ package org.apache.spark.rdd -import org.apache.spark.{RangePartitioner, Logging} import scala.reflect.ClassTag +import org.apache.spark.{Logging, RangePartitioner} + /** * Extra functions available on RDDs of (key, value) pairs where the key is sortable through * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index 1d109a2496..3682c84598 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -17,9 +17,10 @@ package org.apache.spark.rdd -import org.apache.spark.{Dependency, Partitioner, SparkEnv, ShuffleDependency, Partition, TaskContext} import scala.reflect.ClassTag +import org.apache.spark.{Dependency, Partition, Partitioner, ShuffleDependency, + SparkEnv, TaskContext} private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { override val index = idx diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e5de16fc01..e05b842476 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -157,10 +157,9 @@ private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, Actor while (attempts < AKKA_RETRY_ATTEMPTS) { attempts += 1 try { - val future = if (driverActor.isLeft ) { - driverActor.left.get.ask(message)(timeout) - } else { - driverActor.right.get.ask(message)(timeout) + val future = driverActor match { + case Left(a: ActorRef) => a.ask(message)(timeout) + case Right(b: ActorSelection) => b.ask(message)(timeout) } val result = Await.result(future, timeout) if (result == null) { diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala index 69519860c6..bf71882ef7 100644 --- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala @@ -1,5 +1,18 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ // Must be in akka.actor package as ActorSystemImpl is protected[akka]. |