aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-02-17 10:45:18 +0000
committerSean Owen <sowen@cloudera.com>2015-02-17 10:45:18 +0000
commita65766bf0244a41b793b9dc5fbdd2882664ad00e (patch)
treed66bbc9b35b6844a5f46618ff56ca0dec5f5125f /streaming
parentc06e42f2c1e5fcf123b466efd27ee4cb53bbed3f (diff)
downloadspark-a65766bf0244a41b793b9dc5fbdd2882664ad00e.tar.gz
spark-a65766bf0244a41b793b9dc5fbdd2882664ad00e.tar.bz2
spark-a65766bf0244a41b793b9dc5fbdd2882664ad00e.zip
[SPARK-5826][Streaming] Fix Configuration not serializable problem
Author: jerryshao <saisai.shao@intel.com> Closes #4612 from jerryshao/SPARK-5826 and squashes the following commits: 7ec71db [jerryshao] Remove transient for conf statement 88d84e6 [jerryshao] Fix Configuration not serializable problem
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala6
1 files changed, 4 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 6379b88527..4f7db41abe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -18,7 +18,6 @@
package org.apache.spark.streaming.dstream
import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.reflect.ClassTag
@@ -27,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.spark.SerializableWritable
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.util.{TimeStampedHashMap, Utils}
@@ -78,6 +78,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
extends InputDStream[(K, V)](ssc_) {
+ private val serializableConfOpt = conf.map(new SerializableWritable(_))
+
// This is a def so that it works during checkpoint recovery:
private def clock = ssc.scheduler.clock
@@ -240,7 +242,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file =>{
- val rdd = conf match {
+ val rdd = serializableConfOpt.map(_.value) match {
case Some(config) => context.sparkContext.newAPIHadoopFile(
file,
fm.runtimeClass.asInstanceOf[Class[F]],