From 61b427d4b1c4934bd70ed4da844b64f0e9a377aa Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 16 Jan 2015 21:09:06 -0800 Subject: [SPARK-5193][SQL] Remove Spark SQL Java-specific API. After the following patches, the main (Scala) API is now usable for Java users directly. https://github.com/apache/spark/pull/4056 https://github.com/apache/spark/pull/4054 https://github.com/apache/spark/pull/4049 https://github.com/apache/spark/pull/4030 https://github.com/apache/spark/pull/3965 https://github.com/apache/spark/pull/3958 Author: Reynold Xin Closes #4065 from rxin/sql-java-api and squashes the following commits: b1fd860 [Reynold Xin] Fix Mima 6d86578 [Reynold Xin] Ok one more attempt in fixing Python... e8f1455 [Reynold Xin] Fix Python again... 3e53f91 [Reynold Xin] Fixed Python. 83735da [Reynold Xin] Fix BigDecimal test. e9f1de3 [Reynold Xin] Use scala BigDecimal. 500d2c4 [Reynold Xin] Fix Decimal. ba3bfa2 [Reynold Xin] Updated javadoc for RowFactory. c4ae1c5 [Reynold Xin] [SPARK-5193][SQL] Remove Spark SQL Java-specific API. --- python/pyspark/sql.py | 48 ++++++++++++------------------------------------ 1 file changed, 12 insertions(+), 36 deletions(-) (limited to 'python') diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index dcd3b60a60..1990323249 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -1458,7 +1458,7 @@ class SQLContext(object): jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd()) srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def registerRDDAsTable(self, rdd, tableName): """Registers the given RDD as a temporary table in the catalog. @@ -1487,7 +1487,7 @@ class SQLContext(object): >>> sorted(srdd.collect()) == sorted(srdd2.collect()) True """ - jschema_rdd = self._ssql_ctx.parquetFile(path).toJavaSchemaRDD() + jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) def jsonFile(self, path, schema=None, samplingRatio=1.0): @@ -1549,7 +1549,7 @@ class SQLContext(object): else: scala_datatype = self._ssql_ctx.parseDataType(schema.json()) srdd = self._ssql_ctx.jsonFile(path, scala_datatype) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def jsonRDD(self, rdd, schema=None, samplingRatio=1.0): """Loads an RDD storing one JSON object per string as a L{SchemaRDD}. @@ -1619,7 +1619,7 @@ class SQLContext(object): else: scala_datatype = self._ssql_ctx.parseDataType(schema.json()) srdd = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def sql(self, sqlQuery): """Return a L{SchemaRDD} representing the result of the given query. @@ -1630,7 +1630,7 @@ class SQLContext(object): >>> srdd2.collect() [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')] """ - return SchemaRDD(self._ssql_ctx.sql(sqlQuery).toJavaSchemaRDD(), self) + return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) def table(self, tableName): """Returns the specified table as a L{SchemaRDD}. @@ -1641,7 +1641,7 @@ class SQLContext(object): >>> sorted(srdd.collect()) == sorted(srdd2.collect()) True """ - return SchemaRDD(self._ssql_ctx.table(tableName).toJavaSchemaRDD(), self) + return SchemaRDD(self._ssql_ctx.table(tableName), self) def cacheTable(self, tableName): """Caches the specified table in-memory.""" @@ -1686,24 +1686,6 @@ class HiveContext(SQLContext): def _get_hive_ctx(self): return self._jvm.HiveContext(self._jsc.sc()) - def hiveql(self, hqlQuery): - """ - DEPRECATED: Use sql() - """ - warnings.warn("hiveql() is deprecated as the sql function now parses using HiveQL by" + - "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'", - DeprecationWarning) - return SchemaRDD(self._ssql_ctx.hiveql(hqlQuery).toJavaSchemaRDD(), self) - - def hql(self, hqlQuery): - """ - DEPRECATED: Use sql() - """ - warnings.warn("hql() is deprecated as the sql function now parses using HiveQL by" + - "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'", - DeprecationWarning) - return self.hiveql(hqlQuery) - class LocalHiveContext(HiveContext): @@ -1716,12 +1698,6 @@ class LocalHiveContext(HiveContext): return self._jvm.LocalHiveContext(self._jsc.sc()) -class TestHiveContext(HiveContext): - - def _get_hive_ctx(self): - return self._jvm.TestHiveContext(self._jsc.sc()) - - def _create_row(fields, values): row = Row(*values) row.__FIELDS__ = fields @@ -1846,7 +1822,7 @@ class SchemaRDD(RDD): self.sql_ctx = sql_ctx self._sc = sql_ctx._sc clsName = jschema_rdd.getClass().getName() - assert clsName.endswith("JavaSchemaRDD"), "jschema_rdd must be JavaSchemaRDD" + assert clsName.endswith("SchemaRDD"), "jschema_rdd must be SchemaRDD" self._jschema_rdd = jschema_rdd self._id = None self.is_cached = False @@ -1880,7 +1856,7 @@ class SchemaRDD(RDD): >>> srdd.limit(0).collect() [] """ - rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD() + rdd = self._jschema_rdd.baseSchemaRDD().limit(num) return SchemaRDD(rdd, self.sql_ctx) def toJSON(self, use_unicode=False): @@ -2059,18 +2035,18 @@ class SchemaRDD(RDD): def getCheckpointFile(self): checkpointFile = self._jschema_rdd.getCheckpointFile() - if checkpointFile.isPresent(): + if checkpointFile.isDefined(): return checkpointFile.get() def coalesce(self, numPartitions, shuffle=False): - rdd = self._jschema_rdd.coalesce(numPartitions, shuffle) + rdd = self._jschema_rdd.coalesce(numPartitions, shuffle, None) return SchemaRDD(rdd, self.sql_ctx) def distinct(self, numPartitions=None): if numPartitions is None: rdd = self._jschema_rdd.distinct() else: - rdd = self._jschema_rdd.distinct(numPartitions) + rdd = self._jschema_rdd.distinct(numPartitions, None) return SchemaRDD(rdd, self.sql_ctx) def intersection(self, other): @@ -2081,7 +2057,7 @@ class SchemaRDD(RDD): raise ValueError("Can only intersect with another SchemaRDD") def repartition(self, numPartitions): - rdd = self._jschema_rdd.repartition(numPartitions) + rdd = self._jschema_rdd.repartition(numPartitions, None) return SchemaRDD(rdd, self.sql_ctx) def subtract(self, other, numPartitions=None): -- cgit v1.2.3