diff options
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 46 |
1 files changed, 27 insertions, 19 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 3a664c4f5c..928739a416 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.io.{InputStream, NotSerializableException} +import java.util.Properties import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import scala.collection.Map @@ -25,6 +26,7 @@ import scala.collection.mutable.Queue import scala.reflect.ClassTag import scala.util.control.NonFatal +import org.apache.commons.lang.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} @@ -43,7 +45,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} +import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils} @@ -106,7 +108,7 @@ class StreamingContext private[streaming] ( * HDFS compatible filesystems */ def this(path: String, hadoopConf: Configuration) = - this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).get, null) + this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).orNull, null) /** * Recreate a StreamingContext from a checkpoint file. @@ -122,17 +124,14 @@ class StreamingContext private[streaming] ( def this(path: String, sparkContext: SparkContext) = { this( sparkContext, - CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).get, + CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).orNull, null) } + require(_sc != null || _cp != null, + "Spark Streaming cannot be initialized with both SparkContext and checkpoint as null") - if (_sc == null && _cp == null) { - throw new Exception("Spark Streaming cannot be initialized with " + - "both SparkContext and checkpoint as null") - } - - private[streaming] val isCheckpointPresent = (_cp != null) + private[streaming] val isCheckpointPresent: Boolean = _cp != null private[streaming] val sc: SparkContext = { if (_sc != null) { @@ -201,6 +200,10 @@ class StreamingContext private[streaming] ( private val startSite = new AtomicReference[CallSite](null) + // Copy of thread-local properties from SparkContext. These properties will be set in all tasks + // submitted by this StreamingContext after start. + private[streaming] val savedProperties = new AtomicReference[Properties](new Properties) + private[streaming] def getStartSite(): CallSite = startSite.get() private var shutdownHookRef: AnyRef = _ @@ -213,8 +216,8 @@ class StreamingContext private[streaming] ( def sparkContext: SparkContext = sc /** - * Set each DStreams in this context to remember RDDs it generated in the last given duration. - * DStreams remember RDDs only for a limited duration of time and releases them for garbage + * Set each DStream in this context to remember RDDs it generated in the last given duration. + * DStreams remember RDDs only for a limited duration of time and release them for garbage * collection. This method allows the developer to specify how long to remember the RDDs ( * if the developer wishes to query old data outside the DStream computation). * @param duration Minimum duration that each DStream should remember its RDDs @@ -282,13 +285,14 @@ class StreamingContext private[streaming] ( } /** - * Create a input stream from TCP source hostname:port. Data is received using + * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + * @see [[socketStream]] */ def socketTextStream( hostname: String, @@ -299,7 +303,7 @@ class StreamingContext private[streaming] ( } /** - * Create a input stream from TCP source hostname:port. Data is received using + * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes it interpreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data @@ -496,9 +500,10 @@ class StreamingContext private[streaming] ( new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } - /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for - * receiving system events related to streaming. - */ + /** + * Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for + * receiving system events related to streaming. + */ def addStreamingListener(streamingListener: StreamingListener) { scheduler.listenerBus.addListener(streamingListener) } @@ -528,11 +533,12 @@ class StreamingContext private[streaming] ( } } - if (Utils.isDynamicAllocationEnabled(sc.conf)) { + if (Utils.isDynamicAllocationEnabled(sc.conf) || + ExecutorAllocationManager.isDynamicAllocationEnabled(conf)) { logWarning("Dynamic Allocation is enabled for this application. " + "Enabling Dynamic allocation for Spark Streaming applications can cause data loss if " + "Write Ahead Log is not enabled for non-replayable sources like Flume. " + - "See the programming guide for details on how to enable the Write Ahead Log") + "See the programming guide for details on how to enable the Write Ahead Log.") } } @@ -573,6 +579,8 @@ class StreamingContext private[streaming] ( sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") + savedProperties.set(SerializationUtils.clone( + sparkContext.localProperties.get()).asInstanceOf[Properties]) scheduler.start() } state = StreamingContextState.ACTIVE @@ -860,7 +868,7 @@ private class StreamingContextPythonHelper { */ def tryRecoverFromCheckpoint(checkpointPath: String): Option[StreamingContext] = { val checkpointOption = CheckpointReader.read( - checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false) + checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = false) checkpointOption.map(new StreamingContext(null, _, null)) } } |