diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-06-30 12:03:54 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-06-30 12:03:54 -0700 |
commit | 46395db80e3304e3f3a1ebdc8aadb8f2819b48b4 (patch) | |
tree | 88b3c5cc5e5241f0e2b687445a29f88a2aca2c6b /python | |
parent | fdf9f94f8c8861a00cd8415073f842b857c397f7 (diff) | |
download | spark-46395db80e3304e3f3a1ebdc8aadb8f2819b48b4.tar.gz spark-46395db80e3304e3f3a1ebdc8aadb8f2819b48b4.tar.bz2 spark-46395db80e3304e3f3a1ebdc8aadb8f2819b48b4.zip |
[SPARK-16289][SQL] Implement posexplode table generating function
## What changes were proposed in this pull request?
This PR implements `posexplode` table generating function. Currently, master branch raises the following exception for `map` argument. It's different from Hive.
**Before**
```scala
scala> sql("select posexplode(map('a', 1, 'b', 2))").show
org.apache.spark.sql.AnalysisException: No handler for Hive UDF ... posexplode() takes an array as a parameter; line 1 pos 7
```
**After**
```scala
scala> sql("select posexplode(map('a', 1, 'b', 2))").show
+---+---+-----+
|pos|key|value|
+---+---+-----+
| 0| a| 1|
| 1| b| 2|
+---+---+-----+
```
For `array` argument, `after` is the same with `before`.
```
scala> sql("select posexplode(array(1, 2, 3))").show
+---+---+
|pos|col|
+---+---+
| 0| 1|
| 1| 2|
| 2| 3|
+---+---+
```
## How was this patch tested?
Pass the Jenkins tests with newly added testcases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #13971 from dongjoon-hyun/SPARK-16289.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql/functions.py | 21 |
1 files changed, 21 insertions, 0 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 15cefc8cf1..7a7345170c 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1637,6 +1637,27 @@ def explode(col): return Column(jc) +@since(2.1) +def posexplode(col): + """Returns a new row for each element with position in the given array or map. + + >>> from pyspark.sql import Row + >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) + >>> eDF.select(posexplode(eDF.intlist)).collect() + [Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)] + + >>> eDF.select(posexplode(eDF.mapfield)).show() + +---+---+-----+ + |pos|key|value| + +---+---+-----+ + | 0| a| b| + +---+---+-----+ + """ + sc = SparkContext._active_spark_context + jc = sc._jvm.functions.posexplode(_to_java_column(col)) + return Column(jc) + + @ignore_unicode_prefix @since(1.6) def get_json_object(col, path): |