diff options
author | Davies Liu <davies@databricks.com> | 2015-05-19 14:23:28 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-05-19 14:23:28 -0700 |
commit | 4de74d2602f6577c3c8458aa85377e89c19724ca (patch) | |
tree | f265842faf3e23527646f92b5efb5787042c79da /python/pyspark/sql/context.py | |
parent | c12dff9b82e4869f866a9b96ce0bf05503dd7dda (diff) | |
download | spark-4de74d2602f6577c3c8458aa85377e89c19724ca.tar.gz spark-4de74d2602f6577c3c8458aa85377e89c19724ca.tar.bz2 spark-4de74d2602f6577c3c8458aa85377e89c19724ca.zip |
[SPARK-7738] [SQL] [PySpark] add reader and writer API in Python
cc rxin, please take a quick look, I'm working on tests.
Author: Davies Liu <davies@databricks.com>
Closes #6238 from davies/readwrite and squashes the following commits:
c7200eb [Davies Liu] update tests
9cbf01b [Davies Liu] Merge branch 'master' of github.com:apache/spark into readwrite
f0c5a04 [Davies Liu] use sqlContext.read.load
5f68bc8 [Davies Liu] update tests
6437e9a [Davies Liu] Merge branch 'master' of github.com:apache/spark into readwrite
bcc6668 [Davies Liu] add reader amd writer API in Python
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r-- | python/pyspark/sql/context.py | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 9f26d13235..7543475014 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -31,6 +31,7 @@ from pyspark.serializers import AutoBatchedSerializer, PickleSerializer from pyspark.sql.types import Row, StringType, StructType, _verify_type, \ _infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter from pyspark.sql.dataframe import DataFrame +from pyspark.sql.readwriter import DataFrameReader try: import pandas @@ -457,19 +458,7 @@ class SQLContext(object): Optionally, a schema can be provided as the schema of the returned DataFrame. """ - if path is not None: - options["path"] = path - if source is None: - source = self.getConf("spark.sql.sources.default", - "org.apache.spark.sql.parquet") - if schema is None: - df = self._ssql_ctx.load(source, options) - else: - if not isinstance(schema, StructType): - raise TypeError("schema should be StructType") - scala_datatype = self._ssql_ctx.parseDataType(schema.json()) - df = self._ssql_ctx.load(source, scala_datatype, options) - return DataFrame(df, self) + return self.read.load(path, source, schema, **options) def createExternalTable(self, tableName, path=None, source=None, schema=None, **options): @@ -567,6 +556,19 @@ class SQLContext(object): """Removes all cached tables from the in-memory cache. """ self._ssql_ctx.clearCache() + @property + def read(self): + """ + Returns a :class:`DataFrameReader` that can be used to read data + in as a :class:`DataFrame`. + + ::note: Experimental + + >>> sqlContext.read + <pyspark.sql.readwriter.DataFrameReader object at ...> + """ + return DataFrameReader(self) + class HiveContext(SQLContext): """A variant of Spark SQL that integrates with data stored in Hive. |