aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-05-07 09:41:31 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-07 09:41:31 -0700
commit967635a2425a769b932eea0984fe697d6721cab0 (patch)
tree4375459b9bef590cc05e9470926fe273921851dc /python/pyspark
parent3eb53bd59e828275471d41730e6de601a887416d (diff)
downloadspark-967635a2425a769b932eea0984fe697d6721cab0.tar.gz
spark-967635a2425a769b932eea0984fe697d6721cab0.tar.bz2
spark-967635a2425a769b932eea0984fe697d6721cab0.zip
[SPARK-1460] Returning SchemaRDD instead of normal RDD on Set operations...
... that do not change schema Author: Kan Zhang <kzhang@apache.org> Closes #448 from kanzhang/SPARK-1460 and squashes the following commits: 111e388 [Kan Zhang] silence MiMa errors in EdgeRDD and VertexRDD 91dc787 [Kan Zhang] Taking into account newly added Ordering param 79ed52a [Kan Zhang] [SPARK-1460] Returning SchemaRDD on Set operations that do not change schema
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/sql.py29
1 files changed, 29 insertions, 0 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 1a62031db5..6789d7002b 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -360,6 +360,35 @@ class SchemaRDD(RDD):
else:
return None
+ def coalesce(self, numPartitions, shuffle=False):
+ rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
+ return SchemaRDD(rdd, self.sql_ctx)
+
+ def distinct(self):
+ rdd = self._jschema_rdd.distinct()
+ return SchemaRDD(rdd, self.sql_ctx)
+
+ def intersection(self, other):
+ if (other.__class__ is SchemaRDD):
+ rdd = self._jschema_rdd.intersection(other._jschema_rdd)
+ return SchemaRDD(rdd, self.sql_ctx)
+ else:
+ raise ValueError("Can only intersect with another SchemaRDD")
+
+ def repartition(self, numPartitions):
+ rdd = self._jschema_rdd.repartition(numPartitions)
+ return SchemaRDD(rdd, self.sql_ctx)
+
+ def subtract(self, other, numPartitions=None):
+ if (other.__class__ is SchemaRDD):
+ if numPartitions is None:
+ rdd = self._jschema_rdd.subtract(other._jschema_rdd)
+ else:
+ rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions)
+ return SchemaRDD(rdd, self.sql_ctx)
+ else:
+ raise ValueError("Can only subtract another SchemaRDD")
+
def _test():
import doctest
from pyspark.context import SparkContext