aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJeff Zhang <zjffdu@apache.org>2017-03-09 11:44:34 -0800
committerHolden Karau <holden@us.ibm.com>2017-03-09 11:44:34 -0800
commitcabe1df8606e7e5b9e6efb106045deb3f39f5f13 (patch)
treef46f3bd4a2d85abe2b1b12632dfd7b27f0da226e /python
parent30b18e69361746b4d656474374d8b486bb48a19e (diff)
downloadspark-cabe1df8606e7e5b9e6efb106045deb3f39f5f13.tar.gz
spark-cabe1df8606e7e5b9e6efb106045deb3f39f5f13.tar.bz2
spark-cabe1df8606e7e5b9e6efb106045deb3f39f5f13.zip
[SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc file in DataFrameReader.orc
Beside the issue in spark api, also fix 2 minor issues in pyspark - support read from multiple input paths for orc - support read from multiple input paths for text Author: Jeff Zhang <zjffdu@apache.org> Closes #10307 from zjffdu/SPARK-12334.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/readwriter.py14
-rw-r--r--python/pyspark/sql/tests.py5
2 files changed, 13 insertions, 6 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 45fb9b7591..4354345ebc 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -161,7 +161,7 @@ class DataFrameReader(OptionUtils):
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
timeZone=None, wholeFile=None):
"""
- Loads a JSON file and returns the results as a :class:`DataFrame`.
+ Loads JSON files and returns the results as a :class:`DataFrame`.
`JSON Lines <http://jsonlines.org/>`_(newline-delimited JSON) is supported by default.
For JSON (one record per file), set the `wholeFile` parameter to ``true``.
@@ -169,7 +169,7 @@ class DataFrameReader(OptionUtils):
If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.
- :param path: string represents path to the JSON dataset,
+ :param path: string represents path to the JSON dataset, or a list of paths,
or RDD of Strings storing JSON objects.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
:param primitivesAsString: infers all primitive values as a string type. If None is set,
@@ -252,7 +252,7 @@ class DataFrameReader(OptionUtils):
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
return self._df(self._jreader.json(jrdd))
else:
- raise TypeError("path can be only string or RDD")
+ raise TypeError("path can be only string, list or RDD")
@since(1.4)
def table(self, tableName):
@@ -269,7 +269,7 @@ class DataFrameReader(OptionUtils):
@since(1.4)
def parquet(self, *paths):
- """Loads a Parquet file, returning the result as a :class:`DataFrame`.
+ """Loads Parquet files, returning the result as a :class:`DataFrame`.
You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
@@ -407,7 +407,7 @@ class DataFrameReader(OptionUtils):
@since(1.5)
def orc(self, path):
- """Loads an ORC file, returning the result as a :class:`DataFrame`.
+ """Loads ORC files, returning the result as a :class:`DataFrame`.
.. note:: Currently ORC support is only available together with Hive support.
@@ -415,7 +415,9 @@ class DataFrameReader(OptionUtils):
>>> df.dtypes
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
- return self._df(self._jreader.orc(path))
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))
@since(1.4)
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1b873e9578..f0a9a0400e 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -450,6 +450,11 @@ class SQLTests(ReusedPySparkTestCase):
Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')]
self.assertEqual(ages_newlines.collect(), expected)
+ def test_read_multiple_orc_file(self):
+ df = self.spark.read.orc(["python/test_support/sql/orc_partitioned/b=0/c=0",
+ "python/test_support/sql/orc_partitioned/b=1/c=1"])
+ self.assertEqual(2, df.count())
+
def test_udf_with_input_file_name(self):
from pyspark.sql.functions import udf, input_file_name
from pyspark.sql.types import StringType