diff options
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r-- | python/pyspark/sql/tests.py | 32 |
1 files changed, 16 insertions, 16 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 9ada96601a..e396cf41f2 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -326,7 +326,7 @@ class SQLTests(ReusedPySparkTestCase): def test_basic_functions(self): rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) - df = self.sqlCtx.jsonRDD(rdd) + df = self.sqlCtx.read.json(rdd) df.count() df.collect() df.schema @@ -345,7 +345,7 @@ class SQLTests(ReusedPySparkTestCase): df.collect() def test_apply_schema_to_row(self): - df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""])) + df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""])) df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema) self.assertEqual(df.collect(), df2.collect()) @@ -408,12 +408,12 @@ class SQLTests(ReusedPySparkTestCase): NestedRow = Row("f1", "f2") nestedRdd1 = self.sc.parallelize([NestedRow([1, 2], {"row1": 1.0}), NestedRow([2, 3], {"row2": 2.0})]) - df = self.sqlCtx.inferSchema(nestedRdd1) + df = self.sqlCtx.createDataFrame(nestedRdd1) self.assertEqual(Row(f1=[1, 2], f2={u'row1': 1.0}), df.collect()[0]) nestedRdd2 = self.sc.parallelize([NestedRow([[1, 2], [2, 3]], [1, 2]), NestedRow([[2, 3], [3, 4]], [2, 3])]) - df = self.sqlCtx.inferSchema(nestedRdd2) + df = self.sqlCtx.createDataFrame(nestedRdd2) self.assertEqual(Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), df.collect()[0]) from collections import namedtuple @@ -421,7 +421,7 @@ class SQLTests(ReusedPySparkTestCase): rdd = self.sc.parallelize([CustomRow(field1=1, field2="row1"), CustomRow(field1=2, field2="row2"), CustomRow(field1=3, field2="row3")]) - df = self.sqlCtx.inferSchema(rdd) + df = self.sqlCtx.createDataFrame(rdd) self.assertEqual(Row(field1=1, field2=u'row1'), df.first()) def test_create_dataframe_from_objects(self): @@ -581,14 +581,14 @@ class SQLTests(ReusedPySparkTestCase): df0 = self.sqlCtx.createDataFrame([row]) output_dir = os.path.join(self.tempdir.name, "labeled_point") df0.write.parquet(output_dir) - df1 = self.sqlCtx.parquetFile(output_dir) + df1 = self.sqlCtx.read.parquet(output_dir) point = df1.head().point self.assertEqual(point, ExamplePoint(1.0, 2.0)) row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0)) df0 = self.sqlCtx.createDataFrame([row]) df0.write.parquet(output_dir, mode='overwrite') - df1 = self.sqlCtx.parquetFile(output_dir) + df1 = self.sqlCtx.read.parquet(output_dir) point = df1.head().point self.assertEqual(point, PythonOnlyPoint(1.0, 2.0)) @@ -763,7 +763,7 @@ class SQLTests(ReusedPySparkTestCase): defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default", "org.apache.spark.sql.parquet") self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json") - actual = self.sqlCtx.load(path=tmpPath) + actual = self.sqlCtx.read.load(path=tmpPath) self.assertEqual(sorted(df.collect()), sorted(actual.collect())) self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName) @@ -796,7 +796,7 @@ class SQLTests(ReusedPySparkTestCase): defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default", "org.apache.spark.sql.parquet") self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json") - actual = self.sqlCtx.load(path=tmpPath) + actual = self.sqlCtx.read.load(path=tmpPath) self.assertEqual(sorted(df.collect()), sorted(actual.collect())) self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName) @@ -805,7 +805,7 @@ class SQLTests(ReusedPySparkTestCase): def test_help_command(self): # Regression test for SPARK-5464 rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) - df = self.sqlCtx.jsonRDD(rdd) + df = self.sqlCtx.read.json(rdd) # render_doc() reproduces the help() exception without printing output pydoc.render_doc(df) pydoc.render_doc(df.foo) @@ -853,8 +853,8 @@ class SQLTests(ReusedPySparkTestCase): # this saving as Parquet caused issues as well. output_dir = os.path.join(self.tempdir.name, "infer_long_type") - df.saveAsParquetFile(output_dir) - df1 = self.sqlCtx.parquetFile(output_dir) + df.write.parquet(output_dir) + df1 = self.sqlCtx.read.parquet(output_dir) self.assertEqual('a', df1.first().f1) self.assertEqual(100000000000000, df1.first().f2) @@ -1205,9 +1205,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase): F.max("key").over(w.rowsBetween(0, 1)), F.min("key").over(w.rowsBetween(0, 1)), F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))), - F.rowNumber().over(w), + F.row_number().over(w), F.rank().over(w), - F.denseRank().over(w), + F.dense_rank().over(w), F.ntile(2).over(w)) rs = sorted(sel.collect()) expected = [ @@ -1227,9 +1227,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase): F.max("key").over(w.rowsBetween(0, 1)), F.min("key").over(w.rowsBetween(0, 1)), F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))), - F.rowNumber().over(w), + F.row_number().over(w), F.rank().over(w), - F.denseRank().over(w), + F.dense_rank().over(w), F.ntile(2).over(w)) rs = sorted(sel.collect()) expected = [ |