aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-04-22 09:19:36 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-22 09:19:36 -0700
commit056883e070bd258d193fd4d783ab608a19b86c36 (patch)
tree7072659655b15b2ed60c809efcd7af5c2b002e88
parent5bed13a872dc06d099c810cf4caa15b4f27a1e7c (diff)
downloadspark-056883e070bd258d193fd4d783ab608a19b86c36.tar.gz
spark-056883e070bd258d193fd4d783ab608a19b86c36.tar.bz2
spark-056883e070bd258d193fd4d783ab608a19b86c36.zip
[SPARK-13266] [SQL] None read/writer options were not transalated to "null"
## What changes were proposed in this pull request? In Python, the `option` and `options` method of `DataFrameReader` and `DataFrameWriter` were sending the string "None" instead of `null` when passed `None`, therefore making it impossible to send an actual `null`. This fixes that problem. This is based on #11305 from mathieulongtin. ## How was this patch tested? Added test to readwriter.py. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Author: mathieu longtin <mathieu.longtin@nuance.com> Closes #12494 from viirya/py-df-none-option.
-rw-r--r--python/pyspark/sql/readwriter.py9
-rw-r--r--python/pyspark/sql/tests.py3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala6
3 files changed, 14 insertions, 4 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 6c809d1139..e39cf1ae03 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -33,10 +33,13 @@ __all__ = ["DataFrameReader", "DataFrameWriter"]
def to_str(value):
"""
- A wrapper over str(), but convert bool values to lower case string
+ A wrapper over str(), but converts bool values to lower case strings.
+ If None is given, just returns None, instead of converting it to string "None".
"""
if isinstance(value, bool):
return str(value).lower()
+ elif value is None:
+ return value
else:
return str(value)
@@ -398,7 +401,7 @@ class DataFrameWriter(object):
def option(self, key, value):
"""Adds an output option for the underlying data source.
"""
- self._jwrite = self._jwrite.option(key, value)
+ self._jwrite = self._jwrite.option(key, to_str(value))
return self
@since(1.4)
@@ -406,7 +409,7 @@ class DataFrameWriter(object):
"""Adds output options for the underlying data source.
"""
for k in options:
- self._jwrite = self._jwrite.option(k, options[k])
+ self._jwrite = self._jwrite.option(k, to_str(options[k]))
return self
@since(1.4)
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 3b1b2948e9..42e283073f 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -859,6 +859,9 @@ class SQLTests(ReusedPySparkTestCase):
self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName)
+ csvpath = os.path.join(tempfile.mkdtemp(), 'data')
+ df.write.option('quote', None).format('csv').save(csvpath)
+
shutil.rmtree(tmpPath)
def test_save_and_load_builder(self):
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 7b9d3b605a..80a0ad7856 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -29,6 +29,7 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str
val paramValue = parameters.get(paramName)
paramValue match {
case None => default
+ case Some(null) => default
case Some(value) if value.length == 0 => '\u0000'
case Some(value) if value.length == 1 => value.charAt(0)
case _ => throw new RuntimeException(s"$paramName cannot be more than one character")
@@ -39,6 +40,7 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str
val paramValue = parameters.get(paramName)
paramValue match {
case None => default
+ case Some(null) => default
case Some(value) => try {
value.toInt
} catch {
@@ -50,7 +52,9 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str
private def getBool(paramName: String, default: Boolean = false): Boolean = {
val param = parameters.getOrElse(paramName, default.toString)
- if (param.toLowerCase == "true") {
+ if (param == null) {
+ default
+ } else if (param.toLowerCase == "true") {
true
} else if (param.toLowerCase == "false") {
false