aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py32
1 files changed, 16 insertions, 16 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 9ada96601a..e396cf41f2 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -326,7 +326,7 @@ class SQLTests(ReusedPySparkTestCase):
def test_basic_functions(self):
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
- df = self.sqlCtx.jsonRDD(rdd)
+ df = self.sqlCtx.read.json(rdd)
df.count()
df.collect()
df.schema
@@ -345,7 +345,7 @@ class SQLTests(ReusedPySparkTestCase):
df.collect()
def test_apply_schema_to_row(self):
- df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""]))
+ df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""]))
df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
self.assertEqual(df.collect(), df2.collect())
@@ -408,12 +408,12 @@ class SQLTests(ReusedPySparkTestCase):
NestedRow = Row("f1", "f2")
nestedRdd1 = self.sc.parallelize([NestedRow([1, 2], {"row1": 1.0}),
NestedRow([2, 3], {"row2": 2.0})])
- df = self.sqlCtx.inferSchema(nestedRdd1)
+ df = self.sqlCtx.createDataFrame(nestedRdd1)
self.assertEqual(Row(f1=[1, 2], f2={u'row1': 1.0}), df.collect()[0])
nestedRdd2 = self.sc.parallelize([NestedRow([[1, 2], [2, 3]], [1, 2]),
NestedRow([[2, 3], [3, 4]], [2, 3])])
- df = self.sqlCtx.inferSchema(nestedRdd2)
+ df = self.sqlCtx.createDataFrame(nestedRdd2)
self.assertEqual(Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), df.collect()[0])
from collections import namedtuple
@@ -421,7 +421,7 @@ class SQLTests(ReusedPySparkTestCase):
rdd = self.sc.parallelize([CustomRow(field1=1, field2="row1"),
CustomRow(field1=2, field2="row2"),
CustomRow(field1=3, field2="row3")])
- df = self.sqlCtx.inferSchema(rdd)
+ df = self.sqlCtx.createDataFrame(rdd)
self.assertEqual(Row(field1=1, field2=u'row1'), df.first())
def test_create_dataframe_from_objects(self):
@@ -581,14 +581,14 @@ class SQLTests(ReusedPySparkTestCase):
df0 = self.sqlCtx.createDataFrame([row])
output_dir = os.path.join(self.tempdir.name, "labeled_point")
df0.write.parquet(output_dir)
- df1 = self.sqlCtx.parquetFile(output_dir)
+ df1 = self.sqlCtx.read.parquet(output_dir)
point = df1.head().point
self.assertEqual(point, ExamplePoint(1.0, 2.0))
row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
df0 = self.sqlCtx.createDataFrame([row])
df0.write.parquet(output_dir, mode='overwrite')
- df1 = self.sqlCtx.parquetFile(output_dir)
+ df1 = self.sqlCtx.read.parquet(output_dir)
point = df1.head().point
self.assertEqual(point, PythonOnlyPoint(1.0, 2.0))
@@ -763,7 +763,7 @@ class SQLTests(ReusedPySparkTestCase):
defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default",
"org.apache.spark.sql.parquet")
self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json")
- actual = self.sqlCtx.load(path=tmpPath)
+ actual = self.sqlCtx.read.load(path=tmpPath)
self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName)
@@ -796,7 +796,7 @@ class SQLTests(ReusedPySparkTestCase):
defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default",
"org.apache.spark.sql.parquet")
self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json")
- actual = self.sqlCtx.load(path=tmpPath)
+ actual = self.sqlCtx.read.load(path=tmpPath)
self.assertEqual(sorted(df.collect()), sorted(actual.collect()))
self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName)
@@ -805,7 +805,7 @@ class SQLTests(ReusedPySparkTestCase):
def test_help_command(self):
# Regression test for SPARK-5464
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
- df = self.sqlCtx.jsonRDD(rdd)
+ df = self.sqlCtx.read.json(rdd)
# render_doc() reproduces the help() exception without printing output
pydoc.render_doc(df)
pydoc.render_doc(df.foo)
@@ -853,8 +853,8 @@ class SQLTests(ReusedPySparkTestCase):
# this saving as Parquet caused issues as well.
output_dir = os.path.join(self.tempdir.name, "infer_long_type")
- df.saveAsParquetFile(output_dir)
- df1 = self.sqlCtx.parquetFile(output_dir)
+ df.write.parquet(output_dir)
+ df1 = self.sqlCtx.read.parquet(output_dir)
self.assertEqual('a', df1.first().f1)
self.assertEqual(100000000000000, df1.first().f2)
@@ -1205,9 +1205,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
F.max("key").over(w.rowsBetween(0, 1)),
F.min("key").over(w.rowsBetween(0, 1)),
F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
- F.rowNumber().over(w),
+ F.row_number().over(w),
F.rank().over(w),
- F.denseRank().over(w),
+ F.dense_rank().over(w),
F.ntile(2).over(w))
rs = sorted(sel.collect())
expected = [
@@ -1227,9 +1227,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
F.max("key").over(w.rowsBetween(0, 1)),
F.min("key").over(w.rowsBetween(0, 1)),
F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
- F.rowNumber().over(w),
+ F.row_number().over(w),
F.rank().over(w),
- F.denseRank().over(w),
+ F.dense_rank().over(w),
F.ntile(2).over(w))
rs = sorted(sel.collect())
expected = [