aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/FetchFailedException.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/network/Connection.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/network/SenderTest.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobResult.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/Serializer.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/CompletionIterator.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/Distribution.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/util/StatCounter.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/Vector.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/BitSet.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala13
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala19
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala3
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala14
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala3
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala4
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala25
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala7
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala19
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala23
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala31
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala20
-rw-r--r--project/SparkBuild.scala4
-rw-r--r--project/build.properties2
-rw-r--r--project/plugins.sbt11
-rw-r--r--project/project/SparkPluginBuild.scala2
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala15
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala2
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala2
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala2
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala2
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkImports.scala2
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala2
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala2
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala2
-rw-r--r--scalastyle-config.xml126
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala17
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala6
119 files changed, 795 insertions, 460 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 44e26bbb9e..281216612f 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -28,21 +28,22 @@ object Bagel extends Logging {
/**
* Runs a Bagel program.
* @param sc [[org.apache.spark.SparkContext]] to use for the program.
- * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
- * the vertex id.
- * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
- * empty array, i.e. sc.parallelize(Array[K, Message]()).
- * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
- * message before sending (which often involves network I/O).
- * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
- * and provides the result to each vertex in the next superstep.
+ * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
+ * Key will be the vertex id.
+ * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
+ * this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
+ * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
+ * given vertex into one message before sending (which often involves network I/O).
+ * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
+ * after each superstep and provides the result to each vertex in the next
+ * superstep.
* @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
- * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
- * Defaults to caching in memory.
- * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
- * optional Aggregator and the current superstep,
+ * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
+ * intermediate RDDs in each superstep. Defaults to caching in memory.
+ * @param compute function that takes a Vertex, optional set of (possibly combined) messages to
+ * the Vertex, optional Aggregator and the current superstep,
* and returns a set of (Vertex, outgoing Messages) pairs
* @tparam K key
* @tparam V vertex type
@@ -71,7 +72,7 @@ object Bagel extends Logging {
var msgs = messages
var noActivity = false
do {
- logInfo("Starting superstep "+superstep+".")
+ logInfo("Starting superstep " + superstep + ".")
val startTime = System.currentTimeMillis
val aggregated = agg(verts, aggregator)
@@ -97,7 +98,8 @@ object Bagel extends Logging {
verts
}
- /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default storage level */
+ /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default
+ * storage level */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -106,8 +108,8 @@ object Bagel extends Logging {
partitioner: Partitioner,
numPartitions: Int
)(
- compute: (V, Option[C], Int) => (V, Array[M])
- ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+ compute: (V, Option[C], Int) => (V, Array[M])): RDD[(K, V)] = run(sc, vertices, messages,
+ combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
@@ -127,8 +129,8 @@ object Bagel extends Logging {
}
/**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]]
- * and default storage level
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
+ * [[org.apache.spark.HashPartitioner]] and default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
@@ -138,9 +140,13 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
- ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+ ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions,
+ DEFAULT_STORAGE_LEVEL)(compute)
- /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/
+ /**
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
+ * default [[org.apache.spark.HashPartitioner]]
+ */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -158,7 +164,8 @@ object Bagel extends Logging {
}
/**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]],
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
+ * default [[org.apache.spark.HashPartitioner]],
* [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -171,7 +178,8 @@ object Bagel extends Logging {
): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
/**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]]
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
+ * the default [[org.apache.spark.HashPartitioner]]
* and [[org.apache.spark.bagel.DefaultCombiner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -227,8 +235,9 @@ object Bagel extends Logging {
})
numMsgs += newMsgs.size
- if (newVert.active)
+ if (newVert.active) {
numActiveVerts += 1
+ }
Some((newVert, newMsgs))
}.persist(storageLevel)
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 8e5dd8a850..15a0d24fd9 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -31,8 +31,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
private val loading = new HashSet[RDDBlockId]()
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
- def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
- : Iterator[T] = {
+ def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
+ storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, split.index)
logDebug("Looking for partition " + key)
blockManager.get(key) match {
diff --git a/core/src/main/scala/org/apache/spark/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/FetchFailedException.scala
index d242047502..8eaa26bdb1 100644
--- a/core/src/main/scala/org/apache/spark/FetchFailedException.scala
+++ b/core/src/main/scala/org/apache/spark/FetchFailedException.scala
@@ -25,7 +25,8 @@ private[spark] class FetchFailedException(
cause: Throwable)
extends Exception {
- def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, cause: Throwable) =
+ def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
+ cause: Throwable) =
this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
cause)
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 30d182b008..8d6db0fca2 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
+import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 566472e597..25f7a5ed1c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -63,9 +63,9 @@ import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, Metada
*/
class SparkContext(
config: SparkConf,
- // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
- // too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
- // a map from hostname to a list of input format splits on the host.
+ // This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
+ // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
+ // contains a map from hostname to a list of input format splits on the host.
val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
extends Logging {
@@ -552,10 +552,11 @@ class SparkContext(
/**
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
- * BytesWritable values that contain a serialized partition. This is still an experimental storage
- * format and may not be supported exactly as is in future Spark releases. It will also be pretty
- * slow if you use the default serializer (Java serialization), though the nice thing about it is
- * that there's very little effort required to save arbitrary objects.
+ * BytesWritable values that contain a serialized partition. This is still an experimental
+ * storage format and may not be supported exactly as is in future Spark releases. It will also
+ * be pretty slow if you use the default serializer (Java serialization),
+ * though the nice thing about it is that there's very little effort required to save arbitrary
+ * objects.
*/
def objectFile[T: ClassTag](
path: String,
@@ -1043,7 +1044,7 @@ object SparkContext {
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
- def zero(initialValue: Long) = 0l
+ def zero(initialValue: Long) = 0L
}
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
@@ -1109,7 +1110,8 @@ object SparkContext {
implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
- implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
+ implicit def booleanWritableConverter() =
+ simpleWritableConverter[Boolean, BooleanWritable](_.get)
implicit def bytesWritableConverter() = {
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
@@ -1258,7 +1260,8 @@ object SparkContext {
case "yarn-client" =>
val scheduler = try {
- val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
+ val clazz =
+ Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
@@ -1269,7 +1272,8 @@ object SparkContext {
}
val backend = try {
- val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
+ val clazz =
+ Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ed788560e7..6ae020f6a2 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -96,7 +96,7 @@ object SparkEnv extends Logging {
@volatile private var lastSetSparkEnv : SparkEnv = _
def set(e: SparkEnv) {
- lastSetSparkEnv = e
+ lastSetSparkEnv = e
env.set(e)
}
@@ -112,7 +112,7 @@ object SparkEnv extends Logging {
* Returns the ThreadLocal SparkEnv.
*/
def getThreadLocal: SparkEnv = {
- env.get()
+ env.get()
}
private[spark] def create(
@@ -168,7 +168,8 @@ object SparkEnv extends Logging {
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf)), conf)
- val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
+ val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
+ serializer, conf)
val connectionManager = blockManager.connectionManager
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index b0dedc6f4e..33737e1960 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -148,8 +148,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def sum(): Double = srdd.sum()
/**
- * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
- * of the RDD's elements in one operation.
+ * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and
+ * count of the RDD's elements in one operation.
*/
def stats(): StatCounter = srdd.stats()
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index f430a33db1..5b1bf9476e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -88,7 +88,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
- def distinct(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numPartitions))
+ def distinct(numPartitions: Int): JavaPairRDD[K, V] =
+ new JavaPairRDD[K, V](rdd.distinct(numPartitions))
/**
* Return a new RDD containing only the elements that satisfy a predicate.
@@ -210,25 +211,25 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
/**
- * Merge the values for each key using an associative function and a neutral "zero value" which may
- * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
- * list concatenation, 0 for addition, or 1 for multiplication.).
+ * Merge the values for each key using an associative function and a neutral "zero value" which
+ * may be added to the result an arbitrary number of times, and must not change the result
+ * (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
- def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
- fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
+ def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V])
+ : JavaPairRDD[K, V] = fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
/**
- * Merge the values for each key using an associative function and a neutral "zero value" which may
- * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
- * list concatenation, 0 for addition, or 1 for multiplication.).
+ * Merge the values for each key using an associative function and a neutral "zero value" which
+ * may be added to the result an arbitrary number of times, and must not change the result
+ * (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func))
/**
- * Merge the values for each key using an associative function and a neutral "zero value" which may
- * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
- * list concatenation, 0 for addition, or 1 for multiplication.).
+ * Merge the values for each key using an associative function and a neutral "zero value"
+ * which may be added to the result an arbitrary number of times, and must not change the result
+ * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue)(func))
@@ -375,7 +376,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* into `numPartitions` partitions.
*/
- def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
+ def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
+ : JavaPairRDD[K, (V, Optional[W])] = {
val joinResult = rdd.leftOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
@@ -397,7 +399,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
- def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
+ def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
+ : JavaPairRDD[K, (Optional[V], W)] = {
val joinResult = rdd.rightOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
@@ -439,8 +442,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
- def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
+ partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
/**
@@ -462,8 +465,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])]
- = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
+ def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
+ : JavaPairRDD[K, (JList[V], JList[W])] =
+ fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 4db7339e67..fcb9729c10 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -76,7 +76,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
- preservesPartitioning))
+ preservesPartitioning))
/**
* Return a new RDD by applying a function to all elements of this RDD.
@@ -134,7 +134,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = {
+ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
+ preservesPartitioning: Boolean): JavaRDD[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
}
@@ -160,16 +161,18 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = {
+ def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]],
+ preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue()))
+ new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
+ .map((x: java.lang.Double) => x.doubleValue()))
}
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean):
- JavaPairRDD[K2, V2] = {
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
+ preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
}
@@ -294,7 +297,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
- * Reduces the elements of this RDD using the specified commutative and associative binary operator.
+ * Reduces the elements of this RDD using the specified commutative and associative binary
+ * operator.
*/
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 5a426b9835..22dc9c9e2e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -362,15 +362,15 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
doubleAccumulator(initialValue)
/**
- * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" values
- * to using the `add` method. Only the master can access the accumulator's `value`.
+ * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
+ * values to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
sc.accumulator(initialValue)(accumulatorParam)
/**
- * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks can
- * "add" values with `add`. Only the master can access the accumuable's `value`.
+ * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
+ * can "add" values with `add`. Only the master can access the accumuable's `value`.
*/
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
sc.accumulable(initialValue)(param)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
index 2be4e323be..35eca62ecd 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
@@ -23,7 +23,8 @@ import org.apache.spark.Partitioner
import org.apache.spark.util.Utils
/**
- * A [[org.apache.spark.Partitioner]] that performs handling of long-valued keys, for use by the Python API.
+ * A [[org.apache.spark.Partitioner]] that performs handling of long-valued keys, for use by the
+ * Python API.
*
* Stores the unique id() of the Python-side partitioning function so that it is incorporated into
* equality comparisons. Correctness requires that the id is a unique identifier for the
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 9cbd26b607..33667a998e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -91,8 +91,9 @@ private[spark] class PythonRDD[T: ClassTag](
// Kill the Python worker process:
worker.shutdownOutput()
case e: IOException =>
- // This can happen for legitimate reasons if the Python code stops returning data before we are done
- // passing elements through, e.g., for take(). Just log a message to say it happened.
+ // This can happen for legitimate reasons if the Python code stops returning data
+ // before we are done passing elements through, e.g., for take(). Just log a message
+ // to say it happened.
logInfo("stdin writer to Python finished early")
logDebug("stdin writer to Python finished early", e)
}
@@ -132,7 +133,8 @@ private[spark] class PythonRDD[T: ClassTag](
val init = initTime - bootTime
val finish = finishTime - initTime
val total = finishTime - startTime
- logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot, init, finish))
+ logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,
+ init, finish))
read
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
// Signals that an exception has been thrown in python
@@ -184,7 +186,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
override def compute(split: Partition, context: TaskContext) =
prev.iterator(split, context).grouped(2).map {
case Seq(a, b) => (Utils.deserializeLongValue(a), b)
- case x => throw new SparkException("PairwiseRDD: unexpected value: " + x)
+ case x => throw new SparkException("PairwiseRDD: unexpected value: " + x)
}
val asJavaPairRDD : JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this)
}
@@ -274,7 +276,8 @@ private[spark] object PythonRDD {
}
-private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
+private
+class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8")
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index d351dfc1f5..ec997255d5 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -187,8 +187,9 @@ extends Logging {
val bais = new ByteArrayInputStream(byteArray)
var blockNum = (byteArray.length / BLOCK_SIZE)
- if (byteArray.length % BLOCK_SIZE != 0)
+ if (byteArray.length % BLOCK_SIZE != 0) {
blockNum += 1
+ }
var retVal = new Array[TorrentBlock](blockNum)
var blockID = 0
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index db67c6d1bb..3db970ca73 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -101,16 +101,16 @@ private[spark] class ClientArguments(args: Array[String]) {
// TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
// separately similar to in the YARN client.
val usage =
- s"""
- |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
- |Usage: DriverClient kill <active-master> <driver-id>
- |
- |Options:
- | -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
- | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
- | -s, --supervise Whether to restart the driver on failure
- | -v, --verbose Print more debugging output
- """.stripMargin
+ s"""
+ |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
+ |Usage: DriverClient kill <active-master> <driver-id>
+ |
+ |Options:
+ | -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
+ | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
+ | -s, --supervise Whether to restart the driver on failure
+ | -v, --verbose Print more debugging output
+ """.stripMargin
System.err.println(usage)
System.exit(exitCode)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 4dfb19ed8a..7de7c4864e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -1,20 +1,18 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.spark.deploy
@@ -306,7 +304,8 @@ private[spark] object FaultToleranceTest extends App with Logging {
}
}
- logInfo("Ran %s tests, %s passed and %s failed".format(numPassed+numFailed, numPassed, numFailed))
+ logInfo("Ran %s tests, %s passed and %s failed".format(numPassed + numFailed, numPassed,
+ numFailed))
}
private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index ffc0cb0903..488843a32c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -33,7 +33,8 @@ import scala.collection.mutable.ArrayBuffer
* fault recovery without spinning up a lot of processes.
*/
private[spark]
-class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
+class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int)
+ extends Logging {
private val localHostname = Utils.localHostName()
private val masterActorSystems = ArrayBuffer[ActorSystem]()
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 1415e2f3d1..8901806de9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -132,7 +132,8 @@ private[spark] class AppClient(
case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
val fullId = appId + "/" + id
- logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
+ logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
+ cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
case ExecutorUpdated(id, state, message, exitStatus) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
index 55d4ef1b31..2f2cbd182c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
@@ -33,7 +33,8 @@ private[spark] trait AppClientListener {
/** Dead means that we couldn't find any Masters to connect to, and have given up. */
def dead(): Unit
- def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
+ def executorAdded(
+ fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2ef167ffc0..82bf655212 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -149,10 +149,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def receive = {
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
- state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty)
+ state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
- else
+ } else {
RecoveryState.RECOVERING
+ }
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
@@ -165,7 +166,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
System.exit(0)
}
- case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => {
+ case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress)
+ => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
@@ -181,9 +183,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
schedule()
} else {
val workerAddress = worker.actor.path.address
- logWarning("Worker registration failed. Attempted to re-register worker at same address: " +
- workerAddress)
- sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)
+ logWarning("Worker registration failed. Attempted to re-register worker at same " +
+ "address: " + workerAddress)
+ sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ + workerAddress)
}
}
}
@@ -641,8 +644,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
worker.id, WORKER_TIMEOUT/1000))
removeWorker(worker)
} else {
- if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
+ if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index a9af8df552..64ecf22399 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -57,7 +57,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
val completedApps = state.completedApps.sortBy(_.endTime).reverse
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
- val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory", "Main Class")
+ val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory",
+ "Main Class")
val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse
val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse
@@ -103,13 +104,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</div>
<div>
- {if (hasDrivers)
- <div class="row-fluid">
- <div class="span12">
- <h4> Running Drivers </h4>
- {activeDriversTable}
- </div>
- </div>
+ {if (hasDrivers) {
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Running Drivers </h4>
+ {activeDriversTable}
+ </div>
+ </div>
+ }
}
</div>
@@ -121,13 +123,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</div>
<div>
- {if (hasDrivers)
- <div class="row-fluid">
- <div class="span12">
- <h4> Completed Drivers </h4>
- {completedDriversTable}
- </div>
- </div>
+ {if (hasDrivers) {
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Completed Drivers </h4>
+ {completedDriversTable}
+ </div>
+ </div>
+ }
}
</div>;
@@ -175,7 +178,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<tr>
<td>{driver.id} </td>
<td>{driver.submitDate}</td>
- <td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}</td>
+ <td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}
+ </td>
<td>{driver.state}</td>
<td sorttable_customkey={driver.desc.cores.toString}>
{driver.desc.cores}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 460883ec7a..f411eb9cec 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -49,7 +49,8 @@ object CommandUtils extends Logging {
val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command)
.map(p => List("-Djava.library.path=" + p))
.getOrElse(Nil)
- val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+ val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS"))
+ .map(Utils.splitCommandString).getOrElse(Nil)
val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 6f6c101547..a26e47950a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -45,4 +45,4 @@ object DriverWrapper {
System.exit(-1)
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 1dc39c450e..530c147000 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -69,4 +69,4 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor
case e => logWarning(s"Received unexpected actor system event: $e")
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 925c6fb183..3089acffb8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -84,7 +84,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
{runningExecutorTable}
</div>
</div>
-
+ // scalastyle:off
<div>
{if (hasDrivers)
<div class="row-fluid"> <!-- Running Drivers -->
@@ -113,7 +113,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
</div>
}
</div>;
-
+ // scalastyle:on
UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
workerState.host, workerState.port))
}
@@ -133,10 +133,10 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
</ul>
</td>
<td>
- <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
- .format(executor.appId, executor.execId)}>stdout</a>
- <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
- .format(executor.appId, executor.execId)}>stderr</a>
+ <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
+ .format(executor.appId, executor.execId)}>stdout</a>
+ <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
+ .format(executor.appId, executor.execId)}>stderr</a>
</td>
</tr>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index c23b75d757..86688e4424 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -187,7 +187,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val logPageLength = math.min(byteLength, maxBytes)
- val endByte = math.min(startByte+logPageLength, logLength)
+ val endByte = math.min(startByte + logPageLength, logLength)
(startByte, endByte)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f7efd74e1b..989d666f15 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -205,7 +205,7 @@ private[spark] class Executor(
}
attemptedTask = Some(task)
- logDebug("Task " + taskId +"'s epoch is " + task.epoch)
+ logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
// Run the actual task and measure its runtime.
@@ -233,7 +233,8 @@ private[spark] class Executor(
val accumUpdates = Accumulators.values
- val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
+ val directResult = new DirectTaskResult(valueBytes, accumUpdates,
+ task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
val serializedResult = {
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index e5c9bbbe28..210f3dbeeb 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -50,10 +50,11 @@ object ExecutorExitCode {
"Failed to create local directory (bad spark.local.dir?)"
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
- if (exitCode > 128)
+ if (exitCode > 128) {
" (died from signal " + (exitCode - 128) + "?)"
- else
+ } else {
""
+ }
)
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 97176e4f5b..c2e973e173 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -55,7 +55,8 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
override def getValue: Int = executor.threadPool.getPoolSize()
})
- // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
+ // Gauge got executor thread pool's largest number of threads that have ever simultaneously
+ // been in th pool
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 0c8f4662a5..455339943f 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -64,7 +64,8 @@ class TaskMetrics extends Serializable {
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
/**
- * If this task writes to shuffle output, metrics on the written shuffle data will be collected here
+ * If this task writes to shuffle output, metrics on the written shuffle data will be collected
+ * here
*/
var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 9930537b34..de233e416a 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -56,7 +56,8 @@ import org.apache.spark.metrics.source.Source
* wild card "*" can be used to replace instance name, which means all the instances will have
* this property.
*
- * [sink|source] means this property belongs to source or sink. This field can only be source or sink.
+ * [sink|source] means this property belongs to source or sink. This field can only be
+ * source or sink.
*
* [name] specify the name of sink or source, it is custom defined.
*
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index cba8477ed5..ae2007e41b 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -211,7 +211,6 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
return chunk
} else {
- /*logInfo("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() + "]")*/
message.finishTime = System.currentTimeMillis
logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() +
"] in " + message.timeTaken )
@@ -238,7 +237,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
message.startTime = System.currentTimeMillis
}
logTrace(
- "Sending chunk from [" + message+ "] to [" + getRemoteConnectionManagerId() + "]")
+ "Sending chunk from [" + message + "] to [" + getRemoteConnectionManagerId() + "]")
return chunk
} else {
message.finishTime = System.currentTimeMillis
@@ -349,8 +348,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
outbox.getChunk() match {
case Some(chunk) => {
val buffers = chunk.buffers
- // If we have 'seen' pending messages, then reset flag - since we handle that as normal
- // registering of event (below)
+ // If we have 'seen' pending messages, then reset flag - since we handle that as
+ // normal registering of event (below)
if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister()
currentBuffers ++= buffers
}
@@ -404,7 +403,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
} catch {
case e: Exception =>
- logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(), e)
+ logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(),
+ e)
callOnExceptionCallback(e)
close()
}
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index e6e01783c8..24d0a7deb5 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -65,7 +65,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
- // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
+ // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
+ // which should be executed asap
private val handleConnectExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.connect.threads.min", 1),
conf.getInt("spark.core.connection.connect.threads.max", 8),
@@ -73,8 +74,10 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
new LinkedBlockingDeque[Runnable]())
private val serverChannel = ServerSocketChannel.open()
- private val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
- private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
+ private val connectionsByKey = new HashMap[SelectionKey, Connection]
+ with SynchronizedMap[SelectionKey, Connection]
+ private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection]
+ with SynchronizedMap[ConnectionManagerId, SendingConnection]
private val messageStatuses = new HashMap[Int, MessageStatus]
private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
private val registerRequests = new SynchronizedQueue[SendingConnection]
@@ -173,7 +176,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
if (conn == null) return
// prevent other events from being triggered
- // Since we are still trying to connect, we do not need to do the additional steps in triggerWrite
+ // Since we are still trying to connect, we do not need to do the additional steps in
+ // triggerWrite
conn.changeConnectionKeyInterest(0)
handleConnectExecutor.execute(new Runnable {
@@ -188,8 +192,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
}
// fallback to previous behavior : we should not really come here since this method was
- // triggered since channel became connectable : but at times, the first finishConnect need not
- // succeed : hence the loop to retry a few 'times'.
+ // triggered since channel became connectable : but at times, the first finishConnect need
+ // not succeed : hence the loop to retry a few 'times'.
conn.finishConnect(true)
}
} )
@@ -258,8 +262,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
}
- logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() +
- "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
+ logTrace("Changed key for connection to [" +
+ connection.getRemoteConnectionManagerId() + "] changed from [" +
+ intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
}
}
} else {
@@ -282,7 +287,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
try {
selector.select()
} catch {
- // Explicitly only dealing with CancelledKeyException here since other exceptions should be dealt with differently.
+ // Explicitly only dealing with CancelledKeyException here since other exceptions
+ // should be dealt with differently.
case e: CancelledKeyException => {
// Some keys within the selectors list are invalid/closed. clear them.
val allKeys = selector.keys().iterator()
@@ -310,7 +316,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
}
if (selectedKeysCount == 0) {
- logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
+ logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size +
+ " keys")
}
if (selectorThread.isInterrupted) {
logInfo("Selector thread was interrupted!")
@@ -341,7 +348,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
throw new CancelledKeyException()
}
} catch {
- // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
+ // weird, but we saw this happening - even though key.isValid was true,
+ // key.isAcceptable would throw CancelledKeyException.
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
@@ -437,9 +445,10 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
assert (sendingConnectionManagerId == remoteConnectionManagerId)
messageStatuses.synchronized {
- for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
- logInfo("Notifying " + s)
- s.synchronized {
+ for (s <- messageStatuses.values if
+ s.connectionManagerId == sendingConnectionManagerId) {
+ logInfo("Notifying " + s)
+ s.synchronized {
s.attempted = true
s.acked = false
s.markDone()
@@ -458,7 +467,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
}
def handleConnectionError(connection: Connection, e: Exception) {
- logInfo("Handling connection error on connection to " + connection.getRemoteConnectionManagerId())
+ logInfo("Handling connection error on connection to " +
+ connection.getRemoteConnectionManagerId())
removeConnection(connection)
}
@@ -495,7 +505,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
status
}
case None => {
- throw new Exception("Could not find reference for received ack message " + message.id)
+ throw new Exception("Could not find reference for received ack message " +
+ message.id)
null
}
}
@@ -517,7 +528,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
if (ackMessage.isDefined) {
if (!ackMessage.get.isInstanceOf[BufferMessage]) {
- logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type " + ackMessage.get.getClass())
+ logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type "
+ + ackMessage.get.getClass())
} else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) {
logDebug("Response to " + bufferMessage + " does not have ack id set")
ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id
@@ -535,14 +547,16 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
def startNewConnection(): SendingConnection = {
- val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
+ val inetSocketAddress = new InetSocketAddress(connectionManagerId.host,
+ connectionManagerId.port)
val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId)
registerRequests.enqueue(newConnection)
newConnection
}
- // I removed the lookupKey stuff as part of merge ... should I re-add it ? We did not find it useful in our test-env ...
- // If we do re-add it, we should consistently use it everywhere I guess ?
+ // I removed the lookupKey stuff as part of merge ... should I re-add it ? We did not find it
+ // useful in our test-env ... If we do re-add it, we should consistently use it everywhere I
+ // guess ?
val connection = connectionsById.getOrElseUpdate(connectionManagerId, startNewConnection())
message.senderAddress = id.toSocketAddress()
logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")
@@ -558,15 +572,17 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
: Future[Option[Message]] = {
val promise = Promise[Option[Message]]
- val status = new MessageStatus(message, connectionManagerId, s => promise.success(s.ackMessage))
- messageStatuses.synchronized {
+ val status = new MessageStatus(
+ message, connectionManagerId, s => promise.success(s.ackMessage))
+ messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}
sendMessage(connectionManagerId, message)
promise.future
}
- def sendMessageReliablySync(connectionManagerId: ConnectionManagerId, message: Message): Option[Message] = {
+ def sendMessageReliablySync(connectionManagerId: ConnectionManagerId,
+ message: Message): Option[Message] = {
Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf)
}
@@ -656,7 +672,8 @@ private[spark] object ConnectionManager {
val tput = mb * 1000.0 / ms
println("--------------------------")
println("Started at " + startTime + ", finished at " + finishTime)
- println("Sent " + count + " messages of size " + size + " in " + ms + " ms (" + tput + " MB/s)")
+ println("Sent " + count + " messages of size " + size + " in " + ms + " ms " +
+ "(" + tput + " MB/s)")
println("--------------------------")
println()
}
@@ -667,7 +684,8 @@ private[spark] object ConnectionManager {
println("--------------------------")
val size = 10 * 1024 * 1024
val count = 10
- val buffers = Array.tabulate(count)(i => ByteBuffer.allocate(size * (i + 1)).put(Array.tabulate[Byte](size * (i + 1))(x => x.toByte)))
+ val buffers = Array.tabulate(count)(i => ByteBuffer.allocate(size * (i + 1)).put(
+ Array.tabulate[Byte](size * (i + 1))(x => x.toByte)))
buffers.foreach(_.flip)
val mb = buffers.map(_.remaining).reduceLeft(_ + _) / 1024.0 / 1024.0
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
index 4f5742d29b..820045aa21 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
@@ -30,14 +30,14 @@ import scala.concurrent.duration._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
- //<mesos cluster> - the master URL
- //<slaves file> - a list slaves to run connectionTest on
- //[num of tasks] - the number of parallel tasks to be initiated default is number of slave hosts
- //[size of msg in MB (integer)] - the size of messages to be sent in each task, default is 10
- //[count] - how many times to run, default is 3
- //[await time in seconds] : await time (in seconds), default is 600
+ // <mesos cluster> - the master URL <slaves file> - a list slaves to run connectionTest on
+ // [num of tasks] - the number of parallel tasks to be initiated default is number of slave
+ // hosts [size of msg in MB (integer)] - the size of messages to be sent in each task,
+ // default is 10 [count] - how many times to run, default is 3 [await time in seconds] :
+ // await time (in seconds), default is 600
if (args.length < 2) {
- println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] [size of msg in MB (integer)] [count] [await time in seconds)] ")
+ println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] " +
+ "[size of msg in MB (integer)] [count] [await time in seconds)] ")
System.exit(1)
}
@@ -56,7 +56,8 @@ private[spark] object ConnectionManagerTest extends Logging{
val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
val count = if (args.length > 4) args(4).toInt else 3
val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
- println("Running "+count+" rounds of test: " + "parallel tasks = " + tasknum + ", msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
+ println("Running " + count + " rounds of test: " + "parallel tasks = " + tasknum + ", " +
+ "msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map(
i => SparkEnv.get.connectionManager.id).collect()
println("\nSlave ConnectionManagerIds")
@@ -76,7 +77,8 @@ private[spark] object ConnectionManagerTest extends Logging{
buffer.flip
val startTime = System.currentTimeMillis
- val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId => {
+ val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId =>
+ {
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
@@ -87,7 +89,8 @@ private[spark] object ConnectionManagerTest extends Logging{
val mb = size * results.size / 1024.0 / 1024.0
val ms = finishTime - startTime
- val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
+ val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms *
+ 1000.0) + " MB/s"
logInfo(resultStr)
resultStr
}).collect()
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index dcbd183c88..9e03956ba0 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -52,17 +52,20 @@ private[spark] object SenderTest {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
val startTime = System.currentTimeMillis
/*println("Started timer at " + startTime)*/
- val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
- case Some(response) =>
- val buffer = response.asInstanceOf[BufferMessage].buffers(0)
- new String(buffer.array)
- case None => "none"
- }
+ val responseStr =
+ manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
+ case Some(response) =>
+ val buffer = response.asInstanceOf[BufferMessage].buffers(0)
+ new String(buffer.array)
+ case None => "none"
+ }
val finishTime = System.currentTimeMillis
val mb = size / 1024.0 / 1024.0
val ms = finishTime - startTime
- /*val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"*/
- val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" + (mb / ms * 1000.0).toInt + "MB/s) | Response = " + responseStr
+ // val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms at " + (mb / ms
+ // * 1000.0) + " MB/s"
+ val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" + (mb / ms *
+ 1000.0).toInt + "MB/s) | Response = " + responseStr
println(resultStr)
})
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 30e578dd93..8f9d1d5a84 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -60,7 +60,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
checkpointData.get.cpFile = Some(checkpointPath)
override def getPreferredLocations(split: Partition): Seq[String] = {
- val status = fs.getFileStatus(new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)))
+ val status = fs.getFileStatus(new Path(checkpointPath,
+ CheckpointRDD.splitIdToFile(split.index)))
val locations = fs.getFileBlockLocations(status, 0, status.getLen)
locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index cefcc3d2d9..42e1ef8375 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -197,8 +197,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
// return the next preferredLocation of some partition of the RDD
def next(): (String, Partition) = {
- if (it.hasNext)
+ if (it.hasNext) {
it.next()
+ }
else {
it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
it.next()
@@ -290,8 +291,10 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
- if (prefPart== None) // if no preferred locations, just use basic power of two
- return minPowerOfTwo
+ if (prefPart == None) {
+ // if no preferred locations, just use basic power of two
+ return minPowerOfTwo
+ }
val prefPartActual = prefPart.get
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 688c310ee9..20713b4249 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -37,8 +37,8 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
}
/**
- * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
- * of the RDD's elements in one operation.
+ * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and
+ * count of the RDD's elements in one operation.
*/
def stats(): StatCounter = {
self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 370061492d..10d519e697 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -705,7 +705,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
- valueClass.getSimpleName+ ")")
+ valueClass.getSimpleName + ")")
val writer = new SparkHadoopWriter(conf)
writer.preSetup()
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 09d0a8189d..56c7777600 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -39,7 +39,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt
override def equals(other: Any): Boolean = other match {
- case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId && this.slice == that.slice)
+ case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId &&
+ this.slice == that.slice)
case _ => false
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 4c625d062e..f4364329a3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -23,8 +23,8 @@ import org.apache.spark.{TaskContext, OneToOneDependency, SparkContext, Partitio
/**
- * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of corresponding partitions
- * of parent RDDs.
+ * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of
+ * corresponding partitions of parent RDDs.
*/
private[spark]
class PartitionerAwareUnionRDDPartition(
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index bc688110f4..73e8769c09 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -35,10 +35,10 @@ private[spark] object CheckpointState extends Enumeration {
}
/**
- * This class contains all the information related to RDD checkpointing. Each instance of this class
- * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as,
- * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
- * of the checkpointed RDD.
+ * This class contains all the information related to RDD checkpointing. Each instance of this
+ * class is associated with a RDD. It manages process of checkpointing of the associated RDD,
+ * as well as, manages the post-checkpoint state by providing the updated partitions,
+ * iterator and preferred locations of the checkpointed RDD.
*/
private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
extends Logging with Serializable {
@@ -97,7 +97,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
throw new SparkException(
- "Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " +
+ "Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 2d1bd5b481..c9b4c768a9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -71,7 +71,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
- logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+ logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," +
+ valueClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
val jobConf = new JobConf(self.context.hadoopConfiguration)
if (!convertKey && !convertValue) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 21d16fabef..80211541a6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1082,8 +1082,9 @@ class DAGScheduler(
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocs(n.rdd, inPart)
- if (locs != Nil)
+ if (locs != Nil) {
return locs
+ }
}
case _ =>
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index cc10cc0849..23447f1bbf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConversions._
* Parses and holds information about inputFormat (and files) specified as a parameter.
*/
class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
- val path: String) extends Logging {
+ val path: String) extends Logging {
var mapreduceInputFormat: Boolean = false
var mapredInputFormat: Boolean = false
@@ -41,7 +41,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
validate()
override def toString: String = {
- "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", path : " + path
+ "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", " +
+ "path : " + path
}
override def hashCode(): Int = {
@@ -50,8 +51,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
hashCode
}
- // Since we are not doing canonicalization of path, this can be wrong : like relative vs absolute path
- // .. which is fine, this is best case effort to remove duplicates - right ?
+ // Since we are not doing canonicalization of path, this can be wrong : like relative vs
+ // absolute path .. which is fine, this is best case effort to remove duplicates - right ?
override def equals(other: Any): Boolean = other match {
case that: InputFormatInfo => {
// not checking config - that should be fine, right ?
@@ -65,22 +66,26 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
logDebug("validate InputFormatInfo : " + inputFormatClazz + ", path " + path)
try {
- if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
+ if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(
+ inputFormatClazz)) {
logDebug("inputformat is from mapreduce package")
mapreduceInputFormat = true
}
- else if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
+ else if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(
+ inputFormatClazz)) {
logDebug("inputformat is from mapred package")
mapredInputFormat = true
}
else {
throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
- " is NOT a supported input format ? does not implement either of the supported hadoop api's")
+ " is NOT a supported input format ? does not implement either of the supported hadoop " +
+ "api's")
}
}
catch {
case e: ClassNotFoundException => {
- throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz + " cannot be found ?", e)
+ throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
+ " cannot be found ?", e)
}
}
}
@@ -125,8 +130,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
}
private def findPreferredLocations(): Set[SplitInfo] = {
- logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
- ", inputFormatClazz : " + inputFormatClazz)
+ logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " +
+ mapredInputFormat + ", inputFormatClazz : " + inputFormatClazz)
if (mapreduceInputFormat) {
prefLocsFromMapreduceInputFormat()
}
@@ -150,8 +155,8 @@ object InputFormatInfo {
c) Compute rack info for each host and update rack -> count map based on (b).
d) Allocate nodes based on (c)
e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
- (even if data locality on that is very high) : this is to prevent fragility of job if a single
- (or small set of) hosts go down.
+ (even if data locality on that is very high) : this is to prevent fragility of job if a
+ single (or small set of) hosts go down.
go to (a) until required nodes are allocated.
@@ -159,7 +164,8 @@ object InputFormatInfo {
PS: I know the wording here is weird, hopefully it makes some sense !
*/
- def computePreferredLocations(formats: Seq[InputFormatInfo]): HashMap[String, HashSet[SplitInfo]] = {
+ def computePreferredLocations(formats: Seq[InputFormatInfo]): HashMap[String, HashSet[SplitInfo]]
+ = {
val nodeToSplit = new HashMap[String, HashSet[SplitInfo]]
for (inputSplit <- formats) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index f8fa5a9f7a..b909b66a5d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -45,10 +45,11 @@ class JobLogger(val user: String, val logDirName: String)
String.valueOf(System.currentTimeMillis()))
private val logDir =
- if (System.getenv("SPARK_LOG_DIR") != null)
+ if (System.getenv("SPARK_LOG_DIR") != null) {
System.getenv("SPARK_LOG_DIR")
- else
+ } else {
"/tmp/spark-%s".format(user)
+ }
private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIDToJobID = new HashMap[Int, Int]
@@ -116,7 +117,7 @@ class JobLogger(val user: String, val logDirName: String)
var writeInfo = info
if (withTime) {
val date = new Date(System.currentTimeMillis())
- writeInfo = DATE_FORMAT.format(date) + ": " +info
+ writeInfo = DATE_FORMAT.format(date) + ": " + info
}
jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
}
@@ -235,7 +236,8 @@ class JobLogger(val user: String, val logDirName: String)
* @param stage Root stage of the job
* @param indent Indent number before info, default is 0
*/
- protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) {
+ protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0)
+ {
val stageInfo = if (stage.isShuffleMap) {
"STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
index c381348a8d..d94f6ad924 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
@@ -23,4 +23,5 @@ package org.apache.spark.scheduler
private[spark] sealed trait JobResult
private[spark] case object JobSucceeded extends JobResult
-private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult
+private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage])
+ extends JobResult
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 28f3ba53b8..0544f81f1c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -36,7 +36,8 @@ private[spark] object ResultTask {
val metadataCleaner = new MetadataCleaner(
MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues, new SparkConf)
- def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
+ def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _)
+ : Array[Byte] = {
synchronized {
val old = serializedInfoCache.get(stageId).orNull
if (old != null) {
@@ -55,7 +56,8 @@ private[spark] object ResultTask {
}
}
- def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = {
+ def deserializeInfo(stageId: Int, bytes: Array[Byte])
+ : (RDD[_], (TaskContext, Iterator[_]) => _) = {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val ser = SparkEnv.get.closureSerializer.newInstance()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 3cf995ea74..a546193d5b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -148,6 +148,6 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
}
}
parentPool.addSchedulable(manager)
- logInfo("Added task set " + manager.name + " tasks to pool "+poolName)
+ logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d8e97c3b7c..d25f0a6354 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -37,8 +37,8 @@ case class SparkListenerTaskGettingResult(
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
-case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], properties: Properties = null)
- extends SparkListenerEvents
+case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int],
+ properties: Properties = null) extends SparkListenerEvents
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents
@@ -99,11 +99,14 @@ class StatsReportListener extends SparkListener with Logging {
showMillisDistribution("task runtime:", (info, _) => Some(info.duration))
//shuffle write
- showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
+ showBytesDistribution("shuffle bytes written:",
+ (_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
//fetch & io
- showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
- showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
+ showMillisDistribution("fetch wait time:",
+ (_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
+ showBytesDistribution("remote bytes read:",
+ (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
//runtime breakdown
@@ -111,8 +114,10 @@ class StatsReportListener extends SparkListener with Logging {
val runtimePcts = stageCompleted.stage.taskInfos.map{
case (info, metrics) => RuntimePercentage(info.duration, metrics)
}
- showDistribution("executor (non-fetch) time pct: ", Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
- showDistribution("fetch wait time pct: ", Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
+ showDistribution("executor (non-fetch) time pct: ",
+ Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
+ showDistribution("fetch wait time pct: ",
+ Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%")
}
@@ -147,7 +152,8 @@ private[spark] object StatsReportListener extends Logging {
logInfo("\t" + quantiles.mkString("\t"))
}
- def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String) {
+ def showDistribution(heading: String,
+ dOpt: Option[Distribution], formatNumber: Double => String) {
dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
}
@@ -156,7 +162,8 @@ private[spark] object StatsReportListener extends Logging {
showDistribution(heading, dOpt, f _)
}
- def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
+ def showDistribution(heading:String, format: String,
+ getMetric: (TaskInfo,TaskMetrics) => Option[Double])
(implicit stage: SparkListenerStageCompleted) {
showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
}
@@ -175,7 +182,8 @@ private[spark] object StatsReportListener extends Logging {
}
def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
- showDistribution(heading, dOpt, (d => StatsReportListener.millisToString(d.toLong)): Double => String)
+ showDistribution(heading, dOpt,
+ (d => StatsReportListener.millisToString(d.toLong)): Double => String)
}
def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
@@ -212,7 +220,7 @@ private object RuntimePercentage {
val denom = totalTime.toDouble
val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
val fetch = fetchTime.map{_ / denom}
- val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
+ val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom
val other = 1.0 - (exec + fetch.getOrElse(0d))
RuntimePercentage(exec, fetch, other)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 520c0b29e3..a78b0186b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -63,8 +63,9 @@ private[spark] class Stage(
def addOutputLoc(partition: Int, status: MapStatus) {
val prevList = outputLocs(partition)
outputLocs(partition) = status :: prevList
- if (prevList == Nil)
+ if (prevList == Nil) {
numAvailableOutputs += 1
+ }
}
def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index c4d1ad5733..8f320e5c7a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -29,7 +29,8 @@ import org.apache.spark.executor.TaskMetrics
*/
class StageInfo(
stage: Stage,
- val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+ val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] =
+ mutable.Buffer[(TaskInfo, TaskMetrics)]()
) {
val stageId = stage.id
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 3c22edd524..91c27d7b8e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -70,16 +70,17 @@ class TaskInfo(
def running: Boolean = !finished
def status: String = {
- if (running)
+ if (running) {
"RUNNING"
- else if (gettingResult)
+ } else if (gettingResult) {
"GET RESULT"
- else if (failed)
+ } else if (failed) {
"FAILED"
- else if (successful)
+ } else if (successful) {
"SUCCESS"
- else
+ } else {
"UNKNOWN"
+ }
}
def duration: Long = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 9d3e615826..5724ec9d1b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -35,7 +35,8 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se
/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark]
-class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any],
+ var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable {
def this() = this(null.asInstanceOf[ByteBuffer], null, null)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 17b6d97e90..1cdfed1d70 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -36,7 +36,8 @@ private[spark] trait TaskScheduler {
def start(): Unit
// Invoked after system has successfully initialized (typically in spark context).
- // Yarn uses this to bootstrap allocation of resources based on preferred locations, wait for slave registerations, etc.
+ // Yarn uses this to bootstrap allocation of resources based on preferred locations,
+ // wait for slave registerations, etc.
def postStartHook() { }
// Disconnect from the cluster.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3f0ee7a6d4..21b2ff1682 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -80,7 +80,7 @@ private[spark] class TaskSetManager(
var minShare = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
- var name = "TaskSet_"+taskSet.stageId.toString
+ var name = "TaskSet_" + taskSet.stageId.toString
var parent: Pool = null
val runningTasksSet = new HashSet[Long]
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0208388e86..78204103a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -120,7 +120,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
sender ! true
case DisassociatedEvent(_, address, _) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
+ addressToExecutorId.get(address).foreach(removeExecutor(_,
+ "remote Akka client disassociated"))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 33aac52051..04f35cca08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -51,8 +51,8 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome()
- val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
- "http://" + sc.ui.appUIAddress)
+ val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command,
+ sparkHome, "http://" + sc.ui.appUIAddress)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
@@ -84,7 +84,8 @@ private[spark] class SparkDeploySchedulerBackend(
}
}
- override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) {
+ override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
+ memory: Int) {
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index c27049bdb5..4401f6df47 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -136,7 +136,8 @@ private[spark] class CoarseMesosSchedulerBackend(
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
- "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
+ ("cd %s*; " +
+ "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
.format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c14cd47556..2d0b255385 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -60,7 +60,8 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial
try {
for (regCls <- conf.getOption("spark.kryo.registrator")) {
logDebug("Running user registrator: " + regCls)
- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
+ val reg = Class.forName(regCls, true, classLoader).newInstance()
+ .asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 9a5e3cb77e..a38a2b59db 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -27,11 +27,12 @@ import org.apache.spark.util.{NextIterator, ByteBufferInputStream}
/**
* A serializer. Because some serialization libraries are not thread safe, this class is used to
- * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual serialization and are
- * guaranteed to only be called from one thread at a time.
+ * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual
+ * serialization and are guaranteed to only be called from one thread at a time.
*
* Implementations of this trait should have a zero-arg constructor or a constructor that accepts a
- * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes precedence.
+ * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes
+ * precedence.
*/
trait Serializer {
def newInstance(): SerializerInstance
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 4fa2ab96d9..aa62ab5aba 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -76,9 +76,9 @@ object BlockFetcherIterator {
import blockManager._
- private var _remoteBytesRead = 0l
- private var _remoteFetchTime = 0l
- private var _fetchWaitTime = 0l
+ private var _remoteBytesRead = 0L
+ private var _remoteFetchTime = 0L
+ private var _fetchWaitTime = 0L
if (blocksByAddress == null) {
throw new IllegalArgumentException("BlocksByAddress is null")
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ed53558566..542deb98c1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -206,8 +206,9 @@ private[spark] class BlockManager(
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*
- * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid).
- * This ensures that update in master will compensate for the increase in memory on slave.
+ * droppedMemorySize exists to account for when block is dropped from memory to disk (so it
+ * is still valid). This ensures that update in master will compensate for the increase in
+ * memory on slave.
*/
def reportBlockStatus(blockId: BlockId, info: BlockInfo, droppedMemorySize: Long = 0L) {
val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize)
@@ -224,7 +225,8 @@ private[spark] class BlockManager(
* which will be true if the block was successfully recorded and false if
* the slave needs to re-register.
*/
- private def tryToReportBlockStatus(blockId: BlockId, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = {
+ private def tryToReportBlockStatus(blockId: BlockId, info: BlockInfo,
+ droppedMemorySize: Long = 0L): Boolean = {
val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
info.level match {
case null =>
@@ -282,14 +284,15 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
- return diskStore.getBytes(blockId) match {
+ diskStore.getBytes(blockId) match {
case Some(bytes) =>
Some(bytes)
case None =>
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
+ } else {
+ doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
}
- doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
}
private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
@@ -701,7 +704,8 @@ private[spark] class BlockManager(
diskStore.putBytes(blockId, bytes, level)
}
}
- val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
+ val droppedMemorySize =
+ if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockWasRemoved = memoryStore.remove(blockId)
if (!blockWasRemoved) {
logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 2c1a4e2f5d..893418fb8c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -61,8 +61,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
override def preStart() {
if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
import context.dispatcher
- timeoutCheckingTask = context.system.scheduler.schedule(
- 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+ timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
+ checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
}
super.preStart()
}
@@ -169,8 +169,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
- logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
- (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
+ logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 365866d1e3..7cf754fb20 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -57,9 +57,9 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val diskSpaceUsed = storageStatusList
- .flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_ + _)
- .getOrElse(0L)
+ .flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_ + _)
+ .getOrElse(0L)
diskSpaceUsed / 1024 / 1024
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index 59329361f3..5ded9ab359 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -25,7 +25,8 @@ import org.apache.spark._
import org.apache.spark.network._
private[spark]
-class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging {
+class BlockMessageArray(var blockMessages: Seq[BlockMessage])
+ extends Seq[BlockMessage] with Logging {
def this(bm: BlockMessage) = this(Array(bm))
@@ -65,7 +66,8 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
buffer.position(buffer.position() + size)
}
val finishTime = System.currentTimeMillis
- logDebug("Converted block message array from buffer message in " + (finishTime - startTime) / 1000.0 + " s")
+ logDebug("Converted block message array from buffer message in " +
+ (finishTime - startTime) / 1000.0 + " s")
this.blockMessages = newBlockMessages
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 1720007e4e..50a0cdb309 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -25,15 +25,15 @@ private[spark]
case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
blocks: Map[BlockId, BlockStatus]) {
- def memUsed() = blocks.values.map(_.memSize).reduceOption(_+_).getOrElse(0L)
+ def memUsed() = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
def memUsedByRDD(rddId: Int) =
- rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_+_).getOrElse(0L)
+ rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
- def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_+_).getOrElse(0L)
+ def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
def diskUsedByRDD(rddId: Int) =
- rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_+_).getOrElse(0L)
+ rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
def memRemaining : Long = maxMem - memUsed()
@@ -48,8 +48,9 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
extends Ordered[RDDInfo] {
override def toString = {
import Utils.bytesToString
- "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
- storageLevel.toString, numCachedPartitions, numPartitions, bytesToString(memSize), bytesToString(diskSize))
+ ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
+ "DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions,
+ numPartitions, bytesToString(memSize), bytesToString(diskSize))
}
override def compare(that: RDDInfo) = {
@@ -64,7 +65,8 @@ object StorageUtils {
/* Returns RDD-level information, compiled from a list of StorageStatus objects */
def rddInfoFromStorageStatus(storageStatusList: Seq[StorageStatus],
sc: SparkContext) : Array[RDDInfo] = {
- rddInfoFromBlockStatusList(storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus], sc)
+ rddInfoFromBlockStatusList(
+ storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus], sc)
}
/* Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */
@@ -91,7 +93,8 @@ object StorageUtils {
sc.persistentRdds.get(rddId).map { r =>
val rddName = Option(r.name).getOrElse(rddId.toString)
val rddStorageLevel = r.getStorageLevel
- RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize, diskSize)
+ RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size,
+ memSize, diskSize)
}
}.flatten.toArray
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 5573b3847b..b95c8f43b0 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -48,14 +48,16 @@ private[spark] object UIUtils {
case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li>
}
val executors = page match {
- case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li>
+ case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a>
+ </li>
case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li>
}
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
- <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} type="text/css" />
+ <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")}
+ type="text/css" />
<link rel="stylesheet" href={prependBaseUri("/static/webui.css")} type="text/css" />
<script src={prependBaseUri("/static/sorttable.js")} ></script>
<title>{sc.appName} - {title}</title>
@@ -63,7 +65,8 @@ private[spark] object UIUtils {
<body>
<div class="navbar navbar-static-top">
<div class="navbar-inner">
- <a href={prependBaseUri("/")} class="brand"><img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} /></a>
+ <a href={prependBaseUri("/")} class="brand">
+ <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} /></a>
<ul class="nav">
{jobs}
{storage}
@@ -93,7 +96,8 @@ private[spark] object UIUtils {
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
- <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} type="text/css" />
+ <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")}
+ type="text/css" />
<link rel="stylesheet" href={prependBaseUri("/static/webui.css")} type="text/css" />
<script src={prependBaseUri("/static/sorttable.js")} ></script>
<title>{title}</title>
@@ -103,7 +107,8 @@ private[spark] object UIUtils {
<div class="row-fluid">
<div class="span12">
<h3 style="vertical-align: middle; display: inline-block;">
- <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} style="margin-right: 15px;" />
+ <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}
+ style="margin-right: 15px;" />
{title}
</h3>
</div>
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 6ba15187d9..f913ee461b 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -36,7 +36,8 @@ private[spark] object UIWorkloadGenerator {
def main(args: Array[String]) {
if (args.length < 2) {
- println("usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+ println(
+ "usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
System.exit(1)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index a31a7e1d58..4e41acf023 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -51,9 +51,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = sc.getExecutorStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_+_)
- val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
+ val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
+ val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index ca5a28625b..6289f8744f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -43,7 +43,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
}
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
- val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
+ val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse,
+ parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
val pools = listener.sc.getAllPools
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index cfeeccda41..9412a48330 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -60,7 +60,10 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
case None => 0
}
<tr>
- <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>{p.name}</a></td>
+ <td>
+ <a href=
+ {"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>
+ {p.name}</a></td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
<td>{activeStages}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index cfaf121895..08107a3f62 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -64,7 +64,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
val finishedTasks = listener.stageIdToTaskInfos(stageId).filter(_._1.finished)
-
+ // scalastyle:off
val summary =
<div>
<ul class="unstyled">
@@ -96,7 +96,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
</ul>
</div>
-
+ // scalastyle:on
val taskHeaders: Seq[String] =
Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
Seq("Duration", "GC Time", "Result Ser Time") ++
@@ -105,7 +105,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
Seq("Errors")
- val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
+ val taskTable = listingTable(
+ taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
@@ -117,8 +118,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
else {
val serializationTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.resultSerializationTime.toDouble}
- val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
- ms => parent.formatDuration(ms.toLong))
+ val serializationQuantiles =
+ "Result serialization time" +: Distribution(serializationTimes).
+ get.getQuantiles().map(ms => parent.formatDuration(ms.toLong))
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.executorRunTime.toDouble}
@@ -225,7 +227,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
val shuffleReadReadable = maybeShuffleRead.map{Utils.bytesToString(_)}.getOrElse("")
- val maybeShuffleWrite = metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten}
+ val maybeShuffleWrite =
+ metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten}
val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("")
val shuffleWriteReadable = maybeShuffleWrite.map{Utils.bytesToString(_)}.getOrElse("")
@@ -236,7 +239,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled}
val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
- val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+ val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}
+ .getOrElse("")
val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled}
val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9ad6de3c6d..01b6479179 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -59,7 +59,8 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
</table>
}
- private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
+ private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int)
+ : Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 1df6b87fb0..3eb0f081e4 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.Set
import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.objectweb.asm.Opcodes._
-import java.io.{InputStream, IOException, ByteArrayOutputStream, ByteArrayInputStream, BufferedInputStream}
+import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import org.apache.spark.Logging
private[spark] object ClosureCleaner extends Logging {
@@ -159,8 +159,9 @@ private[spark] object ClosureCleaner extends Logging {
// other than to set its fields, so use its constructor
val cons = cls.getConstructors()(0)
val params = cons.getParameterTypes.map(createNullValue).toArray
- if (outer != null)
+ if (outer != null) {
params(0) = outer // First param is always outer object
+ }
return cons.newInstance(params: _*).asInstanceOf[AnyRef]
} else {
// Use reflection to instantiate object without calling constructor
@@ -179,7 +180,8 @@ private[spark] object ClosureCleaner extends Logging {
}
}
-private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
+private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]])
+ extends ClassVisitor(ASM4) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
new MethodVisitor(ASM4) {
@@ -221,11 +223,12 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi
val argTypes = Type.getArgumentTypes(desc)
if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
&& argTypes(0).toString.startsWith("L") // is it an object?
- && argTypes(0).getInternalName == myName)
+ && argTypes(0).getInternalName == myName) {
output += Class.forName(
owner.replace('/', '.'),
false,
Thread.currentThread.getContextClassLoader)
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
index fcc1ca9502..b6a099825f 100644
--- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
@@ -21,7 +21,10 @@ package org.apache.spark.util
* Wrapper around an iterator which calls a completion method after it successfully iterates
* through all the elements.
*/
-private[spark] abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
+private[spark]
+// scalastyle:off
+abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterator[A] {
+// scalastyle:on
def next() = sub.next()
def hasNext = {
val r = sub.hasNext
diff --git a/core/src/main/scala/org/apache/spark/util/Distribution.scala b/core/src/main/scala/org/apache/spark/util/Distribution.scala
index 33bf3562fe..ab738c4b86 100644
--- a/core/src/main/scala/org/apache/spark/util/Distribution.scala
+++ b/core/src/main/scala/org/apache/spark/util/Distribution.scala
@@ -20,7 +20,8 @@ package org.apache.spark.util
import java.io.PrintStream
/**
- * Util for getting some stats from a small sample of numeric values, with some handy summary functions.
+ * Util for getting some stats from a small sample of numeric values, with some handy
+ * summary functions.
*
* Entirely in memory, not intended as a good way to compute stats over large data sets.
*
@@ -68,10 +69,11 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int)
object Distribution {
def apply(data: Traversable[Double]): Option[Distribution] = {
- if (data.size > 0)
+ if (data.size > 0) {
Some(new Distribution(data))
- else
+ } else {
None
+ }
}
def showQuantiles(out: PrintStream = System.out, quantiles: Traversable[Double]) {
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index b0febe906a..3868ab3631 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -67,7 +67,8 @@ private[spark] object MetadataCleanerType extends Enumeration {
type MetadataCleanerType = Value
- def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl." + which.toString
+ def systemProperty(which: MetadataCleanerType.MetadataCleanerType) =
+ "spark.cleaner.ttl." + which.toString
}
// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
index 8b4e7c104c..2110b3596e 100644
--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
+++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
@@ -21,7 +21,8 @@ import java.io.{Externalizable, ObjectOutput, ObjectInput}
import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
/**
- * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
+ * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is
+ * serializable.
*/
private[spark]
class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 3cf94892e9..5f86795183 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -224,24 +224,26 @@ private[spark] object SizeEstimator extends Logging {
}
private def primitiveSize(cls: Class[_]): Long = {
- if (cls == classOf[Byte])
+ if (cls == classOf[Byte]) {
BYTE_SIZE
- else if (cls == classOf[Boolean])
+ } else if (cls == classOf[Boolean]) {
BOOLEAN_SIZE
- else if (cls == classOf[Char])
+ } else if (cls == classOf[Char]) {
CHAR_SIZE
- else if (cls == classOf[Short])
+ } else if (cls == classOf[Short]) {
SHORT_SIZE
- else if (cls == classOf[Int])
+ } else if (cls == classOf[Int]) {
INT_SIZE
- else if (cls == classOf[Long])
+ } else if (cls == classOf[Long]) {
LONG_SIZE
- else if (cls == classOf[Float])
+ } else if (cls == classOf[Float]) {
FLOAT_SIZE
- else if (cls == classOf[Double])
+ } else if (cls == classOf[Double]) {
DOUBLE_SIZE
- else throw new IllegalArgumentException(
+ } else {
+ throw new IllegalArgumentException(
"Non-primitive class " + cls + " passed to primitiveSize()")
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 020d5edba9..5b0d2c3651 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -20,7 +20,8 @@ package org.apache.spark.util
/**
* A class for tracking the statistics of a set of numbers (count, mean and variance) in a
* numerically robust way. Includes support for merging two StatCounters. Based on
- * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]].
+ * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+ * Welford and Chan's algorithms for running variance]].
*
* @constructor Initialize the StatCounter with the given values.
*/
@@ -70,7 +71,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
n += other.n
}
- this
+ this
}
}
@@ -91,10 +92,11 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
/** Return the variance of the values. */
def variance: Double = {
- if (n == 0)
+ if (n == 0) {
Double.NaN
- else
+ } else {
m2 / n
+ }
}
/**
@@ -102,10 +104,11 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
* by N-1 instead of N.
*/
def sampleVariance: Double = {
- if (n <= 1)
+ if (n <= 1) {
Double.NaN
- else
+ } else {
m2 / (n - 1)
+ }
}
/** Return the standard deviation of the values. */
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 861ad62f9f..c201d0a33f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -86,7 +86,8 @@ private[spark] object Utils extends Logging {
}
/** Serialize via nested stream using specific serializer */
- def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
+ def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(
+ f: SerializationStream => Unit) = {
val osWrapper = ser.serializeStream(new OutputStream {
def write(b: Int) = os.write(b)
@@ -100,7 +101,8 @@ private[spark] object Utils extends Logging {
}
/** Deserialize via nested stream using specific serializer */
- def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(f: DeserializationStream => Unit) = {
+ def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(
+ f: DeserializationStream => Unit) = {
val isWrapper = ser.deserializeStream(new InputStream {
def read(): Int = is.read()
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index 83fa0bf1e5..96da93d854 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -26,24 +26,27 @@ class Vector(val elements: Array[Double]) extends Serializable {
def apply(index: Int) = elements(index)
def + (other: Vector): Vector = {
- if (length != other.length)
+ if (length != other.length) {
throw new IllegalArgumentException("Vectors of different length")
+ }
Vector(length, i => this(i) + other(i))
}
def add(other: Vector) = this + other
def - (other: Vector): Vector = {
- if (length != other.length)
+ if (length != other.length) {
throw new IllegalArgumentException("Vectors of different length")
+ }
Vector(length, i => this(i) - other(i))
}
def subtract(other: Vector) = this - other
def dot(other: Vector): Double = {
- if (length != other.length)
+ if (length != other.length) {
throw new IllegalArgumentException("Vectors of different length")
+ }
var ans = 0.0
var i = 0
while (i < length) {
@@ -60,10 +63,12 @@ class Vector(val elements: Array[Double]) extends Serializable {
* @return
*/
def plusDot(plus: Vector, other: Vector): Double = {
- if (length != other.length)
+ if (length != other.length) {
throw new IllegalArgumentException("Vectors of different length")
- if (length != plus.length)
+ }
+ if (length != plus.length) {
throw new IllegalArgumentException("Vectors of different length")
+ }
var ans = 0.0
var i = 0
while (i < length) {
@@ -74,8 +79,9 @@ class Vector(val elements: Array[Double]) extends Serializable {
}
def += (other: Vector): Vector = {
- if (length != other.length)
+ if (length != other.length) {
throw new IllegalArgumentException("Vectors of different length")
+ }
var i = 0
while (i < length) {
elements(i) += other(i)
@@ -131,7 +137,8 @@ object Vector {
* Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
* between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
*/
- def random(length: Int, random: Random = new XORShiftRandom()) = Vector(length, _ => random.nextDouble())
+ def random(length: Int, random: Random = new XORShiftRandom()) =
+ Vector(length, _ => random.nextDouble())
class Multiplier(num: Double) {
def * (vec: Vector) = vec * num
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 856eb772a1..c9cf512843 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -123,7 +123,7 @@ class BitSet(numBits: Int) extends Serializable {
override def hasNext: Boolean = ind >= 0
override def next() = {
val tmp = ind
- ind = nextSetBit(ind+1)
+ ind = nextSetBit(ind + 1)
tmp
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 7eb300d46e..59ba1e457c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -280,7 +280,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
/**
* Select a key with the minimum hash, then combine all values with the same key from all
- * input streams
+ * input streams.
*/
override def next(): (K, C) = {
// Select a key from the StreamBuffer that holds the lowest key hash
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 5ded5d0b6d..148c12e64d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -187,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
override def hasNext: Boolean = pos != INVALID_POS
override def next(): T = {
val tmp = getValue(pos)
- pos = nextPos(pos+1)
+ pos = nextPos(pos + 1)
tmp
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index 88f1cef89b..c2d84a8e08 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -19,18 +19,21 @@ package org.apache.spark.streaming.examples
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
-
+// scalastyle:off
/**
- * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
+ * second.
* Usage: StatefulNetworkWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ * data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./bin/run-example org.apache.spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
*/
+// scalastyle:on
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
@@ -50,8 +53,8 @@ object StatefulNetworkWordCount {
}
// Create the context with a 1 second batch size
- val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(1),
- System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
+ val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey",
+ Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
ssc.checkpoint(".")
// Create a NetworkInputDStream on target ip:port and count the
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index a0094d460f..c6215fd0d7 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -23,20 +23,24 @@ import com.twitter.algebird.HyperLogLog._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
-
+// scalastyle:off
/**
* Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
* a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
* <p>
* <p>
- * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * This <a href= "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data
+ * -mining/">
* blog post</a> and this
- * <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a>
- * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating
- * the cardinality of a data stream, i.e. the number of unique elements.
+ * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
+ * blog post</a>
+ * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for
+ * estimating the cardinality of a data stream, i.e. the number of unique elements.
* <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation.
+ * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the
+ * reduce operation.
*/
+// scalastyle:on
object TwitterAlgebirdHLL {
def main(args: Array[String]) {
if (args.length < 1) {
@@ -82,7 +86,8 @@ object TwitterAlgebirdHLL {
userSet ++= partial
println("Exact distinct users this batch: %d".format(partial.size))
println("Exact distinct users overall: %d".format(userSet.size))
- println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100))
+ println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1
+ ) * 100))
}
})
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
index a2600989ca..0ac46c31c2 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -36,6 +36,7 @@ object PageView extends Serializable {
}
}
+// scalastyle:off
/** Generates streaming events to simulate page views on a website.
*
* This should be used in tandem with PageViewStream.scala. Example:
@@ -44,7 +45,8 @@ object PageView extends Serializable {
*
* When running this, you may want to set the root logging level to ERROR in
* conf/log4j.properties to reduce the verbosity of the output.
- * */
+ */
+// scalastyle:on
object PageViewGenerator {
val pages = Map("http://foo.com/" -> .7,
"http://foo.com/news" -> 0.2,
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index bb44bc3d06..2b130fb30e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -21,7 +21,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.examples.StreamingExamples
-
+// scalastyle:off
/** Analyses a streaming dataset of web page views. This class demonstrates several types of
* operators available in Spark streaming.
*
@@ -29,6 +29,7 @@ import org.apache.spark.streaming.examples.StreamingExamples
* $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10
* $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
*/
+// scalastyle:on
object PageViewStream {
def main(args: Array[String]) {
if (args.length != 3) {
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index a2cd49c573..c2d9dcbfaa 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -37,7 +37,8 @@ import org.apache.spark.streaming.dstream._
/**
* Input stream that pulls messages from a Kafka Broker.
*
- * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
+ * @param kafkaParams Map of kafka configuration paramaters.
+ * See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
@@ -134,12 +135,15 @@ class KafkaReceiver[
}
}
- // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
- // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
+ // It is our responsibility to delete the consumer group when specifying autooffset.reset. This
+ // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
- // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
- // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
+ // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
+ // from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to
+ // 'smallest'/'largest':
+ // scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+ // scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
try {
val dir = "/consumers/" + groupId
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 15a2daa008..5472d0cd04 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -113,7 +113,8 @@ object KafkaUtils {
): JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
}
/**
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 960c6a389e..6acba25f44 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -34,8 +34,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
bytesToObjects: Seq[ByteString] => Iterator[T])
extends Actor with Receiver with Logging {
- override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
- Connect(publisherUrl), subscribe)
+ override def preStart() = ZeroMQExtension(context.system)
+ .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
def receive: Receive = {
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index b47d786986..c989ec0f27 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -59,10 +59,10 @@ object ZeroMQUtils {
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote ZeroMQ publisher
* @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+ * frame has sequence of byte thus it needs the converter(which might be
+ * deserializer of bytes) to translate from sequence of sequence of bytes,
+ * where sequence refer to a frame and sub sequence refer to its payload.
* @param storageLevel Storage level to use for storing the received objects
*/
def createStream[T](
@@ -84,10 +84,10 @@ object ZeroMQUtils {
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+ * frame has sequence of byte thus it needs the converter(which might be
+ * deserializer of bytes) to translate from sequence of sequence of bytes,
+ * where sequence refer to a frame and sub sequence refer to its payload.
* @param storageLevel RDD storage level.
*/
def createStream[T](
@@ -108,10 +108,11 @@ object ZeroMQUtils {
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+ * frame has sequence of byte thus it needs the converter(which might
+ * be deserializer of bytes) to translate from sequence of sequence of
+ * bytes, where sequence refer to a frame and sub sequence refer to its
+ * payload.
*/
def createStream[T](
jssc: JavaStreamingContext,
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index fe03ae4a62..799a9dd1ee 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -66,7 +66,8 @@ class EdgeRDD[@specialized ED: ClassTag](
this
}
- private[graphx] def mapEdgePartitions[ED2: ClassTag](f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
+ private[graphx] def mapEdgePartitions[ED2: ClassTag](
+ f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
: EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
@@ -97,8 +98,8 @@ class EdgeRDD[@specialized ED: ClassTag](
*
* @param other the EdgeRDD to join with
* @param f the join function applied to corresponding values of `this` and `other`
- * @return a new EdgeRDD containing only edges that appear in both `this` and `other`, with values
- * supplied by `f`
+ * @return a new EdgeRDD containing only edges that appear in both `this` and `other`,
+ * with values supplied by `f`
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index eea95d38d5..65a1a8c68f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -171,8 +171,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
: Graph[VD, ED2]
/**
- * Transforms each edge attribute using the map function, passing it the adjacent vertex attributes
- * as well. If adjacent vertex values are not required, consider using `mapEdges` instead.
+ * Transforms each edge attribute using the map function, passing it the adjacent vertex
+ * attributes as well. If adjacent vertex values are not required,
+ * consider using `mapEdges` instead.
*
* @note This does not change the structure of the
* graph or modify the values of this graph. As a consequence
@@ -280,13 +281,13 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* be commutative and associative and is used to combine the output
* of the map phase
*
- * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to consider
- * when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on edges with
- * destination in the active set. If the direction is `Out`, `mapFunc` will only be run on edges
- * originating from vertices in the active set. If the direction is `Either`, `mapFunc` will be
- * run on edges with *either* vertex in the active set. If the direction is `Both`, `mapFunc` will
- * be run on edges with *both* vertices in the active set. The active set must have the same index
- * as the graph's vertices.
+ * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to
+ * consider when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on
+ * edges with destination in the active set. If the direction is `Out`,
+ * `mapFunc` will only be run on edges originating from vertices in the active set. If the
+ * direction is `Either`, `mapFunc` will be run on edges with *either* vertex in the active set
+ * . If the direction is `Both`, `mapFunc` will be run on edges with *both* vertices in the
+ * active set. The active set must have the same index as the graph's vertices.
*
* @example We can use this function to compute the in-degree of each
* vertex
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 929915362c..0470d74cf9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -57,8 +57,9 @@ object PartitionStrategy {
* </pre>
*
* The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
- * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
- * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last
+ * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
+ * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3,
+ * P6)` or the last
* row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be
* replicated to at most `2 * sqrt(numParts)` machines.
*
@@ -66,11 +67,12 @@ object PartitionStrategy {
* balance. To improve balance we first multiply each vertex id by a large prime to shuffle the
* vertex locations.
*
- * One of the limitations of this approach is that the number of machines must either be a perfect
- * square. We partially address this limitation by computing the machine assignment to the next
+ * One of the limitations of this approach is that the number of machines must either be a
+ * perfect square. We partially address this limitation by computing the machine assignment to
+ * the next
* largest perfect square and then mapping back down to the actual number of machines.
- * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square
- * is used.
+ * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect
+ * square is used.
*/
case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index edd59bcf32..d6788d4d4b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -59,7 +59,8 @@ class VertexRDD[@specialized VD: ClassTag](
/**
* Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
- * VertexRDD will be based on a different index and can no longer be quickly joined with this RDD.
+ * VertexRDD will be based on a different index and can no longer be quickly joined with this
+ * RDD.
*/
def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
@@ -101,7 +102,8 @@ class VertexRDD[@specialized VD: ClassTag](
/**
* Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD.
*/
- private[graphx] def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2])
+ private[graphx] def mapVertexPartitions[VD2: ClassTag](
+ f: VertexPartition[VD] => VertexPartition[VD2])
: VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
new VertexRDD(newPartitionsRDD)
@@ -159,8 +161,9 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
- * Left joins this RDD with another VertexRDD with the same index. This function will fail if both
- * VertexRDDs do not share the same index. The resulting vertex set contains an entry for each
+ * Left joins this RDD with another VertexRDD with the same index. This function will fail if
+ * both VertexRDDs do not share the same index. The resulting vertex set contains an entry for
+ * each
* vertex in `this`. If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
*
* @tparam VD2 the attribute type of the other VertexRDD
@@ -187,8 +190,8 @@ class VertexRDD[@specialized VD: ClassTag](
* Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
* backed by a VertexRDD with the same index then the efficient [[leftZipJoin]] implementation is
* used. The resulting VertexRDD contains an entry for each vertex in `this`. If `other` is
- * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates, the vertex
- * is picked arbitrarily.
+ * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates,
+ * the vertex is picked arbitrarily.
*
* @tparam VD2 the attribute type of the other VertexRDD
* @tparam VD3 the attribute type of the resulting VertexRDD
@@ -238,14 +241,14 @@ class VertexRDD[@specialized VD: ClassTag](
/**
* Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
- * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation is
- * used.
+ * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation
+ * is used.
*
* @param other an RDD containing vertices to join. If there are multiple entries for the same
* vertex, one is picked arbitrarily. Use [[aggregateUsingIndex]] to merge multiple entries.
* @param f the join function applied to corresponding values of `this` and `other`
- * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this`
- * and `other`, with values supplied by `f`
+ * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both
+ * `this` and `other`, with values supplied by `f`
*/
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index f914e0565c..24699dfdd3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -82,7 +82,7 @@ object Analytics extends Logging {
val pr = graph.pageRank(tol).vertices.cache()
- println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_))
+ println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
if (!outFname.isEmpty) {
logWarning("Saving pageranks of pages to " + outFname)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 7677641bfe..f841846c0e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -37,11 +37,7 @@ object GraphGenerators {
val RMATa = 0.45
val RMATb = 0.15
- val RMATc = 0.15
val RMATd = 0.25
-
- // Right now it just generates a bunch of edges where
- // the edge data is the weight (default 1)
/**
* Generate a graph whose vertex out degree is log normal.
*/
@@ -59,15 +55,20 @@ object GraphGenerators {
Graph(vertices, edges, 0)
}
+ // Right now it just generates a bunch of edges where
+ // the edge data is the weight (default 1)
+ val RMATc = 0.15
+
def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = {
val rand = new Random()
Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
}
/**
- * Randomly samples from a log normal distribution whose corresponding normal distribution has the
- * the given mean and standard deviation. It uses the formula `X = exp(m+s*Z)` where `m`, `s` are
- * the mean, standard deviation of the lognormal distribution and `Z ~ N(0, 1)`. In this function,
+ * Randomly samples from a log normal distribution whose corresponding normal distribution has
+ * the given mean and standard deviation. It uses the formula `X = exp(m+s*Z)` where `m`,
+ * `s` are the mean, standard deviation of the lognormal distribution and
+ * `Z ~ N(0, 1)`. In this function,
* `m = e^(mu+sigma^2/2)` and `s = sqrt[(e^(sigma^2) - 1)(e^(2*mu+sigma^2))]`.
*
* @param mu the mean of the normal distribution
@@ -76,7 +77,7 @@ object GraphGenerators {
*/
private def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = {
val rand = new Random()
- val m = math.exp(mu+(sigma*sigma)/2.0)
+ val m = math.exp(mu + (sigma * sigma) / 2.0)
val s = math.sqrt((math.exp(sigma*sigma) - 1) * math.exp(2*mu + sigma*sigma))
// Z ~ N(0, 1)
var X: Double = maxVal
@@ -169,9 +170,9 @@ object GraphGenerators {
val newT = math.round(t.toFloat/2.0).toInt
pickQuadrant(RMATa, RMATb, RMATc, RMATd) match {
case 0 => chooseCell(x, y, newT)
- case 1 => chooseCell(x+newT, y, newT)
- case 2 => chooseCell(x, y+newT, newT)
- case 3 => chooseCell(x+newT, y+newT, newT)
+ case 1 => chooseCell(x + newT, y, newT)
+ case 2 => chooseCell(x, y + newT, newT)
+ case 3 => chooseCell(x + newT, y + newT, newT)
}
}
}
@@ -179,8 +180,8 @@ object GraphGenerators {
// TODO(crankshaw) turn result into an enum (or case class for pattern matching}
private def pickQuadrant(a: Double, b: Double, c: Double, d: Double): Int = {
if (a + b + c + d != 1.0) {
- throw new IllegalArgumentException(
- "R-MAT probability parameters sum to " + (a+b+c+d) + ", should sum to 1.0")
+ throw new IllegalArgumentException("R-MAT probability parameters sum to " + (a + b + c + d)
+ + ", should sum to 1.0")
}
val rand = new Random()
val result = rand.nextDouble()
@@ -212,8 +213,8 @@ object GraphGenerators {
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) =>
- (if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++
- (if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty })
+ (if (r + 1 < rows) { Seq( (sub2ind(r, c), sub2ind(r + 1, c))) } else { Seq.empty }) ++
+ (if (c + 1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c + 1))) } else { Seq.empty })
}.map{ case (src, dst) => Edge(src, dst, 1.0) }
Graph(vertices, edges)
} // end of gridGraph
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index efc0eb9353..efe99a31be 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -106,7 +106,8 @@ class PythonMLLibAPI extends Serializable {
bytes
}
- private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+ private def trainRegressionModel(
+ trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
java.util.LinkedList[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
index e476b53450..8803c4c1a0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -105,7 +105,7 @@ object SVD {
cols.flatMap{ case (colind1, mval1) =>
cols.map{ case (colind2, mval2) =>
((colind1, colind2), mval1*mval2) } }
- }.reduceByKey(_+_)
+ }.reduceByKey(_ + _)
// Construct jblas A^T A locally
val ata = DoubleMatrix.zeros(n, n)
@@ -145,10 +145,10 @@ object SVD {
// Multiply A by VS^-1
val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
- val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
- => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
+ val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)))
+ => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_ + _)
.map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}
- val retU = SparseMatrix(retUdata, m, sigma.length)
+ val retU = SparseMatrix(retUdata, m, sigma.length)
MatrixSVD(retU, retS, retV)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index b77364e08d..cd80134737 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -142,7 +142,7 @@ object GradientDescent extends Logging {
var regVal = 0.0
for (i <- 1 to numIterations) {
- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
+ val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i).map {
case (y, features) =>
val featuresCol = new DoubleMatrix(features.length, 1, features:_*)
val (grad, loss) = gradient.compute(featuresCol, y, weights)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index c5f64b1350..a990e0fb01 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -84,8 +84,9 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
* [[http://research.yahoo.com/pub/2433]], adapted for the blocked approach used here.
*
* Essentially instead of finding the low-rank approximations to the rating matrix `R`,
- * this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if r > 0
- * and 0 if r = 0. The ratings then act as 'confidence' values related to strength of indicated user
+ * this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if
+ * r > 0 and 0 if r = 0. The ratings then act as 'confidence' values related to strength of
+ * indicated user
* preferences rather than explicit ratings given to items.
*/
class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double,
@@ -152,8 +153,8 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
- // Initialize user and product factors randomly, but use a deterministic seed for each partition
- // so that fault recovery works
+ // Initialize user and product factors randomly, but use a deterministic seed for each
+ // partition so that fault recovery works
val seedGen = new Random()
val seed1 = seedGen.nextInt()
val seed2 = seedGen.nextInt()
@@ -268,7 +269,8 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray
// Sort them by product ID
val ordering = new Ordering[(Int, ArrayBuffer[Rating])] {
- def compare(a: (Int, ArrayBuffer[Rating]), b: (Int, ArrayBuffer[Rating])): Int = a._1 - b._1
+ def compare(a: (Int, ArrayBuffer[Rating]), b: (Int, ArrayBuffer[Rating])): Int =
+ a._1 - b._1
}
Sorting.quickSort(groupedRatings)(ordering)
// Translate the user IDs to indices based on userIdToPos
@@ -369,7 +371,8 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
val tempXtX = DoubleMatrix.zeros(triangleSize)
val fullXtX = DoubleMatrix.zeros(rank, rank)
- // Compute the XtX and Xy values for each user by adding products it rated in each product block
+ // Compute the XtX and Xy values for each user by adding products it rated in each product
+ // block
for (productBlock <- 0 until numBlocks) {
for (p <- 0 until blockFactors(productBlock).length) {
val x = new DoubleMatrix(blockFactors(productBlock)(p))
@@ -544,9 +547,8 @@ object ALS {
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
*/
- def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double)
- : MatrixFactorizationModel =
- {
+ def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double,
+ alpha: Double): MatrixFactorizationModel = {
trainImplicit(ratings, rank, iterations, lambda, -1, alpha)
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 11a937e011..bb79f0cd73 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -21,6 +21,8 @@ import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._
import scala.util.Properties
+import org.scalastyle.sbt.ScalastylePlugin.{Settings => ScalaStyleSettings}
+
// For Sonatype publishing
//import com.jsuereth.pgp.sbtplugin.PgpKeys._
@@ -231,7 +233,7 @@ object SparkBuild extends Build {
publishMavenStyle in MavenCompile := true,
publishLocal in MavenCompile <<= publishTask(publishLocalConfiguration in MavenCompile, deliverLocal),
publishLocalBoth <<= Seq(publishLocal in MavenCompile, publishLocal).dependOn
- ) ++ net.virtualvoid.sbt.graph.Plugin.graphSettings
+ ) ++ net.virtualvoid.sbt.graph.Plugin.graphSettings ++ ScalaStyleSettings
val slf4jVersion = "1.7.2"
diff --git a/project/build.properties b/project/build.properties
index 839f5fbb0c..4b52bb928a 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-sbt.version=0.12.4
+sbt.version=0.13.1
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 4ba0e4280a..914f2e05a4 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,10 +1,10 @@
+scalaVersion := "2.10.3"
+
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
-resolvers += "Spray Repository" at "http://repo.spray.cc/"
-
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
@@ -15,4 +15,7 @@ addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.1")
//addSbtPlugin("com.jsuereth" % "xsbt-gpg-plugin" % "0.6")
-addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.3")
+addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4")
+
+addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.4.0")
+
diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala
index 4853be2617..0392a6051f 100644
--- a/project/project/SparkPluginBuild.scala
+++ b/project/project/SparkPluginBuild.scala
@@ -20,5 +20,5 @@ import sbt._
object SparkPluginDef extends Build {
lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener)
/* This is not published in a Maven repository, so we get it from GitHub directly */
- lazy val junitXmlListener = uri("https://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce")
+ lazy val junitXmlListener = uri("git://github.com/chenkelmann/junit_xml_listener.git#3f8029fbfda54dc7a68b1afd2f885935e1090016")
}
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 3e171849e3..e3bcf7f30a 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -39,20 +39,22 @@ extends ClassLoader(parent) {
// Hadoop FileSystem object for our URI, if it isn't using HTTP
var fileSystem: FileSystem = {
- if (uri.getScheme() == "http")
+ if (uri.getScheme() == "http") {
null
- else
+ } else {
FileSystem.get(uri, new Configuration())
+ }
}
override def findClass(name: String): Class[_] = {
try {
val pathInDirectory = name.replace('.', '/') + ".class"
val inputStream = {
- if (fileSystem != null)
+ if (fileSystem != null) {
fileSystem.open(new Path(directory, pathInDirectory))
- else
+ } else {
new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+ }
}
val bytes = readAndTransformClass(name, inputStream)
inputStream.close()
@@ -81,10 +83,11 @@ extends ClassLoader(parent) {
var done = false
while (!done) {
val num = in.read(bytes)
- if (num >= 0)
+ if (num >= 0) {
bos.write(bytes, 0, num)
- else
+ } else {
done = true
+ }
}
return bos.toByteArray
}
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
index b2e1df173e..dcc139544e 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
/* NSC -- new Scala compiler
* Copyright 2005-2013 LAMP/EPFL
* @author Paul Phillips
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 87d94d51be..bc25b50a4e 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
/* NSC -- new Scala compiler
* Copyright 2005-2013 LAMP/EPFL
* @author Alexander Spoon
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
index efe45240e9..3ebf288130 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
/* NSC -- new Scala compiler
* Copyright 2005-2013 LAMP/EPFL
* @author Paul Phillips
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 59fdb0b37a..1d73d0b699 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
/* NSC -- new Scala compiler
* Copyright 2005-2013 LAMP/EPFL
* @author Martin Odersky
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
index 64084209e8..8f61a5e835 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
/* NSC -- new Scala compiler
* Copyright 2005-2013 LAMP/EPFL
* @author Paul Phillips
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala b/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala
index 8865f82bc0..3159b70008 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
/* NSC -- new Scala compiler
* Copyright 2005-2013 LAMP/EPFL
* @author Paul Phillips
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala b/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala
index 60a4d7841e..946e710390 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
/* NSC -- new Scala compiler
* Copyright 2005-2013 LAMP/EPFL
* @author Stepan Koltsov
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala b/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala
index 382f8360a7..13cd2b7fa5 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
/* NSC -- new Scala compiler
* Copyright 2005-2013 LAMP/EPFL
* @author Martin Odersky
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
new file mode 100644
index 0000000000..7527232676
--- /dev/null
+++ b/scalastyle-config.xml
@@ -0,0 +1,126 @@
+<!-- If you wish to turn off checking for a section of code, you can put a comment in the source before and after the section, with the following syntax: -->
+<!-- // scalastyle:off -->
+<!-- ... -->
+<!-- // naughty stuff -->
+<!-- ... -->
+<!-- // scalastyle:on -->
+
+<scalastyle>
+ <name>Scalastyle standard configuration</name>
+ <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
+ <!-- <check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="maxFileLength"><![CDATA[800]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
+ <parameters>
+ <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxLineLength"><![CDATA[100]]></parameter>
+ <parameter name="tabSize"><![CDATA[2]]></parameter>
+ <parameter name="ignoreImports">true</parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="false"></check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+ <parameters>
+ <parameter name="maxParameters"><![CDATA[10]]></parameter>
+ </parameters>
+ </check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="false"></check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="regex"><![CDATA[println]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="maxTypes"><![CDATA[30]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="maximum"><![CDATA[10]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
+ <parameters>
+ <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+ <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter>
+ </parameters>
+ </check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="maxLength"><![CDATA[50]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="maxMethods"><![CDATA[30]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> -->
+ <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
+</scalastyle>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 54813934b8..6a45bc2f8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -47,7 +47,8 @@ object MasterFailureTest extends Logging {
def main(args: Array[String]) {
if (args.size < 2) {
println(
- "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]")
+ "Usage: MasterFailureTest <local/HDFS directory> <# batches> " +
+ "[<batch size in milliseconds>]")
System.exit(1)
}
val directory = args(0)
@@ -186,7 +187,8 @@ object MasterFailureTest extends Logging {
// Setup the streaming computation with the given operation
System.clearProperty("spark.driver.port")
- val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
+ val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil,
+ Map())
ssc.checkpoint(checkpointDir.toString)
val inputStream = ssc.textFileStream(testDir.toString)
val operatedStream = operation(inputStream)
@@ -287,7 +289,7 @@ object MasterFailureTest extends Logging {
private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) {
// Verify whether expected outputs do not consecutive batches with same output
for (i <- 0 until expectedOutput.size - 1) {
- assert(expectedOutput(i) != expectedOutput(i+1),
+ assert(expectedOutput(i) != expectedOutput(i + 1),
"Expected output has consecutive duplicate sequence of values")
}
@@ -384,9 +386,9 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
Thread.sleep(5000) // To make sure that all the streaming context has been set up
for (i <- 0 until input.size) {
// Write the data to a local file and then move it to the target test directory
- val localFile = new File(localTestDir, (i+1).toString)
- val hadoopFile = new Path(testDir, (i+1).toString)
- val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
+ val localFile = new File(localTestDir, (i + 1).toString)
+ val hadoopFile = new Path(testDir, (i + 1).toString)
+ val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
var tries = 0
var done = false
@@ -400,7 +402,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
} catch {
case ioe: IOException => {
fs = testDir.getFileSystem(new Configuration())
- logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+ logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.",
+ ioe)
}
}
}
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index 4886cd6ea8..420522433e 100644
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@ -152,7 +152,8 @@ object JavaAPICompletenessChecker {
if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
val tupleParams =
parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
- ParameterizedType("org.apache.spark.streaming.api.java.JavaPairDStream", tupleParams)
+ ParameterizedType("org.apache.spark.streaming.api.java.JavaPairDStream",
+ tupleParams)
} else {
ParameterizedType("org.apache.spark.streaming.api.java.JavaDStream",
parameters.map(applySubs))
@@ -175,7 +176,8 @@ object JavaAPICompletenessChecker {
ParameterizedType("org.apache.spark.api.java.function.VoidFunction",
parameters.dropRight(1).map(applySubs))
} else {
- ParameterizedType("org.apache.spark.api.java.function.Function", parameters.map(applySubs))
+ ParameterizedType("org.apache.spark.api.java.function.Function",
+ parameters.map(applySubs))
}
case _ =>
ParameterizedType(renameSubstitutions.getOrElse(name, name),