aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/Connection.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/ReceiverTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/SenderTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/MutablePair.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalALS.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkALS.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala2
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala2
-rw-r--r--project/SparkBuild.scala6
-rw-r--r--project/plugins.sbt1
-rw-r--r--project/project/SparkPluginBuild.scala44
-rw-r--r--project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala56
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala4
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala2
-rw-r--r--scalastyle-config.xml1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala3
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala1
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala4
-rw-r--r--yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala2
55 files changed, 180 insertions, 88 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index a1af63fa4a..5ceac28fe7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -81,7 +81,7 @@ class SparkEnv private[spark] (
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
- //actorSystem.awaitTermination()
+ // actorSystem.awaitTermination()
}
private[spark]
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 3cd7121376..2595c15104 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -167,7 +167,7 @@ extends Logging {
private var initialized = false
private var conf: SparkConf = null
def initialize(_isDriver: Boolean, conf: SparkConf) {
- TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
+ TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
initialized = true
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index a73b459c3c..9a7a113c95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -66,9 +66,9 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
// TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
// This is unfortunate, but for now we just comment it out.
workerActorSystems.foreach(_.shutdown())
- //workerActorSystems.foreach(_.awaitTermination())
+ // workerActorSystems.foreach(_.awaitTermination())
masterActorSystems.foreach(_.shutdown())
- //masterActorSystems.foreach(_.awaitTermination())
+ // masterActorSystems.foreach(_.awaitTermination())
masterActorSystems.clear()
workerActorSystems.clear()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
index a730fe1f59..4433a2ec29 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
*/
private[spark] trait LeaderElectionAgent extends Actor {
- //TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
+ // TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
val masterActor: ActorRef
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 13e2e29242..aecb069e42 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -275,7 +275,6 @@ private[spark] class Executor(
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID " + taskId, t)
- //System.exit(1)
}
} finally {
// TODO: Unregister shuffle memory only for ResultTask
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 6883a54494..3e3e18c353 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -42,7 +42,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
}
def initialize() {
- //Add default properties in case there's no properties file
+ // Add default properties in case there's no properties file
setDefaultProperties(properties)
// If spark.metrics.conf is not set, try to get file in class path
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index 8fd9c2b87d..16bd00fd18 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -48,7 +48,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
channel.socket.setTcpNoDelay(true)
channel.socket.setReuseAddress(true)
channel.socket.setKeepAlive(true)
- /*channel.socket.setReceiveBufferSize(32768) */
+ /* channel.socket.setReceiveBufferSize(32768) */
@volatile private var closed = false
var onCloseCallback: Connection => Unit = null
@@ -206,12 +206,12 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
private class Outbox {
val messages = new Queue[Message]()
- val defaultChunkSize = 65536 //32768 //16384
+ val defaultChunkSize = 65536
var nextMessageToBeUsed = 0
def addMessage(message: Message) {
messages.synchronized{
- /*messages += message*/
+ /* messages += message*/
messages.enqueue(message)
logDebug("Added [" + message + "] to outbox for sending to " +
"[" + getRemoteConnectionManagerId() + "]")
@@ -221,8 +221,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
def getChunk(): Option[MessageChunk] = {
messages.synchronized {
while (!messages.isEmpty) {
- /*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
- /*val message = messages(nextMessageToBeUsed)*/
+ /* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
+ /* val message = messages(nextMessageToBeUsed)*/
val message = messages.dequeue
val chunk = message.getChunkForSending(defaultChunkSize)
if (chunk.isDefined) {
@@ -262,7 +262,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
val currentBuffers = new ArrayBuffer[ByteBuffer]()
- /*channel.socket.setSendBufferSize(256 * 1024)*/
+ /* channel.socket.setSendBufferSize(256 * 1024)*/
override def getRemoteAddress() = address
@@ -355,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
case None => {
// changeConnectionKeyInterest(0)
- /*key.interestOps(0)*/
+ /* key.interestOps(0)*/
return false
}
}
@@ -540,10 +540,10 @@ private[spark] class ReceivingConnection(
return false
}
- /*logDebug("Read " + bytesRead + " bytes for the buffer")*/
+ /* logDebug("Read " + bytesRead + " bytes for the buffer")*/
if (currentChunk.buffer.remaining == 0) {
- /*println("Filled buffer at " + System.currentTimeMillis)*/
+ /* println("Filled buffer at " + System.currentTimeMillis)*/
val bufferMessage = inbox.getMessageForChunk(currentChunk).get
if (bufferMessage.isCompletelyReceived) {
bufferMessage.flip
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index a75130cba2..2682f9d0ed 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -505,7 +505,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}
}
handleMessageExecutor.execute(runnable)
- /*handleMessage(connection, message)*/
+ /* handleMessage(connection, message)*/
}
private def handleClientAuthentication(
@@ -733,7 +733,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection())
- //send security message until going connection has been authenticated
+ // send security message until going connection has been authenticated
connection.send(message)
wakeupSelector()
@@ -859,14 +859,14 @@ private[spark] object ConnectionManager {
None
})
- /*testSequentialSending(manager)*/
- /*System.gc()*/
+ /* testSequentialSending(manager)*/
+ /* System.gc()*/
- /*testParallelSending(manager)*/
- /*System.gc()*/
+ /* testParallelSending(manager)*/
+ /* System.gc()*/
- /*testParallelDecreasingSending(manager)*/
- /*System.gc()*/
+ /* testParallelDecreasingSending(manager)*/
+ /* System.gc()*/
testContinuousSending(manager)
System.gc()
@@ -948,7 +948,7 @@ private[spark] object ConnectionManager {
val ms = finishTime - startTime
val tput = mb * 1000.0 / ms
println("--------------------------")
- /*println("Started at " + startTime + ", finished at " + finishTime) */
+ /* println("Started at " + startTime + ", finished at " + finishTime) */
println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
println("--------------------------")
println()
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
index 35f64134b0..e5745d7daa 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
@@ -47,8 +47,8 @@ private[spark] object ConnectionManagerTest extends Logging{
val slaves = slavesFile.mkString.split("\n")
slavesFile.close()
- /*println("Slaves")*/
- /*slaves.foreach(println)*/
+ /* println("Slaves")*/
+ /* slaves.foreach(println)*/
val tasknum = if (args.length > 2) args(2).toInt else slaves.length
val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
val count = if (args.length > 4) args(4).toInt else 3
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 3c09a713c6..17fd931c9f 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -27,7 +27,7 @@ private[spark] object ReceiverTest {
println("Started connection manager with id = " + manager.id)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- /*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
+ /* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
val buffer = ByteBuffer.wrap("response".getBytes)
Some(Message.createBufferMessage(buffer, msg.id))
})
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index aac2c24a46..905eddfbb9 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -50,7 +50,7 @@ private[spark] object SenderTest {
(0 until count).foreach(i => {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
val startTime = System.currentTimeMillis
- /*println("Started timer at " + startTime)*/
+ /* println("Started timer at " + startTime)*/
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
.map { response =>
val buffer = response.asInstanceOf[BufferMessage].buffers(0)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
index f9082ffb91..4164e81d3a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
@@ -32,7 +32,7 @@ private[spark] class FileHeader (
buf.writeInt(fileLen)
buf.writeInt(blockId.name.length)
blockId.name.foreach((x: Char) => buf.writeByte(x))
- //padding the rest of header
+ // padding the rest of header
if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 77c558ac46..4fce47e1ee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -753,7 +753,7 @@ class DAGScheduler(
val properties = if (stageIdToActiveJob.contains(jobId)) {
stageIdToActiveJob(stage.jobId).properties
} else {
- //this stage will be assigned to "default" pool
+ // this stage will be assigned to "default" pool
null
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 990e01a3e7..7bfc30b420 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -172,7 +172,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
properties += ((key, value))
}
}
- //TODO (prashant) send conf instead of properties
+ // TODO (prashant) send conf instead of properties
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index bcfc39146a..2fbbda5b76 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -284,7 +284,7 @@ object BlockFetcherIterator {
}
} catch {
case x: InterruptedException => logInfo("Copier Interrupted")
- //case _ => throw new SparkException("Exception Throw in Shuffle Copier")
+ // case _ => throw new SparkException("Exception Throw in Shuffle Copier")
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index a8d20ee332..cdbbc65292 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -112,7 +112,7 @@ private[spark] object ClosureCleaner extends Logging {
accessedFields(cls) = Set[String]()
for (cls <- func.getClass :: innerClasses)
getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
- //logInfo("accessedFields: " + accessedFields)
+ // logInfo("accessedFields: " + accessedFields)
val inInterpreter = {
try {
@@ -139,13 +139,13 @@ private[spark] object ClosureCleaner extends Logging {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
val value = field.get(obj)
- //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
+ // logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
field.set(outer, value)
}
}
if (outer != null) {
- //logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
+ // logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
val field = func.getClass.getDeclaredField("$outer")
field.setAccessible(true)
field.set(func, outer)
@@ -153,7 +153,7 @@ private[spark] object ClosureCleaner extends Logging {
}
private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
- //logInfo("Creating a " + cls + " with outer = " + outer)
+ // logInfo("Creating a " + cls + " with outer = " + outer)
if (!inInterpreter) {
// This is a bona fide closure class, whose constructor has no effects
// other than to set its fields, so use its constructor
@@ -170,7 +170,7 @@ private[spark] object ClosureCleaner extends Logging {
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
val obj = newCtor.newInstance().asInstanceOf[AnyRef]
if (outer != null) {
- //logInfo("3: Setting $outer on " + cls + " to " + outer);
+ // logInfo("3: Setting $outer on " + cls + " to " + outer);
val field = cls.getDeclaredField("$outer")
field.setAccessible(true)
field.set(obj, outer)
diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
index c539d2f708..4188a869c1 100644
--- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
@@ -49,7 +49,7 @@ private[akka] class IndestructibleActorSystemImpl(
if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
"ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
- //shutdown() //TODO make it configurable
+ // shutdown() //TODO make it configurable
} else {
fallbackHandler.uncaughtException(thread, cause)
}
diff --git a/core/src/main/scala/org/apache/spark/util/MutablePair.scala b/core/src/main/scala/org/apache/spark/util/MutablePair.scala
index 2c1a6f8fd0..a898824cff 100644
--- a/core/src/main/scala/org/apache/spark/util/MutablePair.scala
+++ b/core/src/main/scala/org/apache/spark/util/MutablePair.scala
@@ -24,8 +24,8 @@ package org.apache.spark.util
* @param _1 Element 1 of this MutablePair
* @param _2 Element 2 of this MutablePair
*/
-case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
- @specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
+case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T1,
+ @specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T2]
(var _1: T1, var _2: T2)
extends Product2[T1, T2]
{
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 6c73ea6949..4e7c34e6d1 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -66,7 +66,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
test ("add value to collection accumulators") {
val maxI = 1000
- for (nThreads <- List(1, 10)) { //test single & multi-threaded
+ for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val d = sc.parallelize(1 to maxI)
@@ -83,7 +83,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
test ("value not readable in tasks") {
val maxI = 1000
- for (nThreads <- List(1, 10)) { //test single & multi-threaded
+ for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val d = sc.parallelize(1 to maxI)
@@ -124,7 +124,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
test ("localValue readable in tasks") {
val maxI = 1000
- for (nThreads <- List(1, 10)) { //test single & multi-threaded
+ for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index d2e29f20f0..d2555b7c05 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -432,7 +432,6 @@ object CheckpointSuite {
// This is a custom cogroup function that does not use mapValues like
// the PairRDDFunctions.cogroup()
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
- //println("First = " + first + ", second = " + second)
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 996db70809..7c30626a0c 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -146,7 +146,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
// We can't catch all usages of arrays, since they might occur inside other collections:
- //assert(fails { arrPairs.distinct() })
+ // assert(fails { arrPairs.distinct() })
assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index a25ce35736..7c843772bc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -111,7 +111,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
- //just to make sure some of the tasks take a noticeable amount of time
+ // just to make sure some of the tasks take a noticeable amount of time
val w = {i:Int =>
if (i == 0)
Thread.sleep(100)
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index eb8f591560..616214fb5e 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -39,7 +39,7 @@ class UtilsSuite extends FunSuite {
}
test("copyStream") {
- //input array initialization
+ // input array initialization
val bytes = Array.ofDim[Byte](9000)
Random.nextBytes(bytes)
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
index c8ecbb8e41..0095cb8425 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
@@ -53,7 +53,6 @@ object LocalALS {
for (i <- 0 until M; j <- 0 until U) {
r.set(i, j, blas.ddot(ms(i), us(j)))
}
- //println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
sqrt(sumSqs / (M * U))
diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
index 73b0e216ca..1fdb324b89 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
@@ -61,7 +61,7 @@ object SimpleSkewedGroupByTest {
println("RESULT: " + pairs1.groupByKey(numReducers).count)
// Print how many keys each reducer got (for debugging)
- //println("RESULT: " + pairs1.groupByKey(numReducers)
+ // println("RESULT: " + pairs1.groupByKey(numReducers)
// .map{case (k,v) => (k, v.size)}
// .collectAsMap)
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
index ce4b3c8451..f59ab7e7cc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
@@ -54,7 +54,6 @@ object SparkALS {
for (i <- 0 until M; j <- 0 until U) {
r.set(i, j, blas.ddot(ms(i), us(j)))
}
- //println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
sqrt(sumSqs / (M * U))
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index cf1fc3e808..e698b9bf37 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -34,8 +34,6 @@ object SparkHdfsLR {
case class DataPoint(x: Vector, y: Double)
def parsePoint(line: String): DataPoint = {
- //val nums = line.split(' ').map(_.toDouble)
- //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
val tok = new java.util.StringTokenizer(line, " ")
var y = tok.nextToken.toDouble
var x = new Array[Double](D)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 62d3a52615..a22e64ca3c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -168,7 +168,7 @@ object ActorWordCount {
Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
- //compute wordcount
+ // compute wordcount
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
ssc.start()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 35be7ffa1e..35f8f885f8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -88,7 +88,7 @@ object ZeroMQWordCount {
def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
- //For this stream, a zeroMQ publisher should be running.
+ // For this stream, a zeroMQ publisher should be running.
val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 6acba25f44..a538c38dc4 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -44,7 +44,7 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
case m: ZMQMessage =>
logDebug("Received message for:" + m.frame(0))
- //We ignore first frame for processing as it is the topic
+ // We ignore first frame for processing as it is the topic
val bytes = m.frames.tail
pushBlock(bytesToObjects(bytes))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index fea43c3b2b..dfc6a80158 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -27,12 +27,12 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
/**
* The source vertex attribute
*/
- var srcAttr: VD = _ //nullValue[VD]
+ var srcAttr: VD = _ // nullValue[VD]
/**
* The destination vertex attribute
*/
- var dstAttr: VD = _ //nullValue[VD]
+ var dstAttr: VD = _ // nullValue[VD]
/**
* Set the edge properties of this triplet.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 43ac11d895..c2b510a31e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -190,9 +190,9 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
}
- //////////////////////////////////////////////////////////////////////////////////////////////////
+ // ///////////////////////////////////////////////////////////////////////////////////////////////
// Lower level transformation methods
- //////////////////////////////////////////////////////////////////////////////////////////////////
+ // ///////////////////////////////////////////////////////////////////////////////////////////////
override def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index fe6fe76def..bebe3740bc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -45,7 +45,7 @@ class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
* @param data value to send
*/
private[graphx]
-class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T](
+class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T](
@transient var partition: PartitionID,
var data: T)
extends Product2[PartitionID, T] with Serializable {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index 34a145e018..2f2c524df6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -298,7 +298,6 @@ abstract class ShuffleSerializationStream(s: OutputStream) extends Serialization
s.write(v.toInt)
}
- //def writeDouble(v: Double): Unit = writeUnsignedVarLong(java.lang.Double.doubleToLongBits(v))
def writeDouble(v: Double): Unit = writeLong(java.lang.Double.doubleToLongBits(v))
override def flush(): Unit = s.flush()
@@ -391,7 +390,6 @@ abstract class ShuffleDeserializationStream(s: InputStream) extends Deserializat
(s.read() & 0xFF)
}
- //def readDouble(): Double = java.lang.Double.longBitsToDouble(readUnsignedVarLong())
def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong())
override def close(): Unit = s.close()
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
index 014a7335f8..087b1156f6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
@@ -65,7 +65,7 @@ private[graphx] object BytecodeUtils {
val finder = new MethodInvocationFinder(c.getName, m)
getClassReader(c).accept(finder, 0)
for (classMethod <- finder.methodsInvoked) {
- //println(classMethod)
+ // println(classMethod)
if (classMethod._1 == targetClass && classMethod._2 == targetMethod) {
return true
} else if (!seen.contains(classMethod)) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index f841846c0e..a3c8de3f90 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -123,7 +123,7 @@ object GraphGenerators {
* the dimensions of the adjacency matrix
*/
private def addEdge(numVertices: Int): Edge[Int] = {
- //val (src, dst) = chooseCell(numVertices/2.0, numVertices/2.0, numVertices/2.0)
+ // val (src, dst) = chooseCell(numVertices/2.0, numVertices/2.0, numVertices/2.0)
val v = math.round(numVertices.toFloat/2.0).toInt
val (src, dst) = chooseCell(v, v, v)
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 9e269e6551..2549bc9710 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -17,7 +17,7 @@
import sbt._
import sbt.Classpaths.publishTask
-import Keys._
+import sbt.Keys._
import sbtassembly.Plugin._
import AssemblyKeys._
import scala.util.Properties
@@ -27,7 +27,7 @@ import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
import scala.collection.JavaConversions._
// For Sonatype publishing
-//import com.jsuereth.pgp.sbtplugin.PgpKeys._
+// import com.jsuereth.pgp.sbtplugin.PgpKeys._
object SparkBuild extends Build {
val SPARK_VERSION = "1.0.0-SNAPSHOT"
@@ -200,7 +200,7 @@ object SparkBuild extends Build {
publishMavenStyle := true,
- //useGpg in Global := true,
+ // useGpg in Global := true,
pomExtra := (
<parent>
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 4ff6f67af4..5aa8a1ec24 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -22,3 +22,4 @@ addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.4.0")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")
addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.0")
+
diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala
new file mode 100644
index 0000000000..43361aa2b4
--- /dev/null
+++ b/project/project/SparkPluginBuild.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import sbt._
+import sbt.Keys._
+
+/**
+ * This plugin project is there to define new scala style rules for spark. This is
+ * a plugin project so that this gets compiled first and is put on the classpath and
+ * becomes available for scalastyle sbt plugin.
+ */
+object SparkPluginDef extends Build {
+ lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle)
+ lazy val sparkStyle = Project("spark-style", file("spark-style"), settings = styleSettings)
+ val sparkVersion = "1.0.0-SNAPSHOT"
+ // There is actually no need to publish this artifact.
+ def styleSettings = Defaults.defaultSettings ++ Seq (
+ name := "spark-style",
+ organization := "org.apache.spark",
+ version := sparkVersion,
+ scalaVersion := "2.10.3",
+ scalacOptions := Seq("-unchecked", "-deprecation"),
+ libraryDependencies ++= Dependencies.scalaStyle,
+ sbtPlugin := true
+ )
+
+ object Dependencies {
+ val scalaStyle = Seq("org.scalastyle" %% "scalastyle" % "0.4.0")
+ }
+}
diff --git a/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala b/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala
new file mode 100644
index 0000000000..2f3c1a1828
--- /dev/null
+++ b/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.scalastyle
+
+import java.util.regex.Pattern
+
+import org.scalastyle.{PositionError, ScalariformChecker, ScalastyleError}
+import scalariform.lexer.{MultiLineComment, ScalaDocComment, SingleLineComment, Token}
+import scalariform.parser.CompilationUnit
+
+class SparkSpaceAfterCommentStartChecker extends ScalariformChecker {
+ val errorKey: String = "insert.a.single.space.after.comment.start"
+
+ private def multiLineCommentRegex(comment: Token) =
+ Pattern.compile( """/\*\S+.*""", Pattern.DOTALL).matcher(comment.text.trim).matches()
+
+ private def scalaDocPatternRegex(comment: Token) =
+ Pattern.compile( """/\*\*\S+.*""", Pattern.DOTALL).matcher(comment.text.trim).matches()
+
+ private def singleLineCommentRegex(comment: Token): Boolean =
+ comment.text.trim.matches( """//\S+.*""") && !comment.text.trim.matches( """///+""")
+
+ override def verify(ast: CompilationUnit): List[ScalastyleError] = {
+ ast.tokens
+ .filter(hasComment)
+ .map {
+ _.associatedWhitespaceAndComments.comments.map {
+ case x: SingleLineComment if singleLineCommentRegex(x.token) => Some(x.token.offset)
+ case x: MultiLineComment if multiLineCommentRegex(x.token) => Some(x.token.offset)
+ case x: ScalaDocComment if scalaDocPatternRegex(x.token) => Some(x.token.offset)
+ case _ => None
+ }.flatten
+ }.flatten.map(PositionError(_))
+ }
+
+
+ private def hasComment(x: Token) =
+ x.associatedWhitespaceAndComments != null && !x.associatedWhitespaceAndComments.comments.isEmpty
+
+}
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index ee972887fe..bf73800388 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -124,8 +124,8 @@ extends ClassVisitor(ASM4, cv) {
mv.visitVarInsn(ALOAD, 0) // load this
mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V")
mv.visitVarInsn(ALOAD, 0) // load this
- //val classType = className.replace('.', '/')
- //mv.visitFieldInsn(PUTSTATIC, classType, "MODULE$", "L" + classType + ";")
+ // val classType = className.replace('.', '/')
+ // mv.visitFieldInsn(PUTSTATIC, classType, "MODULE$", "L" + classType + ";")
mv.visitInsn(RETURN)
mv.visitMaxs(-1, -1) // stack size and local vars will be auto-computed
mv.visitEnd()
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 90a96ad383..fa2f1a88c4 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -834,7 +834,7 @@ import org.apache.spark.util.Utils
}
((pos, msg)) :: loop(filtered)
}
- //PRASHANT: This leads to a NoSuchMethodError for _.warnings. Yet to figure out its purpose.
+ // PRASHANT: This leads to a NoSuchMethodError for _.warnings. Yet to figure out its purpose.
// val warnings = loop(run.allConditionalWarnings flatMap (_.warnings))
// if (warnings.nonEmpty)
// mostRecentWarnings = warnings
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index ee968c53b3..76ba1ecca3 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -140,4 +140,5 @@
<!-- <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> -->
<check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
<check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
+ <check level="error" class="org.apache.spark.scalastyle.SparkSpaceAfterCommentStartChecker" enabled="true"></check>
</scalastyle>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index f4b61381f9..b70ec897e4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -662,7 +662,7 @@ object HiveQl {
// worth the number of hacks that will be required to implement it. Namely, we need to add
// some sort of mapped star expansion that would expand all child output row to be similarly
// named output expressions where some aggregate expression has been applied (i.e. First).
- ??? /// Aggregate(groups, Star(None, First(_)) :: Nil, joinedResult)
+ ??? // Aggregate(groups, Star(None, First(_)) :: Nil, joinedResult)
case Token(allJoinTokens(joinToken),
relation1 ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index ca53113446..0da5eb754c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -94,7 +94,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
- //logDebug("Table input: %s".format(tablePath))
+ // logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index fde46705d8..d3339063cc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -153,7 +153,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def validate() {
this.synchronized {
assert(batchDuration != null, "Batch duration has not been set")
- //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
+ // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
// " is very low")
assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 0dc6704603..72ad0bae75 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -128,7 +128,6 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
} catch {
case ie: InterruptedException =>
logInfo("Receiving thread interrupted")
- //println("Receiving thread interrupted")
case e: Exception =>
stopOnError(e)
}
@@ -142,7 +141,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
def stop() {
receivingThread.interrupt()
onStop()
- //TODO: terminate the actor
+ // TODO: terminate the actor
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index ca0a8ae478..b334d68bf9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -78,7 +78,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
override def checkpoint(interval: Duration): DStream[(K, V)] = {
super.checkpoint(interval)
- //reducedStream.checkpoint(interval)
+ // reducedStream.checkpoint(interval)
this
}
@@ -128,7 +128,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
// Cogroup the reduced RDDs and merge the reduced values
val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
partitioner)
- //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+ // val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 9d8889b655..5f7d3ba26c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -64,7 +64,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
- //logDebug("Generating state RDD for time " + validTime)
Some(stateRDD)
}
case None => { // If parent RDD does not exist
@@ -97,11 +96,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
val groupedRDD = parentRDD.groupByKey(partitioner)
val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
- //logDebug("Generating state RDD for time " + validTime + " (first)")
+ // logDebug("Generating state RDD for time " + validTime + " (first)")
Some(sessionRDD)
}
case None => { // If parent RDD does not exist, then nothing to do!
- //logDebug("Not generating state RDD (no previous state, no parent)")
+ // logDebug("Not generating state RDD (no previous state, no parent)")
None
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index e4fa163f2e..cad68e248a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -126,7 +126,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
receiverInfo -= streamId
logError("De-registered receiver for network stream " + streamId
+ " with message " + msg)
- //TODO: Do something about the corresponding NetworkInputDStream
+ // TODO: Do something about the corresponding NetworkInputDStream
}
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 0784e562ac..25739956cb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -252,7 +252,7 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
// Create files and advance manual clock to process them
- //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 74e73ebb34..7df206241b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -154,7 +154,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
- StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope
+ // Had to pass the local value of port to prevent from closing over entire scope
+ StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 57e5761cba..6568003bf1 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -139,7 +139,6 @@ trait ClientBase extends Logging {
} else if (srcHost != null && dstHost == null) {
return false
}
- //check for ports
if (srcUri.getPort() != dstUri.getPort()) {
false
} else {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 68cda0f1c9..9b7f1fca96 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -157,7 +157,7 @@ class ClientDistributedCacheManager() extends Logging {
def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
val fs = FileSystem.get(uri, conf)
val current = new Path(uri.getPath())
- //the leaf level file should be readable by others
+ // the leaf level file should be readable by others
if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
return false
}
@@ -177,7 +177,7 @@ class ClientDistributedCacheManager() extends Logging {
statCache: Map[URI, FileStatus]): Boolean = {
var current = path
while (current != null) {
- //the subdirs in the path should have execute permissions for others
+ // the subdirs in the path should have execute permissions for others
if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
return false
}
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index 458df4fa3c..80b57d1355 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -99,7 +99,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
- //add another one and verify both there and order correct
+ // add another one and verify both there and order correct
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
null, new Path("/tmp/testing2"))
val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")