From 3e27940a19e7bab448f1af11d2065ecd1ec66197 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Thu, 14 Apr 2016 10:14:38 -0700 Subject: [SPARK-14630][BUILD][CORE][SQL][STREAMING] Code style: public abstract methods should have explicit return types ## What changes were proposed in this pull request? Currently many public abstract methods (in abstract classes as well as traits) don't declare return types explicitly, such as in [o.a.s.streaming.dstream.InputDStream](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala#L110): ```scala def start() // should be: def start(): Unit def stop() // should be: def stop(): Unit ``` These methods exist in core, sql, streaming; this PR fixes them. ## How was this patch tested? N/A ## Which piece of scala style rule led to the changes? the rule was added separately in https://github.com/apache/spark/pull/12396 Author: Liwei Lin Closes #12389 from lw-lin/public-abstract-methods. --- .../org/apache/spark/streaming/dstream/InputDStream.scala | 4 ++-- .../org/apache/spark/streaming/receiver/BlockGenerator.scala | 8 ++++---- .../apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 2 +- .../scala/org/apache/spark/streaming/receiver/Receiver.scala | 4 ++-- .../apache/spark/streaming/receiver/ReceiverSupervisor.scala | 10 +++++----- 5 files changed, 14 insertions(+), 14 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index dc88349db5..a3c125c306 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -107,8 +107,8 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) } /** Method called to start receiving data. Subclasses must implement this method. */ - def start() + def start(): Unit /** Method called to stop receiving data. Subclasses must implement this method. */ - def stop() + def stop(): Unit } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index e42bea6ec6..4592e015ed 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -37,7 +37,7 @@ private[streaming] trait BlockGeneratorListener { * that will be useful when a block is generated. Any long blocking operation in this callback * will hurt the throughput. */ - def onAddData(data: Any, metadata: Any) + def onAddData(data: Any, metadata: Any): Unit /** * Called when a new block of data is generated by the block generator. The block generation @@ -47,7 +47,7 @@ private[streaming] trait BlockGeneratorListener { * be useful when the block has been successfully stored. Any long blocking operation in this * callback will hurt the throughput. */ - def onGenerateBlock(blockId: StreamBlockId) + def onGenerateBlock(blockId: StreamBlockId): Unit /** * Called when a new block is ready to be pushed. Callers are supposed to store the block into @@ -55,13 +55,13 @@ private[streaming] trait BlockGeneratorListener { * thread, that is not synchronized with any other callbacks. Hence it is okay to do long * blocking operation in this callback. */ - def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]): Unit /** * Called when an error has occurred in the BlockGenerator. Can be called form many places * so better to not do any long block operation in this callback. */ - def onError(message: String, throwable: Throwable) + def onError(message: String, throwable: Throwable): Unit } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 85350ff658..7aea1c9b64 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -48,7 +48,7 @@ private[streaming] trait ReceivedBlockHandler { def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult /** Cleanup old blocks older than the given threshold time */ - def cleanupOldBlocks(threshTime: Long) + def cleanupOldBlocks(threshTime: Long): Unit } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 3376cd557d..5157ca62dc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -99,13 +99,13 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable * (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()` * immediately, and then `onStart()` after a delay. */ - def onStart() + def onStart(): Unit /** * This method is called by the system when the receiver is stopped. All resources * (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method. */ - def onStop() + def onStop(): Unit /** Override this to specify a preferred location (hostname). */ def preferredLocation: Option[String] = None diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index e0fe8d2206..42fc84c19b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -70,28 +70,28 @@ private[streaming] abstract class ReceiverSupervisor( @volatile private[streaming] var receiverState = Initialized /** Push a single data item to backend data store. */ - def pushSingle(data: Any) + def pushSingle(data: Any): Unit /** Store the bytes of received data as a data block into Spark's memory. */ def pushBytes( bytes: ByteBuffer, optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId] - ) + ): Unit /** Store a iterator of received data as a data block into Spark's memory. */ def pushIterator( iterator: Iterator[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId] - ) + ): Unit /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId] - ) + ): Unit /** * Create a custom [[BlockGenerator]] that the receiver implementation can directly control @@ -103,7 +103,7 @@ private[streaming] abstract class ReceiverSupervisor( def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator /** Report errors. */ - def reportError(message: String, throwable: Throwable) + def reportError(message: String, throwable: Throwable): Unit /** * Called when supervisor is started. -- cgit v1.2.3