aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala2
-rw-r--r--core/src/main/scala/spark/RDD.scala6
-rw-r--r--core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala4
-rw-r--r--core/src/main/scala/spark/rdd/EmptyRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/JdbcRDD.scala3
-rw-r--r--core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala2
7 files changed, 17 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 0095b868a8..5cf162f23e 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -536,7 +536,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](
* supplied codec.
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](
- path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
+ path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 3a454df10d..2ebfaadc46 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -500,18 +500,18 @@ abstract class RDD[T: ClassTag](
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
- def zipPartitions[B: ClassManifest, V: ClassManifest](
+ def zipPartitions[B: ClassTag, V: ClassTag](
f: (Iterator[T], Iterator[B]) => Iterator[V],
rdd2: RDD[B]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
+ def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](
f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
rdd2: RDD[B],
rdd3: RDD[C]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
+ def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
rdd2: RDD[B],
rdd3: RDD[C],
diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
index 6044043add..4562884eb3 100644
--- a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
+++ b/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
@@ -1,5 +1,7 @@
package spark.api.java.function
+import scala.reflect.ClassTag
+
/**
* A function that takes two inputs and returns zero or more output records.
*/
@@ -7,5 +9,5 @@ abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Itera
@throws(classOf[Exception])
def call(a: A, b:B) : java.lang.Iterable[C]
- def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]]
+ def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
}
diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala
index e4dd3a7fa7..3b9899238c 100644
--- a/core/src/main/scala/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/spark/rdd/EmptyRDD.scala
@@ -1,12 +1,12 @@
package spark.rdd
import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
-
+import scala.reflect.ClassTag
/**
* An RDD that is empty, i.e. has no element in it.
*/
-class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
+class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
override def getPartitions: Array[Partition] = Array.empty
diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala
index a50f407737..f570dd6d8b 100644
--- a/core/src/main/scala/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/spark/rdd/JdbcRDD.scala
@@ -4,6 +4,7 @@ import java.sql.{Connection, ResultSet}
import spark.{Logging, Partition, RDD, SparkContext, TaskContext}
import spark.util.NextIterator
+import scala.reflect.ClassTag
private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
override def index = idx
@@ -28,7 +29,7 @@ private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) e
* This should only call getInt, getString, etc; the RDD takes care of calling next.
* The default maps a ResultSet to an array of Object.
*/
-class JdbcRDD[T: ClassManifest](
+class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
index b234428ab2..d3c2370885 100644
--- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
@@ -2,6 +2,7 @@ package spark.rdd
import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
+import scala.reflect.ClassTag
private[spark] class ZippedPartitionsPartition(
idx: Int,
@@ -20,7 +21,7 @@ private[spark] class ZippedPartitionsPartition(
}
}
-abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
+abstract class ZippedPartitionsBaseRDD[V: ClassTag](
sc: SparkContext,
var rdds: Seq[RDD[_]])
extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) {
@@ -67,7 +68,7 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
}
}
-class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest](
+class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B]) => Iterator[V],
var rdd1: RDD[A],
@@ -87,7 +88,7 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]
}
class ZippedPartitionsRDD3
- [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest](
+ [A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
var rdd1: RDD[A],
@@ -111,7 +112,7 @@ class ZippedPartitionsRDD3
}
class ZippedPartitionsRDD4
- [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest](
+ [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
var rdd1: RDD[A],
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index e0f6351ef7..ba1312cbe8 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -95,7 +95,7 @@ class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest](
}
// Handles Kafka Messages
- private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable {
+ private class MessageHandler[T: ClassTag](stream: KafkaStream[T]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {