aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala108
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala30
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala13
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/util/Clock.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala1
17 files changed, 135 insertions, 52 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 292ad3b9f9..beba9cfd4f 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -189,7 +189,7 @@ abstract class DStream[T: ClassManifest] (
val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
assert(
- metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000,
+ metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
"than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index fda7264a27..3b910538e0 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -14,7 +14,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
try {
val timeTaken = job.run()
logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
- (System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
+ (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>
logError("Running " + job + " failed", e)
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index aeb7c3eb0e..eb40affe6d 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -22,7 +22,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
- val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_))
+ val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_))
def start() {
// If context was started from checkpoint, then restart timer such that
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index ef73049a81..7256e41af9 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -15,7 +15,6 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.hadoop.fs.Path
import java.util.UUID
@@ -101,14 +100,27 @@ class StreamingContext private (
protected[streaming] var receiverJobThread: Thread = null
protected[streaming] var scheduler: Scheduler = null
+ /**
+ * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * collection. This method allows the developer to specify how to long to remember the RDDs (
+ * if the developer wishes to query old data outside the DStream computation).
+ * @param duration Minimum duration that each DStream should remember its RDDs
+ */
def remember(duration: Time) {
graph.remember(duration)
}
- def checkpoint(dir: String, interval: Time = null) {
- if (dir != null) {
- sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir))
- checkpointDir = dir
+ /**
+ * Sets the context to periodically checkpoint the DStream operations for master
+ * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
+ * @param interval checkpoint interval
+ */
+ def checkpoint(directory: String, interval: Time = null) {
+ if (directory != null) {
+ sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
+ checkpointDir = directory
checkpointInterval = interval
} else {
checkpointDir = null
@@ -122,9 +134,8 @@ class StreamingContext private (
protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
- /**
+ /**
* Create an input stream that pulls messages form a Kafka Broker.
- *
* @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
@@ -147,6 +158,15 @@ class StreamingContext private (
inputStream
}
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
def networkTextStream(
hostname: String,
port: Int,
@@ -155,6 +175,16 @@ class StreamingContext private (
networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes it interepreted as object using the given
+ * converter.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param converter Function to convert the byte stream to objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects received (after converting bytes to objects)
+ */
def networkStream[T: ClassManifest](
hostname: String,
port: Int,
@@ -166,16 +196,32 @@ class StreamingContext private (
inputStream
}
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
def flumeStream (
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = {
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
graph.addInputStream(inputStream)
inputStream
}
-
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects in the received blocks
+ */
def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
@@ -188,7 +234,11 @@ class StreamingContext private (
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
- * for new files and executes the necessary processing on them.
+ * for new files and reads them using the given key-value types and input format.
+ * @param directory HDFS directory to monitor for new file
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassManifest,
@@ -200,13 +250,23 @@ class StreamingContext private (
inputStream
}
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as text files (using key as LongWritable, value
+ * as Text and input format as TextInputFormat).
+ * @param directory HDFS directory to monitor for new file
+ */
def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
/**
* Creates a input stream from an queue of RDDs. In each batch,
- * it will process either one or all of the RDDs returned by the queue
+ * it will process either one or all of the RDDs returned by the queue.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty
+ * @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
queue: Queue[RDD[T]],
@@ -218,13 +278,9 @@ class StreamingContext private (
inputStream
}
- def queueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = {
- val queue = new Queue[RDD[T]]
- val inputStream = queueStream(queue, true, null)
- queue ++= array
- inputStream
- }
-
+ /**
+ * Create a unified DStream from multiple DStreams of the same type and same interval
+ */
def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
@@ -256,7 +312,7 @@ class StreamingContext private (
}
/**
- * This function starts the execution of the streams.
+ * Starts the execution of the streams.
*/
def start() {
if (checkpointDir != null && checkpointInterval == null && graph != null) {
@@ -284,7 +340,7 @@ class StreamingContext private (
}
/**
- * This function stops the execution of the streams.
+ * Sstops the execution of the streams.
*/
def stop() {
try {
@@ -302,6 +358,10 @@ class StreamingContext private (
object StreamingContext {
+ implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
+ new PairDStreamFunctions[K, V](stream)
+ }
+
protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
@@ -312,10 +372,6 @@ object StreamingContext {
new SparkContext(master, frameworkName)
}
- implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
- new PairDStreamFunctions[K, V](stream)
- }
-
protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
if (prefix == null) {
time.milliseconds.toString
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 2976e5e87b..3c6fd5d967 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -1,16 +1,18 @@
package spark.streaming
/**
- * This class is simple wrapper class that represents time in UTC.
- * @param millis Time in UTC long
+ * This is a simple class that represents time. Internally, it represents time as UTC.
+ * The recommended way to create instances of Time is to use helper objects
+ * [[spark.streaming.Milliseconds]], [[spark.streaming.Seconds]], and [[spark.streaming.Minutes]].
+ * @param millis Time in UTC.
*/
case class Time(private val millis: Long) {
def < (that: Time): Boolean = (this.millis < that.millis)
-
+
def <= (that: Time): Boolean = (this.millis <= that.millis)
-
+
def > (that: Time): Boolean = (this.millis > that.millis)
def >= (that: Time): Boolean = (this.millis >= that.millis)
@@ -45,23 +47,33 @@ case class Time(private val millis: Long) {
def milliseconds: Long = millis
}
-object Time {
+private[streaming] object Time {
val zero = Time(0)
implicit def toTime(long: Long) = Time(long)
-
- implicit def toLong(time: Time) = time.milliseconds
}
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of milliseconds.
+ */
object Milliseconds {
def apply(milliseconds: Long) = Time(milliseconds)
}
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of seconds.
+ */
object Seconds {
def apply(seconds: Long) = Time(seconds * 1000)
-}
+}
-object Minutes {
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of minutes.
+ */
+object Minutes {
def apply(minutes: Long) = Time(minutes * 60000)
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index 2e427dadf7..bc23d423d3 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -4,6 +4,7 @@ import spark.{RDD, Partitioner}
import spark.rdd.CoGroupedRDD
import spark.streaming.{Time, DStream}
+private[streaming]
class CoGroupedDStream[K : ClassManifest](
parents: Seq[DStream[(_, _)]],
partitioner: Partitioner
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index 8cdaff467b..cf72095324 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -10,7 +10,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import scala.collection.mutable.HashSet
-
+private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@transient ssc_ : StreamingContext,
directory: String,
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index 7e988cadf4..ff73225e0f 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -17,6 +17,7 @@ import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
+private[streaming]
class FlumeInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -93,6 +94,7 @@ private[streaming] object SparkFlumeEvent {
}
/** A simple server that implements Flume's Avro protocol. */
+private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)
@@ -108,12 +110,13 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
/** A NetworkReceiver which listens for events using the
* Flume Avro interface.*/
+private[streaming]
class FlumeReceiver(
- streamId: Int,
- host: String,
- port: Int,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
+ streamId: Int,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
lazy val dataHandler = new DataHandler(this, storageLevel)
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index a46721af2f..175c75bcb9 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -21,10 +21,12 @@ import scala.collection.JavaConversions._
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
// NOT USED - Originally intended for fault-tolerance
// Metadata for a Kafka Stream that it sent to the Master
+private[streaming]
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
// NOT USED - Originally intended for fault-tolerance
// Checkpoint data specific to a KafkaInputDstream
-case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
+private[streaming]
+case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
@@ -39,6 +41,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
* By default the value is pulled from zookeper.
* @param storageLevel RDD storage level.
*/
+private[streaming]
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -98,6 +101,7 @@ class KafkaInputDStream[T: ClassManifest](
}
}
+private[streaming]
class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 996cc7dea8..aa2f31cea8 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -17,6 +17,7 @@ import java.util.concurrent.ArrayBlockingQueue
* data into Spark Streaming, though it requires the sender to batch data and serialize it
* in the format that the system is configured with.
*/
+private[streaming]
class RawInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -29,6 +30,7 @@ class RawInputDStream[T: ClassManifest](
}
}
+private[streaming]
class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel)
extends NetworkReceiver[Any](streamId) {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index 2686de14d2..d289ed2a3f 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -11,6 +11,7 @@ import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import spark.streaming.{Interval, Time, DStream}
+private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index af5b73ae8d..cbe4372299 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -6,6 +6,7 @@ import spark.storage.StorageLevel
import java.io._
import java.net.Socket
+private[streaming]
class SocketInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -19,7 +20,7 @@ class SocketInputDStream[T: ClassManifest](
}
}
-
+private[streaming]
class SocketReceiver[T: ClassManifest](
streamId: Int,
host: String,
@@ -50,7 +51,7 @@ class SocketReceiver[T: ClassManifest](
}
-
+private[streaming]
object SocketReceiver {
/**
diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index 6e190b5564..175b3060c1 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -6,6 +6,7 @@ import spark.SparkContext._
import spark.storage.StorageLevel
import spark.streaming.{Time, DStream}
+private[streaming]
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
index f1efb2ae72..3bf4c2ecea 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
@@ -5,6 +5,7 @@ import spark.RDD
import collection.mutable.ArrayBuffer
import spark.rdd.UnionRDD
+private[streaming]
class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
index 4b2621c497..7718794cbf 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
@@ -5,7 +5,7 @@ import spark.rdd.UnionRDD
import spark.storage.StorageLevel
import spark.streaming.{Interval, Time, DStream}
-
+private[streaming]
class WindowedDStream[T: ClassManifest](
parent: DStream[T],
_windowTime: Time,
diff --git a/streaming/src/main/scala/spark/streaming/util/Clock.scala b/streaming/src/main/scala/spark/streaming/util/Clock.scala
index ed087e4ea8..974651f9f6 100644
--- a/streaming/src/main/scala/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/spark/streaming/util/Clock.scala
@@ -1,13 +1,12 @@
package spark.streaming.util
-import spark.streaming._
-
-trait Clock {
+private[streaming]
+trait Clock {
def currentTime(): Long
def waitTillTime(targetTime: Long): Long
}
-
+private[streaming]
class SystemClock() extends Clock {
val minPollTime = 25L
@@ -54,6 +53,7 @@ class SystemClock() extends Clock {
}
}
+private[streaming]
class ManualClock() extends Clock {
var time = 0L
diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
index dc55fd902b..2e7f4169c9 100644
--- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
@@ -1,5 +1,6 @@
package spark.streaming.util
+private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
val minPollTime = 25L