diff options
author | Reynold Xin <rxin@databricks.com> | 2016-01-04 18:02:38 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-01-04 18:02:38 -0800 |
commit | 77ab49b8575d2ebd678065fa70b0343d532ab9c2 (patch) | |
tree | f9c4a990499d1856494f787f8bfc095d68a69735 /python | |
parent | fdfac22d08fc4fdc640843dd93a29e2ce4aee2ef (diff) | |
download | spark-77ab49b8575d2ebd678065fa70b0343d532ab9c2.tar.gz spark-77ab49b8575d2ebd678065fa70b0343d532ab9c2.tar.bz2 spark-77ab49b8575d2ebd678065fa70b0343d532ab9c2.zip |
[SPARK-12600][SQL] Remove deprecated methods in Spark SQL
Author: Reynold Xin <rxin@databricks.com>
Closes #10559 from rxin/remove-deprecated-sql.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/__init__.py | 2 | ||||
-rw-r--r-- | python/pyspark/sql/__init__.py | 2 | ||||
-rw-r--r-- | python/pyspark/sql/column.py | 20 | ||||
-rw-r--r-- | python/pyspark/sql/context.py | 111 | ||||
-rw-r--r-- | python/pyspark/sql/dataframe.py | 48 | ||||
-rw-r--r-- | python/pyspark/sql/functions.py | 24 | ||||
-rw-r--r-- | python/pyspark/sql/readwriter.py | 20 | ||||
-rw-r--r-- | python/pyspark/sql/tests.py | 32 |
8 files changed, 34 insertions, 225 deletions
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 8475dfb1c6..d530723ca9 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -65,7 +65,7 @@ def since(version): # for back compatibility -from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row +from pyspark.sql import SQLContext, HiveContext, Row __all__ = [ "SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast", diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 98eaf52866..0b06c8339f 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -47,7 +47,7 @@ from __future__ import absolute_import from pyspark.sql.types import Row from pyspark.sql.context import SQLContext, HiveContext from pyspark.sql.column import Column -from pyspark.sql.dataframe import DataFrame, SchemaRDD, DataFrameNaFunctions, DataFrameStatFunctions +from pyspark.sql.dataframe import DataFrame, DataFrameNaFunctions, DataFrameStatFunctions from pyspark.sql.group import GroupedData from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter from pyspark.sql.window import Window, WindowSpec diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 81fd4e7826..900def59d2 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -27,8 +27,7 @@ from pyspark.context import SparkContext from pyspark.rdd import ignore_unicode_prefix from pyspark.sql.types import * -__all__ = ["DataFrame", "Column", "SchemaRDD", "DataFrameNaFunctions", - "DataFrameStatFunctions"] +__all__ = ["DataFrame", "Column", "DataFrameNaFunctions", "DataFrameStatFunctions"] def _create_column_from_literal(literal): @@ -273,23 +272,6 @@ class Column(object): __getslice__ = substr @ignore_unicode_prefix - @since(1.3) - def inSet(self, *cols): - """ - A boolean expression that is evaluated to true if the value of this - expression is contained by the evaluated values of the arguments. - - >>> df[df.name.inSet("Bob", "Mike")].collect() - [Row(age=5, name=u'Bob')] - >>> df[df.age.inSet([1, 2, 3])].collect() - [Row(age=2, name=u'Alice')] - - .. note:: Deprecated in 1.5, use :func:`Column.isin` instead. - """ - warnings.warn("inSet is deprecated. Use isin() instead.") - return self.isin(*cols) - - @ignore_unicode_prefix @since(1.5) def isin(self, *cols): """ diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index ba6915a123..91e27cf16e 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -274,33 +274,6 @@ class SQLContext(object): schema = rdd.map(_infer_schema).reduce(_merge_type) return schema - @ignore_unicode_prefix - def inferSchema(self, rdd, samplingRatio=None): - """ - .. note:: Deprecated in 1.3, use :func:`createDataFrame` instead. - """ - warnings.warn("inferSchema is deprecated, please use createDataFrame instead.") - - if isinstance(rdd, DataFrame): - raise TypeError("Cannot apply schema to DataFrame") - - return self.createDataFrame(rdd, None, samplingRatio) - - @ignore_unicode_prefix - def applySchema(self, rdd, schema): - """ - .. note:: Deprecated in 1.3, use :func:`createDataFrame` instead. - """ - warnings.warn("applySchema is deprecated, please use createDataFrame instead") - - if isinstance(rdd, DataFrame): - raise TypeError("Cannot apply schema to DataFrame") - - if not isinstance(schema, StructType): - raise TypeError("schema should be StructType, but got %s" % type(schema)) - - return self.createDataFrame(rdd, schema) - def _createFromRDD(self, rdd, schema, samplingRatio): """ Create an RDD for DataFrame from an existing RDD, returns the RDD and schema. @@ -450,90 +423,6 @@ class SQLContext(object): """ self._ssql_ctx.dropTempTable(tableName) - def parquetFile(self, *paths): - """Loads a Parquet file, returning the result as a :class:`DataFrame`. - - .. note:: Deprecated in 1.4, use :func:`DataFrameReader.parquet` instead. - - >>> sqlContext.parquetFile('python/test_support/sql/parquet_partitioned').dtypes - [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] - """ - warnings.warn("parquetFile is deprecated. Use read.parquet() instead.") - gateway = self._sc._gateway - jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths)) - for i in range(0, len(paths)): - jpaths[i] = paths[i] - jdf = self._ssql_ctx.parquetFile(jpaths) - return DataFrame(jdf, self) - - def jsonFile(self, path, schema=None, samplingRatio=1.0): - """Loads a text file storing one JSON object per line as a :class:`DataFrame`. - - .. note:: Deprecated in 1.4, use :func:`DataFrameReader.json` instead. - - >>> sqlContext.jsonFile('python/test_support/sql/people.json').dtypes - [('age', 'bigint'), ('name', 'string')] - """ - warnings.warn("jsonFile is deprecated. Use read.json() instead.") - if schema is None: - df = self._ssql_ctx.jsonFile(path, samplingRatio) - else: - scala_datatype = self._ssql_ctx.parseDataType(schema.json()) - df = self._ssql_ctx.jsonFile(path, scala_datatype) - return DataFrame(df, self) - - @ignore_unicode_prefix - @since(1.0) - def jsonRDD(self, rdd, schema=None, samplingRatio=1.0): - """Loads an RDD storing one JSON object per string as a :class:`DataFrame`. - - If the schema is provided, applies the given schema to this JSON dataset. - Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema. - - >>> df1 = sqlContext.jsonRDD(json) - >>> df1.first() - Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None) - - >>> df2 = sqlContext.jsonRDD(json, df1.schema) - >>> df2.first() - Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None) - - >>> from pyspark.sql.types import * - >>> schema = StructType([ - ... StructField("field2", StringType()), - ... StructField("field3", - ... StructType([StructField("field5", ArrayType(IntegerType()))])) - ... ]) - >>> df3 = sqlContext.jsonRDD(json, schema) - >>> df3.first() - Row(field2=u'row1', field3=Row(field5=None)) - """ - - def func(iterator): - for x in iterator: - if not isinstance(x, basestring): - x = unicode(x) - if isinstance(x, unicode): - x = x.encode("utf-8") - yield x - keyed = rdd.mapPartitions(func) - keyed._bypass_serializer = True - jrdd = keyed._jrdd.map(self._jvm.BytesToString()) - if schema is None: - df = self._ssql_ctx.jsonRDD(jrdd.rdd(), samplingRatio) - else: - scala_datatype = self._ssql_ctx.parseDataType(schema.json()) - df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype) - return DataFrame(df, self) - - def load(self, path=None, source=None, schema=None, **options): - """Returns the dataset in a data source as a :class:`DataFrame`. - - .. note:: Deprecated in 1.4, use :func:`DataFrameReader.load` instead. - """ - warnings.warn("load is deprecated. Use read.load() instead.") - return self.read.load(path, source, schema, **options) - @since(1.3) def createExternalTable(self, tableName, path=None, source=None, schema=None, **options): """Creates an external table based on the dataset in a data source. diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index ad621df910..a7bc288e38 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -36,7 +36,7 @@ from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column from pyspark.sql.readwriter import DataFrameWriter from pyspark.sql.types import * -__all__ = ["DataFrame", "SchemaRDD", "DataFrameNaFunctions", "DataFrameStatFunctions"] +__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"] class DataFrame(object): @@ -113,14 +113,6 @@ class DataFrame(object): rdd = self._jdf.toJSON() return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode)) - def saveAsParquetFile(self, path): - """Saves the contents as a Parquet file, preserving the schema. - - .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.parquet` instead. - """ - warnings.warn("saveAsParquetFile is deprecated. Use write.parquet() instead.") - self._jdf.saveAsParquetFile(path) - @since(1.3) def registerTempTable(self, name): """Registers this RDD as a temporary table using the given name. @@ -135,38 +127,6 @@ class DataFrame(object): """ self._jdf.registerTempTable(name) - def registerAsTable(self, name): - """ - .. note:: Deprecated in 1.4, use :func:`registerTempTable` instead. - """ - warnings.warn("Use registerTempTable instead of registerAsTable.") - self.registerTempTable(name) - - def insertInto(self, tableName, overwrite=False): - """Inserts the contents of this :class:`DataFrame` into the specified table. - - .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.insertInto` instead. - """ - warnings.warn("insertInto is deprecated. Use write.insertInto() instead.") - self.write.insertInto(tableName, overwrite) - - def saveAsTable(self, tableName, source=None, mode="error", **options): - """Saves the contents of this :class:`DataFrame` to a data source as a table. - - .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.saveAsTable` instead. - """ - warnings.warn("insertInto is deprecated. Use write.saveAsTable() instead.") - self.write.saveAsTable(tableName, source, mode, **options) - - @since(1.3) - def save(self, path=None, source=None, mode="error", **options): - """Saves the contents of the :class:`DataFrame` to a data source. - - .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.save` instead. - """ - warnings.warn("insertInto is deprecated. Use write.save() instead.") - return self.write.save(path, source, mode, **options) - @property @since(1.4) def write(self): @@ -1388,12 +1348,6 @@ class DataFrame(object): drop_duplicates = dropDuplicates -# Having SchemaRDD for backward compatibility (for docs) -class SchemaRDD(DataFrame): - """SchemaRDD is deprecated, please use :class:`DataFrame`. - """ - - def _to_scala_map(sc, jm): """ Convert a dict into a JVM Map. diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 25594d79c2..7c15e38458 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -149,12 +149,8 @@ _binary_mathfunctions = { } _window_functions = { - 'rowNumber': - """.. note:: Deprecated in 1.6, use row_number instead.""", 'row_number': """returns a sequential number starting at 1 within a window partition.""", - 'denseRank': - """.. note:: Deprecated in 1.6, use dense_rank instead.""", 'dense_rank': """returns the rank of rows within a window partition, without any gaps. @@ -171,13 +167,9 @@ _window_functions = { place and that the next person came in third. This is equivalent to the RANK function in SQL.""", - 'cumeDist': - """.. note:: Deprecated in 1.6, use cume_dist instead.""", 'cume_dist': """returns the cumulative distribution of values within a window partition, i.e. the fraction of rows that are below the current row.""", - 'percentRank': - """.. note:: Deprecated in 1.6, use percent_rank instead.""", 'percent_rank': """returns the relative rank (i.e. percentile) of rows within a window partition.""", } @@ -318,14 +310,6 @@ def isnull(col): return Column(sc._jvm.functions.isnull(_to_java_column(col))) -@since(1.4) -def monotonicallyIncreasingId(): - """ - .. note:: Deprecated in 1.6, use monotonically_increasing_id instead. - """ - return monotonically_increasing_id() - - @since(1.6) def monotonically_increasing_id(): """A column that generates monotonically increasing 64-bit integers. @@ -434,14 +418,6 @@ def shiftRightUnsigned(col, numBits): return Column(jc) -@since(1.4) -def sparkPartitionId(): - """ - .. note:: Deprecated in 1.6, use spark_partition_id instead. - """ - return spark_partition_id() - - @since(1.6) def spark_partition_id(): """A column for partition ID of the Spark task. diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index a2771daabe..0b20022b14 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -130,11 +130,9 @@ class DataFrameReader(object): self.schema(schema) self.options(**options) if path is not None: - if type(path) == list: - return self._df( - self._jreader.load(self._sqlContext._sc._jvm.PythonUtils.toSeq(path))) - else: - return self._df(self._jreader.load(path)) + if type(path) != list: + path = [path] + return self._df(self._jreader.load(self._sqlContext._sc._jvm.PythonUtils.toSeq(path))) else: return self._df(self._jreader.load()) @@ -179,7 +177,17 @@ class DataFrameReader(object): elif type(path) == list: return self._df(self._jreader.json(self._sqlContext._sc._jvm.PythonUtils.toSeq(path))) elif isinstance(path, RDD): - return self._df(self._jreader.json(path._jrdd)) + def func(iterator): + for x in iterator: + if not isinstance(x, basestring): + x = unicode(x) + if isinstance(x, unicode): + x = x.encode("utf-8") + yield x + keyed = path.mapPartitions(func) + keyed._bypass_serializer = True + jrdd = keyed._jrdd.map(self._sqlContext._jvm.BytesToString()) + return self._df(self._jreader.json(jrdd)) else: raise TypeError("path can be only string or RDD") diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 9ada96601a..e396cf41f2 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -326,7 +326,7 @@ class SQLTests(ReusedPySparkTestCase): def test_basic_functions(self): rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) - df = self.sqlCtx.jsonRDD(rdd) + df = self.sqlCtx.read.json(rdd) df.count() df.collect() df.schema @@ -345,7 +345,7 @@ class SQLTests(ReusedPySparkTestCase): df.collect() def test_apply_schema_to_row(self): - df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""])) + df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""])) df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema) self.assertEqual(df.collect(), df2.collect()) @@ -408,12 +408,12 @@ class SQLTests(ReusedPySparkTestCase): NestedRow = Row("f1", "f2") nestedRdd1 = self.sc.parallelize([NestedRow([1, 2], {"row1": 1.0}), NestedRow([2, 3], {"row2": 2.0})]) - df = self.sqlCtx.inferSchema(nestedRdd1) + df = self.sqlCtx.createDataFrame(nestedRdd1) self.assertEqual(Row(f1=[1, 2], f2={u'row1': 1.0}), df.collect()[0]) nestedRdd2 = self.sc.parallelize([NestedRow([[1, 2], [2, 3]], [1, 2]), NestedRow([[2, 3], [3, 4]], [2, 3])]) - df = self.sqlCtx.inferSchema(nestedRdd2) + df = self.sqlCtx.createDataFrame(nestedRdd2) self.assertEqual(Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), df.collect()[0]) from collections import namedtuple @@ -421,7 +421,7 @@ class SQLTests(ReusedPySparkTestCase): rdd = self.sc.parallelize([CustomRow(field1=1, field2="row1"), CustomRow(field1=2, field2="row2"), CustomRow(field1=3, field2="row3")]) - df = self.sqlCtx.inferSchema(rdd) + df = self.sqlCtx.createDataFrame(rdd) self.assertEqual(Row(field1=1, field2=u'row1'), df.first()) def test_create_dataframe_from_objects(self): @@ -581,14 +581,14 @@ class SQLTests(ReusedPySparkTestCase): df0 = self.sqlCtx.createDataFrame([row]) output_dir = os.path.join(self.tempdir.name, "labeled_point") df0.write.parquet(output_dir) - df1 = self.sqlCtx.parquetFile(output_dir) + df1 = self.sqlCtx.read.parquet(output_dir) point = df1.head().point self.assertEqual(point, ExamplePoint(1.0, 2.0)) row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0)) df0 = self.sqlCtx.createDataFrame([row]) df0.write.parquet(output_dir, mode='overwrite') - df1 = self.sqlCtx.parquetFile(output_dir) + df1 = self.sqlCtx.read.parquet(output_dir) point = df1.head().point self.assertEqual(point, PythonOnlyPoint(1.0, 2.0)) @@ -763,7 +763,7 @@ class SQLTests(ReusedPySparkTestCase): defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default", "org.apache.spark.sql.parquet") self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json") - actual = self.sqlCtx.load(path=tmpPath) + actual = self.sqlCtx.read.load(path=tmpPath) self.assertEqual(sorted(df.collect()), sorted(actual.collect())) self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName) @@ -796,7 +796,7 @@ class SQLTests(ReusedPySparkTestCase): defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default", "org.apache.spark.sql.parquet") self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json") - actual = self.sqlCtx.load(path=tmpPath) + actual = self.sqlCtx.read.load(path=tmpPath) self.assertEqual(sorted(df.collect()), sorted(actual.collect())) self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName) @@ -805,7 +805,7 @@ class SQLTests(ReusedPySparkTestCase): def test_help_command(self): # Regression test for SPARK-5464 rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) - df = self.sqlCtx.jsonRDD(rdd) + df = self.sqlCtx.read.json(rdd) # render_doc() reproduces the help() exception without printing output pydoc.render_doc(df) pydoc.render_doc(df.foo) @@ -853,8 +853,8 @@ class SQLTests(ReusedPySparkTestCase): # this saving as Parquet caused issues as well. output_dir = os.path.join(self.tempdir.name, "infer_long_type") - df.saveAsParquetFile(output_dir) - df1 = self.sqlCtx.parquetFile(output_dir) + df.write.parquet(output_dir) + df1 = self.sqlCtx.read.parquet(output_dir) self.assertEqual('a', df1.first().f1) self.assertEqual(100000000000000, df1.first().f2) @@ -1205,9 +1205,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase): F.max("key").over(w.rowsBetween(0, 1)), F.min("key").over(w.rowsBetween(0, 1)), F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))), - F.rowNumber().over(w), + F.row_number().over(w), F.rank().over(w), - F.denseRank().over(w), + F.dense_rank().over(w), F.ntile(2).over(w)) rs = sorted(sel.collect()) expected = [ @@ -1227,9 +1227,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase): F.max("key").over(w.rowsBetween(0, 1)), F.min("key").over(w.rowsBetween(0, 1)), F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))), - F.rowNumber().over(w), + F.row_number().over(w), F.rank().over(w), - F.denseRank().over(w), + F.dense_rank().over(w), F.ntile(2).over(w)) rs = sorted(sel.collect()) expected = [ |