aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-01-04 18:02:38 -0800
committerMichael Armbrust <michael@databricks.com>2016-01-04 18:02:38 -0800
commit77ab49b8575d2ebd678065fa70b0343d532ab9c2 (patch)
treef9c4a990499d1856494f787f8bfc095d68a69735 /python
parentfdfac22d08fc4fdc640843dd93a29e2ce4aee2ef (diff)
downloadspark-77ab49b8575d2ebd678065fa70b0343d532ab9c2.tar.gz
spark-77ab49b8575d2ebd678065fa70b0343d532ab9c2.tar.bz2
spark-77ab49b8575d2ebd678065fa70b0343d532ab9c2.zip
[SPARK-12600][SQL] Remove deprecated methods in Spark SQL
Author: Reynold Xin <rxin@databricks.com> Closes #10559 from rxin/remove-deprecated-sql.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/__init__.py2
-rw-r--r--python/pyspark/sql/__init__.py2
-rw-r--r--python/pyspark/sql/column.py20
-rw-r--r--python/pyspark/sql/context.py111
-rw-r--r--python/pyspark/sql/dataframe.py48
-rw-r--r--python/pyspark/sql/functions.py24
-rw-r--r--python/pyspark/sql/readwriter.py20
-rw-r--r--python/pyspark/sql/tests.py32
8 files changed, 34 insertions, 225 deletions
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 8475dfb1c6..d530723ca9 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -65,7 +65,7 @@ def since(version):
# for back compatibility
-from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row
+from pyspark.sql import SQLContext, HiveContext, Row
__all__ = [
"SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast",
diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py
index 98eaf52866..0b06c8339f 100644
--- a/python/pyspark/sql/__init__.py
+++ b/python/pyspark/sql/__init__.py
@@ -47,7 +47,7 @@ from __future__ import absolute_import
from pyspark.sql.types import Row
from pyspark.sql.context import SQLContext, HiveContext
from pyspark.sql.column import Column
-from pyspark.sql.dataframe import DataFrame, SchemaRDD, DataFrameNaFunctions, DataFrameStatFunctions
+from pyspark.sql.dataframe import DataFrame, DataFrameNaFunctions, DataFrameStatFunctions
from pyspark.sql.group import GroupedData
from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter
from pyspark.sql.window import Window, WindowSpec
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index 81fd4e7826..900def59d2 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -27,8 +27,7 @@ from pyspark.context import SparkContext
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.types import *
-__all__ = ["DataFrame", "Column", "SchemaRDD", "DataFrameNaFunctions",
- "DataFrameStatFunctions"]
+__all__ = ["DataFrame", "Column", "DataFrameNaFunctions", "DataFrameStatFunctions"]
def _create_column_from_literal(literal):
@@ -273,23 +272,6 @@ class Column(object):
__getslice__ = substr
@ignore_unicode_prefix
- @since(1.3)
- def inSet(self, *cols):
- """
- A boolean expression that is evaluated to true if the value of this
- expression is contained by the evaluated values of the arguments.
-
- >>> df[df.name.inSet("Bob", "Mike")].collect()
- [Row(age=5, name=u'Bob')]
- >>> df[df.age.inSet([1, 2, 3])].collect()
- [Row(age=2, name=u'Alice')]
-
- .. note:: Deprecated in 1.5, use :func:`Column.isin` instead.
- """
- warnings.warn("inSet is deprecated. Use isin() instead.")
- return self.isin(*cols)
-
- @ignore_unicode_prefix
@since(1.5)
def isin(self, *cols):
"""
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index ba6915a123..91e27cf16e 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -274,33 +274,6 @@ class SQLContext(object):
schema = rdd.map(_infer_schema).reduce(_merge_type)
return schema
- @ignore_unicode_prefix
- def inferSchema(self, rdd, samplingRatio=None):
- """
- .. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
- """
- warnings.warn("inferSchema is deprecated, please use createDataFrame instead.")
-
- if isinstance(rdd, DataFrame):
- raise TypeError("Cannot apply schema to DataFrame")
-
- return self.createDataFrame(rdd, None, samplingRatio)
-
- @ignore_unicode_prefix
- def applySchema(self, rdd, schema):
- """
- .. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
- """
- warnings.warn("applySchema is deprecated, please use createDataFrame instead")
-
- if isinstance(rdd, DataFrame):
- raise TypeError("Cannot apply schema to DataFrame")
-
- if not isinstance(schema, StructType):
- raise TypeError("schema should be StructType, but got %s" % type(schema))
-
- return self.createDataFrame(rdd, schema)
-
def _createFromRDD(self, rdd, schema, samplingRatio):
"""
Create an RDD for DataFrame from an existing RDD, returns the RDD and schema.
@@ -450,90 +423,6 @@ class SQLContext(object):
"""
self._ssql_ctx.dropTempTable(tableName)
- def parquetFile(self, *paths):
- """Loads a Parquet file, returning the result as a :class:`DataFrame`.
-
- .. note:: Deprecated in 1.4, use :func:`DataFrameReader.parquet` instead.
-
- >>> sqlContext.parquetFile('python/test_support/sql/parquet_partitioned').dtypes
- [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
- """
- warnings.warn("parquetFile is deprecated. Use read.parquet() instead.")
- gateway = self._sc._gateway
- jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths))
- for i in range(0, len(paths)):
- jpaths[i] = paths[i]
- jdf = self._ssql_ctx.parquetFile(jpaths)
- return DataFrame(jdf, self)
-
- def jsonFile(self, path, schema=None, samplingRatio=1.0):
- """Loads a text file storing one JSON object per line as a :class:`DataFrame`.
-
- .. note:: Deprecated in 1.4, use :func:`DataFrameReader.json` instead.
-
- >>> sqlContext.jsonFile('python/test_support/sql/people.json').dtypes
- [('age', 'bigint'), ('name', 'string')]
- """
- warnings.warn("jsonFile is deprecated. Use read.json() instead.")
- if schema is None:
- df = self._ssql_ctx.jsonFile(path, samplingRatio)
- else:
- scala_datatype = self._ssql_ctx.parseDataType(schema.json())
- df = self._ssql_ctx.jsonFile(path, scala_datatype)
- return DataFrame(df, self)
-
- @ignore_unicode_prefix
- @since(1.0)
- def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
- """Loads an RDD storing one JSON object per string as a :class:`DataFrame`.
-
- If the schema is provided, applies the given schema to this JSON dataset.
- Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema.
-
- >>> df1 = sqlContext.jsonRDD(json)
- >>> df1.first()
- Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
-
- >>> df2 = sqlContext.jsonRDD(json, df1.schema)
- >>> df2.first()
- Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
-
- >>> from pyspark.sql.types import *
- >>> schema = StructType([
- ... StructField("field2", StringType()),
- ... StructField("field3",
- ... StructType([StructField("field5", ArrayType(IntegerType()))]))
- ... ])
- >>> df3 = sqlContext.jsonRDD(json, schema)
- >>> df3.first()
- Row(field2=u'row1', field3=Row(field5=None))
- """
-
- def func(iterator):
- for x in iterator:
- if not isinstance(x, basestring):
- x = unicode(x)
- if isinstance(x, unicode):
- x = x.encode("utf-8")
- yield x
- keyed = rdd.mapPartitions(func)
- keyed._bypass_serializer = True
- jrdd = keyed._jrdd.map(self._jvm.BytesToString())
- if schema is None:
- df = self._ssql_ctx.jsonRDD(jrdd.rdd(), samplingRatio)
- else:
- scala_datatype = self._ssql_ctx.parseDataType(schema.json())
- df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
- return DataFrame(df, self)
-
- def load(self, path=None, source=None, schema=None, **options):
- """Returns the dataset in a data source as a :class:`DataFrame`.
-
- .. note:: Deprecated in 1.4, use :func:`DataFrameReader.load` instead.
- """
- warnings.warn("load is deprecated. Use read.load() instead.")
- return self.read.load(path, source, schema, **options)
-
@since(1.3)
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
"""Creates an external table based on the dataset in a data source.
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index ad621df910..a7bc288e38 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -36,7 +36,7 @@ from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
from pyspark.sql.readwriter import DataFrameWriter
from pyspark.sql.types import *
-__all__ = ["DataFrame", "SchemaRDD", "DataFrameNaFunctions", "DataFrameStatFunctions"]
+__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
class DataFrame(object):
@@ -113,14 +113,6 @@ class DataFrame(object):
rdd = self._jdf.toJSON()
return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
- def saveAsParquetFile(self, path):
- """Saves the contents as a Parquet file, preserving the schema.
-
- .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.parquet` instead.
- """
- warnings.warn("saveAsParquetFile is deprecated. Use write.parquet() instead.")
- self._jdf.saveAsParquetFile(path)
-
@since(1.3)
def registerTempTable(self, name):
"""Registers this RDD as a temporary table using the given name.
@@ -135,38 +127,6 @@ class DataFrame(object):
"""
self._jdf.registerTempTable(name)
- def registerAsTable(self, name):
- """
- .. note:: Deprecated in 1.4, use :func:`registerTempTable` instead.
- """
- warnings.warn("Use registerTempTable instead of registerAsTable.")
- self.registerTempTable(name)
-
- def insertInto(self, tableName, overwrite=False):
- """Inserts the contents of this :class:`DataFrame` into the specified table.
-
- .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.insertInto` instead.
- """
- warnings.warn("insertInto is deprecated. Use write.insertInto() instead.")
- self.write.insertInto(tableName, overwrite)
-
- def saveAsTable(self, tableName, source=None, mode="error", **options):
- """Saves the contents of this :class:`DataFrame` to a data source as a table.
-
- .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.saveAsTable` instead.
- """
- warnings.warn("insertInto is deprecated. Use write.saveAsTable() instead.")
- self.write.saveAsTable(tableName, source, mode, **options)
-
- @since(1.3)
- def save(self, path=None, source=None, mode="error", **options):
- """Saves the contents of the :class:`DataFrame` to a data source.
-
- .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.save` instead.
- """
- warnings.warn("insertInto is deprecated. Use write.save() instead.")
- return self.write.save(path, source, mode, **options)
-
@property
@since(1.4)
def write(self):
@@ -1388,12 +1348,6 @@ class DataFrame(object):
drop_duplicates = dropDuplicates
-# Having SchemaRDD for backward compatibility (for docs)
-class SchemaRDD(DataFrame):
- """SchemaRDD is deprecated, please use :class:`DataFrame`.
- """
-
-
def _to_scala_map(sc, jm):
"""
Convert a dict into a JVM Map.
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 25594d79c2..7c15e38458 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -149,12 +149,8 @@ _binary_mathfunctions = {
}
_window_functions = {
- 'rowNumber':
- """.. note:: Deprecated in 1.6, use row_number instead.""",
'row_number':
"""returns a sequential number starting at 1 within a window partition.""",
- 'denseRank':
- """.. note:: Deprecated in 1.6, use dense_rank instead.""",
'dense_rank':
"""returns the rank of rows within a window partition, without any gaps.
@@ -171,13 +167,9 @@ _window_functions = {
place and that the next person came in third.
This is equivalent to the RANK function in SQL.""",
- 'cumeDist':
- """.. note:: Deprecated in 1.6, use cume_dist instead.""",
'cume_dist':
"""returns the cumulative distribution of values within a window partition,
i.e. the fraction of rows that are below the current row.""",
- 'percentRank':
- """.. note:: Deprecated in 1.6, use percent_rank instead.""",
'percent_rank':
"""returns the relative rank (i.e. percentile) of rows within a window partition.""",
}
@@ -318,14 +310,6 @@ def isnull(col):
return Column(sc._jvm.functions.isnull(_to_java_column(col)))
-@since(1.4)
-def monotonicallyIncreasingId():
- """
- .. note:: Deprecated in 1.6, use monotonically_increasing_id instead.
- """
- return monotonically_increasing_id()
-
-
@since(1.6)
def monotonically_increasing_id():
"""A column that generates monotonically increasing 64-bit integers.
@@ -434,14 +418,6 @@ def shiftRightUnsigned(col, numBits):
return Column(jc)
-@since(1.4)
-def sparkPartitionId():
- """
- .. note:: Deprecated in 1.6, use spark_partition_id instead.
- """
- return spark_partition_id()
-
-
@since(1.6)
def spark_partition_id():
"""A column for partition ID of the Spark task.
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index a2771daabe..0b20022b14 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -130,11 +130,9 @@ class DataFrameReader(object):
self.schema(schema)
self.options(**options)
if path is not None:
- if type(path) == list:
- return self._df(
- self._jreader.load(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
- else:
- return self._df(self._jreader.load(path))
+ if type(path) != list:
+ path = [path]
+ return self._df(self._jreader.load(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
else:
return self._df(self._jreader.load())
@@ -179,7 +177,17 @@ class DataFrameReader(object):
elif type(path) == list:
return self._df(self._jreader.json(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
elif isinstance(path, RDD):
- return self._df(self._jreader.json(path._jrdd))
+ def func(iterator):
+ for x in iterator:
+ if not isinstance(x, basestring):
+ x = unicode(x)
+ if isinstance(x, unicode):
+ x = x.encode("utf-8")
+ yield x
+ keyed = path.mapPartitions(func)
+ keyed._bypass_serializer = True
+ jrdd = keyed._jrdd.map(self._sqlContext._jvm.BytesToString())
+ return self._df(self._jreader.json(jrdd))
else:
raise TypeError("path can be only string or RDD")
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 9ada96601a..e396cf41f2 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -326,7 +326,7 @@ class SQLTests(ReusedPySparkTestCase):
def test_basic_functions(self):
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
- df = self.sqlCtx.jsonRDD(rdd)
+ df = self.sqlCtx.read.json(rdd)
df.count()
df.collect()
df.schema
@@ -345,7 +345,7 @@ class SQLTests(ReusedPySparkTestCase):
df.collect()
def test_apply_schema_to_row(self):
- df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""]))
+ df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""]))
df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
self.assertEqual(df.collect(), df2.collect())
@@ -408,12 +408,12 @@ class SQLTests(ReusedPySparkTestCase):
NestedRow = Row("f1", "f2")
nestedRdd1 = self.sc.parallelize([NestedRow([1, 2], {"row1": 1.0}),
NestedRow([2, 3], {"row2": 2.0})])
- df = self.sqlCtx.inferSchema(nestedRdd1)
+ df = self.sqlCtx.createDataFrame(nestedRdd1)
self.assertEqual(Row(f1=[1, 2], f2={u'row1': 1.0}), df.collect()[0])
nestedRdd2 = self.sc.parallelize([NestedRow([[1, 2], [2, 3]], [1, 2]),
NestedRow([[2, 3], [3, 4]], [2, 3])])
- df = self.sqlCtx.inferSchema(nestedRdd2)
+ df = self.sqlCtx.createDataFrame(nestedRdd2)
self.assertEqual(Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), df.collect()[0])
from collections import namedtuple
@@ -421,7 +421,7 @@ class SQLTests(ReusedPySparkTestCase):
rdd = self.sc.parallelize([CustomRow(field1=1, field2="row1"),
CustomRow(field1=2, field2="row2"),
CustomRow(field1=3, field2="row3")])
- df = self.sqlCtx.inferSchema(rdd)
+ df = self.sqlCtx.createDataFrame(rdd)
self.assertEqual(Row(field1=1, field2=u'row1'), df.first())
def test_create_dataframe_from_objects(self):
@@ -581,14 +581,14 @@ class SQLTests(ReusedPySparkTestCase):
df0 = self.sqlCtx.createDataFrame([row])
output_dir = os.path.join(self.tempdir.name, "labeled_point")
df0.write.parquet(output_dir)
- df1 = self.sqlCtx.parquetFile(output_dir)
+ df1 = self.sqlCtx.read.parquet(output_dir)
point = df1.head().point
self.assertEqual(point, ExamplePoint(1.0, 2.0))
row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
df0 = self.sqlCtx.createDataFrame([row])
df0.write.parquet(output_dir, mode='overwrite')
- df1 = self.sqlCtx.parquetFile(output_dir)
+ df1 = self.sqlCtx.read.parquet(output_dir)
point = df1.head().point
self.assertEqual(point, PythonOnlyPoint(1.0, 2.0))
@@ -763,7 +763,7 @@ class SQLTests(ReusedPySparkTestCase):
defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default",
"org.apache.spark.sql.parquet")
self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json")
- actual = self.sqlCtx.load(path=tmpPath)
+ actual = self.sqlCtx.read.load(path=tmpPath)
self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName)
@@ -796,7 +796,7 @@ class SQLTests(ReusedPySparkTestCase):
defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default",
"org.apache.spark.sql.parquet")
self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json")
- actual = self.sqlCtx.load(path=tmpPath)
+ actual = self.sqlCtx.read.load(path=tmpPath)
self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName)
@@ -805,7 +805,7 @@ class SQLTests(ReusedPySparkTestCase):
def test_help_command(self):
# Regression test for SPARK-5464
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
- df = self.sqlCtx.jsonRDD(rdd)
+ df = self.sqlCtx.read.json(rdd)
# render_doc() reproduces the help() exception without printing output
pydoc.render_doc(df)
pydoc.render_doc(df.foo)
@@ -853,8 +853,8 @@ class SQLTests(ReusedPySparkTestCase):
# this saving as Parquet caused issues as well.
output_dir = os.path.join(self.tempdir.name, "infer_long_type")
- df.saveAsParquetFile(output_dir)
- df1 = self.sqlCtx.parquetFile(output_dir)
+ df.write.parquet(output_dir)
+ df1 = self.sqlCtx.read.parquet(output_dir)
self.assertEqual('a', df1.first().f1)
self.assertEqual(100000000000000, df1.first().f2)
@@ -1205,9 +1205,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
F.max("key").over(w.rowsBetween(0, 1)),
F.min("key").over(w.rowsBetween(0, 1)),
F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
- F.rowNumber().over(w),
+ F.row_number().over(w),
F.rank().over(w),
- F.denseRank().over(w),
+ F.dense_rank().over(w),
F.ntile(2).over(w))
rs = sorted(sel.collect())
expected = [
@@ -1227,9 +1227,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
F.max("key").over(w.rowsBetween(0, 1)),
F.min("key").over(w.rowsBetween(0, 1)),
F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
- F.rowNumber().over(w),
+ F.row_number().over(w),
F.rank().over(w),
- F.denseRank().over(w),
+ F.dense_rank().over(w),
F.ntile(2).over(w))
rs = sorted(sel.collect())
expected = [