aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-04-14 09:43:41 +0100
committerSean Owen <sowen@cloudera.com>2016-04-14 09:43:41 +0100
commit6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c (patch)
treedb47cd619d84a7890ff1cacc78a44046ace85633 /streaming
parent478af2f45595913c9b8f560d13e8d88447486f99 (diff)
downloadspark-6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c.tar.gz
spark-6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c.tar.bz2
spark-6fc3dc8839eaed673c64ec87af6dfe24f8cebe0c.zip
[MINOR][SQL] Remove extra anonymous closure within functional transformations
## What changes were proposed in this pull request? This PR removes extra anonymous closure within functional transformations. For example, ```scala .map(item => { ... }) ``` which can be just simply as below: ```scala .map { item => ... } ``` ## How was this patch tested? Related unit tests and `sbt scalastyle`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #12382 from HyukjinKwon/minor-extra-closers.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala4
3 files changed, 8 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 5cc677d085..0395600954 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -247,10 +247,10 @@ class CheckpointWriter(
// Delete old checkpoint files
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
if (allCheckpointFiles.size > 10) {
- allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
+ allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file =>
logInfo("Deleting " + file)
fs.delete(file, true)
- })
+ }
}
// All done, print success
@@ -345,7 +345,7 @@ object CheckpointReader extends Logging {
// Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
var readError: Exception = null
- checkpointFiles.foreach(file => {
+ checkpointFiles.foreach { file =>
logInfo("Attempting to load checkpoint from file " + file)
try {
val fis = fs.open(file)
@@ -358,7 +358,7 @@ object CheckpointReader extends Logging {
readError = e
logWarning("Error reading checkpoint from file " + file, e)
}
- })
+ }
// If none of checkpoint files could be read, then throw exception
if (!ignoreReadError) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 28aed0ca45..8efb09a8ce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -48,11 +48,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
- val i = iterator.map(t => {
+ val i = iterator.map { t =>
val itr = t._2._2.iterator
val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
- })
+ }
updateFuncLocal(i)
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3b33a979df..9aa2f0bbb9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -434,11 +434,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
- val receivers = receiverInputStreams.map(nis => {
+ val receivers = receiverInputStreams.map { nis =>
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
- })
+ }
runDummySparkJob()