aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-04-14 10:14:38 -0700
committerReynold Xin <rxin@databricks.com>2016-04-14 10:14:38 -0700
commit3e27940a19e7bab448f1af11d2065ecd1ec66197 (patch)
tree76981c9be102eb396cb9be433b52143b18fd2005 /streaming/src
parentde2ad52855aee3c60bbc4642afb180d6fe62173b (diff)
downloadspark-3e27940a19e7bab448f1af11d2065ecd1ec66197.tar.gz
spark-3e27940a19e7bab448f1af11d2065ecd1ec66197.tar.bz2
spark-3e27940a19e7bab448f1af11d2065ecd1ec66197.zip
[SPARK-14630][BUILD][CORE][SQL][STREAMING] Code style: public abstract methods should have explicit return types
## What changes were proposed in this pull request? Currently many public abstract methods (in abstract classes as well as traits) don't declare return types explicitly, such as in [o.a.s.streaming.dstream.InputDStream](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala#L110): ```scala def start() // should be: def start(): Unit def stop() // should be: def stop(): Unit ``` These methods exist in core, sql, streaming; this PR fixes them. ## How was this patch tested? N/A ## Which piece of scala style rule led to the changes? the rule was added separately in https://github.com/apache/spark/pull/12396 Author: Liwei Lin <lwlin7@gmail.com> Closes #12389 from lw-lin/public-abstract-methods.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala10
5 files changed, 14 insertions, 14 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index dc88349db5..a3c125c306 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -107,8 +107,8 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
}
/** Method called to start receiving data. Subclasses must implement this method. */
- def start()
+ def start(): Unit
/** Method called to stop receiving data. Subclasses must implement this method. */
- def stop()
+ def stop(): Unit
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index e42bea6ec6..4592e015ed 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -37,7 +37,7 @@ private[streaming] trait BlockGeneratorListener {
* that will be useful when a block is generated. Any long blocking operation in this callback
* will hurt the throughput.
*/
- def onAddData(data: Any, metadata: Any)
+ def onAddData(data: Any, metadata: Any): Unit
/**
* Called when a new block of data is generated by the block generator. The block generation
@@ -47,7 +47,7 @@ private[streaming] trait BlockGeneratorListener {
* be useful when the block has been successfully stored. Any long blocking operation in this
* callback will hurt the throughput.
*/
- def onGenerateBlock(blockId: StreamBlockId)
+ def onGenerateBlock(blockId: StreamBlockId): Unit
/**
* Called when a new block is ready to be pushed. Callers are supposed to store the block into
@@ -55,13 +55,13 @@ private[streaming] trait BlockGeneratorListener {
* thread, that is not synchronized with any other callbacks. Hence it is okay to do long
* blocking operation in this callback.
*/
- def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]): Unit
/**
* Called when an error has occurred in the BlockGenerator. Can be called form many places
* so better to not do any long block operation in this callback.
*/
- def onError(message: String, throwable: Throwable)
+ def onError(message: String, throwable: Throwable): Unit
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 85350ff658..7aea1c9b64 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -48,7 +48,7 @@ private[streaming] trait ReceivedBlockHandler {
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
/** Cleanup old blocks older than the given threshold time */
- def cleanupOldBlocks(threshTime: Long)
+ def cleanupOldBlocks(threshTime: Long): Unit
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 3376cd557d..5157ca62dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -99,13 +99,13 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
* immediately, and then `onStart()` after a delay.
*/
- def onStart()
+ def onStart(): Unit
/**
* This method is called by the system when the receiver is stopped. All resources
* (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method.
*/
- def onStop()
+ def onStop(): Unit
/** Override this to specify a preferred location (hostname). */
def preferredLocation: Option[String] = None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index e0fe8d2206..42fc84c19b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -70,28 +70,28 @@ private[streaming] abstract class ReceiverSupervisor(
@volatile private[streaming] var receiverState = Initialized
/** Push a single data item to backend data store. */
- def pushSingle(data: Any)
+ def pushSingle(data: Any): Unit
/** Store the bytes of received data as a data block into Spark's memory. */
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
- )
+ ): Unit
/** Store a iterator of received data as a data block into Spark's memory. */
def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
- )
+ ): Unit
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
- )
+ ): Unit
/**
* Create a custom [[BlockGenerator]] that the receiver implementation can directly control
@@ -103,7 +103,7 @@ private[streaming] abstract class ReceiverSupervisor(
def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator
/** Report errors. */
- def reportError(message: String, throwable: Throwable)
+ def reportError(message: String, throwable: Throwable): Unit
/**
* Called when supervisor is started.