aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala46
1 files changed, 27 insertions, 19 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 3a664c4f5c..928739a416 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.{InputStream, NotSerializableException}
+import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scala.collection.Map
@@ -25,6 +26,7 @@ import scala.collection.mutable.Queue
import scala.reflect.ClassTag
import scala.util.control.NonFatal
+import org.apache.commons.lang.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@@ -43,7 +45,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
+import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils}
@@ -106,7 +108,7 @@ class StreamingContext private[streaming] (
* HDFS compatible filesystems
*/
def this(path: String, hadoopConf: Configuration) =
- this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).get, null)
+ this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).orNull, null)
/**
* Recreate a StreamingContext from a checkpoint file.
@@ -122,17 +124,14 @@ class StreamingContext private[streaming] (
def this(path: String, sparkContext: SparkContext) = {
this(
sparkContext,
- CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).get,
+ CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).orNull,
null)
}
+ require(_sc != null || _cp != null,
+ "Spark Streaming cannot be initialized with both SparkContext and checkpoint as null")
- if (_sc == null && _cp == null) {
- throw new Exception("Spark Streaming cannot be initialized with " +
- "both SparkContext and checkpoint as null")
- }
-
- private[streaming] val isCheckpointPresent = (_cp != null)
+ private[streaming] val isCheckpointPresent: Boolean = _cp != null
private[streaming] val sc: SparkContext = {
if (_sc != null) {
@@ -201,6 +200,10 @@ class StreamingContext private[streaming] (
private val startSite = new AtomicReference[CallSite](null)
+ // Copy of thread-local properties from SparkContext. These properties will be set in all tasks
+ // submitted by this StreamingContext after start.
+ private[streaming] val savedProperties = new AtomicReference[Properties](new Properties)
+
private[streaming] def getStartSite(): CallSite = startSite.get()
private var shutdownHookRef: AnyRef = _
@@ -213,8 +216,8 @@ class StreamingContext private[streaming] (
def sparkContext: SparkContext = sc
/**
- * Set each DStreams in this context to remember RDDs it generated in the last given duration.
- * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * Set each DStream in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of time and release them for garbage
* collection. This method allows the developer to specify how long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
* @param duration Minimum duration that each DStream should remember its RDDs
@@ -282,13 +285,14 @@ class StreamingContext private[streaming] (
}
/**
- * Create a input stream from TCP source hostname:port. Data is received using
+ * Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ * @see [[socketStream]]
*/
def socketTextStream(
hostname: String,
@@ -299,7 +303,7 @@ class StreamingContext private[streaming] (
}
/**
- * Create a input stream from TCP source hostname:port. Data is received using
+ * Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interpreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
@@ -496,9 +500,10 @@ class StreamingContext private[streaming] (
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}
- /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
- * receiving system events related to streaming.
- */
+ /**
+ * Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+ * receiving system events related to streaming.
+ */
def addStreamingListener(streamingListener: StreamingListener) {
scheduler.listenerBus.addListener(streamingListener)
}
@@ -528,11 +533,12 @@ class StreamingContext private[streaming] (
}
}
- if (Utils.isDynamicAllocationEnabled(sc.conf)) {
+ if (Utils.isDynamicAllocationEnabled(sc.conf) ||
+ ExecutorAllocationManager.isDynamicAllocationEnabled(conf)) {
logWarning("Dynamic Allocation is enabled for this application. " +
"Enabling Dynamic allocation for Spark Streaming applications can cause data loss if " +
"Write Ahead Log is not enabled for non-replayable sources like Flume. " +
- "See the programming guide for details on how to enable the Write Ahead Log")
+ "See the programming guide for details on how to enable the Write Ahead Log.")
}
}
@@ -573,6 +579,8 @@ class StreamingContext private[streaming] (
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
+ savedProperties.set(SerializationUtils.clone(
+ sparkContext.localProperties.get()).asInstanceOf[Properties])
scheduler.start()
}
state = StreamingContextState.ACTIVE
@@ -860,7 +868,7 @@ private class StreamingContextPythonHelper {
*/
def tryRecoverFromCheckpoint(checkpointPath: String): Option[StreamingContext] = {
val checkpointOption = CheckpointReader.read(
- checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false)
+ checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = false)
checkpointOption.map(new StreamingContext(null, _, null))
}
}