diff options
author | Kan Zhang <kzhang@apache.org> | 2014-05-07 09:41:31 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-05-07 09:41:31 -0700 |
commit | 967635a2425a769b932eea0984fe697d6721cab0 (patch) | |
tree | 4375459b9bef590cc05e9470926fe273921851dc /python | |
parent | 3eb53bd59e828275471d41730e6de601a887416d (diff) | |
download | spark-967635a2425a769b932eea0984fe697d6721cab0.tar.gz spark-967635a2425a769b932eea0984fe697d6721cab0.tar.bz2 spark-967635a2425a769b932eea0984fe697d6721cab0.zip |
[SPARK-1460] Returning SchemaRDD instead of normal RDD on Set operations...
... that do not change schema
Author: Kan Zhang <kzhang@apache.org>
Closes #448 from kanzhang/SPARK-1460 and squashes the following commits:
111e388 [Kan Zhang] silence MiMa errors in EdgeRDD and VertexRDD
91dc787 [Kan Zhang] Taking into account newly added Ordering param
79ed52a [Kan Zhang] [SPARK-1460] Returning SchemaRDD on Set operations that do not change schema
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql.py | 29 |
1 files changed, 29 insertions, 0 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 1a62031db5..6789d7002b 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -360,6 +360,35 @@ class SchemaRDD(RDD): else: return None + def coalesce(self, numPartitions, shuffle=False): + rdd = self._jschema_rdd.coalesce(numPartitions, shuffle) + return SchemaRDD(rdd, self.sql_ctx) + + def distinct(self): + rdd = self._jschema_rdd.distinct() + return SchemaRDD(rdd, self.sql_ctx) + + def intersection(self, other): + if (other.__class__ is SchemaRDD): + rdd = self._jschema_rdd.intersection(other._jschema_rdd) + return SchemaRDD(rdd, self.sql_ctx) + else: + raise ValueError("Can only intersect with another SchemaRDD") + + def repartition(self, numPartitions): + rdd = self._jschema_rdd.repartition(numPartitions) + return SchemaRDD(rdd, self.sql_ctx) + + def subtract(self, other, numPartitions=None): + if (other.__class__ is SchemaRDD): + if numPartitions is None: + rdd = self._jschema_rdd.subtract(other._jschema_rdd) + else: + rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions) + return SchemaRDD(rdd, self.sql_ctx) + else: + raise ValueError("Can only subtract another SchemaRDD") + def _test(): import doctest from pyspark.context import SparkContext |