aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py4
-rw-r--r--python/pyspark/tests.py9
2 files changed, 10 insertions, 3 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 113a082e16..b84d976114 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1687,7 +1687,6 @@ class PipelinedRDD(RDD):
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
self.ctx._gateway._gateway_client)
self.ctx._pickled_broadcast_vars.clear()
- class_tag = self._prev_jrdd.classTag()
env = MapConverter().convert(self.ctx.environment,
self.ctx._gateway._gateway_client)
includes = ListConverter().convert(self.ctx._python_includes,
@@ -1696,8 +1695,7 @@ class PipelinedRDD(RDD):
bytearray(pickled_command),
env, includes, self.preservesPartitioning,
self.ctx.pythonExec,
- broadcast_vars, self.ctx._javaAccumulator,
- class_tag)
+ broadcast_vars, self.ctx._javaAccumulator)
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index a92abbf371..8ba51461d1 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -226,6 +226,15 @@ class TestRDDFunctions(PySparkTestCase):
cart = rdd1.cartesian(rdd2)
result = cart.map(lambda (x, y): x + y).collect()
+ def test_transforming_pickle_file(self):
+ # Regression test for SPARK-2601
+ data = self.sc.parallelize(["Hello", "World!"])
+ tempFile = tempfile.NamedTemporaryFile(delete=True)
+ tempFile.close()
+ data.saveAsPickleFile(tempFile.name)
+ pickled_file = self.sc.pickleFile(tempFile.name)
+ pickled_file.map(lambda x: x).collect()
+
def test_cartesian_on_textfile(self):
# Regression test for
path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")