aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/rdd.py6
-rw-r--r--python/pyspark/sql.py9
-rw-r--r--python/pyspark/tests.py9
3 files changed, 20 insertions, 4 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index aa90297855..266090e3ae 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2075,6 +2075,7 @@ class PipelinedRDD(RDD):
self.ctx = prev.ctx
self.prev = prev
self._jrdd_val = None
+ self._id = None
self._jrdd_deserializer = self.ctx.serializer
self._bypass_serializer = False
self._partitionFunc = prev._partitionFunc if self.preservesPartitioning else None
@@ -2105,6 +2106,11 @@ class PipelinedRDD(RDD):
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val
+ def id(self):
+ if self._id is None:
+ self._id = self._jrdd.id()
+ return self._id
+
def _is_pipelinable(self):
return not (self.is_cached or self.is_checkpointed)
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 97a51b9f8a..004d4937cb 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -1525,7 +1525,7 @@ class SchemaRDD(RDD):
self.sql_ctx = sql_ctx
self._sc = sql_ctx._sc
self._jschema_rdd = jschema_rdd
-
+ self._id = None
self.is_cached = False
self.is_checkpointed = False
self.ctx = self.sql_ctx._sc
@@ -1543,9 +1543,10 @@ class SchemaRDD(RDD):
self._lazy_jrdd = self._jschema_rdd.javaToPython()
return self._lazy_jrdd
- @property
- def _id(self):
- return self._jrdd.id()
+ def id(self):
+ if self._id is None:
+ self._id = self._jrdd.id()
+ return self._id
def saveAsParquetFile(self, path):
"""Save the contents as a Parquet file, preserving the schema.
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 3e74799e82..2ade15b35a 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -281,6 +281,15 @@ class TestAddFile(PySparkTestCase):
class TestRDDFunctions(PySparkTestCase):
+ def test_id(self):
+ rdd = self.sc.parallelize(range(10))
+ id = rdd.id()
+ self.assertEqual(id, rdd.id())
+ rdd2 = rdd.map(str).filter(bool)
+ id2 = rdd2.id()
+ self.assertEqual(id + 1, id2)
+ self.assertEqual(id2, rdd2.id())
+
def test_failed_sparkcontext_creation(self):
# Regression test for SPARK-1550
self.sc.stop()