aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-01-16 21:09:06 -0800
committerReynold Xin <rxin@databricks.com>2015-01-16 21:09:06 -0800
commit61b427d4b1c4934bd70ed4da844b64f0e9a377aa (patch)
tree5068b31119fa7e2256422d4fdf18703ae64d7ab2 /python
parentee1c1f3a04dfe80843432e349f01178e47f02443 (diff)
downloadspark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.tar.gz
spark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.tar.bz2
spark-61b427d4b1c4934bd70ed4da844b64f0e9a377aa.zip
[SPARK-5193][SQL] Remove Spark SQL Java-specific API.
After the following patches, the main (Scala) API is now usable for Java users directly. https://github.com/apache/spark/pull/4056 https://github.com/apache/spark/pull/4054 https://github.com/apache/spark/pull/4049 https://github.com/apache/spark/pull/4030 https://github.com/apache/spark/pull/3965 https://github.com/apache/spark/pull/3958 Author: Reynold Xin <rxin@databricks.com> Closes #4065 from rxin/sql-java-api and squashes the following commits: b1fd860 [Reynold Xin] Fix Mima 6d86578 [Reynold Xin] Ok one more attempt in fixing Python... e8f1455 [Reynold Xin] Fix Python again... 3e53f91 [Reynold Xin] Fixed Python. 83735da [Reynold Xin] Fix BigDecimal test. e9f1de3 [Reynold Xin] Use scala BigDecimal. 500d2c4 [Reynold Xin] Fix Decimal. ba3bfa2 [Reynold Xin] Updated javadoc for RowFactory. c4ae1c5 [Reynold Xin] [SPARK-5193][SQL] Remove Spark SQL Java-specific API.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql.py48
1 files changed, 12 insertions, 36 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index dcd3b60a60..1990323249 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -1458,7 +1458,7 @@ class SQLContext(object):
jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
- return SchemaRDD(srdd.toJavaSchemaRDD(), self)
+ return SchemaRDD(srdd, self)
def registerRDDAsTable(self, rdd, tableName):
"""Registers the given RDD as a temporary table in the catalog.
@@ -1487,7 +1487,7 @@ class SQLContext(object):
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True
"""
- jschema_rdd = self._ssql_ctx.parquetFile(path).toJavaSchemaRDD()
+ jschema_rdd = self._ssql_ctx.parquetFile(path)
return SchemaRDD(jschema_rdd, self)
def jsonFile(self, path, schema=None, samplingRatio=1.0):
@@ -1549,7 +1549,7 @@ class SQLContext(object):
else:
scala_datatype = self._ssql_ctx.parseDataType(schema.json())
srdd = self._ssql_ctx.jsonFile(path, scala_datatype)
- return SchemaRDD(srdd.toJavaSchemaRDD(), self)
+ return SchemaRDD(srdd, self)
def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
"""Loads an RDD storing one JSON object per string as a L{SchemaRDD}.
@@ -1619,7 +1619,7 @@ class SQLContext(object):
else:
scala_datatype = self._ssql_ctx.parseDataType(schema.json())
srdd = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
- return SchemaRDD(srdd.toJavaSchemaRDD(), self)
+ return SchemaRDD(srdd, self)
def sql(self, sqlQuery):
"""Return a L{SchemaRDD} representing the result of the given query.
@@ -1630,7 +1630,7 @@ class SQLContext(object):
>>> srdd2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
"""
- return SchemaRDD(self._ssql_ctx.sql(sqlQuery).toJavaSchemaRDD(), self)
+ return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self)
def table(self, tableName):
"""Returns the specified table as a L{SchemaRDD}.
@@ -1641,7 +1641,7 @@ class SQLContext(object):
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True
"""
- return SchemaRDD(self._ssql_ctx.table(tableName).toJavaSchemaRDD(), self)
+ return SchemaRDD(self._ssql_ctx.table(tableName), self)
def cacheTable(self, tableName):
"""Caches the specified table in-memory."""
@@ -1686,24 +1686,6 @@ class HiveContext(SQLContext):
def _get_hive_ctx(self):
return self._jvm.HiveContext(self._jsc.sc())
- def hiveql(self, hqlQuery):
- """
- DEPRECATED: Use sql()
- """
- warnings.warn("hiveql() is deprecated as the sql function now parses using HiveQL by" +
- "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'",
- DeprecationWarning)
- return SchemaRDD(self._ssql_ctx.hiveql(hqlQuery).toJavaSchemaRDD(), self)
-
- def hql(self, hqlQuery):
- """
- DEPRECATED: Use sql()
- """
- warnings.warn("hql() is deprecated as the sql function now parses using HiveQL by" +
- "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'",
- DeprecationWarning)
- return self.hiveql(hqlQuery)
-
class LocalHiveContext(HiveContext):
@@ -1716,12 +1698,6 @@ class LocalHiveContext(HiveContext):
return self._jvm.LocalHiveContext(self._jsc.sc())
-class TestHiveContext(HiveContext):
-
- def _get_hive_ctx(self):
- return self._jvm.TestHiveContext(self._jsc.sc())
-
-
def _create_row(fields, values):
row = Row(*values)
row.__FIELDS__ = fields
@@ -1846,7 +1822,7 @@ class SchemaRDD(RDD):
self.sql_ctx = sql_ctx
self._sc = sql_ctx._sc
clsName = jschema_rdd.getClass().getName()
- assert clsName.endswith("JavaSchemaRDD"), "jschema_rdd must be JavaSchemaRDD"
+ assert clsName.endswith("SchemaRDD"), "jschema_rdd must be SchemaRDD"
self._jschema_rdd = jschema_rdd
self._id = None
self.is_cached = False
@@ -1880,7 +1856,7 @@ class SchemaRDD(RDD):
>>> srdd.limit(0).collect()
[]
"""
- rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD()
+ rdd = self._jschema_rdd.baseSchemaRDD().limit(num)
return SchemaRDD(rdd, self.sql_ctx)
def toJSON(self, use_unicode=False):
@@ -2059,18 +2035,18 @@ class SchemaRDD(RDD):
def getCheckpointFile(self):
checkpointFile = self._jschema_rdd.getCheckpointFile()
- if checkpointFile.isPresent():
+ if checkpointFile.isDefined():
return checkpointFile.get()
def coalesce(self, numPartitions, shuffle=False):
- rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
+ rdd = self._jschema_rdd.coalesce(numPartitions, shuffle, None)
return SchemaRDD(rdd, self.sql_ctx)
def distinct(self, numPartitions=None):
if numPartitions is None:
rdd = self._jschema_rdd.distinct()
else:
- rdd = self._jschema_rdd.distinct(numPartitions)
+ rdd = self._jschema_rdd.distinct(numPartitions, None)
return SchemaRDD(rdd, self.sql_ctx)
def intersection(self, other):
@@ -2081,7 +2057,7 @@ class SchemaRDD(RDD):
raise ValueError("Can only intersect with another SchemaRDD")
def repartition(self, numPartitions):
- rdd = self._jschema_rdd.repartition(numPartitions)
+ rdd = self._jschema_rdd.repartition(numPartitions, None)
return SchemaRDD(rdd, self.sql_ctx)
def subtract(self, other, numPartitions=None):