aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-04-12 00:43:28 -0700
committerReynold Xin <rxin@databricks.com>2016-04-12 00:43:28 -0700
commitb0f5497e9520575e5082fa8ce8be5569f43abe74 (patch)
treeefd349be7227cf20616712fc7376b7c2f11f6614 /streaming
parent678b96e77bf77a64b8df14b19db5a3bb18febfe3 (diff)
downloadspark-b0f5497e9520575e5082fa8ce8be5569f43abe74.tar.gz
spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.tar.bz2
spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.zip
[SPARK-14508][BUILD] Add a new ScalaStyle Rule `OmitBracesInCase`
## What changes were proposed in this pull request? According to the [Spark Code Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) and [Scala Style Guide](http://docs.scala-lang.org/style/control-structures.html#curlybraces), we had better enforce the following rule. ``` case: Always omit braces in case clauses. ``` This PR makes a new ScalaStyle rule, 'OmitBracesInCase', and enforces it to the code. ## How was this patch tested? Pass the Jenkins tests (including Scala style checking) Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12280 from dongjoon-hyun/SPARK-14508.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala26
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala3
7 files changed, 15 insertions, 32 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index c40beeff97..58842f9c2f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -429,13 +429,12 @@ abstract class DStream[T: ClassTag] (
*/
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
- case Some(rdd) => {
+ case Some(rdd) =>
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
- }
case None => None
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 431c9dbe2c..e73837eb96 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -109,10 +109,9 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
def restore() {
// Create RDDs from the checkpoint data
currentCheckpointFiles.foreach {
- case(time, file) => {
+ case(time, file) =>
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
- }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 7fba2e8ec0..36f50e04db 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -333,14 +333,13 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
override def restore() {
hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach {
- case (t, f) => {
+ case (t, f) =>
// Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " +
f.mkString("[", ", ", "]") )
batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((t, f)) }
recentlySelectedFiles ++= f
generatedRDDs += ((t, filesToRDD(f)))
- }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 0379957e58..28aed0ca45 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -65,14 +65,12 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// Try to get the previous state RDD
getOrCompute(validTime - slideDuration) match {
- case Some(prevStateRDD) => { // If previous state RDD exists
-
+ case Some(prevStateRDD) => // If previous state RDD exists
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
- case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+ case Some(parentRDD) => // If parent RDD exists, then compute as usual
computeUsingPreviousRDD(parentRDD, prevStateRDD)
- }
- case None => { // If parent RDD does not exist
+ case None => // If parent RDD does not exist
// Re-apply the update function to the old state RDD
val updateFuncLocal = updateFunc
@@ -82,17 +80,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
}
val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)
- }
}
- }
-
- case None => { // If previous session RDD does not exist (first input data)
+ case None => // If previous session RDD does not exist (first input data)
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
- case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+ case Some(parentRDD) => // If parent RDD exists, then compute as usual
initialRDD match {
- case None => {
+ case None =>
// Define the function for the mapPartition operation on grouped RDD;
// first map the grouped tuple to tuples of required type,
// and then apply the update function
@@ -105,18 +100,13 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
// logDebug("Generating state RDD for time " + validTime + " (first)")
Some(sessionRDD)
- }
- case Some(initialStateRDD) => {
+ case Some(initialStateRDD) =>
computeUsingPreviousRDD(parentRDD, initialStateRDD)
- }
}
- }
- case None => { // If parent RDD does not exist, then nothing to do!
+ case None => // If parent RDD does not exist, then nothing to do!
// logDebug("Not generating state RDD (no previous state, no parent)")
None
- }
}
- }
}
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index bd60059b18..cfcbdc7c38 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -538,10 +538,9 @@ class BasicOperationsSuite extends TestSuiteBase {
val stateObj = state.getOrElse(new StateObject)
values.sum match {
case 0 => stateObj.expireCounter += 1 // no new values
- case n => { // has new values, increment and reset expireCounter
+ case n => // has new values, increment and reset expireCounter
stateObj.counter += n
stateObj.expireCounter = 0
- }
}
stateObj.expireCounter match {
case 2 => None // seen twice with no new values, give it the boot
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index fbb25d4c59..bdbac64b9b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -267,10 +267,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty,
"No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.currentCheckpointFiles.foreach {
- case (time, file) => {
+ case (time, file) =>
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
" for state stream before first failure does not exist")
- }
}
// Run till a further time such that previous checkpoint files in the stream would be deleted
@@ -297,10 +296,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty,
"No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.currentCheckpointFiles.foreach {
- case (time, file) => {
+ case (time, file) =>
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
" for state stream before seconds failure does not exist")
- }
}
ssc.stop()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 29bee4adf2..60c8e70235 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -382,11 +382,10 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
fs.rename(tempHadoopFile, hadoopFile)
done = true
} catch {
- case ioe: IOException => {
+ case ioe: IOException =>
fs = testDir.getFileSystem(new Configuration())
logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.",
ioe)
- }
}
}
if (!done) {