aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJeff Zhang <zjffdu@apache.org>2015-11-18 08:18:54 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-18 08:18:54 -0800
commit3a6807fdf07b0e73d76502a6bd91ad979fde8b61 (patch)
tree27bbc22fcdad14ef406a095bbb1f53e67aa8505e /python
parent1429e0a2b562469146b6fa06051c85a00092e5b8 (diff)
downloadspark-3a6807fdf07b0e73d76502a6bd91ad979fde8b61.tar.gz
spark-3a6807fdf07b0e73d76502a6bd91ad979fde8b61.tar.bz2
spark-3a6807fdf07b0e73d76502a6bd91ad979fde8b61.zip
[SPARK-11804] [PYSPARK] Exception raise when using Jdbc predicates opt…
…ion in PySpark Author: Jeff Zhang <zjffdu@apache.org> Closes #9791 from zjffdu/SPARK-11804.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/readwriter.py10
-rw-r--r--python/pyspark/sql/utils.py13
2 files changed, 18 insertions, 5 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 7b8ddb9feb..e8f0d7ec77 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -26,6 +26,7 @@ from pyspark import RDD, since
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.column import _to_seq
from pyspark.sql.types import *
+from pyspark.sql import utils
__all__ = ["DataFrameReader", "DataFrameWriter"]
@@ -131,9 +132,7 @@ class DataFrameReader(object):
if type(path) == list:
paths = path
gateway = self._sqlContext._sc._gateway
- jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths))
- for i in range(0, len(paths)):
- jpaths[i] = paths[i]
+ jpaths = utils.toJArray(gateway, gateway.jvm.java.lang.String, paths)
return self._df(self._jreader.load(jpaths))
else:
return self._df(self._jreader.load(path))
@@ -269,8 +268,9 @@ class DataFrameReader(object):
return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound),
int(numPartitions), jprop))
if predicates is not None:
- arr = self._sqlContext._sc._jvm.PythonUtils.toArray(predicates)
- return self._df(self._jreader.jdbc(url, table, arr, jprop))
+ gateway = self._sqlContext._sc._gateway
+ jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
+ return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
return self._df(self._jreader.jdbc(url, table, jprop))
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index c4fda8bd3b..b0a0373372 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -71,3 +71,16 @@ def install_exception_handler():
patched = capture_sql_exception(original)
# only patch the one used in in py4j.java_gateway (call Java API)
py4j.java_gateway.get_return_value = patched
+
+
+def toJArray(gateway, jtype, arr):
+ """
+ Convert python list to java type array
+ :param gateway: Py4j Gateway
+ :param jtype: java type of element in array
+ :param arr: python type list
+ """
+ jarr = gateway.new_array(jtype, len(arr))
+ for i in range(0, len(arr)):
+ jarr[i] = arr[i]
+ return jarr