aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-03-02 15:26:34 -0800
committerReynold Xin <rxin@databricks.com>2016-03-02 15:26:34 -0800
commit4dd24811d9035c52c5965fca2fc6431aac6963fc (patch)
tree49771acf8775633de5fe38cf9b38c929b70c99c6 /python/pyspark/sql/tests.py
parente2780ce8252ded93a695125c0a745d8b93193cca (diff)
downloadspark-4dd24811d9035c52c5965fca2fc6431aac6963fc.tar.gz
spark-4dd24811d9035c52c5965fca2fc6431aac6963fc.tar.bz2
spark-4dd24811d9035c52c5965fca2fc6431aac6963fc.zip
[SPARK-13594][SQL] remove typed operations(e.g. map, flatMap) from python DataFrame
## What changes were proposed in this pull request? Remove `map`, `flatMap`, `mapPartitions` from python DataFrame, to prepare for Dataset API in the future. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #11445 from cloud-fan/python-clean.
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py24
1 files changed, 12 insertions, 12 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 90fd769691..b5b848e1db 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -346,7 +346,7 @@ class SQLTests(ReusedPySparkTestCase):
def test_apply_schema_to_row(self):
df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""]))
- df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
+ df2 = self.sqlCtx.createDataFrame(df.rdd.map(lambda x: x), df.schema)
self.assertEqual(df.collect(), df2.collect())
rdd = self.sc.parallelize(range(10)).map(lambda x: Row(a=x))
@@ -382,15 +382,15 @@ class SQLTests(ReusedPySparkTestCase):
self.assertEqual(1, row.l[0].a)
self.assertEqual("2", row.d["key"].d)
- l = df.map(lambda x: x.l).first()
+ l = df.rdd.map(lambda x: x.l).first()
self.assertEqual(1, len(l))
self.assertEqual('s', l[0].b)
- d = df.map(lambda x: x.d).first()
+ d = df.rdd.map(lambda x: x.d).first()
self.assertEqual(1, len(d))
self.assertEqual(1.0, d["key"].c)
- row = df.map(lambda x: x.d["key"]).first()
+ row = df.rdd.map(lambda x: x.d["key"]).first()
self.assertEqual(1.0, row.c)
self.assertEqual("2", row.d)
@@ -399,16 +399,16 @@ class SQLTests(ReusedPySparkTestCase):
Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}, s="")]
rdd = self.sc.parallelize(d)
df = self.sqlCtx.createDataFrame(rdd)
- self.assertEqual([], df.map(lambda r: r.l).first())
- self.assertEqual([None, ""], df.map(lambda r: r.s).collect())
+ self.assertEqual([], df.rdd.map(lambda r: r.l).first())
+ self.assertEqual([None, ""], df.rdd.map(lambda r: r.s).collect())
df.registerTempTable("test")
result = self.sqlCtx.sql("SELECT l[0].a from test where d['key'].d = '2'")
self.assertEqual(1, result.head()[0])
df2 = self.sqlCtx.createDataFrame(rdd, samplingRatio=1.0)
self.assertEqual(df.schema, df2.schema)
- self.assertEqual({}, df2.map(lambda r: r.d).first())
- self.assertEqual([None, ""], df2.map(lambda r: r.s).collect())
+ self.assertEqual({}, df2.rdd.map(lambda r: r.d).first())
+ self.assertEqual([None, ""], df2.rdd.map(lambda r: r.s).collect())
df2.registerTempTable("test2")
result = self.sqlCtx.sql("SELECT l[0].a from test2 where d['key'].d = '2'")
self.assertEqual(1, result.head()[0])
@@ -462,8 +462,8 @@ class SQLTests(ReusedPySparkTestCase):
StructField("list1", ArrayType(ByteType(), False), False),
StructField("null1", DoubleType(), True)])
df = self.sqlCtx.createDataFrame(rdd, schema)
- results = df.map(lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1, x.date1,
- x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
+ results = df.rdd.map(lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1,
+ x.date1, x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
r = (127, -128, -32768, 32767, 2147483647, 1.0, date(2010, 1, 1),
datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)
self.assertEqual(r, results.first())
@@ -570,7 +570,7 @@ class SQLTests(ReusedPySparkTestCase):
from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
df = self.sqlCtx.createDataFrame([row])
- self.assertEqual(1.0, df.map(lambda r: r.point.x).first())
+ self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
udf = UserDefinedFunction(lambda p: p.y, DoubleType())
self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
udf2 = UserDefinedFunction(lambda p: ExamplePoint(p.x + 1, p.y + 1), ExamplePointUDT())
@@ -578,7 +578,7 @@ class SQLTests(ReusedPySparkTestCase):
row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
df = self.sqlCtx.createDataFrame([row])
- self.assertEqual(1.0, df.map(lambda r: r.point.x).first())
+ self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
udf = UserDefinedFunction(lambda p: p.y, DoubleType())
self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
udf2 = UserDefinedFunction(lambda p: PythonOnlyPoint(p.x + 1, p.y + 1), PythonOnlyUDT())