diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql.py | 48 |
1 files changed, 12 insertions, 36 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index dcd3b60a60..1990323249 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -1458,7 +1458,7 @@ class SQLContext(object): jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd()) srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def registerRDDAsTable(self, rdd, tableName): """Registers the given RDD as a temporary table in the catalog. @@ -1487,7 +1487,7 @@ class SQLContext(object): >>> sorted(srdd.collect()) == sorted(srdd2.collect()) True """ - jschema_rdd = self._ssql_ctx.parquetFile(path).toJavaSchemaRDD() + jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) def jsonFile(self, path, schema=None, samplingRatio=1.0): @@ -1549,7 +1549,7 @@ class SQLContext(object): else: scala_datatype = self._ssql_ctx.parseDataType(schema.json()) srdd = self._ssql_ctx.jsonFile(path, scala_datatype) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def jsonRDD(self, rdd, schema=None, samplingRatio=1.0): """Loads an RDD storing one JSON object per string as a L{SchemaRDD}. @@ -1619,7 +1619,7 @@ class SQLContext(object): else: scala_datatype = self._ssql_ctx.parseDataType(schema.json()) srdd = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype) - return SchemaRDD(srdd.toJavaSchemaRDD(), self) + return SchemaRDD(srdd, self) def sql(self, sqlQuery): """Return a L{SchemaRDD} representing the result of the given query. @@ -1630,7 +1630,7 @@ class SQLContext(object): >>> srdd2.collect() [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')] """ - return SchemaRDD(self._ssql_ctx.sql(sqlQuery).toJavaSchemaRDD(), self) + return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) def table(self, tableName): """Returns the specified table as a L{SchemaRDD}. @@ -1641,7 +1641,7 @@ class SQLContext(object): >>> sorted(srdd.collect()) == sorted(srdd2.collect()) True """ - return SchemaRDD(self._ssql_ctx.table(tableName).toJavaSchemaRDD(), self) + return SchemaRDD(self._ssql_ctx.table(tableName), self) def cacheTable(self, tableName): """Caches the specified table in-memory.""" @@ -1686,24 +1686,6 @@ class HiveContext(SQLContext): def _get_hive_ctx(self): return self._jvm.HiveContext(self._jsc.sc()) - def hiveql(self, hqlQuery): - """ - DEPRECATED: Use sql() - """ - warnings.warn("hiveql() is deprecated as the sql function now parses using HiveQL by" + - "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'", - DeprecationWarning) - return SchemaRDD(self._ssql_ctx.hiveql(hqlQuery).toJavaSchemaRDD(), self) - - def hql(self, hqlQuery): - """ - DEPRECATED: Use sql() - """ - warnings.warn("hql() is deprecated as the sql function now parses using HiveQL by" + - "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'", - DeprecationWarning) - return self.hiveql(hqlQuery) - class LocalHiveContext(HiveContext): @@ -1716,12 +1698,6 @@ class LocalHiveContext(HiveContext): return self._jvm.LocalHiveContext(self._jsc.sc()) -class TestHiveContext(HiveContext): - - def _get_hive_ctx(self): - return self._jvm.TestHiveContext(self._jsc.sc()) - - def _create_row(fields, values): row = Row(*values) row.__FIELDS__ = fields @@ -1846,7 +1822,7 @@ class SchemaRDD(RDD): self.sql_ctx = sql_ctx self._sc = sql_ctx._sc clsName = jschema_rdd.getClass().getName() - assert clsName.endswith("JavaSchemaRDD"), "jschema_rdd must be JavaSchemaRDD" + assert clsName.endswith("SchemaRDD"), "jschema_rdd must be SchemaRDD" self._jschema_rdd = jschema_rdd self._id = None self.is_cached = False @@ -1880,7 +1856,7 @@ class SchemaRDD(RDD): >>> srdd.limit(0).collect() [] """ - rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD() + rdd = self._jschema_rdd.baseSchemaRDD().limit(num) return SchemaRDD(rdd, self.sql_ctx) def toJSON(self, use_unicode=False): @@ -2059,18 +2035,18 @@ class SchemaRDD(RDD): def getCheckpointFile(self): checkpointFile = self._jschema_rdd.getCheckpointFile() - if checkpointFile.isPresent(): + if checkpointFile.isDefined(): return checkpointFile.get() def coalesce(self, numPartitions, shuffle=False): - rdd = self._jschema_rdd.coalesce(numPartitions, shuffle) + rdd = self._jschema_rdd.coalesce(numPartitions, shuffle, None) return SchemaRDD(rdd, self.sql_ctx) def distinct(self, numPartitions=None): if numPartitions is None: rdd = self._jschema_rdd.distinct() else: - rdd = self._jschema_rdd.distinct(numPartitions) + rdd = self._jschema_rdd.distinct(numPartitions, None) return SchemaRDD(rdd, self.sql_ctx) def intersection(self, other): @@ -2081,7 +2057,7 @@ class SchemaRDD(RDD): raise ValueError("Can only intersect with another SchemaRDD") def repartition(self, numPartitions): - rdd = self._jschema_rdd.repartition(numPartitions) + rdd = self._jschema_rdd.repartition(numPartitions, None) return SchemaRDD(rdd, self.sql_ctx) def subtract(self, other, numPartitions=None): |