diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-13 23:57:27 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-13 23:57:27 -0800 |
commit | f8e239e058953e8db88e784439cfd9eca446e606 (patch) | |
tree | d71125aba8f9eda10280cfed70f84556ae593dd2 /streaming | |
parent | 4e497db8f3826cf5142b2165a08d02c6f3c2cd90 (diff) | |
parent | fdaabdc67387524ffb84354f87985f48bd31cf60 (diff) | |
download | spark-f8e239e058953e8db88e784439cfd9eca446e606.tar.gz spark-f8e239e058953e8db88e784439cfd9eca446e606.tar.bz2 spark-f8e239e058953e8db88e784439cfd9eca446e606.zip |
Merge remote-tracking branch 'apache/master' into filestream-fix
Conflicts:
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
Diffstat (limited to 'streaming')
7 files changed, 186 insertions, 33 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 1ec4492bca..a493a8279f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -208,7 +208,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration) } - /** * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a sliding window over this DStream. However, the reduction is done incrementally @@ -410,7 +409,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T } /** - * Enable periodic checkpointing of RDDs of this DStream + * Enable periodic checkpointing of RDDs of this DStream. * @param interval Time interval after which generated RDD will be checkpointed */ def checkpoint(interval: Duration) = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 299628ce9f..844316a1c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -343,6 +343,10 @@ abstract class DStream[T: ClassTag] ( private[streaming] def clearMetadata(time: Time) { val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) generatedRDDs --= oldRDDs.keys + if (ssc.conf.getBoolean("spark.streaming.unpersist", false)) { + logDebug("Unpersisting old RDDs: " + oldRDDs.keys.mkString(", ")) + oldRDDs.values.foreach(_.unpersist(false)) + } logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) dependencies.foreach(_.clearMetadata(time)) @@ -763,9 +767,11 @@ abstract class DStream[T: ClassTag] ( } /** - * Register this DStream as an output DStream. + * Register this streaming as an output stream. This would ensure that RDDs of this + * DStream will be generated. */ - private[streaming] def register() { + private[streaming] def register(): DStream[T] = { ssc.graph.addOutputStream(this) + this } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 2da4127f47..38bad5ac80 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -94,7 +94,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } } case None => - logInfo("Nothing to delete") + logDebug("Nothing to delete") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala new file mode 100644 index 0000000000..b9c0596378 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import scala.annotation.tailrec + +import java.io.OutputStream +import java.util.concurrent.TimeUnit._ + +class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream { + val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS) + val CHUNK_SIZE = 8192 + var lastSyncTime = System.nanoTime + var bytesWrittenSinceSync: Long = 0 + + override def write(b: Int) { + waitToWrite(1) + out.write(b) + } + + override def write(bytes: Array[Byte]) { + write(bytes, 0, bytes.length) + } + + @tailrec + override final def write(bytes: Array[Byte], offset: Int, length: Int) { + val writeSize = math.min(length - offset, CHUNK_SIZE) + if (writeSize > 0) { + waitToWrite(writeSize) + out.write(bytes, offset, writeSize) + write(bytes, offset + writeSize, length) + } + } + + override def flush() { + out.flush() + } + + override def close() { + out.close() + } + + @tailrec + private def waitToWrite(numBytes: Int) { + val now = System.nanoTime + val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS) + val rate = bytesWrittenSinceSync.toDouble / elapsedSecs + if (rate < bytesPerSec) { + // It's okay to write; just update some variables and return + bytesWrittenSinceSync += numBytes + if (now > lastSyncTime + SYNC_INTERVAL) { + // Sync interval has passed; let's resync + lastSyncTime = now + bytesWrittenSinceSync = numBytes + } + } else { + // Calculate how much time we should sleep to bring ourselves to the desired rate. + // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) + val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS) + if (sleepTime > 0) Thread.sleep(sleepTime) + waitToWrite(numBytes) + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index 6585d494a6..463617a713 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -17,14 +17,17 @@ package org.apache.spark.streaming.util -import java.nio.ByteBuffer -import org.apache.spark.util.{RateLimitedOutputStream, IntParam} +import java.io.IOException import java.net.ServerSocket -import org.apache.spark.{SparkConf, Logging} -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream +import java.nio.ByteBuffer + import scala.io.Source -import java.io.IOException + +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream + +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.util.IntParam /** * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index cb53555f5c..bcb0c28bf0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -24,7 +24,9 @@ import org.apache.spark.SparkContext._ import util.ManualClock import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.dstream.{WindowedDStream, DStream} +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.reflect.ClassTag class BasicOperationsSuite extends TestSuiteBase { test("map") { @@ -394,40 +396,31 @@ class BasicOperationsSuite extends TestSuiteBase { Thread.sleep(1000) } - test("forgetting of RDDs - map and window operations") { - assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second") + val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq - val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq + test("rdd cleanup - map and window") { val rememberDuration = Seconds(3) - - assert(input.size === 10, "Number of inputs have changed") - def operation(s: DStream[Int]): DStream[(Int, Int)] = { s.map(x => (x % 10, 1)) .window(Seconds(2), Seconds(1)) .window(Seconds(4), Seconds(2)) } - val ssc = setupStreams(input, operation _) - ssc.remember(rememberDuration) - runStreams[(Int, Int)](ssc, input.size, input.size / 2) - - val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head - val windowedStream1 = windowedStream2.dependencies.head + val operatedStream = runCleanupTest(conf, operation _, + numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3)) + val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] + val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] val mappedStream = windowedStream1.dependencies.head - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - assert(clock.time === Seconds(10).milliseconds) - - // IDEALLY - // WindowedStream2 should remember till 7 seconds: 10, 8, - // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5 - // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, + // Checkpoint remember durations + assert(windowedStream2.rememberDuration === rememberDuration) + assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration) + assert(mappedStream.rememberDuration === + rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration) - // IN THIS TEST - // WindowedStream2 should remember till 7 seconds: 10, 8, + // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7 // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 - // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 + // MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 // WindowedStream2 assert(windowedStream2.generatedRDDs.contains(Time(10000))) @@ -444,4 +437,37 @@ class BasicOperationsSuite extends TestSuiteBase { assert(mappedStream.generatedRDDs.contains(Time(2000))) assert(!mappedStream.generatedRDDs.contains(Time(1000))) } + + test("rdd cleanup - updateStateByKey") { + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) + } + val stateStream = runCleanupTest( + conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3))) + + assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2) + assert(stateStream.generatedRDDs.contains(Time(10000))) + assert(!stateStream.generatedRDDs.contains(Time(4000))) + } + + /** Test cleanup of RDDs in DStream metadata */ + def runCleanupTest[T: ClassTag]( + conf2: SparkConf, + operation: DStream[Int] => DStream[T], + numExpectedOutput: Int = cleanupTestInput.size, + rememberDuration: Duration = null + ): DStream[T] = { + + // Setup the stream computation + assert(batchDuration === Seconds(1), + "Batch duration has changed from 1 second, check cleanup tests") + val ssc = setupStreams(cleanupTestInput, operation) + val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]] + if (rememberDuration != null) ssc.remember(rememberDuration) + val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput) + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + assert(clock.time === Seconds(10).milliseconds) + assert(output.size === numExpectedOutput) + operatedStream + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala new file mode 100644 index 0000000000..15f13d5b19 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import org.scalatest.FunSuite +import java.io.ByteArrayOutputStream +import java.util.concurrent.TimeUnit._ + +class RateLimitedOutputStreamSuite extends FunSuite { + + private def benchmark[U](f: => U): Long = { + val start = System.nanoTime + f + System.nanoTime - start + } + + test("write") { + val underlying = new ByteArrayOutputStream + val data = "X" * 41000 + val stream = new RateLimitedOutputStream(underlying, 10000) + val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) } + assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4) + assert(underlying.toString("UTF-8") == data) + } +} |