aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala16
1 files changed, 8 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 58842f9c2f..583f5a48d1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -593,7 +593,7 @@ abstract class DStream[T: ClassTag] (
* of this DStream.
*/
def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
- this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)
}
/**
@@ -615,7 +615,7 @@ abstract class DStream[T: ClassTag] (
*/
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {
- this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+ this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
/**
@@ -624,7 +624,7 @@ abstract class DStream[T: ClassTag] (
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
- foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true)
+ foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
}
/**
@@ -663,7 +663,7 @@ abstract class DStream[T: ClassTag] (
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
- transform((r: RDD[T], t: Time) => cleanedF(r))
+ transform((r: RDD[T], _: Time) => cleanedF(r))
}
/**
@@ -806,7 +806,7 @@ abstract class DStream[T: ClassTag] (
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = ssc.withScope {
- this.map(x => (1, x))
+ this.map((1, _))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
.map(_._2)
}
@@ -845,7 +845,7 @@ abstract class DStream[T: ClassTag] (
numPartitions: Int = ssc.sc.defaultParallelism)
(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {
- this.map(x => (x, 1L)).reduceByKeyAndWindow(
+ this.map((_, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
windowDuration,
@@ -895,9 +895,9 @@ abstract class DStream[T: ClassTag] (
logInfo(s"Slicing from $fromTime to $toTime" +
s" (aligned to $alignedFromTime and $alignedToTime)")
- alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
+ alignedFromTime.to(alignedToTime, slideDuration).flatMap { time =>
if (time >= zeroTime) getOrCompute(time) else None
- })
+ }
}
/**