aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-07-30 17:44:20 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-30 17:44:20 -0700
commit9307f5653d19a6a2fda355a675ca9ea97e35611b (patch)
treef830d04b1ea9d8e6e95df79d767a1aa382134838 /streaming
parent3c66ff727d4b47220e1ff363cea215189ed64f36 (diff)
downloadspark-9307f5653d19a6a2fda355a675ca9ea97e35611b.tar.gz
spark-9307f5653d19a6a2fda355a675ca9ea97e35611b.tar.bz2
spark-9307f5653d19a6a2fda355a675ca9ea97e35611b.zip
[SPARK-9472] [STREAMING] consistent hadoop configuration, streaming only
Author: cody koeninger <cody@koeninger.org> Closes #7772 from koeninger/streaming-hadoop-config and squashes the following commits: 5267284 [cody koeninger] [SPARK-4229][Streaming] consistent hadoop configuration, streaming only
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala3
4 files changed, 9 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 65d4e933bf..2780d5b6ad 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkException, SparkConf, Logging}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{MetadataCleaner, Utils}
import org.apache.spark.streaming.scheduler.JobGenerator
@@ -100,7 +101,7 @@ object Checkpoint extends Logging {
}
val path = new Path(checkpointDir)
- val fs = fsOption.getOrElse(path.getFileSystem(new Configuration()))
+ val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
if (fs.exists(path)) {
val statuses = fs.listStatus(path)
if (statuses != null) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 92438f1b1f..177e710ace 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.serializer.SerializationDebugger
@@ -110,7 +111,7 @@ class StreamingContext private[streaming] (
* Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
- def this(path: String) = this(path, new Configuration)
+ def this(path: String) = this(path, SparkHadoopUtil.get.conf)
/**
* Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
@@ -803,7 +804,7 @@ object StreamingContext extends Logging {
def getActiveOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
- hadoopConf: Configuration = new Configuration(),
+ hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
ACTIVATION_LOCK.synchronized {
@@ -828,7 +829,7 @@ object StreamingContext extends Logging {
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
- hadoopConf: Configuration = new Configuration(),
+ hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 959ac9c177..26383e4201 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -788,7 +788,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
- conf: Configuration = new Configuration) {
+ conf: Configuration = dstream.context.sparkContext.hadoopConfiguration) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 40deb6d7ea..35cc3ce5cf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -33,6 +33,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.function.{Function0 => JFunction0}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
@@ -136,7 +137,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Recreate a JavaStreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
- def this(path: String) = this(new StreamingContext(path, new Configuration))
+ def this(path: String) = this(new StreamingContext(path, SparkHadoopUtil.get.conf))
/**
* Re-creates a JavaStreamingContext from a checkpoint file.