diff options
author | Jeff Zhang <zjffdu@apache.org> | 2015-11-18 08:18:54 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-11-18 08:18:54 -0800 |
commit | 3a6807fdf07b0e73d76502a6bd91ad979fde8b61 (patch) | |
tree | 27bbc22fcdad14ef406a095bbb1f53e67aa8505e /python/pyspark | |
parent | 1429e0a2b562469146b6fa06051c85a00092e5b8 (diff) | |
download | spark-3a6807fdf07b0e73d76502a6bd91ad979fde8b61.tar.gz spark-3a6807fdf07b0e73d76502a6bd91ad979fde8b61.tar.bz2 spark-3a6807fdf07b0e73d76502a6bd91ad979fde8b61.zip |
[SPARK-11804] [PYSPARK] Exception raise when using Jdbc predicates opt…
…ion in PySpark
Author: Jeff Zhang <zjffdu@apache.org>
Closes #9791 from zjffdu/SPARK-11804.
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/sql/readwriter.py | 10 | ||||
-rw-r--r-- | python/pyspark/sql/utils.py | 13 |
2 files changed, 18 insertions, 5 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 7b8ddb9feb..e8f0d7ec77 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -26,6 +26,7 @@ from pyspark import RDD, since from pyspark.rdd import ignore_unicode_prefix from pyspark.sql.column import _to_seq from pyspark.sql.types import * +from pyspark.sql import utils __all__ = ["DataFrameReader", "DataFrameWriter"] @@ -131,9 +132,7 @@ class DataFrameReader(object): if type(path) == list: paths = path gateway = self._sqlContext._sc._gateway - jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths)) - for i in range(0, len(paths)): - jpaths[i] = paths[i] + jpaths = utils.toJArray(gateway, gateway.jvm.java.lang.String, paths) return self._df(self._jreader.load(jpaths)) else: return self._df(self._jreader.load(path)) @@ -269,8 +268,9 @@ class DataFrameReader(object): return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound), int(numPartitions), jprop)) if predicates is not None: - arr = self._sqlContext._sc._jvm.PythonUtils.toArray(predicates) - return self._df(self._jreader.jdbc(url, table, arr, jprop)) + gateway = self._sqlContext._sc._gateway + jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates) + return self._df(self._jreader.jdbc(url, table, jpredicates, jprop)) return self._df(self._jreader.jdbc(url, table, jprop)) diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index c4fda8bd3b..b0a0373372 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -71,3 +71,16 @@ def install_exception_handler(): patched = capture_sql_exception(original) # only patch the one used in in py4j.java_gateway (call Java API) py4j.java_gateway.get_return_value = patched + + +def toJArray(gateway, jtype, arr): + """ + Convert python list to java type array + :param gateway: Py4j Gateway + :param jtype: java type of element in array + :param arr: python type list + """ + jarr = gateway.new_array(jtype, len(arr)) + for i in range(0, len(arr)): + jarr[i] = arr[i] + return jarr |