aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-02-09 22:17:52 -0800
committerReynold Xin <rxin@apache.org>2014-02-09 22:17:52 -0800
commit919bd7f669c61500eee7231298d9880b320eb6f3 (patch)
tree5cdcf197aef425b6be47b676f8d7fe3d1e2e8c34
parent2182aa3c55737a90e0ff200eede7146b440801a3 (diff)
downloadspark-919bd7f669c61500eee7231298d9880b320eb6f3.tar.gz
spark-919bd7f669c61500eee7231298d9880b320eb6f3.tar.bz2
spark-919bd7f669c61500eee7231298d9880b320eb6f3.zip
Merge pull request #567 from ScrapCodes/style2.
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Pt 2 Continuation of PR #557 With this all scala style errors are fixed across the code base !! The reason for creating a separate PR was to not interrupt an already reviewed and ready to merge PR. Hope this gets reviewed soon and merged too. Author: Prashant Sharma <prashant.s@imaginea.com> Closes #567 and squashes the following commits: 3b1ec30 [Prashant Sharma] scala style fixes
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/network/SenderTest.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala14
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala92
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala7
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LogQuery.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala15
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala17
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala22
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala7
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala21
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala20
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala5
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala3
-rw-r--r--project/project/SparkPluginBuild.scala4
-rw-r--r--scalastyle-config.xml19
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Interval.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala24
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala45
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala21
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala8
63 files changed, 356 insertions, 254 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 281216612f..dd3eed8aff 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -33,7 +33,8 @@ object Bagel extends Logging {
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
* this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
* @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
- * given vertex into one message before sending (which often involves network I/O).
+ * given vertex into one message before sending (which often involves network
+ * I/O).
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
* after each superstep and provides the result to each vertex in the next
* superstep.
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 15a0d24fd9..b38af2497d 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -32,7 +32,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
- storageLevel: StorageLevel): Iterator[T] = {
+ storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, split.index)
logDebug("Looking for partition " + key)
blockManager.get(key) match {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
index 2f2cbd182c..1f20aa3dfa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
@@ -33,8 +33,7 @@ private[spark] trait AppClientListener {
/** Dead means that we couldn't find any Masters to connect to, and have given up. */
def dead(): Unit
- def executorAdded(
- fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
+ def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 82bf655212..0bb9a9a937 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -166,8 +166,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
System.exit(0)
}
- case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress)
- => {
+ case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
+ {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
@@ -176,7 +176,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
- sender, workerWebUiPort, publicAddress)
+ sender, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 64ecf22399..04f9a22a25 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -123,7 +123,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</div>
<div>
- {if (hasDrivers) {
+ {
+ if (hasDrivers) {
<div class="row-fluid">
<div class="span12">
<h4> Completed Drivers </h4>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index f411eb9cec..2ceccc703d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -50,7 +50,7 @@ object CommandUtils extends Logging {
.map(p => List("-Djava.library.path=" + p))
.getOrElse(Nil)
val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS"))
- .map(Utils.splitCommandString).getOrElse(Nil)
+ .map(Utils.splitCommandString).getOrElse(Nil)
val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 24d0a7deb5..a78d6ac70f 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -74,8 +74,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
new LinkedBlockingDeque[Runnable]())
private val serverChannel = ServerSocketChannel.open()
- private val connectionsByKey = new HashMap[SelectionKey, Connection]
- with SynchronizedMap[SelectionKey, Connection]
+ private val connectionsByKey =
+ new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection]
with SynchronizedMap[ConnectionManagerId, SendingConnection]
private val messageStatuses = new HashMap[Int, MessageStatus]
@@ -445,10 +445,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
assert (sendingConnectionManagerId == remoteConnectionManagerId)
messageStatuses.synchronized {
- for (s <- messageStatuses.values if
- s.connectionManagerId == sendingConnectionManagerId) {
- logInfo("Notifying " + s)
- s.synchronized {
+ for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
+ logInfo("Notifying " + s)
+ s.synchronized {
s.attempted = true
s.acked = false
s.markDone()
@@ -574,7 +573,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
val promise = Promise[Option[Message]]
val status = new MessageStatus(
message, connectionManagerId, s => promise.success(s.ackMessage))
- messageStatuses.synchronized {
+ messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}
sendMessage(connectionManagerId, message)
@@ -684,8 +683,11 @@ private[spark] object ConnectionManager {
println("--------------------------")
val size = 10 * 1024 * 1024
val count = 10
- val buffers = Array.tabulate(count)(i => ByteBuffer.allocate(size * (i + 1)).put(
- Array.tabulate[Byte](size * (i + 1))(x => x.toByte)))
+ val buffers = Array.tabulate(count) { i =>
+ val bufferLen = size * (i + 1)
+ val bufferContent = Array.tabulate[Byte](bufferLen)(x => x.toByte)
+ ByteBuffer.allocate(bufferLen).put(bufferContent)
+ }
buffers.foreach(_.flip)
val mb = buffers.map(_.remaining).reduceLeft(_ + _) / 1024.0 / 1024.0
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
index 820045aa21..8e5c5296cb 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
@@ -77,12 +77,13 @@ private[spark] object ConnectionManagerTest extends Logging{
buffer.flip
val startTime = System.currentTimeMillis
- val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId =>
- {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
- connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
- })
+ val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map{ slaveConnManagerId =>
+ {
+ val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+ logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
+ connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
+ }
+ }
val results = futures.map(f => Await.result(f, awaitTime))
val finishTime = System.currentTimeMillis
Thread.sleep(5000)
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index 9e03956ba0..162d49bf61 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -52,20 +52,19 @@ private[spark] object SenderTest {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
val startTime = System.currentTimeMillis
/*println("Started timer at " + startTime)*/
- val responseStr =
- manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
- case Some(response) =>
- val buffer = response.asInstanceOf[BufferMessage].buffers(0)
- new String(buffer.array)
- case None => "none"
- }
+ val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
+ .map { response =>
+ val buffer = response.asInstanceOf[BufferMessage].buffers(0)
+ new String(buffer.array)
+ }.getOrElse("none")
+
val finishTime = System.currentTimeMillis
val mb = size / 1024.0 / 1024.0
val ms = finishTime - startTime
// val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms at " + (mb / ms
// * 1000.0) + " MB/s"
- val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" + (mb / ms *
- 1000.0).toInt + "MB/s) | Response = " + responseStr
+ val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" +
+ (mb / ms * 1000.0).toInt + "MB/s) | Response = " + responseStr
println(resultStr)
})
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 42e1ef8375..dc345b2df0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -199,8 +199,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
def next(): (String, Partition) = {
if (it.hasNext) {
it.next()
- }
- else {
+ } else {
it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
it.next()
}
@@ -291,9 +290,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
- if (prefPart == None) {
- // if no preferred locations, just use basic power of two
- return minPowerOfTwo
+ if (prefPart.isEmpty) {
+ // if no preferred locations, just use basic power of two
+ return minPowerOfTwo
}
val prefPartActual = prefPart.get
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 56c7777600..f270c1ac21 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -39,8 +39,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt
override def equals(other: Any): Boolean = other match {
- case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId &&
- this.slice == that.slice)
+ case that: ParallelCollectionPartition[_] =>
+ this.rddId == that.rddId && this.slice == that.slice
case _ => false
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 0544f81f1c..77b1682b3e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -36,8 +36,8 @@ private[spark] object ResultTask {
val metadataCleaner = new MetadataCleaner(
MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues, new SparkConf)
- def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _)
- : Array[Byte] = {
+ def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] =
+ {
synchronized {
val old = serializedInfoCache.get(stageId).orNull
if (old != null) {
@@ -56,8 +56,8 @@ private[spark] object ResultTask {
}
}
- def deserializeInfo(stageId: Int, bytes: Array[Byte])
- : (RDD[_], (TaskContext, Iterator[_]) => _) = {
+ def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) =
+ {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val ser = SparkEnv.get.closureSerializer.newInstance()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d25f0a6354..129153c732 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -100,13 +100,13 @@ class StatsReportListener extends SparkListener with Logging {
//shuffle write
showBytesDistribution("shuffle bytes written:",
- (_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
+ (_,metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten))
//fetch & io
showMillisDistribution("fetch wait time:",
- (_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
+ (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime))
showBytesDistribution("remote bytes read:",
- (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
+ (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead))
showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
//runtime breakdown
@@ -152,8 +152,8 @@ private[spark] object StatsReportListener extends Logging {
logInfo("\t" + quantiles.mkString("\t"))
}
- def showDistribution(heading: String,
- dOpt: Option[Distribution], formatNumber: Double => String) {
+ def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String)
+ {
dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
}
@@ -162,9 +162,11 @@ private[spark] object StatsReportListener extends Logging {
showDistribution(heading, dOpt, f _)
}
- def showDistribution(heading:String, format: String,
- getMetric: (TaskInfo,TaskMetrics) => Option[Double])
- (implicit stage: SparkListenerStageCompleted) {
+ def showDistribution(
+ heading: String,
+ format: String,
+ getMetric: (TaskInfo, TaskMetrics) => Option[Double])
+ (implicit stage: SparkListenerStageCompleted) {
showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 542deb98c1..780a3a15dd 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -291,7 +291,7 @@ private[spark] class BlockManager(
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
} else {
- doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+ doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index 5ded9ab359..dc62b1efaa 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -26,7 +26,7 @@ import org.apache.spark.network._
private[spark]
class BlockMessageArray(var blockMessages: Seq[BlockMessage])
- extends Seq[BlockMessage] with Logging {
+ extends Seq[BlockMessage] with Logging {
def this(bm: BlockMessage) = this(Array(bm))
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index b3deb41e76..ade8ba1323 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -94,12 +94,14 @@ private[spark] object JettyUtils extends Logging {
}
/**
- * Attempts to start a Jetty server at the supplied hostName:port which uses the supplied handlers.
+ * Attempts to start a Jetty server at the supplied hostName:port which uses the supplied
+ * handlers.
*
* If the desired port number is contented, continues incrementing ports until a free port is
* found. Returns the chosen port and the jetty Server object.
*/
- def startJettyServer(hostName: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = {
+ def startJettyServer(hostName: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int)
+ = {
val handlersToRegister = handlers.map { case(path, handler) =>
val contextHandler = new ContextHandler(path)
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index b95c8f43b0..547a194d58 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -48,8 +48,8 @@ private[spark] object UIUtils {
case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li>
}
val executors = page match {
- case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a>
- </li>
+ case Executors =>
+ <li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li>
case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li>
}
@@ -66,7 +66,8 @@ private[spark] object UIUtils {
<div class="navbar navbar-static-top">
<div class="navbar-inner">
<a href={prependBaseUri("/")} class="brand">
- <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} /></a>
+ <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
+ </a>
<ul class="nav">
{jobs}
{storage}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index 9412a48330..22bc97ada1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -61,9 +61,8 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
}
<tr>
<td>
- <a href=
- {"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>
- {p.name}</a></td>
+ <a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>{p.name}</a>
+ </td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
<td>{activeStages}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 08107a3f62..b6e98942ab 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -223,28 +223,27 @@ private[spark] class StagePage(parent: JobProgressUI) {
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L)
- val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead}
+ val maybeShuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => s.remoteBytesRead)
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
- val shuffleReadReadable = maybeShuffleRead.map{Utils.bytesToString(_)}.getOrElse("")
+ val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
val maybeShuffleWrite =
- metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten}
+ metrics.flatMap{m => m.shuffleWriteMetrics}.map(s => s.shuffleBytesWritten)
val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("")
- val shuffleWriteReadable = maybeShuffleWrite.map{Utils.bytesToString(_)}.getOrElse("")
+ val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("")
- val maybeWriteTime = metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleWriteTime}
+ val maybeWriteTime = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => s.shuffleWriteTime)
val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
- val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
+ val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map{ ms =>
if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")
- val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled}
+ val maybeMemoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled)
val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
- val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}
- .getOrElse("")
+ val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled}
val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
- val diskBytesSpilledReadable = maybeDiskBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+ val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
<tr>
<td>{info.index}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 01b6479179..999a94fc2d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -59,8 +59,8 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
</table>
}
- private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int)
- : Seq[Node] = {
+ private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] =
+ {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 3eb0f081e4..c0c057be8d 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -180,8 +180,8 @@ private[spark] object ClosureCleaner extends Logging {
}
}
-private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]])
- extends ClassVisitor(ASM4) {
+private[spark]
+class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
new MethodVisitor(ASM4) {
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 5f86795183..17c6481c18 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -241,7 +241,7 @@ private[spark] object SizeEstimator extends Logging {
} else if (cls == classOf[Double]) {
DOUBLE_SIZE
} else {
- throw new IllegalArgumentException(
+ throw new IllegalArgumentException(
"Non-primitive class " + cls + " passed to primitiveSize()")
}
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index 0097dade19..4d2f45df85 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -22,19 +22,21 @@ import org.apache.spark.SparkContext
object BroadcastTest {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo] [blockSize]")
+ System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo]" +
+ " [blockSize]")
System.exit(1)
- }
-
+ }
+
val bcName = if (args.length > 3) args(3) else "Http"
val blockSize = if (args.length > 4) args(4) else "4096"
- System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory")
+ System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName +
+ "BroadcastFactory")
System.setProperty("spark.broadcast.blockSize", blockSize)
val sc = new SparkContext(args(0), "Broadcast Test",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
-
+
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
@@ -42,7 +44,7 @@ object BroadcastTest {
for (i <- 0 until arr1.length) {
arr1(i) = i
}
-
+
for (i <- 0 until 3) {
println("Iteration " + i)
println("===========")
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
index 33bf7151a7..3e3a3b2d50 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
@@ -146,68 +146,68 @@ assume Words keys as utf8;
set Words['3musk001']['book'] = 'The Three Musketeers';
set Words['3musk001']['para'] = 'On the first Monday of the month of April, 1625, the market
town of Meung, in which the author of ROMANCE OF THE ROSE was born, appeared to
- be in as perfect a state of revolution as if the Huguenots had just made
- a second La Rochelle of it. Many citizens, seeing the women flying
- toward the High Street, leaving their children crying at the open doors,
- hastened to don the cuirass, and supporting their somewhat uncertain
- courage with a musket or a partisan, directed their steps toward the
- hostelry of the Jolly Miller, before which was gathered, increasing
- every minute, a compact group, vociferous and full of curiosity.';
+ be in as perfect a state of revolution as if the Huguenots had just made
+ a second La Rochelle of it. Many citizens, seeing the women flying
+ toward the High Street, leaving their children crying at the open doors,
+ hastened to don the cuirass, and supporting their somewhat uncertain
+ courage with a musket or a partisan, directed their steps toward the
+ hostelry of the Jolly Miller, before which was gathered, increasing
+ every minute, a compact group, vociferous and full of curiosity.';
set Words['3musk002']['book'] = 'The Three Musketeers';
set Words['3musk002']['para'] = 'In those times panics were common, and few days passed without
some city or other registering in its archives an event of this kind. There were
- nobles, who made war against each other; there was the king, who made
- war against the cardinal; there was Spain, which made war against the
- king. Then, in addition to these concealed or public, secret or open
- wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
- who made war upon everybody. The citizens always took up arms readily
- against thieves, wolves or scoundrels, often against nobles or
- Huguenots, sometimes against the king, but never against cardinal or
- Spain. It resulted, then, from this habit that on the said first Monday
- of April, 1625, the citizens, on hearing the clamor, and seeing neither
- the red-and-yellow standard nor the livery of the Duc de Richelieu,
- rushed toward the hostel of the Jolly Miller. When arrived there, the
- cause of the hubbub was apparent to all';
+ nobles, who made war against each other; there was the king, who made
+ war against the cardinal; there was Spain, which made war against the
+ king. Then, in addition to these concealed or public, secret or open
+ wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
+ who made war upon everybody. The citizens always took up arms readily
+ against thieves, wolves or scoundrels, often against nobles or
+ Huguenots, sometimes against the king, but never against cardinal or
+ Spain. It resulted, then, from this habit that on the said first Monday
+ of April, 1625, the citizens, on hearing the clamor, and seeing neither
+ the red-and-yellow standard nor the livery of the Duc de Richelieu,
+ rushed toward the hostel of the Jolly Miller. When arrived there, the
+ cause of the hubbub was apparent to all';
set Words['3musk003']['book'] = 'The Three Musketeers';
set Words['3musk003']['para'] = 'You ought, I say, then, to husband the means you have, however
large the sum may be; but you ought also to endeavor to perfect yourself in
- the exercises becoming a gentleman. I will write a letter today to the
- Director of the Royal Academy, and tomorrow he will admit you without
- any expense to yourself. Do not refuse this little service. Our
- best-born and richest gentlemen sometimes solicit it without being able
- to obtain it. You will learn horsemanship, swordsmanship in all its
- branches, and dancing. You will make some desirable acquaintances; and
- from time to time you can call upon me, just to tell me how you are
- getting on, and to say whether I can be of further service to you.';
+ the exercises becoming a gentleman. I will write a letter today to the
+ Director of the Royal Academy, and tomorrow he will admit you without
+ any expense to yourself. Do not refuse this little service. Our
+ best-born and richest gentlemen sometimes solicit it without being able
+ to obtain it. You will learn horsemanship, swordsmanship in all its
+ branches, and dancing. You will make some desirable acquaintances; and
+ from time to time you can call upon me, just to tell me how you are
+ getting on, and to say whether I can be of further service to you.';
set Words['thelostworld001']['book'] = 'The Lost World';
set Words['thelostworld001']['para'] = 'She sat with that proud, delicate profile of hers outlined
against the red curtain. How beautiful she was! And yet how aloof! We had been
- friends, quite good friends; but never could I get beyond the same
- comradeship which I might have established with one of my
- fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
- and perfectly unsexual. My instincts are all against a woman being too
- frank and at her ease with me. It is no compliment to a man. Where
- the real sex feeling begins, timidity and distrust are its companions,
- heritage from old wicked days when love and violence went often hand in
- hand. The bent head, the averted eye, the faltering voice, the wincing
- figure--these, and not the unshrinking gaze and frank reply, are the
- true signals of passion. Even in my short life I had learned as much
- as that--or had inherited it in that race memory which we call instinct.';
+ friends, quite good friends; but never could I get beyond the same
+ comradeship which I might have established with one of my
+ fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
+ and perfectly unsexual. My instincts are all against a woman being too
+ frank and at her ease with me. It is no compliment to a man. Where
+ the real sex feeling begins, timidity and distrust are its companions,
+ heritage from old wicked days when love and violence went often hand in
+ hand. The bent head, the averted eye, the faltering voice, the wincing
+ figure--these, and not the unshrinking gaze and frank reply, are the
+ true signals of passion. Even in my short life I had learned as much
+ as that--or had inherited it in that race memory which we call instinct.';
set Words['thelostworld002']['book'] = 'The Lost World';
set Words['thelostworld002']['para'] = 'I always liked McArdle, the crabbed, old, round-backed,
red-headed news editor, and I rather hoped that he liked me. Of course, Beaumont was
- the real boss; but he lived in the rarefied atmosphere of some Olympian
- height from which he could distinguish nothing smaller than an
- international crisis or a split in the Cabinet. Sometimes we saw him
- passing in lonely majesty to his inner sanctum, with his eyes staring
- vaguely and his mind hovering over the Balkans or the Persian Gulf. He
- was above and beyond us. But McArdle was his first lieutenant, and it
- was he that we knew. The old man nodded as I entered the room, and he
- pushed his spectacles far up on his bald forehead.';
+ the real boss; but he lived in the rarefied atmosphere of some Olympian
+ height from which he could distinguish nothing smaller than an
+ international crisis or a split in the Cabinet. Sometimes we saw him
+ passing in lonely majesty to his inner sanctum, with his eyes staring
+ vaguely and his mind hovering over the Balkans or the Persian Gulf. He
+ was above and beyond us. But McArdle was his first lieutenant, and it
+ was he that we knew. The old man nodded as I entered the room, and he
+ pushed his spectacles far up on his bald forehead.';
*/
diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
index b3eb611dd2..fdb976dfc6 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
@@ -29,8 +29,9 @@ object ExceptionHandlingTest {
val sc = new SparkContext(args(0), "ExceptionHandlingTest",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
- if (math.random > 0.75)
+ if (math.random > 0.75) {
throw new Exception("Testing exception handling")
+ }
}
System.exit(0)
diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
index 39752fdd0e..36534e5935 100644
--- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
@@ -24,7 +24,8 @@ import java.util.Random
object GroupByTest {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.err.println(
+ "Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
}
@@ -35,7 +36,7 @@ object GroupByTest {
val sc = new SparkContext(args(0), "GroupBy Test",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
-
+
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
@@ -48,7 +49,7 @@ object GroupByTest {
}.cache
// Enforce that everything has been calculated and in cache
pairs1.count
-
+
println(pairs1.groupByKey(numReducers).count)
System.exit(0)
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
index 9ab5f5a486..737c444139 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
@@ -28,7 +28,7 @@ object LocalFileLR {
def parsePoint(line: String): DataPoint = {
val nums = line.split(' ').map(_.toDouble)
- DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+ DataPoint(new Vector(nums.slice(1, D + 1)), nums(0))
}
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
index a730464ea1..3895675b3b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
@@ -80,7 +80,11 @@ object LocalKMeans {
var mappings = closest.groupBy[Int] (x => x._1)
- var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
+ var pointStats = mappings.map { pair =>
+ pair._2.reduceLeft [(Int, (Vector, Int))] {
+ case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1 + y2))
+ }
+ }
var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
index 544c782469..fcaba6bb4f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
@@ -50,10 +50,10 @@ object LogQuery {
val dataSet =
if (args.length == 2) sc.textFile(args(1))
else sc.parallelize(exampleApacheLogs)
-
+ // scalastyle:off
val apacheLogRegex =
"""^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r
-
+ // scalastyle:on
/** Tracks the total query count and number of aggregate bytes for a particular group. */
class Stats(val count: Int, val numBytes: Int) extends Serializable {
def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
index 31c6d108f3..966478fe4a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
@@ -24,7 +24,8 @@ import java.util.Random
object SkewedGroupByTest {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.err.println(
+ "Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 39819064ed..cf1fc3e808 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -56,7 +56,8 @@ object SparkHdfsLR {
val sc = new SparkContext(args(0), "SparkHdfsLR",
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
InputFormatInfo.computePreferredLocations(
- Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))))
+ Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
+ ))
val lines = sc.textFile(inputPath)
val points = lines.map(parsePoint _).cache()
val ITERATIONS = args(2).toInt
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
index cfafbaf23e..b97cb8fb02 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
@@ -43,16 +43,18 @@ class PageRankUtils extends Serializable {
val terminate = superstep >= 10
val outbox: Array[PRMessage] =
- if (!terminate)
- self.outEdges.map(targetId =>
- new PRMessage(targetId, newValue / self.outEdges.size))
- else
+ if (!terminate) {
+ self.outEdges.map(targetId => new PRMessage(targetId, newValue / self.outEdges.size))
+ } else {
Array[PRMessage]()
+ }
(new PRVertex(newValue, self.outEdges, !terminate), outbox)
}
- def computeNoCombiner(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int): (PRVertex, Array[PRMessage]) =
+ def computeNoCombiner(numVertices: Long, epsilon: Double)
+ (self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int)
+ : (PRVertex, Array[PRMessage]) =
computeWithCombiner(numVertices, epsilon)(self, messages match {
case Some(msgs) => Some(msgs.map(_.value).sum)
case None => None
@@ -81,7 +83,8 @@ class PRVertex() extends Vertex with Serializable {
}
override def toString(): String = {
- "PRVertex(value=%f, outEdges.length=%d, active=%s)".format(value, outEdges.length, active.toString)
+ "PRVertex(value=%f, outEdges.length=%d, active=%s)"
+ .format(value, outEdges.length, active.toString)
}
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
index 4c0de46964..25bd55ca88 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
@@ -33,7 +33,8 @@ import scala.xml.{XML,NodeSeq}
object WikipediaPageRank {
def main(args: Array[String]) {
if (args.length < 5) {
- System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
+ System.err.println(
+ "Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
System.exit(-1)
}
val sparkConf = new SparkConf()
@@ -61,24 +62,26 @@ object WikipediaPageRank {
val fields = line.split("\t")
val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
val links =
- if (body == "\\N")
+ if (body == "\\N") {
NodeSeq.Empty
- else
+ } else {
try {
XML.loadString(body) \\ "link" \ "target"
} catch {
case e: org.xml.sax.SAXParseException =>
- System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
+ System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body)
NodeSeq.Empty
}
+ }
val outEdges = links.map(link => new String(link.text)).toArray
val id = new String(title)
(id, new PRVertex(1.0 / numVertices, outEdges))
})
- if (usePartitioner)
+ if (usePartitioner) {
vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache
- else
+ } else {
vertices = vertices.cache
+ }
println("Done parsing input file.")
// Do the computation
@@ -92,7 +95,7 @@ object WikipediaPageRank {
utils.computeWithCombiner(numVertices, epsilon))
// Print the result
- System.err.println("Articles with PageRank >= "+threshold+":")
+ System.err.println("Articles with PageRank >= " + threshold + ":")
val top =
(result
.filter { case (id, vertex) => vertex.value >= threshold }
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
index 2cf273a702..27afa6b642 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -31,7 +31,8 @@ import org.apache.spark.rdd.RDD
object WikipediaPageRankStandalone {
def main(args: Array[String]) {
if (args.length < 5) {
- System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> <numIterations> <host> <usePartitioner>")
+ System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> " +
+ "<numIterations> <host> <usePartitioner>")
System.exit(-1)
}
val sparkConf = new SparkConf()
@@ -51,10 +52,11 @@ object WikipediaPageRankStandalone {
val input = sc.textFile(inputFile)
val partitioner = new HashPartitioner(sc.defaultParallelism)
val links =
- if (usePartitioner)
+ if (usePartitioner) {
input.map(parseArticle _).partitionBy(partitioner).cache()
- else
+ } else {
input.map(parseArticle _).cache()
+ }
val n = links.count()
val defaultRank = 1.0 / n
val a = 0.15
@@ -62,10 +64,11 @@ object WikipediaPageRankStandalone {
// Do the computation
val startTime = System.currentTimeMillis
val ranks =
- pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, sc.defaultParallelism)
+ pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner,
+ sc.defaultParallelism)
// Print the result
- System.err.println("Articles with PageRank >= "+threshold+":")
+ System.err.println("Articles with PageRank >= " + threshold + ":")
val top =
(ranks
.filter { case (id, rank) => rank >= threshold }
@@ -75,7 +78,7 @@ object WikipediaPageRankStandalone {
val time = (System.currentTimeMillis - startTime) / 1000.0
println("Completed %d iterations in %f seconds: %f seconds per iteration"
- .format(numIterations, time, time / numIterations))
+ .format(numIterations, time, time / numIterations))
System.exit(0)
}
@@ -84,16 +87,17 @@ object WikipediaPageRankStandalone {
val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
val id = new String(title)
val links =
- if (body == "\\N")
+ if (body == "\\N") {
NodeSeq.Empty
- else
+ } else {
try {
XML.loadString(body) \\ "link" \ "target"
} catch {
case e: org.xml.sax.SAXParseException =>
- System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
+ System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body)
NodeSeq.Empty
}
+ }
val outEdges = links.map(link => new String(link.text)).toArray
(id, outEdges)
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index bc0d1633f1..3d7b390724 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -132,7 +132,7 @@ object FeederActor {
* To run this example locally, you may run Feeder Actor as
* `$ ./bin/run-example org.apache.spark.streaming.examples.FeederActor 127.0.1.1 9999`
* and then run the example
- * `$ ./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
+ * `./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index d9cb7326bb..6bccd1d884 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -26,6 +26,7 @@ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.util.RawTextHelper._
+// scalastyle:off
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
@@ -38,6 +39,7 @@ import org.apache.spark.streaming.util.RawTextHelper._
* Example:
* `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
*/
+// scalastyle:on
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 5) {
@@ -56,7 +58,7 @@ object KafkaWordCount {
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1l))
+ val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index eb61caf8c8..0a68ac84c2 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -64,6 +64,7 @@ object MQTTPublisher {
}
}
+// scalastyle:off
/**
* A sample wordcount with MqttStream stream
*
@@ -71,7 +72,8 @@ object MQTTPublisher {
* Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
* In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto`
* Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/
- * Example Java code for Mqtt Publisher and Subscriber can be found here https://bitbucket.org/mkjinesh/mqttclient
+ * Example Java code for Mqtt Publisher and Subscriber can be found here
+ * https://bitbucket.org/mkjinesh/mqttclient
* Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>
* In local mode, <master> should be 'local[n]' with n > 1
* <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
@@ -81,6 +83,7 @@ object MQTTPublisher {
* and run the example as
* `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
*/
+// scalastyle:on
object MQTTWordCount {
def main(args: Array[String]) {
@@ -93,7 +96,7 @@ object MQTTWordCount {
val Seq(master, brokerUrl, topic) = args.toSeq
- val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
+ val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
StreamingContext.jarOfClass(this.getClass))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index 5656d487a5..d4c4d86b34 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -21,6 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
+// scalastyle:off
/**
* Counts words in text encoded with UTF8 received from the network every second.
*
@@ -33,6 +34,7 @@ import org.apache.spark.storage.StorageLevel
* and then run the example
* `$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
*/
+// scalastyle:on
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index aa82bf3c6b..56d10a964b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -30,8 +30,8 @@ import java.nio.charset.Charset
*
* Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
- * <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ * data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
* <output-file> file to which the word counts will be appended
*
* In local mode, <master> should be 'local[n]' with n > 1
@@ -54,11 +54,13 @@ import java.nio.charset.Charset
*
* To run this example in a local standalone cluster with automatic driver recovery,
*
- * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> <path-to-examples-jar> \
+ * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
+ * <path-to-examples-jar> \
* org.apache.spark.streaming.examples.RecoverableNetworkWordCount <cluster-url> \
* localhost 9999 ~/checkpoint ~/out`
*
- * <path-to-examples-jar> would typically be <spark-dir>/examples/target/scala-XX/spark-examples....jar
+ * <path-to-examples-jar> would typically be
+ * <spark-dir>/examples/target/scala-XX/spark-examples....jar
*
* Refer to the online documentation for more details.
*/
@@ -96,11 +98,12 @@ object RecoverableNetworkWordCount {
System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
- |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
- | <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- | <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
- | <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
- | <output-file> file to which the word counts will be appended
+ |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory>
+ | <output-file> <master> is the Spark master URL. In local mode, <master> should be
+ | 'local[n]' with n > 1. <hostname> and <port> describe the TCP server that Spark
+ | Streaming would connect to receive data. <checkpoint-directory> directory to
+ | HDFS-compatible file system which checkpoint data <output-file> file to which the
+ | word counts will be appended
|
|In local mode, <master> should be 'local[n]' with n > 1
|Both <checkpoint-directory> and <output-file> must be absolute paths
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index bbd44948b6..8a654f8fad 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -24,7 +24,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
-
+// scalastyle:off
/**
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
@@ -34,15 +34,19 @@ import org.apache.spark.streaming.twitter._
* the same approach could be used for computing popular topics for example.
* <p>
* <p>
- * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure
- * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
- * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
- * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
- * percentage of the overall total count.
+ * <a href=
+ * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data
+ * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency
+ * of any given element, etc), that uses space sub-linear in the number of elements in the
+ * stream. Once elements are added to the CMS, the estimated count of an element can be computed,
+ * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total
+ * count.
* <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
+ * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the
+ * reduce operation.
*/
+// scalastyle:on
object TwitterAlgebirdCMS {
def main(args: Array[String]) {
if (args.length < 1) {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index c6215fd0d7..45771d7050 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -29,8 +29,7 @@ import org.apache.spark.streaming.twitter._
* a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
* <p>
* <p>
- * This <a href= "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data
- * -mining/">
+ * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
* blog post</a> and this
* <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
* blog post</a>
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 85b4ce5e81..35be7ffa1e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -53,11 +53,13 @@ object SimpleZeroMQPublisher {
}
}
+// scalastyle:off
/**
* A sample wordcount with ZeroMQStream stream
*
* To work with zeroMQ, some native libraries have to be installed.
- * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
+ * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
+ * (http://www.zeromq.org/intro:get-the-software)
*
* Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>
* In local mode, <master> should be 'local[n]' with n > 1
@@ -68,6 +70,7 @@ object SimpleZeroMQPublisher {
* and run the example as
* `$ ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
*/
+// scalastyle:on
object ZeroMQWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 799a9dd1ee..f2296a865e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -67,8 +67,7 @@ class EdgeRDD[@specialized ED: ClassTag](
}
private[graphx] def mapEdgePartitions[ED2: ClassTag](
- f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
- : EdgeRDD[ED2] = {
+ f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala
index 0392a6051f..a88a5e1453 100644
--- a/project/project/SparkPluginBuild.scala
+++ b/project/project/SparkPluginBuild.scala
@@ -20,5 +20,7 @@ import sbt._
object SparkPluginDef extends Build {
lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener)
/* This is not published in a Maven repository, so we get it from GitHub directly */
- lazy val junitXmlListener = uri("git://github.com/chenkelmann/junit_xml_listener.git#3f8029fbfda54dc7a68b1afd2f885935e1090016")
+ lazy val junitXmlListener = uri(
+ "https://github.com/chenkelmann/junit_xml_listener.git#3f8029fbfda54dc7a68b1afd2f885935e1090016"
+ )
}
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 7527232676..ee968c53b3 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -1,4 +1,21 @@
-<!-- If you wish to turn off checking for a section of code, you can put a comment in the source before and after the section, with the following syntax: -->
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!-- If you wish to turn off checking for a section of code, you can put a comment in the source
+ before and after the section, with the following syntax: -->
<!-- // scalastyle:off -->
<!-- ... -->
<!-- // naughty stuff -->
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 4d778dc4d4..baf80fe2a9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -128,7 +128,8 @@ class CheckpointWriter(
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
- logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'")
+ logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile
+ + "'")
// Write checkpoint to temp file
fs.delete(tempFile, true) // just in case it exists
@@ -167,11 +168,13 @@ class CheckpointWriter(
return
} catch {
case ioe: IOException =>
- logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe)
+ logWarning("Error in attempt " + attempts + " of writing checkpoint to "
+ + checkpointFile, ioe)
reset()
}
}
- logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'")
+ logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
+ + checkpointFile + "'")
}
}
@@ -220,7 +223,8 @@ class CheckpointWriter(
private[streaming]
object CheckpointReader extends Logging {
- def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = {
+ def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] =
+ {
val checkpointPath = new Path(checkpointDir)
def fs = checkpointPath.getFileSystem(hadoopConf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 0683113bd0..fde46705d8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -153,7 +153,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def validate() {
this.synchronized {
assert(batchDuration != null, "Batch duration has not been set")
- //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
+ //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
+ // " is very low")
assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
index 04c994c136..16479a0127 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
@@ -33,7 +33,8 @@ class Interval(val beginTime: Time, val endTime: Time) {
def < (that: Interval): Boolean = {
if (this.duration != that.duration) {
- throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]")
+ throw new Exception("Comparing two intervals with different durations [" + this + ", "
+ + that + "]")
}
this.endTime < that.endTime
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 64fe204cdf..7aa7ead29b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -78,8 +78,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD has a single element generated by counting the number
- * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
- * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
+ * of elements in a window over this DStream. windowDuration and slideDuration are as defined in
+ * the window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
dstream.countByWindow(windowDuration, slideDuration)
@@ -87,8 +87,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD contains the count of distinct elements in
- * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
- * Spark's default number of partitions.
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs
+ * with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -103,8 +103,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD contains the count of distinct elements in
- * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
- * partitions.
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs
+ * with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 62cfa0a229..4dcd0e4c51 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -151,8 +151,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
- * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
- * partitioning of each RDD.
+ * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+ * thepartitioning of each RDD.
*/
def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
dstream.reduceByKey(func, partitioner)
@@ -160,8 +160,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
- * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
- * information.
+ * combineByKey for RDDs. Please refer to combineByKey in
+ * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -175,8 +175,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
- * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
- * information.
+ * combineByKey for RDDs. Please refer to combineByKey in
+ * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -241,7 +241,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -315,7 +316,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
@@ -403,7 +405,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
* @param filterFunc function to filter expired key-value pairs;
* only pairs that satisfy the function are retained
* set this to null if you do not want to filter
@@ -479,7 +482,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
* @tparam S State type
*/
def updateStateByKey[S](
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 921b56143a..2268160dcc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -65,8 +65,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param appName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
* @param sparkHome The SPARK_HOME directory on the slave nodes
- * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local
- * file system or an HDFS, HTTP, HTTPS, or FTP URL.
+ * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the
+ * local file system or an HDFS, HTTP, HTTPS, or FTP URL.
*/
def this(
master: String,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 906a16e508..903e3f3c9b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -114,7 +114,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
override def toString() = {
- "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]"
+ "[\n" + currentCheckpointFiles.size + " checkpoint files \n" +
+ currentCheckpointFiles.mkString("\n") + "\n]"
}
@throws(classOf[IOException])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 27303390d9..226844c228 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -53,7 +53,8 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
} else {
// Time is valid, but check it it is more than lastValidTime
if (lastValidTime != null && time < lastValidTime) {
- logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime)
+ logWarning("isTimeValid called with " + time + " where as last valid time is " +
+ lastValidTime)
}
lastValidTime = time
true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index ce153f065d..0dc6704603 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -80,7 +80,8 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte
private[streaming] sealed trait NetworkReceiverMessage
private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
-private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) extends NetworkReceiverMessage
+private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
+ extends NetworkReceiverMessage
private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
/**
@@ -202,8 +203,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
}
/**
- * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts them into
- * appropriately named blocks at regular intervals. This class starts two threads,
+ * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts
+ * them into appropriately named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fb9df2f48e..f3c58aede0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -38,11 +38,12 @@ import org.apache.spark.streaming.{Time, Duration}
* these functions.
*/
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
-extends Serializable {
+ extends Serializable {
private[streaming] def ssc = self.ssc
- private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
+ private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism)
+ = {
new HashPartitioner(numPartitions)
}
@@ -63,8 +64,8 @@ extends Serializable {
}
/**
- * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]]
- * is used to control the partitioning of each RDD.
+ * Return a new DStream by applying `groupByKey` on each RDD. The supplied
+ * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
@@ -94,8 +95,8 @@ extends Serializable {
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
- * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
- * partitioning of each RDD.
+ * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
*/
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
@@ -113,7 +114,8 @@ extends Serializable {
mergeCombiner: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true): DStream[(K, C)] = {
- new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
+ new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner,
+ mapSideCombine)
}
/**
@@ -138,7 +140,8 @@ extends Serializable {
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
- def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] =
+ {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
}
@@ -170,7 +173,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -239,7 +243,8 @@ extends Serializable {
slideDuration: Duration,
numPartitions: Int
): DStream[(K, V)] = {
- reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+ reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration,
+ defaultPartitioner(numPartitions))
}
/**
@@ -315,7 +320,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
* @param filterFunc Optional function to filter expired key-value pairs;
* only pairs that satisfy the function are retained
*/
@@ -373,7 +379,8 @@ extends Serializable {
* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
@@ -395,7 +402,8 @@ extends Serializable {
* this function may generate a different a tuple with a different key
* than the input key. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @tparam S State type
*/
@@ -438,7 +446,8 @@ extends Serializable {
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
- def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int)
+ : DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(numPartitions))
}
@@ -566,7 +575,8 @@ extends Serializable {
prefix: String,
suffix: String
)(implicit fm: ClassTag[F]) {
- saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+ saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+ fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -580,7 +590,7 @@ extends Serializable {
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf
- ) {
+ ) {
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
@@ -596,7 +606,8 @@ extends Serializable {
prefix: String,
suffix: String
)(implicit fm: ClassTag[F]) {
- saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+ saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+ fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 7a6b1ea35e..ca0a8ae478 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -87,7 +87,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
val invReduceF = invReduceFunc
val currentTime = validTime
- val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime)
+ val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
+ currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
@@ -125,7 +126,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values
- val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
+ val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
+ partitioner)
//val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
val numOldValues = oldRDDs.size
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index 4ecba03ab5..57429a1532 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -48,7 +48,8 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
val rdds = new ArrayBuffer[RDD[T]]()
parents.map(_.getOrCompute(validTime)).foreach(_ match {
case Some(rdd) => rdds += rdd
- case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
+ case None => throw new Exception("Could not generate RDD from a parent for unifying at time "
+ + validTime)
})
if (rdds.size > 0) {
Some(new UnionRDD(ssc.sc, rdds))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 6301772468..24289b714f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -31,13 +31,15 @@ class WindowedDStream[T: ClassTag](
_slideDuration: Duration)
extends DStream[T](parent.ssc) {
- if (!_windowDuration.isMultipleOf(parent.slideDuration))
+ if (!_windowDuration.isMultipleOf(parent.slideDuration)) {
throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " +
"must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ }
- if (!_slideDuration.isMultipleOf(parent.slideDuration))
+ if (!_slideDuration.isMultipleOf(parent.slideDuration)) {
throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " +
"must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ }
// Persist parent level by default, as those RDDs are going to be obviously reused.
parent.persist(StorageLevel.MEMORY_ONLY_SER)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index b5f11d3440..c7306248b1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -46,8 +46,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime)))
- private lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
- new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
+ private lazy val checkpointWriter =
+ if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
+ new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
null
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 0d9733fa69..e4fa163f2e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -34,9 +34,12 @@ import org.apache.spark.streaming.{Time, StreamingContext}
import org.apache.spark.util.AkkaUtils
private[streaming] sealed trait NetworkInputTrackerMessage
-private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage
-private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
+private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
+ extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any)
+ extends NetworkInputTrackerMessage
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
+ extends NetworkInputTrackerMessage
/**
* This class manages the execution of the receivers of NetworkInputDStreams. Instance of
@@ -66,7 +69,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
}
if (!networkInputStreams.isEmpty) {
- actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
+ actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor),
+ "NetworkInputTracker")
receiverExecutor.start()
logInfo("NetworkInputTracker started")
}
@@ -102,7 +106,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
- logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
+ logInfo("Registered receiver for network stream " + streamId + " from "
+ + sender.path.address)
sender ! true
}
case AddBlocks(streamId, blockIds, metadata) => {
@@ -153,12 +158,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
})
// Right now, we only honor preferences if all receivers have them
- val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
+ val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined)
+ .reduce(_ && _)
// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
- val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
+ val receiversWithPreferences =
+ receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
}
else {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 3063cf10a3..18811fc2b0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -23,7 +23,8 @@ import java.util.concurrent.LinkedBlockingQueue
/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
private[spark] class StreamingListenerBus() extends Logging {
- private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener]
+ private val listeners = new ArrayBuffer[StreamingListener]()
+ with SynchronizedBuffer[StreamingListener]
/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 6a45bc2f8a..2bb616cfb8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -407,10 +407,11 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
}
}
}
- if (!done)
+ if (!done) {
logError("Could not generate file " + hadoopFile)
- else
+ } else {
logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
+ }
Thread.sleep(interval)
localFile.delete()
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
index 179fd75939..2b8cdb72b8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
@@ -71,8 +71,12 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
}
} else {
// Calculate how much time we should sleep to bring ourselves to the desired rate.
- // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
- val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
+ // Based on throttler in Kafka
+ // scalastyle:off
+ // (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
+ // scalastyle:on
+ val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs),
+ SECONDS)
if (sleepTime > 0) Thread.sleep(sleepTime)
waitToWrite(numBytes)
}