diff options
author | Jacek Laskowski <jacek@japila.pl> | 2016-01-23 12:14:16 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-23 12:14:16 -0800 |
commit | cfdcef70ddd25484f1cb1791e529210d602c2283 (patch) | |
tree | 3b8bc3a8d8781b1eefb901c078af7683d53e0ad4 /streaming/src | |
parent | 423783a08bb8730852973aca19603e444d15040d (diff) | |
download | spark-cfdcef70ddd25484f1cb1791e529210d602c2283.tar.gz spark-cfdcef70ddd25484f1cb1791e529210d602c2283.tar.bz2 spark-cfdcef70ddd25484f1cb1791e529210d602c2283.zip |
[STREAMING][MINOR] Scaladoc + logs
Found while doing code review
Author: Jacek Laskowski <jacek@japila.pl>
Closes #10878 from jaceklaskowski/streaming-scaladoc-logs-tiny-fixes.
Diffstat (limited to 'streaming/src')
4 files changed, 5 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala index f1114c1e5a..66f646d7dc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala @@ -30,9 +30,8 @@ import org.apache.spark.util.ClosureCleaner * `mapWithState` operation of a * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]] (Scala) or a * [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] (Java). - * Use the [[org.apache.spark.streaming.StateSpec StateSpec.apply()]] or - * [[org.apache.spark.streaming.StateSpec StateSpec.create()]] to create instances of - * this class. + * Use [[org.apache.spark.streaming.StateSpec.function() StateSpec.function]] factory methods + * to create instances of this class. * * Example in Scala: * {{{ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala index 36ff9c7e61..ed08191f41 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala @@ -90,7 +90,7 @@ private[streaming] class MapWithStateDStreamImpl[ } /** - * A DStream that allows per-key state to be maintains, and arbitrary records to be generated + * A DStream that allows per-key state to be maintained, and arbitrary records to be generated * based on updates to the state. This is the main DStream that implements the `mapWithState` * operation on DStreams. * diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 60b5c838e9..5f1c671c3c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -166,7 +166,7 @@ private[streaming] class ReceivedBlockTracker( def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized { require(cleanupThreshTime.milliseconds < clock.getTimeMillis()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq - logInfo("Deleting batches " + timesToCleanup) + logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}") if (writeToLog(BatchCleanupEvent(timesToCleanup))) { timeToAllocatedBlocks --= timesToCleanup writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 4908be0536..cacd430cf3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -91,7 +91,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized { val batchUIData = BatchUIData(batchStarted.batchInfo) - runningBatchUIData(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo) + runningBatchUIData(batchStarted.batchInfo.batchTime) = batchUIData waitingBatchUIData.remove(batchStarted.batchInfo.batchTime) totalReceivedRecords += batchUIData.numRecords |