aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/readwriter.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-05-19 14:23:28 -0700
committerReynold Xin <rxin@databricks.com>2015-05-19 14:23:28 -0700
commit4de74d2602f6577c3c8458aa85377e89c19724ca (patch)
treef265842faf3e23527646f92b5efb5787042c79da /python/pyspark/sql/readwriter.py
parentc12dff9b82e4869f866a9b96ce0bf05503dd7dda (diff)
downloadspark-4de74d2602f6577c3c8458aa85377e89c19724ca.tar.gz
spark-4de74d2602f6577c3c8458aa85377e89c19724ca.tar.bz2
spark-4de74d2602f6577c3c8458aa85377e89c19724ca.zip
[SPARK-7738] [SQL] [PySpark] add reader and writer API in Python
cc rxin, please take a quick look, I'm working on tests. Author: Davies Liu <davies@databricks.com> Closes #6238 from davies/readwrite and squashes the following commits: c7200eb [Davies Liu] update tests 9cbf01b [Davies Liu] Merge branch 'master' of github.com:apache/spark into readwrite f0c5a04 [Davies Liu] use sqlContext.read.load 5f68bc8 [Davies Liu] update tests 6437e9a [Davies Liu] Merge branch 'master' of github.com:apache/spark into readwrite bcc6668 [Davies Liu] add reader amd writer API in Python
Diffstat (limited to 'python/pyspark/sql/readwriter.py')
-rw-r--r--python/pyspark/sql/readwriter.py338
1 files changed, 338 insertions, 0 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
new file mode 100644
index 0000000000..e2b27fb587
--- /dev/null
+++ b/python/pyspark/sql/readwriter.py
@@ -0,0 +1,338 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from py4j.java_gateway import JavaClass
+
+from pyspark.sql.column import _to_seq
+from pyspark.sql.types import *
+
+__all__ = ["DataFrameReader", "DataFrameWriter"]
+
+
+class DataFrameReader(object):
+ """
+ Interface used to load a :class:`DataFrame` from external storage systems
+ (e.g. file systems, key-value stores, etc). Use :func:`SQLContext.read`
+ to access this.
+
+ ::Note: Experimental
+ """
+
+ def __init__(self, sqlContext):
+ self._jreader = sqlContext._ssql_ctx.read()
+ self._sqlContext = sqlContext
+
+ def _df(self, jdf):
+ from pyspark.sql.dataframe import DataFrame
+ return DataFrame(jdf, self._sqlContext)
+
+ def load(self, path=None, format=None, schema=None, **options):
+ """Loads data from a data source and returns it as a :class`DataFrame`.
+
+ :param path: optional string for file-system backed data sources.
+ :param format: optional string for format of the data source. Default to 'parquet'.
+ :param schema: optional :class:`StructType` for the input schema.
+ :param options: all other string options
+ """
+ jreader = self._jreader
+ if format is not None:
+ jreader = jreader.format(format)
+ if schema is not None:
+ if not isinstance(schema, StructType):
+ raise TypeError("schema should be StructType")
+ jschema = self._sqlContext._ssql_ctx.parseDataType(schema.json())
+ jreader = jreader.schema(jschema)
+ for k in options:
+ jreader = jreader.option(k, options[k])
+ if path is not None:
+ return self._df(jreader.load(path))
+ else:
+ return self._df(jreader.load())
+
+ def json(self, path, schema=None):
+ """
+ Loads a JSON file (one object per line) and returns the result as
+ a :class`DataFrame`.
+
+ If the ``schema`` parameter is not specified, this function goes
+ through the input once to determine the input schema.
+
+ :param path: string, path to the JSON dataset.
+ :param schema: an optional :class:`StructType` for the input schema.
+
+ >>> import tempfile, shutil
+ >>> jsonFile = tempfile.mkdtemp()
+ >>> shutil.rmtree(jsonFile)
+ >>> with open(jsonFile, 'w') as f:
+ ... f.writelines(jsonStrings)
+ >>> df1 = sqlContext.read.json(jsonFile)
+ >>> df1.printSchema()
+ root
+ |-- field1: long (nullable = true)
+ |-- field2: string (nullable = true)
+ |-- field3: struct (nullable = true)
+ | |-- field4: long (nullable = true)
+
+ >>> from pyspark.sql.types import *
+ >>> schema = StructType([
+ ... StructField("field2", StringType()),
+ ... StructField("field3",
+ ... StructType([StructField("field5", ArrayType(IntegerType()))]))])
+ >>> df2 = sqlContext.read.json(jsonFile, schema)
+ >>> df2.printSchema()
+ root
+ |-- field2: string (nullable = true)
+ |-- field3: struct (nullable = true)
+ | |-- field5: array (nullable = true)
+ | | |-- element: integer (containsNull = true)
+ """
+ if schema is None:
+ jdf = self._jreader.json(path)
+ else:
+ jschema = self._sqlContext._ssql_ctx.parseDataType(schema.json())
+ jdf = self._jreader.schema(jschema).json(path)
+ return self._df(jdf)
+
+ def table(self, tableName):
+ """Returns the specified table as a :class:`DataFrame`.
+
+ >>> sqlContext.registerDataFrameAsTable(df, "table1")
+ >>> df2 = sqlContext.read.table("table1")
+ >>> sorted(df.collect()) == sorted(df2.collect())
+ True
+ """
+ return self._df(self._jreader.table(tableName))
+
+ def parquet(self, *path):
+ """Loads a Parquet file, returning the result as a :class:`DataFrame`.
+
+ >>> import tempfile, shutil
+ >>> parquetFile = tempfile.mkdtemp()
+ >>> shutil.rmtree(parquetFile)
+ >>> df.saveAsParquetFile(parquetFile)
+ >>> df2 = sqlContext.read.parquet(parquetFile)
+ >>> sorted(df.collect()) == sorted(df2.collect())
+ True
+ """
+ return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
+
+ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
+ predicates=None, properties={}):
+ """
+ Construct a :class:`DataFrame` representing the database table accessible
+ via JDBC URL `url` named `table` and connection `properties`.
+
+ The `column` parameter could be used to partition the table, then it will
+ be retrieved in parallel based on the parameters passed to this function.
+
+ The `predicates` parameter gives a list expressions suitable for inclusion
+ in WHERE clauses; each one defines one partition of the :class:`DataFrame`.
+
+ ::Note: Don't create too many partitions in parallel on a large cluster;
+ otherwise Spark might crash your external database systems.
+
+ :param url: a JDBC URL
+ :param table: name of table
+ :param column: the column used to partition
+ :param lowerBound: the lower bound of partition column
+ :param upperBound: the upper bound of the partition column
+ :param numPartitions: the number of partitions
+ :param predicates: a list of expressions
+ :param properties: JDBC database connection arguments, a list of arbitrary string
+ tag/value. Normally at least a "user" and "password" property
+ should be included.
+ :return: a DataFrame
+ """
+ jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
+ for k in properties:
+ jprop.setProperty(k, properties[k])
+ if column is not None:
+ if numPartitions is None:
+ numPartitions = self._sqlContext._sc.defaultParallelism
+ return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound),
+ int(numPartitions), jprop))
+ if predicates is not None:
+ arr = self._sqlContext._sc._jvm.PythonUtils.toArray(predicates)
+ return self._df(self._jreader.jdbc(url, table, arr, jprop))
+ return self._df(self._jreader.jdbc(url, table, jprop))
+
+
+class DataFrameWriter(object):
+ """
+ Interface used to write a [[DataFrame]] to external storage systems
+ (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.write`
+ to access this.
+
+ ::Note: Experimental
+ """
+ def __init__(self, df):
+ self._df = df
+ self._sqlContext = df.sql_ctx
+ self._jwrite = df._jdf.write()
+
+ def save(self, path=None, format=None, mode="error", **options):
+ """
+ Saves the contents of the :class:`DataFrame` to a data source.
+
+ The data source is specified by the ``format`` and a set of ``options``.
+ If ``format`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
+
+ Additionally, mode is used to specify the behavior of the save operation when
+ data already exists in the data source. There are four modes:
+
+ * `append`: Append contents of this :class:`DataFrame` to existing data.
+ * `overwrite`: Overwrite existing data.
+ * `error`: Throw an exception if data already exists.
+ * `ignore`: Silently ignore this operation if data already exists.
+
+ :param path: the path in a Hadoop supported file system
+ :param format: the format used to save
+ :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+ :param options: all other string options
+ """
+ jwrite = self._jwrite.mode(mode)
+ if format is not None:
+ jwrite = jwrite.format(format)
+ for k in options:
+ jwrite = jwrite.option(k, options[k])
+ if path is None:
+ jwrite.save()
+ else:
+ jwrite.save(path)
+
+ def saveAsTable(self, name, format=None, mode="error", **options):
+ """
+ Saves the contents of this :class:`DataFrame` to a data source as a table.
+
+ The data source is specified by the ``source`` and a set of ``options``.
+ If ``source`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
+
+ Additionally, mode is used to specify the behavior of the saveAsTable operation when
+ table already exists in the data source. There are four modes:
+
+ * `append`: Append contents of this :class:`DataFrame` to existing data.
+ * `overwrite`: Overwrite existing data.
+ * `error`: Throw an exception if data already exists.
+ * `ignore`: Silently ignore this operation if data already exists.
+
+ :param name: the table name
+ :param format: the format used to save
+ :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+ :param options: all other string options
+ """
+ jwrite = self._jwrite.mode(mode)
+ if format is not None:
+ jwrite = jwrite.format(format)
+ for k in options:
+ jwrite = jwrite.option(k, options[k])
+ return jwrite.saveAsTable(name)
+
+ def json(self, path, mode="error"):
+ """
+ Saves the content of the :class:`DataFrame` in JSON format at the
+ specified path.
+
+ Additionally, mode is used to specify the behavior of the save operation when
+ data already exists in the data source. There are four modes:
+
+ * `append`: Append contents of this :class:`DataFrame` to existing data.
+ * `overwrite`: Overwrite existing data.
+ * `error`: Throw an exception if data already exists.
+ * `ignore`: Silently ignore this operation if data already exists.
+
+ :param path: the path in any Hadoop supported file system
+ :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+ """
+ return self._jwrite.mode(mode).json(path)
+
+ def parquet(self, path, mode="error"):
+ """
+ Saves the content of the :class:`DataFrame` in Parquet format at the
+ specified path.
+
+ Additionally, mode is used to specify the behavior of the save operation when
+ data already exists in the data source. There are four modes:
+
+ * `append`: Append contents of this :class:`DataFrame` to existing data.
+ * `overwrite`: Overwrite existing data.
+ * `error`: Throw an exception if data already exists.
+ * `ignore`: Silently ignore this operation if data already exists.
+
+ :param path: the path in any Hadoop supported file system
+ :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+ """
+ return self._jwrite.mode(mode).parquet(path)
+
+ def jdbc(self, url, table, mode="error", properties={}):
+ """
+ Saves the content of the :class:`DataFrame` to a external database table
+ via JDBC.
+
+ In the case the table already exists in the external database,
+ behavior of this function depends on the save mode, specified by the `mode`
+ function (default to throwing an exception). There are four modes:
+
+ * `append`: Append contents of this :class:`DataFrame` to existing data.
+ * `overwrite`: Overwrite existing data.
+ * `error`: Throw an exception if data already exists.
+ * `ignore`: Silently ignore this operation if data already exists.
+
+ :param url: a JDBC URL of the form `jdbc:subprotocol:subname`
+ :param table: Name of the table in the external database.
+ :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+ :param properties: JDBC database connection arguments, a list of
+ arbitrary string tag/value. Normally at least a
+ "user" and "password" property should be included.
+ """
+ jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
+ for k in properties:
+ jprop.setProperty(k, properties[k])
+ self._jwrite.mode(mode).jdbc(url, table, jprop)
+
+
+def _test():
+ import doctest
+ from pyspark.context import SparkContext
+ from pyspark.sql import Row, SQLContext
+ import pyspark.sql.readwriter
+ globs = pyspark.sql.readwriter.__dict__.copy()
+ sc = SparkContext('local[4]', 'PythonTest')
+ globs['sc'] = sc
+ globs['sqlContext'] = SQLContext(sc)
+ globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \
+ .toDF(StructType([StructField('age', IntegerType()),
+ StructField('name', StringType())]))
+ jsonStrings = [
+ '{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
+ '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},'
+ '"field6":[{"field7": "row2"}]}',
+ '{"field1" : null, "field2": "row3", '
+ '"field3":{"field4":33, "field5": []}}'
+ ]
+ globs['jsonStrings'] = jsonStrings
+ (failure_count, test_count) = doctest.testmod(
+ pyspark.sql.readwriter, globs=globs,
+ optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+
+if __name__ == "__main__":
+ _test()