aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--python/pyspark/rdd.py4
-rw-r--r--python/pyspark/tests.py9
3 files changed, 12 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index d6b0988641..d87783efd2 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -37,8 +37,8 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
-private[spark] class PythonRDD[T: ClassTag](
- parent: RDD[T],
+private[spark] class PythonRDD(
+ parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 113a082e16..b84d976114 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1687,7 +1687,6 @@ class PipelinedRDD(RDD):
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
self.ctx._gateway._gateway_client)
self.ctx._pickled_broadcast_vars.clear()
- class_tag = self._prev_jrdd.classTag()
env = MapConverter().convert(self.ctx.environment,
self.ctx._gateway._gateway_client)
includes = ListConverter().convert(self.ctx._python_includes,
@@ -1696,8 +1695,7 @@ class PipelinedRDD(RDD):
bytearray(pickled_command),
env, includes, self.preservesPartitioning,
self.ctx.pythonExec,
- broadcast_vars, self.ctx._javaAccumulator,
- class_tag)
+ broadcast_vars, self.ctx._javaAccumulator)
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index a92abbf371..8ba51461d1 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -226,6 +226,15 @@ class TestRDDFunctions(PySparkTestCase):
cart = rdd1.cartesian(rdd2)
result = cart.map(lambda (x, y): x + y).collect()
+ def test_transforming_pickle_file(self):
+ # Regression test for SPARK-2601
+ data = self.sc.parallelize(["Hello", "World!"])
+ tempFile = tempfile.NamedTemporaryFile(delete=True)
+ tempFile.close()
+ data.saveAsPickleFile(tempFile.name)
+ pickled_file = self.sc.pickleFile(tempFile.name)
+ pickled_file.map(lambda x: x).collect()
+
def test_cartesian_on_textfile(self):
# Regression test for
path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")