diff options
author | jerryshao <saisai.shao@intel.com> | 2015-02-17 10:45:18 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-02-17 10:45:18 +0000 |
commit | a65766bf0244a41b793b9dc5fbdd2882664ad00e (patch) | |
tree | d66bbc9b35b6844a5f46618ff56ca0dec5f5125f | |
parent | c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f (diff) | |
download | spark-a65766bf0244a41b793b9dc5fbdd2882664ad00e.tar.gz spark-a65766bf0244a41b793b9dc5fbdd2882664ad00e.tar.bz2 spark-a65766bf0244a41b793b9dc5fbdd2882664ad00e.zip |
[SPARK-5826][Streaming] Fix Configuration not serializable problem
Author: jerryshao <saisai.shao@intel.com>
Closes #4612 from jerryshao/SPARK-5826 and squashes the following commits:
7ec71db [jerryshao] Remove transient for conf statement
88d84e6 [jerryshao] Fix Configuration not serializable problem
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 6379b88527..4f7db41abe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream} -import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.reflect.ClassTag @@ -27,6 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.SerializableWritable import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.util.{TimeStampedHashMap, Utils} @@ -78,6 +78,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) extends InputDStream[(K, V)](ssc_) { + private val serializableConfOpt = conf.map(new SerializableWritable(_)) + // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -240,7 +242,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file =>{ - val rdd = conf match { + val rdd = serializableConfOpt.map(_.value) match { case Some(config) => context.sparkContext.newAPIHadoopFile( file, fm.runtimeClass.asInstanceOf[Class[F]], |