aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-04-14 10:14:38 -0700
committerReynold Xin <rxin@databricks.com>2016-04-14 10:14:38 -0700
commit3e27940a19e7bab448f1af11d2065ecd1ec66197 (patch)
tree76981c9be102eb396cb9be433b52143b18fd2005
parentde2ad52855aee3c60bbc4642afb180d6fe62173b (diff)
downloadspark-3e27940a19e7bab448f1af11d2065ecd1ec66197.tar.gz
spark-3e27940a19e7bab448f1af11d2065ecd1ec66197.tar.bz2
spark-3e27940a19e7bab448f1af11d2065ecd1ec66197.zip
[SPARK-14630][BUILD][CORE][SQL][STREAMING] Code style: public abstract methods should have explicit return types
## What changes were proposed in this pull request? Currently many public abstract methods (in abstract classes as well as traits) don't declare return types explicitly, such as in [o.a.s.streaming.dstream.InputDStream](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala#L110): ```scala def start() // should be: def start(): Unit def stop() // should be: def stop(): Unit ``` These methods exist in core, sql, streaming; this PR fixes them. ## How was this patch tested? N/A ## Which piece of scala style rule led to the changes? the rule was added separately in https://github.com/apache/spark/pull/12396 Author: Liwei Lin <lwlin7@gmail.com> Closes #12389 from lw-lin/public-abstract-methods.
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/BlockTransferService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala10
27 files changed, 50 insertions, 49 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 8fc657c5eb..76692ccec8 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -278,9 +278,9 @@ private object ContextCleaner {
* Listener class used for testing when any item has been cleaned by the Cleaner class.
*/
private[spark] trait CleanerListener {
- def rddCleaned(rddId: Int)
- def shuffleCleaned(shuffleId: Int)
- def broadcastCleaned(broadcastId: Long)
- def accumCleaned(accId: Long)
- def checkpointCleaned(rddId: Long)
+ def rddCleaned(rddId: Int): Unit
+ def shuffleCleaned(shuffleId: Int): Unit
+ def broadcastCleaned(broadcastId: Long): Unit
+ def accumCleaned(accId: Long): Unit
+ def checkpointCleaned(rddId: Long): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index ce11772a6d..339266a5d4 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -41,7 +41,7 @@ trait FutureAction[T] extends Future[T] {
/**
* Cancels the execution of this action.
*/
- def cancel()
+ def cancel(): Unit
/**
* Blocks until this action completes.
@@ -65,7 +65,7 @@ trait FutureAction[T] extends Future[T] {
* When this action is completed, either through an exception, or a value, applies the provided
* function.
*/
- def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext)
+ def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit
/**
* Returns whether the action has already been completed with a value or an exception.
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
index e584952a9a..94506a0cbb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
@@ -33,7 +33,8 @@ private[spark] trait AppClientListener {
/** An application death is an unrecoverable failure condition. */
def dead(reason: String): Unit
- def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
+ def executorAdded(
+ fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
index 70f21fbe0d..52e2854961 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -32,8 +32,8 @@ trait LeaderElectionAgent {
@DeveloperApi
trait LeaderElectable {
- def electedLeader()
- def revokedLeadership()
+ def electedLeader(): Unit
+ def revokedLeadership(): Unit
}
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
index dddf2be57e..b30bc821b7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
@@ -40,12 +40,12 @@ abstract class PersistenceEngine {
* Defines how the object is serialized and persisted. Implementation will
* depend on the store used.
*/
- def persist(name: String, obj: Object)
+ def persist(name: String, obj: Object): Unit
/**
* Defines how the object referred by its name is removed from the store.
*/
- def unpersist(name: String)
+ def unpersist(name: String): Unit
/**
* Gives all objects, matching a prefix. This defines how objects are
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 9c6bc5c62f..aad2e91b25 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -218,7 +218,7 @@ private[deploy] class DriverRunner(
}
private[deploy] trait Sleeper {
- def sleep(seconds: Int)
+ def sleep(seconds: Int): Unit
}
// Needed because ProcessBuilder is a final class and cannot be mocked
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
index e07cb31cbe..7153323d01 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
@@ -25,6 +25,6 @@ import org.apache.spark.TaskState.TaskState
* A pluggable interface used by the Executor to send updates to the cluster scheduler.
*/
private[spark] trait ExecutorBackend {
- def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
+ def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index e43e3a2de2..09ce012e4e 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -36,7 +36,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
* local blocks or put local blocks.
*/
- def init(blockDataManager: BlockDataManager)
+ def init(blockDataManager: BlockDataManager): Unit
/**
* Tear down the transfer service.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala b/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala
index 50c2b9acd6..e0f7c8f021 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala
@@ -23,6 +23,6 @@ package org.apache.spark.scheduler
* job fails (and no further taskSucceeded events will happen).
*/
private[spark] trait JobListener {
- def taskSucceeded(index: Int, result: Any)
- def jobFailed(exception: Exception)
+ def taskSucceeded(index: Int, result: Any): Unit
+ def jobFailed(exception: Exception): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 5baebe8c1f..100ed76ecb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -34,9 +34,9 @@ import org.apache.spark.util.Utils
private[spark] trait SchedulableBuilder {
def rootPool: Pool
- def buildPools()
+ def buildPools(): Unit
- def addTaskSetManager(manager: Schedulable, properties: Properties)
+ def addTaskSetManager(manager: Schedulable, properties: Properties): Unit
}
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 8477a66b39..647d44a0f0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -51,7 +51,7 @@ private[spark] trait TaskScheduler {
def submitTasks(taskSet: TaskSet): Unit
// Cancel a stage.
- def cancelTasks(stageId: Int, interruptThread: Boolean)
+ def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 3d090a4353..918ae376f6 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -357,7 +357,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
* serialization.
*/
trait KryoRegistrator {
- def registerClasses(kryo: Kryo)
+ def registerClasses(kryo: Kryo): Unit
}
private[serializer] object KryoSerializer {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index 6cd7d69518..be1e84a2ba 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -35,7 +35,7 @@ private[spark] trait ShuffleWriterGroup {
val writers: Array[DiskBlockObjectWriter]
/** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
- def releaseWriters(success: Boolean)
+ def releaseWriters(success: Boolean): Unit
}
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 3939b111b5..2b0bc32cf6 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -129,7 +129,7 @@ private[spark] abstract class WebUI(
}
/** Initialize all components of the server. */
- def initialize()
+ def initialize(): Unit
/** Bind to the HTTP server behind this web interface. */
def bind() {
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala
index b34880d3a7..6e80db2f51 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala
@@ -32,10 +32,10 @@ private[spark] trait RollingPolicy {
def shouldRollover(bytesToBeWritten: Long): Boolean
/** Notify that rollover has occurred */
- def rolledOver()
+ def rolledOver(): Unit
/** Notify that bytes have been written */
- def bytesWritten(bytes: Long)
+ def bytesWritten(bytes: Long): Unit
/** Get the desired name of the rollover file */
def generateRolledOverFileSuffix(): String
diff --git a/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala b/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala
index 70f3dd62b9..41f28f6e51 100644
--- a/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala
@@ -26,5 +26,5 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
trait Pseudorandom {
/** Set random seed. */
- def setSeed(seed: Long)
+ def setSeed(seed: Long): Unit
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
index 4615c55d67..61ca7272df 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
@@ -62,7 +62,7 @@ import org.apache.spark.sql.types._
abstract class MutableValue extends Serializable {
var isNull: Boolean = true
def boxed: Any
- def update(v: Any)
+ def update(v: Any): Unit
def copy(): MutableValue
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index be6b2530ef..93a8278528 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -164,7 +164,7 @@ trait BaseGenericInternalRow extends InternalRow {
abstract class MutableRow extends InternalRow {
def setNullAt(i: Int): Unit
- def update(i: Int, value: Any)
+ def update(i: Int, value: Any): Unit
// default implementation (slow)
def setBoolean(i: Int, value: Boolean): Unit = { update(i, value) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
index 78664baa56..7cde04b626 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
@@ -38,7 +38,7 @@ private[columnar] trait ColumnAccessor {
def hasNext: Boolean
- def extractTo(row: MutableRow, ordinal: Int)
+ def extractTo(row: MutableRow, ordinal: Int): Unit
protected def underlyingBuffer: ByteBuffer
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala
index 9a173367f4..d30655e0c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala
@@ -28,12 +28,12 @@ private[columnar] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
- def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false)
+ def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false): Unit
/**
* Appends `row(ordinal)` to the column builder.
*/
- def appendFrom(row: InternalRow, ordinal: Int)
+ def appendFrom(row: InternalRow, ordinal: Int): Unit
/**
* Column statistics information
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index cc5327e0e2..9521506325 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -50,7 +50,7 @@ trait StateStore {
def get(key: UnsafeRow): Option[UnsafeRow]
/** Put a new value for a key. */
- def put(key: UnsafeRow, value: UnsafeRow)
+ def put(key: UnsafeRow, value: UnsafeRow): Unit
/**
* Remove keys that match the following condition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala
index bf78be9d9f..ba1facf11b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala
@@ -37,7 +37,7 @@ abstract class ContinuousQueryListener {
* `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please
* don't block this method as it will block your query.
*/
- def onQueryStarted(queryStarted: QueryStarted)
+ def onQueryStarted(queryStarted: QueryStarted): Unit
/**
* Called when there is some status update (ingestion rate updated, etc.)
@@ -47,10 +47,10 @@ abstract class ContinuousQueryListener {
* may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]]
* is terminated when you are processing [[QueryProgress]].
*/
- def onQueryProgress(queryProgress: QueryProgress)
+ def onQueryProgress(queryProgress: QueryProgress): Unit
/** Called when a query is stopped, with or without error */
- def onQueryTerminated(queryTerminated: QueryTerminated)
+ def onQueryTerminated(queryTerminated: QueryTerminated): Unit
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index dc88349db5..a3c125c306 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -107,8 +107,8 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
}
/** Method called to start receiving data. Subclasses must implement this method. */
- def start()
+ def start(): Unit
/** Method called to stop receiving data. Subclasses must implement this method. */
- def stop()
+ def stop(): Unit
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index e42bea6ec6..4592e015ed 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -37,7 +37,7 @@ private[streaming] trait BlockGeneratorListener {
* that will be useful when a block is generated. Any long blocking operation in this callback
* will hurt the throughput.
*/
- def onAddData(data: Any, metadata: Any)
+ def onAddData(data: Any, metadata: Any): Unit
/**
* Called when a new block of data is generated by the block generator. The block generation
@@ -47,7 +47,7 @@ private[streaming] trait BlockGeneratorListener {
* be useful when the block has been successfully stored. Any long blocking operation in this
* callback will hurt the throughput.
*/
- def onGenerateBlock(blockId: StreamBlockId)
+ def onGenerateBlock(blockId: StreamBlockId): Unit
/**
* Called when a new block is ready to be pushed. Callers are supposed to store the block into
@@ -55,13 +55,13 @@ private[streaming] trait BlockGeneratorListener {
* thread, that is not synchronized with any other callbacks. Hence it is okay to do long
* blocking operation in this callback.
*/
- def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]): Unit
/**
* Called when an error has occurred in the BlockGenerator. Can be called form many places
* so better to not do any long block operation in this callback.
*/
- def onError(message: String, throwable: Throwable)
+ def onError(message: String, throwable: Throwable): Unit
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 85350ff658..7aea1c9b64 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -48,7 +48,7 @@ private[streaming] trait ReceivedBlockHandler {
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
/** Cleanup old blocks older than the given threshold time */
- def cleanupOldBlocks(threshTime: Long)
+ def cleanupOldBlocks(threshTime: Long): Unit
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 3376cd557d..5157ca62dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -99,13 +99,13 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
* immediately, and then `onStart()` after a delay.
*/
- def onStart()
+ def onStart(): Unit
/**
* This method is called by the system when the receiver is stopped. All resources
* (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method.
*/
- def onStop()
+ def onStop(): Unit
/** Override this to specify a preferred location (hostname). */
def preferredLocation: Option[String] = None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index e0fe8d2206..42fc84c19b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -70,28 +70,28 @@ private[streaming] abstract class ReceiverSupervisor(
@volatile private[streaming] var receiverState = Initialized
/** Push a single data item to backend data store. */
- def pushSingle(data: Any)
+ def pushSingle(data: Any): Unit
/** Store the bytes of received data as a data block into Spark's memory. */
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
- )
+ ): Unit
/** Store a iterator of received data as a data block into Spark's memory. */
def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
- )
+ ): Unit
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
- )
+ ): Unit
/**
* Create a custom [[BlockGenerator]] that the receiver implementation can directly control
@@ -103,7 +103,7 @@ private[streaming] abstract class ReceiverSupervisor(
def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator
/** Report errors. */
- def reportError(message: String, throwable: Throwable)
+ def reportError(message: String, throwable: Throwable): Unit
/**
* Called when supervisor is started.