From 9f678e97549b19d6d979b22fa4079094ce9fb2c0 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 16 Apr 2016 14:56:23 +0100 Subject: [MINOR] Remove inappropriate type notation and extra anonymous closure within functional transformations ## What changes were proposed in this pull request? This PR removes - Inappropriate type notations For example, from ```scala words.foreachRDD { (rdd: RDD[String], time: Time) => ... ``` to ```scala words.foreachRDD { (rdd, time) => ... ``` - Extra anonymous closure within functional transformations. For example, ```scala .map(item => { ... }) ``` which can be just simply as below: ```scala .map { item => ... } ``` and corrects some obvious style nits. ## How was this patch tested? This was tested after adding rules in `scalastyle-config.xml`, which ended up with not finding all perfectly. The rules applied were below: - For the first correction, ```xml (?m)\.[a-zA-Z_][a-zA-Z0-9]*\(\s*[^,]+s*=>\s*\{[^\}]+\}\s*\) ``` ```xml \.[a-zA-Z_][a-zA-Z0-9]*\s*[\{|\(]([^\n>,]+=>)?\s*\{([^()]|(?R))*\}^[,] ``` - For the second correction ```xml \.[a-zA-Z_][a-zA-Z0-9]*\s*[\{|\(]\s*\([^):]*:R))*\}^[,] ``` **Those rules were not added** Author: hyukjinkwon Closes #12413 from HyukjinKwon/SPARK-style. --- core/src/main/scala/org/apache/spark/TaskEndReason.scala | 4 +--- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 10 +++++----- .../apache/spark/deploy/master/ui/ApplicationPage.scala | 5 ++--- .../org/apache/spark/deploy/worker/DriverRunner.scala | 7 +++++-- .../main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 8 ++++---- core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala | 4 ++-- .../org/apache/spark/rdd/ParallelCollectionRDD.scala | 9 ++++----- .../spark/examples/mllib/StreamingTestExample.scala | 4 ++-- .../examples/streaming/RecoverableNetworkWordCount.scala | 8 +++----- .../spark/examples/streaming/SqlNetworkWordCount.scala | 2 +- .../spark/mllib/api/python/Word2VecModelWrapper.scala | 4 +++- .../mllib/evaluation/BinaryClassificationMetrics.scala | 3 +-- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- .../spark/sql/catalyst/expressions/Projection.scala | 4 +--- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 10 ++++------ .../spark/sql/execution/joins/ShuffledHashJoin.scala | 4 +--- .../apache/spark/sql/execution/stat/StatFunctions.scala | 2 +- .../main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 8 +++----- .../org/apache/spark/streaming/dstream/DStream.scala | 16 ++++++++-------- 19 files changed, 54 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 83af226bfd..7487cfe9c5 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -149,9 +149,7 @@ case class ExceptionFailure( this(e, accumUpdates, preserveCause = true) } - def exception: Option[Throwable] = exceptionWrapper.flatMap { - (w: ThrowableSerializationWrapper) => Option(w.exception) - } + def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception)) override def toErrorString: String = if (fullStackTrace == null) { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 4212027122..6f3b8faf03 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -105,7 +105,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to all elements of this RDD. */ def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = { - new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue())) + new JavaDoubleRDD(rdd.map(f.call(_).doubleValue())) } /** @@ -131,7 +131,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala - new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue())) + new JavaDoubleRDD(rdd.flatMap(fn).map(_.doubleValue())) } /** @@ -173,7 +173,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def fn: (Iterator[T]) => Iterator[jl.Double] = { (x: Iterator[T]) => f.call(x.asJava).asScala } - new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue())) + new JavaDoubleRDD(rdd.mapPartitions(fn).map(_.doubleValue())) } /** @@ -196,7 +196,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { (x: Iterator[T]) => f.call(x.asJava).asScala } new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning) - .map(x => x.doubleValue())) + .map(_.doubleValue())) } /** @@ -215,7 +215,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Applies a function f to each partition of this RDD. */ def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { - rdd.foreachPartition((x => f.call(x.asJava))) + rdd.foreachPartition(x => f.call(x.asJava)) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 1b18cf0ded..96274958d1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -35,9 +35,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") val state = master.askWithRetry[MasterStateResponse](RequestMasterState) - val app = state.activeApps.find(_.id == appId).getOrElse({ - state.completedApps.find(_.id == appId).getOrElse(null) - }) + val app = state.activeApps.find(_.id == appId) + .getOrElse(state.completedApps.find(_.id == appId).orNull) if (app == null) { val msg =
No running application with ID {appId}
return UIUtils.basicSparkPage(msg, "Not Found") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index aad2e91b25..f4376dedea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -68,7 +68,10 @@ private[deploy] class DriverRunner( private var clock: Clock = new SystemClock() private var sleeper = new Sleeper { - def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed}) + def sleep(seconds: Int): Unit = (0 until seconds).takeWhile { _ => + Thread.sleep(1000) + !killed + } } /** Starts a thread to run and manage the driver. */ @@ -116,7 +119,7 @@ private[deploy] class DriverRunner( /** Terminate this driver (or prevent it from ever starting if not yet started) */ private[worker] def kill() { synchronized { - process.foreach(p => p.destroy()) + process.foreach(_.destroy()) killed = true } } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 90d9735cb3..35665ab7c0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -190,11 +190,11 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: // initializes/resets to start iterating from the beginning def resetIterator(): Iterator[(String, Partition)] = { - val iterators = (0 to 2).map( x => - prev.partitions.iterator.flatMap(p => { + val iterators = (0 to 2).map { x => + prev.partitions.iterator.flatMap { p => if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None - } ) - ) + } + } iterators.reduceLeft((x, y) => x ++ y) } diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 526138093d..5426bf80ba 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -65,11 +65,11 @@ class JdbcRDD[T: ClassTag]( override def getPartitions: Array[Partition] = { // bounds are inclusive, hence the + 1 here and - 1 on end val length = BigInt(1) + upperBound - lowerBound - (0 until numPartitions).map(i => { + (0 until numPartitions).map { i => val start = lowerBound + ((i * length) / numPartitions) val end = lowerBound + (((i + 1) * length) / numPartitions) - 1 new JdbcPartition(i, start.toLong, end.toLong) - }).toArray + }.toArray } override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T] diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index bb84e4af15..34a1c112cb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -129,7 +129,7 @@ private object ParallelCollectionRDD { } seq match { case r: Range => - positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) => + positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) => // If the range is inclusive, use inclusive range for the last slice if (r.isInclusive && index == numSlices - 1) { new Range.Inclusive(r.start + start * r.step, r.end, r.step) @@ -137,7 +137,7 @@ private object ParallelCollectionRDD { else { new Range(r.start + start * r.step, r.start + end * r.step, r.step) } - }).toSeq.asInstanceOf[Seq[Seq[T]]] + }.toSeq.asInstanceOf[Seq[Seq[T]]] case nr: NumericRange[_] => // For ranges of Long, Double, BigInteger, etc val slices = new ArrayBuffer[Seq[T]](numSlices) @@ -150,10 +150,9 @@ private object ParallelCollectionRDD { slices case _ => val array = seq.toArray // To prevent O(n^2) operations for List etc - positions(array.length, numSlices).map({ - case (start, end) => + positions(array.length, numSlices).map { case (start, end) => array.slice(start, end).toSeq - }).toSeq + }.toSeq } } } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala index 49f5df3944..ae4dee24c6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala @@ -59,10 +59,10 @@ object StreamingTestExample { val conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample") val ssc = new StreamingContext(conf, batchDuration) - ssc.checkpoint({ + ssc.checkpoint { val dir = Utils.createTempDir() dir.toString - }) + } // $example on$ val data = ssc.textFileStream(dataDir).map(line => line.split(",") match { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index bb2af9cd72..aa762b27dc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -115,8 +115,8 @@ object RecoverableNetworkWordCount { // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.socketTextStream(ip, port) val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => + val wordCounts = words.map((_, 1)).reduceByKey(_ + _) + wordCounts.foreachRDD { (rdd, time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator @@ -158,9 +158,7 @@ object RecoverableNetworkWordCount { } val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args val ssc = StreamingContext.getOrCreate(checkpointDirectory, - () => { - createContext(ip, port, outputPath, checkpointDirectory) - }) + () => createContext(ip, port, outputPath, checkpointDirectory)) ssc.start() ssc.awaitTermination() } diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala index 918e124065..ad6a89e320 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala @@ -59,7 +59,7 @@ object SqlNetworkWordCount { val words = lines.flatMap(_.split(" ")) // Convert RDDs of the words DStream to DataFrame and run SQL query - words.foreachRDD { (rdd: RDD[String], time: Time) => + words.foreachRDD { (rdd, time) => // Get the singleton instance of SQLContext val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala index 05273c3434..4b4ed2291d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala @@ -56,7 +56,9 @@ private[python] class Word2VecModelWrapper(model: Word2VecModel) { } def getVectors: JMap[String, JList[Float]] = { - model.getVectors.map({case (k, v) => (k, v.toList.asJava)}).asJava + model.getVectors.map { case (k, v) => + (k, v.toList.asJava) + }.asJava } def save(sc: SparkContext, path: String): Unit = model.save(sc, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 0a7a45b4f4..92cd7f22dc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -189,8 +189,7 @@ class BinaryClassificationMetrics @Since("1.3.0") ( Iterator(agg) }.collect() val partitionwiseCumulativeCounts = - agg.scanLeft(new BinaryLabelCounter())( - (agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c) + agg.scanLeft(new BinaryLabelCounter())((agg, c) => agg.clone() += c) val totalCount = partitionwiseCumulativeCounts.last logInfo(s"Total counts: $totalCount") val cumulativeCounts = binnedCounts.mapPartitionsWithIndex( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 37ff6ab6f6..6591559426 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -172,8 +172,8 @@ class Analyzer( private def assignAliases(exprs: Seq[NamedExpression]) = { exprs.zipWithIndex.map { case (expr, i) => - expr transformUp { - case u @ UnresolvedAlias(child, optionalAliasName) => child match { + expr.transformUp { case u @ UnresolvedAlias(child, optionalAliasName) => + child match { case ne: NamedExpression => ne case e if !e.resolved => u case g: Generator => MultiAlias(g, Nil) @@ -215,7 +215,7 @@ class Analyzer( * represented as the bit masks. */ def bitmasks(r: Rollup): Seq[Int] = { - Seq.tabulate(r.groupByExprs.length + 1)(idx => {(1 << idx) - 1}) + Seq.tabulate(r.groupByExprs.length + 1)(idx => (1 << idx) - 1) } /* diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 354311c5e7..27ad8e4cf2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -168,9 +168,7 @@ object FromUnsafeProjection { * Returns an UnsafeProjection for given Array of DataTypes. */ def apply(fields: Seq[DataType]): Projection = { - create(fields.zipWithIndex.map(x => { - new BoundReference(x._2, x._1, true) - })) + create(fields.zipWithIndex.map(x => new BoundReference(x._2, x._1, true))) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6c8f8f40dd..c46bdfb2b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -314,11 +314,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { assert(children.nonEmpty) val (deterministic, nondeterministic) = partitionByDeterministic(condition) val newFirstChild = Filter(deterministic, children.head) - val newOtherChildren = children.tail.map { - child => { - val rewrites = buildRewrites(children.head, child) - Filter(pushToRight(deterministic, rewrites), child) - } + val newOtherChildren = children.tail.map { child => + val rewrites = buildRewrites(children.head, child) + Filter(pushToRight(deterministic, rewrites), child) } Filter(nondeterministic, Union(newFirstChild +: newOtherChildren)) @@ -360,7 +358,7 @@ object ColumnPruning extends Rule[LogicalPlan] { case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty => val newOutput = e.output.filter(a.references.contains(_)) val newProjects = e.projections.map { proj => - proj.zip(e.output).filter { case (e, a) => + proj.zip(e.output).filter { case (_, a) => newOutput.contains(a) }.unzip._1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index 0c3e3c3fc1..f021f3758c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -60,9 +60,7 @@ case class ShuffledHashJoin( val context = TaskContext.get() val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager()) // This relation is usually used until the end of task. - context.addTaskCompletionListener((t: TaskContext) => - relation.close() - ) + context.addTaskCompletionListener(_ => relation.close()) relation } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index d603f63a08..9afbd0e994 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -431,7 +431,7 @@ private[sql] object StatFunctions extends Logging { s"exceed 1e4. Currently $columnSize") val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) => val countsRow = new GenericMutableRow(columnSize + 1) - rows.foreach { (row: Row) => + rows.foreach { row => // row.get(0) is column 1 // row.get(1) is column 2 // row.get(2) is the frequency diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 784b018353..5aab4132bc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -82,7 +82,7 @@ private[hive] case class HiveSimpleUDF( // TODO: Finish input output types. override def eval(input: InternalRow): Any = { - val inputs = wrap(children.map(c => c.eval(input)), arguments, cached, inputDataTypes) + val inputs = wrap(children.map(_.eval(input)), arguments, cached, inputDataTypes) val ret = FunctionRegistry.invoke( method, function, @@ -152,10 +152,8 @@ private[hive] case class HiveGenericUDF( var i = 0 while (i < children.length) { val idx = i - deferredObjects(i).asInstanceOf[DeferredObjectAdapter].set( - () => { - children(idx).eval(input) - }) + deferredObjects(i).asInstanceOf[DeferredObjectAdapter] + .set(() => children(idx).eval(input)) i += 1 } unwrap(function.evaluate(deferredObjects), returnInspector) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 58842f9c2f..583f5a48d1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -593,7 +593,7 @@ abstract class DStream[T: ClassTag] ( * of this DStream. */ def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope { - this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) + this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2) } /** @@ -615,7 +615,7 @@ abstract class DStream[T: ClassTag] ( */ def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null) : DStream[(T, Long)] = ssc.withScope { - this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) } /** @@ -624,7 +624,7 @@ abstract class DStream[T: ClassTag] ( */ def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { val cleanedF = context.sparkContext.clean(foreachFunc, false) - foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true) + foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true) } /** @@ -663,7 +663,7 @@ abstract class DStream[T: ClassTag] ( // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = context.sparkContext.clean(transformFunc, false) - transform((r: RDD[T], t: Time) => cleanedF(r)) + transform((r: RDD[T], _: Time) => cleanedF(r)) } /** @@ -806,7 +806,7 @@ abstract class DStream[T: ClassTag] ( windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { - this.map(x => (1, x)) + this.map((1, _)) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .map(_._2) } @@ -845,7 +845,7 @@ abstract class DStream[T: ClassTag] ( numPartitions: Int = ssc.sc.defaultParallelism) (implicit ord: Ordering[T] = null) : DStream[(T, Long)] = ssc.withScope { - this.map(x => (x, 1L)).reduceByKeyAndWindow( + this.map((_, 1L)).reduceByKeyAndWindow( (x: Long, y: Long) => x + y, (x: Long, y: Long) => x - y, windowDuration, @@ -895,9 +895,9 @@ abstract class DStream[T: ClassTag] ( logInfo(s"Slicing from $fromTime to $toTime" + s" (aligned to $alignedFromTime and $alignedToTime)") - alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { + alignedFromTime.to(alignedToTime, slideDuration).flatMap { time => if (time >= zeroTime) getOrCompute(time) else None - }) + } } /** -- cgit v1.2.3