diff options
author | Reynold Xin <rxin@databricks.com> | 2015-01-16 21:09:06 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-01-16 21:09:06 -0800 |
commit | 61b427d4b1c4934bd70ed4da844b64f0e9a377aa (patch) | |
tree | 5068b31119fa7e2256422d4fdf18703ae64d7ab2 /python | |
parent | ee1c1f3a04dfe80843432e349f01178e47f02443 (diff) | |
download | spark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.tar.gz spark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.tar.bz2 spark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.zip |
[SPARK-5193][SQL] Remove Spark SQL Java-specific API.
After the following patches, the main (Scala) API is now usable for Java users directly.
https://github.com/apache/spark/pull/4056
https://github.com/apache/spark/pull/4054
https://github.com/apache/spark/pull/4049
https://github.com/apache/spark/pull/4030
https://github.com/apache/spark/pull/3965
https://github.com/apache/spark/pull/3958
Author: Reynold Xin <rxin@databricks.com>
Closes #4065 from rxin/sql-java-api and squashes the following commits:
b1fd860 [Reynold Xin] Fix Mima
6d86578 [Reynold Xin] Ok one more attempt in fixing Python...
e8f1455 [Reynold Xin] Fix Python again...
3e53f91 [Reynold Xin] Fixed Python.
83735da [Reynold Xin] Fix BigDecimal test.
e9f1de3 [Reynold Xin] Use scala BigDecimal.
500d2c4 [Reynold Xin] Fix Decimal.
ba3bfa2 [Reynold Xin] Updated javadoc for RowFactory.
c4ae1c5 [Reynold Xin] [SPARK-5193][SQL] Remove Spark SQL Java-specific API.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql.py | 48 |
1 files changed, 12 insertions, 36 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index dcd3b60a60..1990323249 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -1458,7 +1458,7 @@ class SQLContext(object): jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd()) srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def registerRDDAsTable(self, rdd, tableName): """Registers the given RDD as a temporary table in the catalog. @@ -1487,7 +1487,7 @@ class SQLContext(object): >>> sorted(srdd.collect()) == sorted(srdd2.collect()) True """ - jschema_rdd = self._ssql_ctx.parquetFile(path).toJavaSchemaRDD() + jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) def jsonFile(self, path, schema=None, samplingRatio=1.0): @@ -1549,7 +1549,7 @@ class SQLContext(object): else: scala_datatype = self._ssql_ctx.parseDataType(schema.json()) srdd = self._ssql_ctx.jsonFile(path, scala_datatype) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def jsonRDD(self, rdd, schema=None, samplingRatio=1.0): """Loads an RDD storing one JSON object per string as a L{SchemaRDD}. @@ -1619,7 +1619,7 @@ class SQLContext(object): else: scala_datatype = self._ssql_ctx.parseDataType(schema.json()) srdd = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def sql(self, sqlQuery): """Return a L{SchemaRDD} representing the result of the given query. @@ -1630,7 +1630,7 @@ class SQLContext(object): >>> srdd2.collect() [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')] """ - return SchemaRDD(self._ssql_ctx.sql(sqlQuery).toJavaSchemaRDD(), self) + return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) def table(self, tableName): """Returns the specified table as a L{SchemaRDD}. @@ -1641,7 +1641,7 @@ class SQLContext(object): >>> sorted(srdd.collect()) == sorted(srdd2.collect()) True """ - return SchemaRDD(self._ssql_ctx.table(tableName).toJavaSchemaRDD(), self) + return SchemaRDD(self._ssql_ctx.table(tableName), self) def cacheTable(self, tableName): """Caches the specified table in-memory.""" @@ -1686,24 +1686,6 @@ class HiveContext(SQLContext): def _get_hive_ctx(self): return self._jvm.HiveContext(self._jsc.sc()) - def hiveql(self, hqlQuery): - """ - DEPRECATED: Use sql() - """ - warnings.warn("hiveql() is deprecated as the sql function now parses using HiveQL by" + - "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'", - DeprecationWarning) - return SchemaRDD(self._ssql_ctx.hiveql(hqlQuery).toJavaSchemaRDD(), self) - - def hql(self, hqlQuery): - """ - DEPRECATED: Use sql() - """ - warnings.warn("hql() is deprecated as the sql function now parses using HiveQL by" + - "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'", - DeprecationWarning) - return self.hiveql(hqlQuery) - class LocalHiveContext(HiveContext): @@ -1716,12 +1698,6 @@ class LocalHiveContext(HiveContext): return self._jvm.LocalHiveContext(self._jsc.sc()) -class TestHiveContext(HiveContext): - - def _get_hive_ctx(self): - return self._jvm.TestHiveContext(self._jsc.sc()) - - def _create_row(fields, values): row = Row(*values) row.__FIELDS__ = fields @@ -1846,7 +1822,7 @@ class SchemaRDD(RDD): self.sql_ctx = sql_ctx self._sc = sql_ctx._sc clsName = jschema_rdd.getClass().getName() - assert clsName.endswith("JavaSchemaRDD"), "jschema_rdd must be JavaSchemaRDD" + assert clsName.endswith("SchemaRDD"), "jschema_rdd must be SchemaRDD" self._jschema_rdd = jschema_rdd self._id = None self.is_cached = False @@ -1880,7 +1856,7 @@ class SchemaRDD(RDD): >>> srdd.limit(0).collect() [] """ - rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD() + rdd = self._jschema_rdd.baseSchemaRDD().limit(num) return SchemaRDD(rdd, self.sql_ctx) def toJSON(self, use_unicode=False): @@ -2059,18 +2035,18 @@ class SchemaRDD(RDD): def getCheckpointFile(self): checkpointFile = self._jschema_rdd.getCheckpointFile() - if checkpointFile.isPresent(): + if checkpointFile.isDefined(): return checkpointFile.get() def coalesce(self, numPartitions, shuffle=False): - rdd = self._jschema_rdd.coalesce(numPartitions, shuffle) + rdd = self._jschema_rdd.coalesce(numPartitions, shuffle, None) return SchemaRDD(rdd, self.sql_ctx) def distinct(self, numPartitions=None): if numPartitions is None: rdd = self._jschema_rdd.distinct() else: - rdd = self._jschema_rdd.distinct(numPartitions) + rdd = self._jschema_rdd.distinct(numPartitions, None) return SchemaRDD(rdd, self.sql_ctx) def intersection(self, other): @@ -2081,7 +2057,7 @@ class SchemaRDD(RDD): raise ValueError("Can only intersect with another SchemaRDD") def repartition(self, numPartitions): - rdd = self._jschema_rdd.repartition(numPartitions) + rdd = self._jschema_rdd.repartition(numPartitions, None) return SchemaRDD(rdd, self.sql_ctx) def subtract(self, other, numPartitions=None): |