aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-06-18 19:36:05 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-06-18 19:36:05 -0700
commit43f50decdd20fafc55913c56ffa30f56040090e4 (patch)
treeda3cf219841826b9d1bc6c5870994f6d1bfe7d32 /streaming
parentdc413138995b45a7a957acae007dc11622110310 (diff)
downloadspark-43f50decdd20fafc55913c56ffa30f56040090e4.tar.gz
spark-43f50decdd20fafc55913c56ffa30f56040090e4.tar.bz2
spark-43f50decdd20fafc55913c56ffa30f56040090e4.zip
[SPARK-8135] Don't load defaults when reconstituting Hadoop Configurations
Author: Sandy Ryza <sandy@cloudera.com> Closes #6679 from sryza/sandy-spark-8135 and squashes the following commits: c5554ff [Sandy Ryza] SPARK-8135. In SerializableWritable, don't load defaults when instantiating Configuration
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala9
4 files changed, 14 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 6c1fab5674..86a8e2beff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,10 +26,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.spark.{SparkConf, SerializableWritable}
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
-import org.apache.spark.util.{TimeStampedHashMap, Utils}
+import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
/**
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
@@ -78,7 +77,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
extends InputDStream[(K, V)](ssc_) {
- private val serializableConfOpt = conf.map(new SerializableWritable(_))
+ private val serializableConfOpt = conf.map(new SerializableConfiguration(_))
/**
* Minimum duration of remembering the information of selected files. Defaults to 60 seconds.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 358e4c66df..71bec96d46 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -24,10 +24,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
+import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.streaming.StreamingContext.rddToFileName
+import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf}
/**
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
@@ -688,7 +689,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
): Unit = ssc.withScope {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
- val serializableConf = new SerializableWritable(conf)
+ val serializableConf = new SerializableJobConf(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
@@ -721,7 +722,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
conf: Configuration = ssc.sparkContext.hadoopConfiguration
): Unit = ssc.withScope {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
- val serializableConf = new SerializableWritable(conf)
+ val serializableConf = new SerializableConfiguration(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index ffce6a4c3c..31ce8e1ec1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -23,12 +23,11 @@ import java.util.UUID
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import org.apache.commons.io.FileUtils
-
import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.util._
+import org.apache.spark.util.SerializableConfiguration
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
@@ -94,7 +93,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
- private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
+ private val broadcastedHadoopConf = new SerializableConfiguration(hadoopConfig)
override def isValid(): Boolean = true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index f1504b09c9..e6cdbec11e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -21,10 +21,12 @@ import scala.collection.mutable.{HashMap, SynchronizedMap}
import scala.language.existentials
import org.apache.spark.streaming.util.WriteAheadLogUtils
-import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
+import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.rpc._
import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, StopReceiver}
+import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl,
+ StopReceiver}
+import org.apache.spark.util.SerializableConfiguration
/**
* Messages used by the NetworkReceiver and the ReceiverTracker to communicate
@@ -294,7 +296,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
val checkpointDirOption = Option(ssc.checkpointDir)
- val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)
+ val serializableHadoopConf =
+ new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {