diff options
Diffstat (limited to 'core/src/main/scala/org')
18 files changed, 40 insertions, 41 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index a1af63fa4a..5ceac28fe7 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -81,7 +81,7 @@ class SparkEnv private[spark] ( // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut // down, but let's call it anyway in case it gets fixed in a later release // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it. - //actorSystem.awaitTermination() + // actorSystem.awaitTermination() } private[spark] diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 3cd7121376..2595c15104 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -167,7 +167,7 @@ extends Logging { private var initialized = false private var conf: SparkConf = null def initialize(_isDriver: Boolean, conf: SparkConf) { - TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests + TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests synchronized { if (!initialized) { initialized = true diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index a73b459c3c..9a7a113c95 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -66,9 +66,9 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I // TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors! // This is unfortunate, but for now we just comment it out. workerActorSystems.foreach(_.shutdown()) - //workerActorSystems.foreach(_.awaitTermination()) + // workerActorSystems.foreach(_.awaitTermination()) masterActorSystems.foreach(_.shutdown()) - //masterActorSystems.foreach(_.awaitTermination()) + // masterActorSystems.foreach(_.awaitTermination()) masterActorSystems.clear() workerActorSystems.clear() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala index a730fe1f59..4433a2ec29 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -30,7 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]] */ private[spark] trait LeaderElectionAgent extends Actor { - //TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring. + // TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring. val masterActor: ActorRef } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 13e2e29242..aecb069e42 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -275,7 +275,6 @@ private[spark] class Executor( // have left some weird state around depending on when the exception was thrown, but on // the other hand, maybe we could detect that when future tasks fail and exit then. logError("Exception in task ID " + taskId, t) - //System.exit(1) } } finally { // TODO: Unregister shuffle memory only for ResultTask diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 6883a54494..3e3e18c353 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -42,7 +42,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi } def initialize() { - //Add default properties in case there's no properties file + // Add default properties in case there's no properties file setDefaultProperties(properties) // If spark.metrics.conf is not set, try to get file in class path diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index 8fd9c2b87d..16bd00fd18 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -48,7 +48,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, channel.socket.setTcpNoDelay(true) channel.socket.setReuseAddress(true) channel.socket.setKeepAlive(true) - /*channel.socket.setReceiveBufferSize(32768) */ + /* channel.socket.setReceiveBufferSize(32768) */ @volatile private var closed = false var onCloseCallback: Connection => Unit = null @@ -206,12 +206,12 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, private class Outbox { val messages = new Queue[Message]() - val defaultChunkSize = 65536 //32768 //16384 + val defaultChunkSize = 65536 var nextMessageToBeUsed = 0 def addMessage(message: Message) { messages.synchronized{ - /*messages += message*/ + /* messages += message*/ messages.enqueue(message) logDebug("Added [" + message + "] to outbox for sending to " + "[" + getRemoteConnectionManagerId() + "]") @@ -221,8 +221,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, def getChunk(): Option[MessageChunk] = { messages.synchronized { while (!messages.isEmpty) { - /*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */ - /*val message = messages(nextMessageToBeUsed)*/ + /* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */ + /* val message = messages(nextMessageToBeUsed)*/ val message = messages.dequeue val chunk = message.getChunkForSending(defaultChunkSize) if (chunk.isDefined) { @@ -262,7 +262,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, val currentBuffers = new ArrayBuffer[ByteBuffer]() - /*channel.socket.setSendBufferSize(256 * 1024)*/ + /* channel.socket.setSendBufferSize(256 * 1024)*/ override def getRemoteAddress() = address @@ -355,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } case None => { // changeConnectionKeyInterest(0) - /*key.interestOps(0)*/ + /* key.interestOps(0)*/ return false } } @@ -540,10 +540,10 @@ private[spark] class ReceivingConnection( return false } - /*logDebug("Read " + bytesRead + " bytes for the buffer")*/ + /* logDebug("Read " + bytesRead + " bytes for the buffer")*/ if (currentChunk.buffer.remaining == 0) { - /*println("Filled buffer at " + System.currentTimeMillis)*/ + /* println("Filled buffer at " + System.currentTimeMillis)*/ val bufferMessage = inbox.getMessageForChunk(currentChunk).get if (bufferMessage.isCompletelyReceived) { bufferMessage.flip diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index a75130cba2..2682f9d0ed 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -505,7 +505,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } } handleMessageExecutor.execute(runnable) - /*handleMessage(connection, message)*/ + /* handleMessage(connection, message)*/ } private def handleClientAuthentication( @@ -733,7 +733,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, logTrace("Sending Security [" + message + "] to [" + connManagerId + "]") val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection()) - //send security message until going connection has been authenticated + // send security message until going connection has been authenticated connection.send(message) wakeupSelector() @@ -859,14 +859,14 @@ private[spark] object ConnectionManager { None }) - /*testSequentialSending(manager)*/ - /*System.gc()*/ + /* testSequentialSending(manager)*/ + /* System.gc()*/ - /*testParallelSending(manager)*/ - /*System.gc()*/ + /* testParallelSending(manager)*/ + /* System.gc()*/ - /*testParallelDecreasingSending(manager)*/ - /*System.gc()*/ + /* testParallelDecreasingSending(manager)*/ + /* System.gc()*/ testContinuousSending(manager) System.gc() @@ -948,7 +948,7 @@ private[spark] object ConnectionManager { val ms = finishTime - startTime val tput = mb * 1000.0 / ms println("--------------------------") - /*println("Started at " + startTime + ", finished at " + finishTime) */ + /* println("Started at " + startTime + ", finished at " + finishTime) */ println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)") println("--------------------------") println() diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala index 35f64134b0..e5745d7daa 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala @@ -47,8 +47,8 @@ private[spark] object ConnectionManagerTest extends Logging{ val slaves = slavesFile.mkString.split("\n") slavesFile.close() - /*println("Slaves")*/ - /*slaves.foreach(println)*/ + /* println("Slaves")*/ + /* slaves.foreach(println)*/ val tasknum = if (args.length > 2) args(2).toInt else slaves.length val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024 val count = if (args.length > 4) args(4).toInt else 3 diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala index 3c09a713c6..17fd931c9f 100644 --- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala +++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala @@ -27,7 +27,7 @@ private[spark] object ReceiverTest { println("Started connection manager with id = " + manager.id) manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { - /*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/ + /* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/ val buffer = ByteBuffer.wrap("response".getBytes) Some(Message.createBufferMessage(buffer, msg.id)) }) diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala index aac2c24a46..905eddfbb9 100644 --- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala +++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala @@ -50,7 +50,7 @@ private[spark] object SenderTest { (0 until count).foreach(i => { val dataMessage = Message.createBufferMessage(buffer.duplicate) val startTime = System.currentTimeMillis - /*println("Started timer at " + startTime)*/ + /* println("Started timer at " + startTime)*/ val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) .map { response => val buffer = response.asInstanceOf[BufferMessage].buffers(0) diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala index f9082ffb91..4164e81d3a 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala @@ -32,7 +32,7 @@ private[spark] class FileHeader ( buf.writeInt(fileLen) buf.writeInt(blockId.name.length) blockId.name.foreach((x: Char) => buf.writeByte(x)) - //padding the rest of header + // padding the rest of header if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) { buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes) } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 77c558ac46..4fce47e1ee 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -753,7 +753,7 @@ class DAGScheduler( val properties = if (stageIdToActiveJob.contains(jobId)) { stageIdToActiveJob(stage.jobId).properties } else { - //this stage will be assigned to "default" pool + // this stage will be assigned to "default" pool null } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 990e01a3e7..7bfc30b420 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -172,7 +172,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A properties += ((key, value)) } } - //TODO (prashant) send conf instead of properties + // TODO (prashant) send conf instead of properties driverActor = actorSystem.actorOf( Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index bcfc39146a..2fbbda5b76 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -284,7 +284,7 @@ object BlockFetcherIterator { } } catch { case x: InterruptedException => logInfo("Copier Interrupted") - //case _ => throw new SparkException("Exception Throw in Shuffle Copier") + // case _ => throw new SparkException("Exception Throw in Shuffle Copier") } } } diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index a8d20ee332..cdbbc65292 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -112,7 +112,7 @@ private[spark] object ClosureCleaner extends Logging { accessedFields(cls) = Set[String]() for (cls <- func.getClass :: innerClasses) getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0) - //logInfo("accessedFields: " + accessedFields) + // logInfo("accessedFields: " + accessedFields) val inInterpreter = { try { @@ -139,13 +139,13 @@ private[spark] object ClosureCleaner extends Logging { val field = cls.getDeclaredField(fieldName) field.setAccessible(true) val value = field.get(obj) - //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value); + // logInfo("1: Setting " + fieldName + " on " + cls + " to " + value); field.set(outer, value) } } if (outer != null) { - //logInfo("2: Setting $outer on " + func.getClass + " to " + outer); + // logInfo("2: Setting $outer on " + func.getClass + " to " + outer); val field = func.getClass.getDeclaredField("$outer") field.setAccessible(true) field.set(func, outer) @@ -153,7 +153,7 @@ private[spark] object ClosureCleaner extends Logging { } private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = { - //logInfo("Creating a " + cls + " with outer = " + outer) + // logInfo("Creating a " + cls + " with outer = " + outer) if (!inInterpreter) { // This is a bona fide closure class, whose constructor has no effects // other than to set its fields, so use its constructor @@ -170,7 +170,7 @@ private[spark] object ClosureCleaner extends Logging { val newCtor = rf.newConstructorForSerialization(cls, parentCtor) val obj = newCtor.newInstance().asInstanceOf[AnyRef] if (outer != null) { - //logInfo("3: Setting $outer on " + cls + " to " + outer); + // logInfo("3: Setting $outer on " + cls + " to " + outer); val field = cls.getDeclaredField("$outer") field.setAccessible(true) field.set(obj, outer) diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala index c539d2f708..4188a869c1 100644 --- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala @@ -49,7 +49,7 @@ private[akka] class IndestructibleActorSystemImpl( if (isFatalError(cause) && !settings.JvmExitOnFatalError) { log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " + "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name) - //shutdown() //TODO make it configurable + // shutdown() //TODO make it configurable } else { fallbackHandler.uncaughtException(thread, cause) } diff --git a/core/src/main/scala/org/apache/spark/util/MutablePair.scala b/core/src/main/scala/org/apache/spark/util/MutablePair.scala index 2c1a6f8fd0..a898824cff 100644 --- a/core/src/main/scala/org/apache/spark/util/MutablePair.scala +++ b/core/src/main/scala/org/apache/spark/util/MutablePair.scala @@ -24,8 +24,8 @@ package org.apache.spark.util * @param _1 Element 1 of this MutablePair * @param _2 Element 2 of this MutablePair */ -case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1, - @specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2] +case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T1, + @specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T2] (var _1: T1, var _2: T2) extends Product2[T1, T2] { |