aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-04-16 14:56:23 +0100
committerSean Owen <sowen@cloudera.com>2016-04-16 14:56:23 +0100
commit9f678e97549b19d6d979b22fa4079094ce9fb2c0 (patch)
tree978e18e46d294ba336f4657c6d627d146680f915 /streaming/src
parent527c780bb0d6cb074128448da00cb330e9049385 (diff)
downloadspark-9f678e97549b19d6d979b22fa4079094ce9fb2c0.tar.gz
spark-9f678e97549b19d6d979b22fa4079094ce9fb2c0.tar.bz2
spark-9f678e97549b19d6d979b22fa4079094ce9fb2c0.zip
[MINOR] Remove inappropriate type notation and extra anonymous closure within functional transformations
## What changes were proposed in this pull request? This PR removes - Inappropriate type notations For example, from ```scala words.foreachRDD { (rdd: RDD[String], time: Time) => ... ``` to ```scala words.foreachRDD { (rdd, time) => ... ``` - Extra anonymous closure within functional transformations. For example, ```scala .map(item => { ... }) ``` which can be just simply as below: ```scala .map { item => ... } ``` and corrects some obvious style nits. ## How was this patch tested? This was tested after adding rules in `scalastyle-config.xml`, which ended up with not finding all perfectly. The rules applied were below: - For the first correction, ```xml <check customId="NoExtraClosure" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">(?m)\.[a-zA-Z_][a-zA-Z0-9]*\(\s*[^,]+s*=>\s*\{[^\}]+\}\s*\)</parameter></parameters> </check> ``` ```xml <check customId="NoExtraClosure" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">\.[a-zA-Z_][a-zA-Z0-9]*\s*[\{|\(]([^\n>,]+=>)?\s*\{([^()]|(?R))*\}^[,]</parameter></parameters> </check> ``` - For the second correction ```xml <check customId="TypeNotation" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">\.[a-zA-Z_][a-zA-Z0-9]*\s*[\{|\(]\s*\([^):]*:R))*\}^[,]</parameter></parameters> </check> ``` **Those rules were not added** Author: hyukjinkwon <gurwls223@gmail.com> Closes #12413 from HyukjinKwon/SPARK-style.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala16
1 files changed, 8 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 58842f9c2f..583f5a48d1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -593,7 +593,7 @@ abstract class DStream[T: ClassTag] (
* of this DStream.
*/
def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
- this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)
}
/**
@@ -615,7 +615,7 @@ abstract class DStream[T: ClassTag] (
*/
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {
- this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+ this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
/**
@@ -624,7 +624,7 @@ abstract class DStream[T: ClassTag] (
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
- foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true)
+ foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
}
/**
@@ -663,7 +663,7 @@ abstract class DStream[T: ClassTag] (
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
- transform((r: RDD[T], t: Time) => cleanedF(r))
+ transform((r: RDD[T], _: Time) => cleanedF(r))
}
/**
@@ -806,7 +806,7 @@ abstract class DStream[T: ClassTag] (
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = ssc.withScope {
- this.map(x => (1, x))
+ this.map((1, _))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
.map(_._2)
}
@@ -845,7 +845,7 @@ abstract class DStream[T: ClassTag] (
numPartitions: Int = ssc.sc.defaultParallelism)
(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {
- this.map(x => (x, 1L)).reduceByKeyAndWindow(
+ this.map((_, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
windowDuration,
@@ -895,9 +895,9 @@ abstract class DStream[T: ClassTag] (
logInfo(s"Slicing from $fromTime to $toTime" +
s" (aligned to $alignedFromTime and $alignedToTime)")
- alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
+ alignedFromTime.to(alignedToTime, slideDuration).flatMap { time =>
if (time >= zeroTime) getOrCompute(time) else None
- })
+ }
}
/**