From 3b2e22e2c343ce4615f31c3d94f9af568ea0ea42 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 9 Jan 2014 13:27:40 -0800 Subject: Revert changes to examples/.../PageRankUtils.scala Reverts to 04d83fc37f9eef89c20331c85291a0a169f75e6d:examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala. --- .../main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'examples') diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala index 8dd7fb40e8..cfafbaf23e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala @@ -31,16 +31,16 @@ import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} import com.esotericsoftware.kryo._ class PageRankUtils extends Serializable { - def computeWithCombiner(numVertices: Long, epsilon: Double, terminateSteps: Int = 10)( + def computeWithCombiner(numVertices: Long, epsilon: Double)( self: PRVertex, messageSum: Option[Double], superstep: Int ): (PRVertex, Array[PRMessage]) = { val newValue = messageSum match { case Some(msgSum) if msgSum != 0 => - 0.15 + 0.85 * msgSum + 0.15 / numVertices + 0.85 * msgSum case _ => self.value } - val terminate = superstep >= terminateSteps + val terminate = superstep >= 10 val outbox: Array[PRMessage] = if (!terminate) -- cgit v1.2.3 From 93a65e5fde64ffed3dbd2a050c1007e077ecd004 Mon Sep 17 00:00:00 2001 From: Henry Saputra Date: Sun, 12 Jan 2014 10:30:04 -0800 Subject: Remove simple redundant return statement for Scala methods/functions: -) Only change simple return statements at the end of method -) Ignore the complex if-else check -) Ignore the ones inside synchronized --- .../main/scala/org/apache/spark/Accumulators.scala | 2 +- .../main/scala/org/apache/spark/CacheManager.scala | 4 +-- .../scala/org/apache/spark/HttpFileServer.scala | 6 ++--- core/src/main/scala/org/apache/spark/Logging.scala | 2 +- .../scala/org/apache/spark/MapOutputTracker.scala | 4 +-- .../main/scala/org/apache/spark/Partitioner.scala | 4 +-- .../main/scala/org/apache/spark/SparkContext.scala | 11 +++++--- .../scala/org/apache/spark/SparkHadoopWriter.scala | 13 +++++----- .../org/apache/spark/api/python/PythonRDD.scala | 2 +- .../apache/spark/broadcast/TorrentBroadcast.scala | 6 ++--- .../org/apache/spark/network/BufferMessage.scala | 2 +- .../org/apache/spark/network/Connection.scala | 6 ++--- .../scala/org/apache/spark/network/Message.scala | 6 ++--- .../apache/spark/network/netty/ShuffleSender.scala | 2 +- .../scala/org/apache/spark/rdd/CoalescedRDD.scala | 4 +-- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 8 +++--- .../main/scala/org/apache/spark/rdd/PipedRDD.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 +-- .../apache/spark/scheduler/InputFormatInfo.scala | 8 +++--- .../scala/org/apache/spark/scheduler/Pool.scala | 8 +++--- .../spark/scheduler/SchedulingAlgorithm.scala | 11 ++++---- .../apache/spark/scheduler/SparkListenerBus.scala | 2 +- .../scala/org/apache/spark/scheduler/Stage.scala | 2 +- .../org/apache/spark/scheduler/TaskResult.scala | 2 +- .../apache/spark/scheduler/TaskResultGetter.scala | 2 +- .../apache/spark/scheduler/TaskSetManager.scala | 14 +++++------ .../mesos/CoarseMesosSchedulerBackend.scala | 2 +- .../cluster/mesos/MesosSchedulerBackend.scala | 6 ++--- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../apache/spark/storage/BlockManagerWorker.scala | 14 +++++------ .../org/apache/spark/storage/BlockMessage.scala | 2 +- .../apache/spark/storage/BlockMessageArray.scala | 2 +- .../org/apache/spark/storage/MemoryStore.scala | 2 +- .../org/apache/spark/storage/StorageLevel.scala | 2 +- .../org/apache/spark/util/AppendOnlyMap.scala | 2 +- .../org/apache/spark/util/ClosureCleaner.scala | 10 ++++---- .../org/apache/spark/util/SizeEstimator.scala | 10 ++++---- .../main/scala/org/apache/spark/util/Utils.scala | 19 +++++++------- .../main/scala/org/apache/spark/util/Vector.scala | 12 ++++----- .../spark/scheduler/ClusterSchedulerSuite.scala | 9 ++++--- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../apache/spark/scheduler/JobLoggerSuite.scala | 2 +- .../apache/spark/util/ClosureCleanerSuite.scala | 14 +++++------ .../scala/org/apache/spark/examples/LocalALS.scala | 8 +++--- .../org/apache/spark/examples/LocalFileLR.scala | 2 +- .../org/apache/spark/examples/LocalKMeans.scala | 2 +- .../scala/org/apache/spark/examples/SparkALS.scala | 6 ++--- .../org/apache/spark/examples/SparkHdfsLR.scala | 2 +- .../org/apache/spark/examples/SparkKMeans.scala | 12 ++++----- .../examples/clickstream/PageViewGenerator.scala | 2 +- .../spark/mllib/api/python/PythonMLLibAPI.scala | 29 +++++++++++----------- .../org/apache/spark/streaming/Checkpoint.scala | 2 +- .../spark/streaming/dstream/FileInputDStream.scala | 2 +- .../spark/streaming/dstream/StateDStream.scala | 8 +++--- .../streaming/scheduler/StreamingListenerBus.scala | 2 +- .../org/apache/spark/streaming/util/Clock.scala | 4 +-- .../spark/streaming/util/RawTextHelper.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 9 ++++--- .../apache/spark/deploy/yarn/WorkerLauncher.scala | 8 +++--- .../apache/spark/deploy/yarn/WorkerRunnable.scala | 7 +++--- .../yarn/ClientDistributedCacheManager.scala | 10 ++++---- .../yarn/ClientDistributedCacheManagerSuite.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 5 ++-- 63 files changed, 187 insertions(+), 186 deletions(-) (limited to 'examples') diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 5f73d234aa..e89ac28b8e 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -218,7 +218,7 @@ private object Accumulators { def newId: Long = synchronized { lastId += 1 - return lastId + lastId } def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized { diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 519ecde50a..8e5dd8a850 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -38,7 +38,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { blockManager.get(key) match { case Some(values) => // Partition is already materialized, so just return its values - return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) + new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => // Mark the split as loading (unless someone else marks it first) @@ -74,7 +74,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val elements = new ArrayBuffer[Any] elements ++= computedValues blockManager.put(key, elements, storageLevel, tellMaster = true) - return elements.iterator.asInstanceOf[Iterator[T]] + elements.iterator.asInstanceOf[Iterator[T]] } finally { loading.synchronized { loading.remove(key) diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index ad1ee20045..a885898ad4 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -47,17 +47,17 @@ private[spark] class HttpFileServer extends Logging { def addFile(file: File) : String = { addFileToDir(file, fileDir) - return serverUri + "/files/" + file.getName + serverUri + "/files/" + file.getName } def addJar(file: File) : String = { addFileToDir(file, jarDir) - return serverUri + "/jars/" + file.getName + serverUri + "/jars/" + file.getName } def addFileToDir(file: File, dir: File) : String = { Files.copy(file, new File(dir, file.getName)) - return dir + "/" + file.getName + dir + "/" + file.getName } } diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index 4a34989e50..9063cae87e 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -41,7 +41,7 @@ trait Logging { } log_ = LoggerFactory.getLogger(className) } - return log_ + log_ } // Log methods that take only a String diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 77b8ca1cce..57bdf22b9c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -139,7 +139,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses) } } - else{ + else { throw new FetchFailedException(null, shuffleId, -1, reduceId, new Exception("Missing all output locations for shuffle " + shuffleId)) } @@ -185,7 +185,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { private[spark] class MapOutputTrackerMaster(conf: SparkConf) extends MapOutputTracker(conf) { - // Cache a serialized version of the output statuses for each shuffle to send them out faster + // Cache a serialized version of the output statuses for each shuffle to send them out faster return private var cacheEpoch = epoch private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]] diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 9b043f06dd..fc0a749882 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -53,9 +53,9 @@ object Partitioner { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { - return new HashPartitioner(rdd.context.defaultParallelism) + new HashPartitioner(rdd.context.defaultParallelism) } else { - return new HashPartitioner(bySize.head.partitions.size) + new HashPartitioner(bySize.head.partitions.size) } } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f91392b351..3d82bfc019 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -756,8 +756,11 @@ class SparkContext( private[spark] def getCallSite(): String = { val callSite = getLocalProperty("externalCallSite") - if (callSite == null) return Utils.formatSparkCallSite - callSite + if (callSite == null) { + Utils.formatSparkCallSite + } else { + callSite + } } /** @@ -907,7 +910,7 @@ class SparkContext( */ private[spark] def clean[F <: AnyRef](f: F): F = { ClosureCleaner.clean(f) - return f + f } /** @@ -919,7 +922,7 @@ class SparkContext( val path = new Path(dir, UUID.randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) - fs.getFileStatus(path).getPath().toString + fs.getFileStatus(path).getPath.toString } } diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 618d95015f..bba873a0b6 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -134,28 +134,28 @@ class SparkHadoopWriter(@transient jobConf: JobConf) format = conf.value.getOutputFormat() .asInstanceOf[OutputFormat[AnyRef,AnyRef]] } - return format + format } private def getOutputCommitter(): OutputCommitter = { if (committer == null) { committer = conf.value.getOutputCommitter } - return committer + committer } private def getJobContext(): JobContext = { if (jobContext == null) { jobContext = newJobContext(conf.value, jID.value) } - return jobContext + jobContext } private def getTaskContext(): TaskAttemptContext = { if (taskContext == null) { taskContext = newTaskAttemptContext(conf.value, taID.value) } - return taskContext + taskContext } private def setIDs(jobid: Int, splitid: Int, attemptid: Int) { @@ -182,7 +182,7 @@ object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) - return new JobID(jobtrackerID, id) + new JobID(jobtrackerID, id) } def createPathFromString(path: String, conf: JobConf): Path = { @@ -194,7 +194,6 @@ object SparkHadoopWriter { if (outputPath == null || fs == null) { throw new IllegalArgumentException("Incorrectly formatted output path") } - outputPath = outputPath.makeQualified(fs) - return outputPath + outputPath.makeQualified(fs) } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 40c519b5bd..8830de7273 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag]( // Return an iterator that read lines from the process's stdout val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) - return new Iterator[Array[Byte]] { + new Iterator[Array[Byte]] { def next(): Array[Byte] = { val obj = _nextObj if (hasNext) { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index fdf92eca4f..1d295c62bc 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -203,16 +203,16 @@ extends Logging { } bais.close() - var tInfo = TorrentInfo(retVal, blockNum, byteArray.length) + val tInfo = TorrentInfo(retVal, blockNum, byteArray.length) tInfo.hasBlocks = blockNum - return tInfo + tInfo } def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock], totalBytes: Int, totalBlocks: Int): T = { - var retByteArray = new Array[Byte](totalBytes) + val retByteArray = new Array[Byte](totalBytes) for (i <- 0 until totalBlocks) { System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray, i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length) diff --git a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala index f736bb3713..fb4c65909a 100644 --- a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala +++ b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala @@ -46,7 +46,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: throw new Exception("Max chunk size is " + maxChunkSize) } - if (size == 0 && gotChunkForSendingOnce == false) { + if (size == 0 && !gotChunkForSendingOnce) { val newChunk = new MessageChunk( new MessageChunkHeader(typ, id, 0, 0, ackId, senderAddress), null) gotChunkForSendingOnce = true diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index 95cb0206ac..cba8477ed5 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -330,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, // Is highly unlikely unless there was an unclean close of socket, etc registerInterest() logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending") - return true + true } catch { case e: Exception => { logWarning("Error finishing connection to " + address, e) @@ -385,7 +385,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } } // should not happen - to keep scala compiler happy - return true + true } // This is a hack to determine if remote socket was closed or not. @@ -559,7 +559,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S } } // should not happen - to keep scala compiler happy - return true + true } def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback} diff --git a/core/src/main/scala/org/apache/spark/network/Message.scala b/core/src/main/scala/org/apache/spark/network/Message.scala index f2ecc6d439..2612884bdb 100644 --- a/core/src/main/scala/org/apache/spark/network/Message.scala +++ b/core/src/main/scala/org/apache/spark/network/Message.scala @@ -61,7 +61,7 @@ private[spark] object Message { if (dataBuffers.exists(_ == null)) { throw new Exception("Attempting to create buffer message with null buffer") } - return new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId) + new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId) } def createBufferMessage(dataBuffers: Seq[ByteBuffer]): BufferMessage = @@ -69,9 +69,9 @@ private[spark] object Message { def createBufferMessage(dataBuffer: ByteBuffer, ackId: Int): BufferMessage = { if (dataBuffer == null) { - return createBufferMessage(Array(ByteBuffer.allocate(0)), ackId) + createBufferMessage(Array(ByteBuffer.allocate(0)), ackId) } else { - return createBufferMessage(Array(dataBuffer), ackId) + createBufferMessage(Array(dataBuffer), ackId) } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 546d921067..44204a8c46 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -64,7 +64,7 @@ private[spark] object ShuffleSender { val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) val file = new File(subDir, blockId.name) - return new FileSegment(file, 0, file.length()) + new FileSegment(file, 0, file.length()) } } val sender = new ShuffleSender(port, pResovler) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 98da35763b..a5394a28e0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -296,9 +296,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc val prefPartActual = prefPart.get if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows - return minPowerOfTwo // prefer balance over locality + minPowerOfTwo // prefer balance over locality else { - return prefPartActual // prefer locality over balance + prefPartActual // prefer locality over balance } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 53f77a38f5..20db7db5ed 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -99,11 +99,11 @@ class HadoopRDD[K, V]( val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. - return conf.asInstanceOf[JobConf] + conf.asInstanceOf[JobConf] } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { // getJobConf() has been called previously, so there is already a local cache of the JobConf // needed by this RDD. - return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). @@ -111,7 +111,7 @@ class HadoopRDD[K, V]( val newJobConf = new JobConf(broadcastedConf.value.value) initLocalJobConfFuncOpt.map(f => f(newJobConf)) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - return newJobConf + newJobConf } } @@ -127,7 +127,7 @@ class HadoopRDD[K, V]( newInputFormat.asInstanceOf[Configurable].setConf(conf) } HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat) - return newInputFormat + newInputFormat } override def getPartitions: Array[Partition] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 1dbbe39898..d4f396afb5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -96,7 +96,7 @@ class PipedRDD[T: ClassTag]( // Return an iterator that read lines from the process's stdout val lines = Source.fromInputStream(proc.getInputStream).getLines - return new Iterator[String] { + new Iterator[String] { def next() = lines.next() def hasNext = { if (lines.hasNext) { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f9dc12eee3..edd4f381db 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -764,7 +764,7 @@ abstract class RDD[T: ClassTag]( val entry = iter.next() m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue) } - return m1 + m1 } val myResult = mapPartitions(countPartition).reduce(mergeMaps) myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map @@ -842,7 +842,7 @@ abstract class RDD[T: ClassTag]( partsScanned += numPartsToTry } - return buf.toArray + buf.toArray } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 90eb8a747f..cc10cc0849 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split) } - return retval.toSet + retval.toSet } // This method does not expect failures, since validate has already passed ... @@ -121,18 +121,18 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem) ) - return retval.toSet + retval.toSet } private def findPreferredLocations(): Set[SplitInfo] = { logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat + ", inputFormatClazz : " + inputFormatClazz) if (mapreduceInputFormat) { - return prefLocsFromMapreduceInputFormat() + prefLocsFromMapreduceInputFormat() } else { assert(mapredInputFormat) - return prefLocsFromMapredInputFormat() + prefLocsFromMapredInputFormat() } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 1791242215..4bc13c23d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -75,12 +75,12 @@ private[spark] class Pool( return schedulableNameToSchedulable(schedulableName) } for (schedulable <- schedulableQueue) { - var sched = schedulable.getSchedulableByName(schedulableName) + val sched = schedulable.getSchedulableByName(schedulableName) if (sched != null) { return sched } } - return null + null } override def executorLost(executorId: String, host: String) { @@ -92,7 +92,7 @@ private[spark] class Pool( for (schedulable <- schedulableQueue) { shouldRevive |= schedulable.checkSpeculatableTasks() } - return shouldRevive + shouldRevive } override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { @@ -101,7 +101,7 @@ private[spark] class Pool( for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() } - return sortedTaskSetQueue + sortedTaskSetQueue } def increaseRunningTasks(taskNum: Int) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala index 3418640b8c..5e62c8468f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -37,9 +37,9 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { res = math.signum(stageId1 - stageId2) } if (res < 0) { - return true + true } else { - return false + false } } } @@ -56,7 +56,6 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble - var res:Boolean = true var compare:Int = 0 if (s1Needy && !s2Needy) { @@ -70,11 +69,11 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { } if (compare < 0) { - return true + true } else if (compare > 0) { - return false + false } else { - return s1.name < s2.name + s1.name < s2.name } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e7defd768b..e551c11f72 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -88,6 +88,6 @@ private[spark] class SparkListenerBus() extends Logging { * add overhead in the general case. */ Thread.sleep(10) } - return true + true } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 7cb3fe46e5..c60e9896de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -96,7 +96,7 @@ private[spark] class Stage( def newAttemptId(): Int = { val id = nextAttemptId nextAttemptId += 1 - return id + id } val name = callSite.getOrElse(rdd.origin) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index e80cc6b0f6..9d3e615826 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -74,6 +74,6 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long def value(): T = { val resultSer = SparkEnv.get.serializer.newInstance() - return resultSer.deserialize(valueBytes) + resultSer.deserialize(valueBytes) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index c52d6175d2..35e9544718 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -37,7 +37,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul protected val serializer = new ThreadLocal[SerializerInstance] { override def initialValue(): SerializerInstance = { - return sparkEnv.closureSerializer.newInstance() + sparkEnv.closureSerializer.newInstance() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a10e5397ad..fc0ee07089 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -228,7 +228,7 @@ private[spark] class TaskSetManager( return Some(index) } } - return None + None } /** Check whether a task is currently running an attempt on a given host */ @@ -291,7 +291,7 @@ private[spark] class TaskSetManager( } } - return None + None } /** @@ -332,7 +332,7 @@ private[spark] class TaskSetManager( } // Finally, if all else has failed, find a speculative task - return findSpeculativeTask(execId, host, locality) + findSpeculativeTask(execId, host, locality) } /** @@ -387,7 +387,7 @@ private[spark] class TaskSetManager( case _ => } } - return None + None } /** @@ -584,7 +584,7 @@ private[spark] class TaskSetManager( } override def getSchedulableByName(name: String): Schedulable = { - return null + null } override def addSchedulable(schedulable: Schedulable) {} @@ -594,7 +594,7 @@ private[spark] class TaskSetManager( override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this) sortedTaskSetQueue += this - return sortedTaskSetQueue + sortedTaskSetQueue } /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */ @@ -669,7 +669,7 @@ private[spark] class TaskSetManager( } } } - return foundTasks + foundTasks } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index e16d60c54c..c27049bdb5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -140,7 +140,7 @@ private[spark] class CoarseMesosSchedulerBackend( .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } - return command.build() + command.build() } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index b428c82a48..49781485d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -141,13 +141,13 @@ private[spark] class MesosSchedulerBackend( // Serialize the map as an array of (String, String) pairs execArgs = Utils.serialize(props.toArray) } - return execArgs + execArgs } private def setClassLoader(): ClassLoader = { val oldClassLoader = Thread.currentThread.getContextClassLoader Thread.currentThread.setContextClassLoader(classLoader) - return oldClassLoader + oldClassLoader } private def restoreClassLoader(oldClassLoader: ClassLoader) { @@ -255,7 +255,7 @@ private[spark] class MesosSchedulerBackend( .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(1).build()) .build() - return MesosTaskInfo.newBuilder() + MesosTaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) .setExecutor(createExecutorInfo(slaveId)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c56e2ca2df..a716b1d577 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -412,7 +412,7 @@ private[spark] class BlockManager( logDebug("The value of block " + blockId + " is null") } logDebug("Block " + blockId + " not found") - return None + None } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala index 21f003609b..a36abe0670 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala @@ -42,7 +42,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage) logDebug("Parsed as a block message array") val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get) - return Some(new BlockMessageArray(responseMessages).toBufferMessage) + Some(new BlockMessageArray(responseMessages).toBufferMessage) } catch { case e: Exception => logError("Exception handling buffer message", e) return None @@ -50,7 +50,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends } case otherMessage: Any => { logError("Unknown type message received: " + otherMessage) - return None + None } } } @@ -61,7 +61,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) logDebug("Received [" + pB + "]") putBlock(pB.id, pB.data, pB.level) - return None + None } case BlockMessage.TYPE_GET_BLOCK => { val gB = new GetBlock(blockMessage.getId) @@ -70,9 +70,9 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends if (buffer == null) { return None } - return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer))) + Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer))) } - case _ => return None + case _ => None } } @@ -93,7 +93,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends } logDebug("GetBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs) + " and got buffer " + buffer) - return buffer + buffer } } @@ -132,6 +132,6 @@ private[spark] object BlockManagerWorker extends Logging { } case None => logDebug("No response message received"); return null } - return null + null } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala index 80dcb5a207..fbafcf79d2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala @@ -154,7 +154,7 @@ private[spark] class BlockMessage() { println() */ val finishTime = System.currentTimeMillis - return Message.createBufferMessage(buffers) + Message.createBufferMessage(buffers) } override def toString: String = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala index a06f50a0ac..59329361f3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala @@ -96,7 +96,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM println() println() */ - return Message.createBufferMessage(buffers) + Message.createBufferMessage(buffers) } } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 05f676c6e2..27f057b9f2 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -245,7 +245,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) return false } } - return true + true } override def contains(blockId: BlockId): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index b5596dffd3..0f84810d6b 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -74,7 +74,7 @@ class StorageLevel private( if (deserialized_) { ret |= 1 } - return ret + ret } override def writeExternal(out: ObjectOutput) { diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala index 8bb4ee3bfa..edfa58b2d9 100644 --- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala @@ -67,7 +67,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi i += 1 } } - return null.asInstanceOf[V] + null.asInstanceOf[V] } /** Set the value for a key */ diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 7108595e3e..1df6b87fb0 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -61,7 +61,7 @@ private[spark] object ClosureCleaner extends Logging { return f.getType :: Nil // Stop at the first $outer that is not a closure } } - return Nil + Nil } // Get a list of the outer objects for a given closure object. @@ -74,7 +74,7 @@ private[spark] object ClosureCleaner extends Logging { return f.get(obj) :: Nil // Stop at the first $outer that is not a closure } } - return Nil + Nil } private def getInnerClasses(obj: AnyRef): List[Class[_]] = { @@ -174,7 +174,7 @@ private[spark] object ClosureCleaner extends Logging { field.setAccessible(true) field.set(obj, outer) } - return obj + obj } } } @@ -182,7 +182,7 @@ private[spark] object ClosureCleaner extends Logging { private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { - return new MethodVisitor(ASM4) { + new MethodVisitor(ASM4) { override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) { if (op == GETFIELD) { for (cl <- output.keys if cl.getName == owner.replace('/', '.')) { @@ -215,7 +215,7 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { - return new MethodVisitor(ASM4) { + new MethodVisitor(ASM4) { override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { val argTypes = Type.getArgumentTypes(desc) diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index bddb3bb735..3cf94892e9 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -108,7 +108,7 @@ private[spark] object SizeEstimator extends Logging { val bean = ManagementFactory.newPlatformMXBeanProxy(server, hotSpotMBeanName, hotSpotMBeanClass) // TODO: We could use reflection on the VMOption returned ? - return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true") + getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true") } catch { case e: Exception => { // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB @@ -141,7 +141,7 @@ private[spark] object SizeEstimator extends Logging { def dequeue(): AnyRef = { val elem = stack.last stack.trimEnd(1) - return elem + elem } } @@ -162,7 +162,7 @@ private[spark] object SizeEstimator extends Logging { while (!state.isFinished) { visitSingleObject(state.dequeue(), state) } - return state.size + state.size } private def visitSingleObject(obj: AnyRef, state: SearchState) { @@ -276,11 +276,11 @@ private[spark] object SizeEstimator extends Logging { // Create and cache a new ClassInfo val newInfo = new ClassInfo(shellSize, pointerFields) classInfos.put(cls, newInfo) - return newInfo + newInfo } private def alignSize(size: Long): Long = { val rem = size % ALIGN_SIZE - return if (rem == 0) size else (size + ALIGN_SIZE - rem) + if (rem == 0) size else (size + ALIGN_SIZE - rem) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5f1253100b..f80ed290ab 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -49,14 +49,14 @@ private[spark] object Utils extends Logging { val oos = new ObjectOutputStream(bos) oos.writeObject(o) oos.close() - return bos.toByteArray + bos.toByteArray } /** Deserialize an object using Java serialization */ def deserialize[T](bytes: Array[Byte]): T = { val bis = new ByteArrayInputStream(bytes) val ois = new ObjectInputStream(bis) - return ois.readObject.asInstanceOf[T] + ois.readObject.asInstanceOf[T] } /** Deserialize an object using Java serialization and the given ClassLoader */ @@ -66,7 +66,7 @@ private[spark] object Utils extends Logging { override def resolveClass(desc: ObjectStreamClass) = Class.forName(desc.getName, false, loader) } - return ois.readObject.asInstanceOf[T] + ois.readObject.asInstanceOf[T] } /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */ @@ -144,7 +144,7 @@ private[spark] object Utils extends Logging { i += 1 } } - return buf + buf } private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() @@ -428,7 +428,7 @@ private[spark] object Utils extends Logging { def parseHostPort(hostPort: String): (String, Int) = { { // Check cache first. - var cached = hostPortParseResults.get(hostPort) + val cached = hostPortParseResults.get(hostPort) if (cached != null) return cached } @@ -731,7 +731,7 @@ private[spark] object Utils extends Logging { } catch { case ise: IllegalStateException => return true } - return false + false } def isSpace(c: Char): Boolean = { @@ -748,7 +748,7 @@ private[spark] object Utils extends Logging { var inWord = false var inSingleQuote = false var inDoubleQuote = false - var curWord = new StringBuilder + val curWord = new StringBuilder def endWord() { buf += curWord.toString curWord.clear() @@ -794,7 +794,7 @@ private[spark] object Utils extends Logging { if (inWord || inDoubleQuote || inSingleQuote) { endWord() } - return buf + buf } /* Calculates 'x' modulo 'mod', takes to consideration sign of x, @@ -822,8 +822,7 @@ private[spark] object Utils extends Logging { /** Returns a copy of the system properties that is thread-safe to iterator over. */ def getSystemProperties(): Map[String, String] = { - return System.getProperties().clone() - .asInstanceOf[java.util.Properties].toMap[String, String] + System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String] } /** diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala index fe710c58ac..094edcde7e 100644 --- a/core/src/main/scala/org/apache/spark/util/Vector.scala +++ b/core/src/main/scala/org/apache/spark/util/Vector.scala @@ -25,7 +25,7 @@ class Vector(val elements: Array[Double]) extends Serializable { def + (other: Vector): Vector = { if (length != other.length) throw new IllegalArgumentException("Vectors of different length") - return Vector(length, i => this(i) + other(i)) + Vector(length, i => this(i) + other(i)) } def add(other: Vector) = this + other @@ -33,7 +33,7 @@ class Vector(val elements: Array[Double]) extends Serializable { def - (other: Vector): Vector = { if (length != other.length) throw new IllegalArgumentException("Vectors of different length") - return Vector(length, i => this(i) - other(i)) + Vector(length, i => this(i) - other(i)) } def subtract(other: Vector) = this - other @@ -47,7 +47,7 @@ class Vector(val elements: Array[Double]) extends Serializable { ans += this(i) * other(i) i += 1 } - return ans + ans } /** @@ -67,7 +67,7 @@ class Vector(val elements: Array[Double]) extends Serializable { ans += (this(i) + plus(i)) * other(i) i += 1 } - return ans + ans } def += (other: Vector): Vector = { @@ -102,7 +102,7 @@ class Vector(val elements: Array[Double]) extends Serializable { ans += (this(i) - other(i)) * (this(i) - other(i)) i += 1 } - return ans + ans } def dist(other: Vector): Double = math.sqrt(squaredDist(other)) @@ -117,7 +117,7 @@ object Vector { def apply(length: Int, initializer: Int => Double): Vector = { val elements: Array[Double] = Array.tabulate(length)(initializer) - return new Vector(elements) + new Vector(elements) } def zeros(length: Int) = new Vector(new Array[Double](length)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala index 7bf2020fe3..235d31709a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala @@ -64,7 +64,7 @@ class FakeTaskSetManager( } override def getSchedulableByName(name: String): Schedulable = { - return null + null } override def executorLost(executorId: String, host: String): Unit = { @@ -79,13 +79,14 @@ class FakeTaskSetManager( { if (tasksSuccessful + runningTasks < numTasks) { increaseRunningTasks(1) - return Some(new TaskDescription(0, execId, "task 0:0", 0, null)) + Some(new TaskDescription(0, execId, "task 0:0", 0, null)) + } else { + None } - return None } override def checkSpeculatableTasks(): Boolean = { - return true + true } def taskFinished() { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2aa259daf3..14f89d50b7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont locations: Seq[Seq[String]] = Nil ): MyRDD = { val maxPartition = numPartitions - 1 - return new MyRDD(sc, dependencies) { + new MyRDD(sc, dependencies) { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") override def getPartitions = (0 to maxPartition).map(i => new Partition { diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala index 5cc48ee00a..3880e68725 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala @@ -47,7 +47,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers dependencies: List[Dependency[_]] ): MyRDD = { val maxPartition = numPartitions - 1 - return new MyRDD(sc, dependencies) { + new MyRDD(sc, dependencies) { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") override def getPartitions = (0 to maxPartition).map(i => new Partition { diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 0ed366fb70..de4871d043 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -61,8 +61,8 @@ class NonSerializable {} object TestObject { def run(): Int = { var nonSer = new NonSerializable - var x = 5 - return withSpark(new SparkContext("local", "test")) { sc => + val x = 5 + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + x).reduce(_ + _) } @@ -76,7 +76,7 @@ class TestClass extends Serializable { def run(): Int = { var nonSer = new NonSerializable - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + getX).reduce(_ + _) } @@ -88,7 +88,7 @@ class TestClassWithoutDefaultConstructor(x: Int) extends Serializable { def run(): Int = { var nonSer = new NonSerializable - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + getX).reduce(_ + _) } @@ -103,7 +103,7 @@ class TestClassWithoutFieldAccess { def run(): Int = { var nonSer2 = new NonSerializable var x = 5 - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + x).reduce(_ + _) } @@ -115,7 +115,7 @@ object TestObjectWithNesting { def run(): Int = { var nonSer = new NonSerializable var answer = 0 - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) var y = 1 for (i <- 1 to 4) { @@ -134,7 +134,7 @@ class TestClassWithNesting(val y: Int) extends Serializable { def run(): Int = { var nonSer = new NonSerializable var answer = 0 - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) for (i <- 1 to 4) { var nonSer2 = new NonSerializable diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala index 83db8b9e26..c8ecbb8e41 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala @@ -43,7 +43,7 @@ object LocalALS { def generateR(): DoubleMatrix2D = { val mh = factory2D.random(M, F) val uh = factory2D.random(U, F) - return algebra.mult(mh, algebra.transpose(uh)) + algebra.mult(mh, algebra.transpose(uh)) } def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], @@ -56,7 +56,7 @@ object LocalALS { //println("R: " + r) blas.daxpy(-1, targetR, r) val sumSqs = r.aggregate(Functions.plus, Functions.square) - return sqrt(sumSqs / (M * U)) + sqrt(sumSqs / (M * U)) } def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], @@ -80,7 +80,7 @@ object LocalALS { val ch = new CholeskyDecomposition(XtX) val Xty2D = factory2D.make(Xty.toArray, F) val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) + solved2D.viewColumn(0) } def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], @@ -104,7 +104,7 @@ object LocalALS { val ch = new CholeskyDecomposition(XtX) val Xty2D = factory2D.make(Xty.toArray, F) val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) + solved2D.viewColumn(0) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala index fb130ea198..9ab5f5a486 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala @@ -28,7 +28,7 @@ object LocalFileLR { def parsePoint(line: String): DataPoint = { val nums = line.split(' ').map(_.toDouble) - return DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) + DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala index f90ea35cd4..a730464ea1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala @@ -55,7 +55,7 @@ object LocalKMeans { } } - return bestIndex + bestIndex } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala index 30c86d83e6..17bafc2218 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala @@ -44,7 +44,7 @@ object SparkALS { def generateR(): DoubleMatrix2D = { val mh = factory2D.random(M, F) val uh = factory2D.random(U, F) - return algebra.mult(mh, algebra.transpose(uh)) + algebra.mult(mh, algebra.transpose(uh)) } def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], @@ -57,7 +57,7 @@ object SparkALS { //println("R: " + r) blas.daxpy(-1, targetR, r) val sumSqs = r.aggregate(Functions.plus, Functions.square) - return sqrt(sumSqs / (M * U)) + sqrt(sumSqs / (M * U)) } def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], @@ -83,7 +83,7 @@ object SparkALS { val ch = new CholeskyDecomposition(XtX) val Xty2D = factory2D.make(Xty.toArray, F) val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) + solved2D.viewColumn(0) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index ff72532db1..39819064ed 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -43,7 +43,7 @@ object SparkHdfsLR { while (i < D) { x(i) = tok.nextToken.toDouble; i += 1 } - return DataPoint(new Vector(x), y) + DataPoint(new Vector(x), y) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index 8c99025eaa..9fe2465235 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -30,7 +30,7 @@ object SparkKMeans { val rand = new Random(42) def parseVector(line: String): Vector = { - return new Vector(line.split(' ').map(_.toDouble)) + new Vector(line.split(' ').map(_.toDouble)) } def closestPoint(p: Vector, centers: Array[Vector]): Int = { @@ -46,7 +46,7 @@ object SparkKMeans { } } - return bestIndex + bestIndex } def main(args: Array[String]) { @@ -61,15 +61,15 @@ object SparkKMeans { val K = args(2).toInt val convergeDist = args(3).toDouble - var kPoints = data.takeSample(false, K, 42).toArray + val kPoints = data.takeSample(withReplacement = false, K, 42).toArray var tempDist = 1.0 while(tempDist > convergeDist) { - var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + val closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} + val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} - var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() + val newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() tempDist = 0.0 for (i <- 0 until K) { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala index 4fe57de4a4..a2600989ca 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala @@ -65,7 +65,7 @@ object PageViewGenerator { return item } } - return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 + inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 } def getNextClickEvent() : String = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 2d8623392e..c972a71349 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -48,7 +48,7 @@ class PythonMLLibAPI extends Serializable { val db = bb.asDoubleBuffer() val ans = new Array[Double](length.toInt) db.get(ans) - return ans + ans } private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = { @@ -60,7 +60,7 @@ class PythonMLLibAPI extends Serializable { bb.putLong(len) val db = bb.asDoubleBuffer() db.put(doubles) - return bytes + bytes } private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = { @@ -86,7 +86,7 @@ class PythonMLLibAPI extends Serializable { ans(i) = new Array[Double](cols.toInt) db.get(ans(i)) } - return ans + ans } private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = { @@ -102,11 +102,10 @@ class PythonMLLibAPI extends Serializable { bb.putLong(rows) bb.putLong(cols) val db = bb.asDoubleBuffer() - var i = 0 for (i <- 0 until rows) { db.put(doubles(i)) } - return bytes + bytes } private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel, @@ -121,7 +120,7 @@ class PythonMLLibAPI extends Serializable { val ret = new java.util.LinkedList[java.lang.Object]() ret.add(serializeDoubleVector(model.weights)) ret.add(model.intercept: java.lang.Double) - return ret + ret } /** @@ -130,7 +129,7 @@ class PythonMLLibAPI extends Serializable { def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => LinearRegressionWithSGD.train(data, numIterations, stepSize, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -142,7 +141,7 @@ class PythonMLLibAPI extends Serializable { def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => LassoWithSGD.train(data, numIterations, stepSize, regParam, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -154,7 +153,7 @@ class PythonMLLibAPI extends Serializable { def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -166,7 +165,7 @@ class PythonMLLibAPI extends Serializable { def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => SVMWithSGD.train(data, numIterations, stepSize, regParam, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -178,7 +177,7 @@ class PythonMLLibAPI extends Serializable { def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => LogisticRegressionWithSGD.train(data, numIterations, stepSize, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -194,7 +193,7 @@ class PythonMLLibAPI extends Serializable { val model = KMeans.train(data, k, maxIterations, runs, initializationMode) val ret = new java.util.LinkedList[java.lang.Object]() ret.add(serializeDoubleMatrix(model.clusterCenters)) - return ret + ret } /** Unpack a Rating object from an array of bytes */ @@ -204,7 +203,7 @@ class PythonMLLibAPI extends Serializable { val user = bb.getInt() val product = bb.getInt() val rating = bb.getDouble() - return new Rating(user, product, rating) + new Rating(user, product, rating) } /** Unpack a tuple of Ints from an array of bytes */ @@ -245,7 +244,7 @@ class PythonMLLibAPI extends Serializable { def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int, iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) - return ALS.train(ratings, rank, iterations, lambda, blocks) + ALS.train(ratings, rank, iterations, lambda, blocks) } /** @@ -257,6 +256,6 @@ class PythonMLLibAPI extends Serializable { def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int, iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) - return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) + ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index ca0115f90e..ebfb8dba8e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -203,6 +203,6 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade } catch { case e: Exception => } - return super.resolveClass(desc) + super.resolveClass(desc) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index fb9eda8996..a7ba2339c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -219,7 +219,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas reset() return false } - return true + true } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index e0ff3ccba4..b34ba7b9b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -65,7 +65,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) //logDebug("Generating state RDD for time " + validTime) - return Some(stateRDD) + Some(stateRDD) } case None => { // If parent RDD does not exist @@ -76,7 +76,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( updateFuncLocal(i) } val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) - return Some(stateRDD) + Some(stateRDD) } } } @@ -98,11 +98,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val groupedRDD = parentRDD.groupByKey(partitioner) val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) //logDebug("Generating state RDD for time " + validTime + " (first)") - return Some(sessionRDD) + Some(sessionRDD) } case None => { // If parent RDD does not exist, then nothing to do! //logDebug("Not generating state RDD (no previous state, no parent)") - return None + None } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 110a20f282..73dc52023a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -76,6 +76,6 @@ private[spark] class StreamingListenerBus() extends Logging { * add overhead in the general case. */ Thread.sleep(10) } - return true + true } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala index f67bb2f6ac..c3a849d276 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala @@ -66,7 +66,7 @@ class SystemClock() extends Clock { } Thread.sleep(sleepTime) } - return -1 + -1 } } @@ -96,6 +96,6 @@ class ManualClock() extends Clock { this.wait(100) } } - return currentTime() + currentTime() } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index 4e6ce6eabd..5b6c048a39 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -90,7 +90,7 @@ object RawTextHelper { } } } - return taken.toIterator + taken.toIterator } /** diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 23781ea35c..e1fe09e3e2 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -158,7 +158,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) appContext.setApplicationName(args.appName) - return appContext + appContext } /** See if two file systems are the same or not. */ @@ -191,9 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } //check for ports if (srcUri.getPort() != dstUri.getPort()) { - return false + false + } else { + true } - return true } /** Copy the file into HDFS if needed. */ @@ -299,7 +300,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } UserGroupInformation.getCurrentUser().addCredentials(credentials) - return localResources + localResources } def setupLaunchEnv( diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index ddfec1a4ac..0138d7ade1 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -125,7 +125,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val containerId = ConverterUtils.toContainerId(containerIdString) val appAttemptId = containerId.getApplicationAttemptId() logInfo("ApplicationAttemptId: " + appAttemptId) - return appAttemptId + appAttemptId } private def registerWithResourceManager(): AMRMProtocol = { @@ -133,7 +133,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)) logInfo("Connecting to ResourceManager at " + rmAddress) - return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] + rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { @@ -147,7 +147,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar appMasterRequest.setRpcPort(0) // What do we provide here ? Might make sense to expose something sensible later ? appMasterRequest.setTrackingUrl("") - return resourceManager.registerApplicationMaster(appMasterRequest) + resourceManager.registerApplicationMaster(appMasterRequest) } private def waitForSparkMaster() { @@ -220,7 +220,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar t.setDaemon(true) t.start() logInfo("Started progress reporter thread - sleep time : " + sleepTime) - return t + t } private def sendProgress() { diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 132630e5ef..d32cdcc879 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -195,7 +195,7 @@ class WorkerRunnable( } logInfo("Prepared Local resources " + localResources) - return localResources + localResources } def prepareEnvironment: HashMap[String, String] = { @@ -207,7 +207,7 @@ class WorkerRunnable( Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } - return env + env } def connectToCM: ContainerManager = { @@ -226,8 +226,7 @@ class WorkerRunnable( val proxy = user .doAs(new PrivilegedExceptionAction[ContainerManager] { def run: ContainerManager = { - return rpc.getProxy(classOf[ContainerManager], - cmAddress, conf).asInstanceOf[ContainerManager] + rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager] } }) proxy diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 5f159b073f..535abbfb7f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -143,7 +143,7 @@ class ClientDistributedCacheManager() extends Logging { if (isPublic(conf, uri, statCache)) { return LocalResourceVisibility.PUBLIC } - return LocalResourceVisibility.PRIVATE + LocalResourceVisibility.PRIVATE } /** @@ -161,7 +161,7 @@ class ClientDistributedCacheManager() extends Logging { if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { return false } - return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache) + ancestorsHaveExecutePermissions(fs, current.getParent(), statCache) } /** @@ -183,7 +183,7 @@ class ClientDistributedCacheManager() extends Logging { } current = current.getParent() } - return true + true } /** @@ -203,7 +203,7 @@ class ClientDistributedCacheManager() extends Logging { if (otherAction.implies(action)) { return true } - return false + false } /** @@ -223,6 +223,6 @@ class ClientDistributedCacheManager() extends Logging { statCache.put(uri, newStat) newStat } - return stat + stat } } diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala index 2941356bc5..458df4fa3c 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala @@ -42,7 +42,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar { class MockClientDistributedCacheManager extends ClientDistributedCacheManager { override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): LocalResourceVisibility = { - return LocalResourceVisibility.PRIVATE + LocalResourceVisibility.PRIVATE } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index be323d7783..efeee31acd 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -205,9 +205,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } //check for ports if (srcUri.getPort() != dstUri.getPort()) { - return false + false + } else { + true } - return true } /** Copy the file into HDFS if needed. */ -- cgit v1.2.3 From f4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 11 Jan 2014 10:50:14 -0800 Subject: Rename DStream.foreach to DStream.foreachRDD `foreachRDD` makes it clear that the granularity of this operator is per-RDD. As it stands, `foreach` is inconsistent with with `map`, `filter`, and the other DStream operators which get pushed down to individual records within each RDD. --- docs/streaming-programming-guide.md | 4 ++-- .../org/apache/spark/streaming/examples/RawNetworkGrep.scala | 2 +- .../apache/spark/streaming/examples/TwitterAlgebirdCMS.scala | 4 ++-- .../apache/spark/streaming/examples/TwitterAlgebirdHLL.scala | 4 ++-- .../apache/spark/streaming/examples/TwitterPopularTags.scala | 4 ++-- .../spark/streaming/examples/clickstream/PageViewStream.scala | 2 +- .../src/main/scala/org/apache/spark/streaming/DStream.scala | 10 +++++----- .../org/apache/spark/streaming/PairDStreamFunctions.scala | 4 ++-- .../org/apache/spark/streaming/api/java/JavaDStreamLike.scala | 8 ++++---- .../org/apache/spark/streaming/BasicOperationsSuite.scala | 2 +- 10 files changed, 22 insertions(+), 22 deletions(-) (limited to 'examples') diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1c9ece6270..3273817c78 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -175,7 +175,7 @@ When an output operator is called, it triggers the computation of a stream. Curr - + @@ -375,7 +375,7 @@ There are two failure behaviors based on which input sources are used. 1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure. 1. _Using any input source that receives data through a network_ - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. -Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreach`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. +Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreachRDD`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. ## Failure of the Driver Node A system that is required to operate 24/7 needs to be able tolerate the failure of the driver node as well. Spark Streaming does this by saving the state of the DStream computation periodically to a HDFS file, that can be used to restart the streaming computation in the event of a failure of the driver node. This checkpointing is enabled by setting a HDFS directory for checkpointing using `ssc.checkpoint()` as described [earlier](#rdd-checkpointing-within-dstreams). To elaborate, the following state is periodically saved to a file. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala index 3d08d86567..99b79c3949 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala @@ -58,7 +58,7 @@ object RawNetworkGrep { val rawStreams = (1 to numStreams).map(_ => ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray val union = ssc.union(rawStreams) - union.filter(_.contains("the")).count().foreach(r => + union.filter(_.contains("the")).count().foreachRDD(r => println("Grep count: " + r.collect().mkString)) ssc.start() } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 80b5a98b14..483c4d3118 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -81,7 +81,7 @@ object TwitterAlgebirdCMS { val exactTopUsers = users.map(id => (id, 1)) .reduceByKey((a, b) => a + b) - approxTopUsers.foreach(rdd => { + approxTopUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() val partialTopK = partial.heavyHitters.map(id => @@ -96,7 +96,7 @@ object TwitterAlgebirdCMS { } }) - exactTopUsers.foreach(rdd => { + exactTopUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partialMap = rdd.collect().toMap val partialTopK = rdd.map( diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index cb2f2c51a0..94c2bf29ac 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -67,7 +67,7 @@ object TwitterAlgebirdHLL { val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - approxUsers.foreach(rdd => { + approxUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() globalHll += partial @@ -76,7 +76,7 @@ object TwitterAlgebirdHLL { } }) - exactUsers.foreach(rdd => { + exactUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() userSet ++= partial diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 16c10feaba..8a70d4a978 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -56,13 +56,13 @@ object TwitterPopularTags { // Print popular hashtags - topCounts60.foreach(rdd => { + topCounts60.foreachRDD(rdd => { val topList = rdd.take(5) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) - topCounts10.foreach(rdd => { + topCounts10.foreachRDD(rdd => { val topList = rdd.take(5) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala index da6b67bcce..bb44bc3d06 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala @@ -91,7 +91,7 @@ object PageViewStream { case "popularUsersSeen" => // Look for users in our existing dataset and print it out if we have a match pageViews.map(view => (view.userID, 1)) - .foreach((rdd, time) => rdd.join(userList) + .foreachRDD((rdd, time) => rdd.join(userList) .map(_._2._2) .take(10) .foreach(u => println("Saw user %s at time %s".format(u, time)))) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index b98f4a5101..93d57db494 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -487,15 +487,15 @@ abstract class DStream[T: ClassTag] ( * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: RDD[T] => Unit) { - this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) + def foreachRDD(foreachFunc: RDD[T] => Unit) { + this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r)) } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: (RDD[T], Time) => Unit) { + def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) } @@ -719,7 +719,7 @@ abstract class DStream[T: ClassTag] ( val file = rddToFileName(prefix, suffix, time) rdd.saveAsObjectFile(file) } - this.foreach(saveFunc) + this.foreachRDD(saveFunc) } /** @@ -732,7 +732,7 @@ abstract class DStream[T: ClassTag] ( val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } - this.foreach(saveFunc) + this.foreachRDD(saveFunc) } def register() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 56dbcbda23..69d80c3711 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -582,7 +582,7 @@ extends Serializable { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } - self.foreach(saveFunc) + self.foreachRDD(saveFunc) } /** @@ -612,7 +612,7 @@ extends Serializable { val file = rddToFileName(prefix, suffix, time) rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } - self.foreach(saveFunc) + self.foreachRDD(saveFunc) } private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 64f38ce1c0..4b5d5ece52 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -244,16 +244,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: JFunction[R, Void]) { - dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd))) + def foreachRDD(foreachFunc: JFunction[R, Void]) { + dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd))) } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: JFunction2[R, Time, Void]) { - dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) + def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) { + dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index ee6b433d1f..9a187ce031 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -383,7 +383,7 @@ class BasicOperationsSuite extends TestSuiteBase { val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) ssc.registerInputStream(stream) - stream.foreach(_ => {}) // Dummy output stream + stream.foreachRDD(_ => {}) // Dummy output stream ssc.start() Thread.sleep(2000) def getInputFromSlice(fromMillis: Long, toMillis: Long) = { -- cgit v1.2.3 From 8ca97739741152cce30adfce80aee4462b5a04f2 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 13 Jan 2014 12:17:58 -0800 Subject: Add LiveJournalPageRank example --- .../examples/graphx/LiveJournalPageRank.scala | 49 ++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala (limited to 'examples') diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala new file mode 100644 index 0000000000..d58fddff2b --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.graphx + +import org.apache.spark.SparkContext._ +import org.apache.spark._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.lib.Analytics + +/** + * Uses GraphX to run PageRank on a LiveJournal social network graph. Download the dataset from + * http://snap.stanford.edu/data/soc-LiveJournal1.html. + */ +object LiveJournalPageRank { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println( + "Usage: LiveJournalPageRank \n" + + " [--tol=]\n" + + " The tolerance allowed at convergence (smaller => more accurate). Default is " + + "0.001.\n" + + " [--output=]\n" + + " If specified, the file to write the ranks to.\n" + + " [--numEPart=]\n" + + " The number of partitions for the graph's edge RDD. Default is 4.\n" + + " [--partStrategy=RandomVertexCut | EdgePartition1D | EdgePartition2D | " + + "CanonicalRandomVertexCut]\n" + + " The way edges are assigned to edge partitions. Default is RandomVertexCut.") + System.exit(-1) + } + + Analytics.main(args.patch(1, List("pagerank"), 0)) + } +} -- cgit v1.2.3
OperatorMeaning
foreach(func) foreachRDD(func) The fundamental output operator. Applies a function, func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system.