diff options
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r-- | python/pyspark/sql/context.py | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 49f016a9cf..882c0f98ea 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -21,6 +21,7 @@ from array import array from itertools import imap from py4j.protocol import Py4JError +from py4j.java_collections import MapConverter from pyspark.rdd import _prepare_for_python_RDD from pyspark.serializers import AutoBatchedSerializer, PickleSerializer @@ -87,6 +88,18 @@ class SQLContext(object): self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc()) return self._scala_SQLContext + def setConf(self, key, value): + """Sets the given Spark SQL configuration property. + """ + self._ssql_ctx.setConf(key, value) + + def getConf(self, key, defaultValue): + """Returns the value of Spark SQL configuration property for the given key. + + If the key is not set, returns defaultValue. + """ + return self._ssql_ctx.getConf(key, defaultValue) + def registerFunction(self, name, f, returnType=StringType()): """Registers a lambda function as a UDF so it can be used in SQL statements. @@ -455,6 +468,61 @@ class SQLContext(object): df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype) return DataFrame(df, self) + def load(self, path=None, source=None, schema=None, **options): + """Returns the dataset in a data source as a DataFrame. + + The data source is specified by the `source` and a set of `options`. + If `source` is not specified, the default data source configured by + spark.sql.sources.default will be used. + + Optionally, a schema can be provided as the schema of the returned DataFrame. + """ + if path is not None: + options["path"] = path + if source is None: + source = self.getConf("spark.sql.sources.default", + "org.apache.spark.sql.parquet") + joptions = MapConverter().convert(options, + self._sc._gateway._gateway_client) + if schema is None: + df = self._ssql_ctx.load(source, joptions) + else: + if not isinstance(schema, StructType): + raise TypeError("schema should be StructType") + scala_datatype = self._ssql_ctx.parseDataType(schema.json()) + df = self._ssql_ctx.load(source, scala_datatype, joptions) + return DataFrame(df, self) + + def createExternalTable(self, tableName, path=None, source=None, + schema=None, **options): + """Creates an external table based on the dataset in a data source. + + It returns the DataFrame associated with the external table. + + The data source is specified by the `source` and a set of `options`. + If `source` is not specified, the default data source configured by + spark.sql.sources.default will be used. + + Optionally, a schema can be provided as the schema of the returned DataFrame and + created external table. + """ + if path is not None: + options["path"] = path + if source is None: + source = self.getConf("spark.sql.sources.default", + "org.apache.spark.sql.parquet") + joptions = MapConverter().convert(options, + self._sc._gateway._gateway_client) + if schema is None: + df = self._ssql_ctx.createExternalTable(tableName, source, joptions) + else: + if not isinstance(schema, StructType): + raise TypeError("schema should be StructType") + scala_datatype = self._ssql_ctx.parseDataType(schema.json()) + df = self._ssql_ctx.createExternalTable(tableName, source, scala_datatype, + joptions) + return DataFrame(df, self) + def sql(self, sqlQuery): """Return a L{DataFrame} representing the result of the given query. |