diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-01-13 22:43:28 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-13 22:43:28 -0800 |
commit | 962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7 (patch) | |
tree | fa7174220efa51f56287d32bc82a379508ee4c17 /python | |
parent | e2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984 (diff) | |
download | spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.tar.gz spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.tar.bz2 spark-962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7.zip |
[SPARK-12756][SQL] use hash expression in Exchange
This PR makes bucketing and exchange share one common hash algorithm, so that we can guarantee the data distribution is same between shuffle and bucketed data source, which enables us to only shuffle one side when join a bucketed table and a normal one.
This PR also fixes the tests that are broken by the new hash behaviour in shuffle.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #10703 from cloud-fan/use-hash-expr-in-shuffle.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql/dataframe.py | 26 | ||||
-rw-r--r-- | python/pyspark/sql/group.py | 6 |
2 files changed, 16 insertions, 16 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a7bc288e38..90a6b5d9c0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -403,10 +403,10 @@ class DataFrame(object): +---+-----+ |age| name| +---+-----+ - | 2|Alice| - | 2|Alice| | 5| Bob| | 5| Bob| + | 2|Alice| + | 2|Alice| +---+-----+ >>> data = data.repartition(7, "age") >>> data.show() @@ -552,7 +552,7 @@ class DataFrame(object): >>> df_as2 = df.alias("df_as2") >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner') >>> joined_df.select(col("df_as1.name"), col("df_as2.name"), col("df_as2.age")).collect() - [Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)] + [Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', name=u'Alice', age=2)] """ assert isinstance(alias, basestring), "alias should be a string" return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx) @@ -573,14 +573,14 @@ class DataFrame(object): One of `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect() - [Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)] + [Row(name=None, height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)] >>> df.join(df2, 'name', 'outer').select('name', 'height').collect() - [Row(name=u'Tom', height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)] + [Row(name=u'Tom', height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)] >>> cond = [df.name == df3.name, df.age == df3.age] >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect() - [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)] + [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] >>> df.join(df2, 'name').select(df.name, df2.height).collect() [Row(name=u'Bob', height=85)] @@ -880,9 +880,9 @@ class DataFrame(object): >>> df.groupBy().avg().collect() [Row(avg(age)=3.5)] - >>> df.groupBy('name').agg({'age': 'mean'}).collect() + >>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect()) [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)] - >>> df.groupBy(df.name).avg().collect() + >>> sorted(df.groupBy(df.name).avg().collect()) [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)] >>> df.groupBy(['name', df.age]).count().collect() [Row(name=u'Bob', age=5, count=1), Row(name=u'Alice', age=2, count=1)] @@ -901,11 +901,11 @@ class DataFrame(object): +-----+----+-----+ | name| age|count| +-----+----+-----+ - |Alice|null| 1| + |Alice| 2| 1| | Bob| 5| 1| | Bob|null| 1| | null|null| 2| - |Alice| 2| 1| + |Alice|null| 1| +-----+----+-----+ """ jgd = self._jdf.rollup(self._jcols(*cols)) @@ -923,12 +923,12 @@ class DataFrame(object): | name| age|count| +-----+----+-----+ | null| 2| 1| - |Alice|null| 1| + |Alice| 2| 1| | Bob| 5| 1| - | Bob|null| 1| | null| 5| 1| + | Bob|null| 1| | null|null| 2| - |Alice| 2| 1| + |Alice|null| 1| +-----+----+-----+ """ jgd = self._jdf.cube(self._jcols(*cols)) diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index 9ca303a974..ee734cb439 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -74,11 +74,11 @@ class GroupedData(object): or a list of :class:`Column`. >>> gdf = df.groupBy(df.name) - >>> gdf.agg({"*": "count"}).collect() + >>> sorted(gdf.agg({"*": "count"}).collect()) [Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)] >>> from pyspark.sql import functions as F - >>> gdf.agg(F.min(df.age)).collect() + >>> sorted(gdf.agg(F.min(df.age)).collect()) [Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)] """ assert exprs, "exprs should not be empty" @@ -96,7 +96,7 @@ class GroupedData(object): def count(self): """Counts the number of records for each group. - >>> df.groupBy(df.age).count().collect() + >>> sorted(df.groupBy(df.age).count().collect()) [Row(age=2, count=1), Row(age=5, count=1)] """ |