From 4dd24811d9035c52c5965fca2fc6431aac6963fc Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 2 Mar 2016 15:26:34 -0800 Subject: [SPARK-13594][SQL] remove typed operations(e.g. map, flatMap) from python DataFrame ## What changes were proposed in this pull request? Remove `map`, `flatMap`, `mapPartitions` from python DataFrame, to prepare for Dataset API in the future. ## How was this patch tested? existing tests Author: Wenchen Fan Closes #11445 from cloud-fan/python-clean. --- python/pyspark/sql/context.py | 2 +- python/pyspark/sql/dataframe.py | 42 ++--------------------------------------- python/pyspark/sql/functions.py | 4 ++-- python/pyspark/sql/tests.py | 24 +++++++++++------------ 4 files changed, 17 insertions(+), 55 deletions(-) (limited to 'python/pyspark/sql') diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 87e32c04ea..8e324169d8 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -94,7 +94,7 @@ class SQLContext(object): ... 'from allTypes where b and i > 0').collect() [Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2, \ dict[s]=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)] - >>> df.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect() + >>> df.rdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect() [(1, u'string', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])] """ self._sc = sparkContext diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 76fbb0c9aa..99d665fafe 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -267,44 +267,6 @@ class DataFrame(object): self._jdf, num) return list(_load_from_socket(port, BatchedSerializer(PickleSerializer()))) - @ignore_unicode_prefix - @since(1.3) - def map(self, f): - """ Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`. - - This is a shorthand for ``df.rdd.map()``. - - >>> df.map(lambda p: p.name).collect() - [u'Alice', u'Bob'] - """ - return self.rdd.map(f) - - @ignore_unicode_prefix - @since(1.3) - def flatMap(self, f): - """ Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`, - and then flattening the results. - - This is a shorthand for ``df.rdd.flatMap()``. - - >>> df.flatMap(lambda p: p.name).collect() - [u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b'] - """ - return self.rdd.flatMap(f) - - @since(1.3) - def mapPartitions(self, f, preservesPartitioning=False): - """Returns a new :class:`RDD` by applying the ``f`` function to each partition. - - This is a shorthand for ``df.rdd.mapPartitions()``. - - >>> rdd = sc.parallelize([1, 2, 3, 4], 4) - >>> def f(iterator): yield 1 - >>> rdd.mapPartitions(f).sum() - 4 - """ - return self.rdd.mapPartitions(f, preservesPartitioning) - @since(1.3) def foreach(self, f): """Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`. @@ -315,7 +277,7 @@ class DataFrame(object): ... print(person.name) >>> df.foreach(f) """ - return self.rdd.foreach(f) + self.rdd.foreach(f) @since(1.3) def foreachPartition(self, f): @@ -328,7 +290,7 @@ class DataFrame(object): ... print(person.name) >>> df.foreachPartition(f) """ - return self.rdd.foreachPartition(f) + self.rdd.foreachPartition(f) @since(1.3) def cache(self): diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index b30cc6799e..92e724fef4 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -616,10 +616,10 @@ def log(arg1, arg2=None): If there is only one argument, then this takes the natural logarithm of the argument. - >>> df.select(log(10.0, df.age).alias('ten')).map(lambda l: str(l.ten)[:7]).collect() + >>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect() ['0.30102', '0.69897'] - >>> df.select(log(df.age).alias('e')).map(lambda l: str(l.e)[:7]).collect() + >>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect() ['0.69314', '1.60943'] """ sc = SparkContext._active_spark_context diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 90fd769691..b5b848e1db 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -346,7 +346,7 @@ class SQLTests(ReusedPySparkTestCase): def test_apply_schema_to_row(self): df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""])) - df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema) + df2 = self.sqlCtx.createDataFrame(df.rdd.map(lambda x: x), df.schema) self.assertEqual(df.collect(), df2.collect()) rdd = self.sc.parallelize(range(10)).map(lambda x: Row(a=x)) @@ -382,15 +382,15 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual(1, row.l[0].a) self.assertEqual("2", row.d["key"].d) - l = df.map(lambda x: x.l).first() + l = df.rdd.map(lambda x: x.l).first() self.assertEqual(1, len(l)) self.assertEqual('s', l[0].b) - d = df.map(lambda x: x.d).first() + d = df.rdd.map(lambda x: x.d).first() self.assertEqual(1, len(d)) self.assertEqual(1.0, d["key"].c) - row = df.map(lambda x: x.d["key"]).first() + row = df.rdd.map(lambda x: x.d["key"]).first() self.assertEqual(1.0, row.c) self.assertEqual("2", row.d) @@ -399,16 +399,16 @@ class SQLTests(ReusedPySparkTestCase): Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}, s="")] rdd = self.sc.parallelize(d) df = self.sqlCtx.createDataFrame(rdd) - self.assertEqual([], df.map(lambda r: r.l).first()) - self.assertEqual([None, ""], df.map(lambda r: r.s).collect()) + self.assertEqual([], df.rdd.map(lambda r: r.l).first()) + self.assertEqual([None, ""], df.rdd.map(lambda r: r.s).collect()) df.registerTempTable("test") result = self.sqlCtx.sql("SELECT l[0].a from test where d['key'].d = '2'") self.assertEqual(1, result.head()[0]) df2 = self.sqlCtx.createDataFrame(rdd, samplingRatio=1.0) self.assertEqual(df.schema, df2.schema) - self.assertEqual({}, df2.map(lambda r: r.d).first()) - self.assertEqual([None, ""], df2.map(lambda r: r.s).collect()) + self.assertEqual({}, df2.rdd.map(lambda r: r.d).first()) + self.assertEqual([None, ""], df2.rdd.map(lambda r: r.s).collect()) df2.registerTempTable("test2") result = self.sqlCtx.sql("SELECT l[0].a from test2 where d['key'].d = '2'") self.assertEqual(1, result.head()[0]) @@ -462,8 +462,8 @@ class SQLTests(ReusedPySparkTestCase): StructField("list1", ArrayType(ByteType(), False), False), StructField("null1", DoubleType(), True)]) df = self.sqlCtx.createDataFrame(rdd, schema) - results = df.map(lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1, x.date1, - x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1)) + results = df.rdd.map(lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1, + x.date1, x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1)) r = (127, -128, -32768, 32767, 2147483647, 1.0, date(2010, 1, 1), datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None) self.assertEqual(r, results.first()) @@ -570,7 +570,7 @@ class SQLTests(ReusedPySparkTestCase): from pyspark.sql.tests import ExamplePoint, ExamplePointUDT row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) df = self.sqlCtx.createDataFrame([row]) - self.assertEqual(1.0, df.map(lambda r: r.point.x).first()) + self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first()) udf = UserDefinedFunction(lambda p: p.y, DoubleType()) self.assertEqual(2.0, df.select(udf(df.point)).first()[0]) udf2 = UserDefinedFunction(lambda p: ExamplePoint(p.x + 1, p.y + 1), ExamplePointUDT()) @@ -578,7 +578,7 @@ class SQLTests(ReusedPySparkTestCase): row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0)) df = self.sqlCtx.createDataFrame([row]) - self.assertEqual(1.0, df.map(lambda r: r.point.x).first()) + self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first()) udf = UserDefinedFunction(lambda p: p.y, DoubleType()) self.assertEqual(2.0, df.select(udf(df.point)).first()[0]) udf2 = UserDefinedFunction(lambda p: PythonOnlyPoint(p.x + 1, p.y + 1), PythonOnlyUDT()) -- cgit v1.2.3