diff options
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala | 2 |
2 files changed, 2 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 5cf162f23e..ee09c2085b 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -537,7 +537,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag]( */ def saveAsHadoopFile[F <: OutputFormat[K, V]]( path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) { - saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec) + saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec) } /** diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 8b23642e4a..47274f41a5 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -85,7 +85,7 @@ class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest]( } // Create Threads for each Topic/Message Stream we are listening - val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]] + val decoder = manifest[D].runtimeClass.newInstance.asInstanceOf[Decoder[T]] val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder) // Start the messages handler for each partition |