aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2017-01-18 23:06:44 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-18 23:06:44 +0800
commitd06172b88e61c0f79e3dea5703a17c6ae590f248 (patch)
tree40c5b92279f7db0a28bc9ae11e268b4818cbfa63 /python
parentf85f29608de801d7cacc779a77c8edaed8124acf (diff)
downloadspark-d06172b88e61c0f79e3dea5703a17c6ae590f248.tar.gz
spark-d06172b88e61c0f79e3dea5703a17c6ae590f248.tar.bz2
spark-d06172b88e61c0f79e3dea5703a17c6ae590f248.zip
[SPARK-19223][SQL][PYSPARK] Fix InputFileBlockHolder for datasources which are based on HadoopRDD or NewHadoopRDD
## What changes were proposed in this pull request? For some datasources which are based on HadoopRDD or NewHadoopRDD, such as spark-xml, InputFileBlockHolder doesn't work with Python UDF. The method to reproduce it is, running the following codes with `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`: from pyspark.sql.functions import udf,input_file_name from pyspark.sql.types import StringType from pyspark.sql import SparkSession def filename(path): return path session = SparkSession.builder.appName('APP').getOrCreate() session.udf.register('sameText', filename) sameText = udf(filename, StringType()) df = session.read.format('xml').load('a.xml', rowTag='root').select('*', input_file_name().alias('file')) df.select('file').show() # works df.select(sameText(df['file'])).show() # returns empty content The issue is because in `HadoopRDD` and `NewHadoopRDD` we set the file block's info in `InputFileBlockHolder` before the returned iterator begins consuming. `InputFileBlockHolder` will record this info into thread local variable. When running Python UDF in batch, we set up another thread to consume the iterator from child plan's output rdd, so we can't read the info back in another thread. To fix this, we have to set the info in `InputFileBlockHolder` after the iterator begins consuming. So the info can be read in correct thread. ## How was this patch tested? Manual test with above example codes for spark-xml package on pyspark: `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`. Added pyspark test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #16585 from viirya/fix-inputfileblock-hadooprdd.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/tests.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a8250281da..73a5df65e0 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -435,6 +435,30 @@ class SQLTests(ReusedPySparkTestCase):
row = self.spark.read.json(filePath).select(sourceFile(input_file_name())).first()
self.assertTrue(row[0].find("people1.json") != -1)
+ def test_udf_with_input_file_name_for_hadooprdd(self):
+ from pyspark.sql.functions import udf, input_file_name
+ from pyspark.sql.types import StringType
+
+ def filename(path):
+ return path
+
+ sameText = udf(filename, StringType())
+
+ rdd = self.sc.textFile('python/test_support/sql/people.json')
+ df = self.spark.read.json(rdd).select(input_file_name().alias('file'))
+ row = df.select(sameText(df['file'])).first()
+ self.assertTrue(row[0].find("people.json") != -1)
+
+ rdd2 = self.sc.newAPIHadoopFile(
+ 'python/test_support/sql/people.json',
+ 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
+ 'org.apache.hadoop.io.LongWritable',
+ 'org.apache.hadoop.io.Text')
+
+ df2 = self.spark.read.json(rdd2).select(input_file_name().alias('file'))
+ row2 = df2.select(sameText(df2['file'])).first()
+ self.assertTrue(row2[0].find("people.json") != -1)
+
def test_basic_functions(self):
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
df = self.spark.read.json(rdd)