aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-03-02 15:26:34 -0800
committerReynold Xin <rxin@databricks.com>2016-03-02 15:26:34 -0800
commit4dd24811d9035c52c5965fca2fc6431aac6963fc (patch)
tree49771acf8775633de5fe38cf9b38c929b70c99c6 /python
parente2780ce8252ded93a695125c0a745d8b93193cca (diff)
downloadspark-4dd24811d9035c52c5965fca2fc6431aac6963fc.tar.gz
spark-4dd24811d9035c52c5965fca2fc6431aac6963fc.tar.bz2
spark-4dd24811d9035c52c5965fca2fc6431aac6963fc.zip
[SPARK-13594][SQL] remove typed operations(e.g. map, flatMap) from python DataFrame
## What changes were proposed in this pull request? Remove `map`, `flatMap`, `mapPartitions` from python DataFrame, to prepare for Dataset API in the future. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #11445 from cloud-fan/python-clean.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/linalg/distributed.py6
-rw-r--r--python/pyspark/mllib/tests.py4
-rw-r--r--python/pyspark/sql/context.py2
-rw-r--r--python/pyspark/sql/dataframe.py42
-rw-r--r--python/pyspark/sql/functions.py4
-rw-r--r--python/pyspark/sql/tests.py24
6 files changed, 22 insertions, 60 deletions
diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py
index e1f022187d..43cb0beef1 100644
--- a/python/pyspark/mllib/linalg/distributed.py
+++ b/python/pyspark/mllib/linalg/distributed.py
@@ -256,7 +256,7 @@ class IndexedRowMatrix(DistributedMatrix):
# on the Scala/Java side. Then we map each Row in the
# DataFrame back to an IndexedRow on this side.
rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model)
- rows = rows_df.map(lambda row: IndexedRow(row[0], row[1]))
+ rows = rows_df.rdd.map(lambda row: IndexedRow(row[0], row[1]))
return rows
def numRows(self):
@@ -475,7 +475,7 @@ class CoordinateMatrix(DistributedMatrix):
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a MatrixEntry on this side.
entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
- entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
+ entries = entries_df.rdd.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
return entries
def numRows(self):
@@ -700,7 +700,7 @@ class BlockMatrix(DistributedMatrix):
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a sub-matrix block on this side.
blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
- blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1]))
+ blocks = blocks_df.rdd.map(lambda row: ((row[0][0], row[0][1]), row[1]))
return blocks
@property
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 25a7c29982..5f515b666c 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -697,7 +697,7 @@ class VectorUDTTests(MLlibTestCase):
schema = df.schema
field = [f for f in schema.fields if f.name == "features"][0]
self.assertEqual(field.dataType, self.udt)
- vectors = df.map(lambda p: p.features).collect()
+ vectors = df.rdd.map(lambda p: p.features).collect()
self.assertEqual(len(vectors), 2)
for v in vectors:
if isinstance(v, SparseVector):
@@ -729,7 +729,7 @@ class MatrixUDTTests(MLlibTestCase):
df = rdd.toDF()
schema = df.schema
self.assertTrue(schema.fields[1].dataType, self.udt)
- matrices = df.map(lambda x: x._2).collect()
+ matrices = df.rdd.map(lambda x: x._2).collect()
self.assertEqual(len(matrices), 2)
for m in matrices:
if isinstance(m, DenseMatrix):
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 87e32c04ea..8e324169d8 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -94,7 +94,7 @@ class SQLContext(object):
... 'from allTypes where b and i > 0').collect()
[Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2, \
dict[s]=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)]
- >>> df.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect()
+ >>> df.rdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect()
[(1, u'string', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])]
"""
self._sc = sparkContext
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 76fbb0c9aa..99d665fafe 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -267,44 +267,6 @@ class DataFrame(object):
self._jdf, num)
return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
- @ignore_unicode_prefix
- @since(1.3)
- def map(self, f):
- """ Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`.
-
- This is a shorthand for ``df.rdd.map()``.
-
- >>> df.map(lambda p: p.name).collect()
- [u'Alice', u'Bob']
- """
- return self.rdd.map(f)
-
- @ignore_unicode_prefix
- @since(1.3)
- def flatMap(self, f):
- """ Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`,
- and then flattening the results.
-
- This is a shorthand for ``df.rdd.flatMap()``.
-
- >>> df.flatMap(lambda p: p.name).collect()
- [u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
- """
- return self.rdd.flatMap(f)
-
- @since(1.3)
- def mapPartitions(self, f, preservesPartitioning=False):
- """Returns a new :class:`RDD` by applying the ``f`` function to each partition.
-
- This is a shorthand for ``df.rdd.mapPartitions()``.
-
- >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
- >>> def f(iterator): yield 1
- >>> rdd.mapPartitions(f).sum()
- 4
- """
- return self.rdd.mapPartitions(f, preservesPartitioning)
-
@since(1.3)
def foreach(self, f):
"""Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`.
@@ -315,7 +277,7 @@ class DataFrame(object):
... print(person.name)
>>> df.foreach(f)
"""
- return self.rdd.foreach(f)
+ self.rdd.foreach(f)
@since(1.3)
def foreachPartition(self, f):
@@ -328,7 +290,7 @@ class DataFrame(object):
... print(person.name)
>>> df.foreachPartition(f)
"""
- return self.rdd.foreachPartition(f)
+ self.rdd.foreachPartition(f)
@since(1.3)
def cache(self):
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index b30cc6799e..92e724fef4 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -616,10 +616,10 @@ def log(arg1, arg2=None):
If there is only one argument, then this takes the natural logarithm of the argument.
- >>> df.select(log(10.0, df.age).alias('ten')).map(lambda l: str(l.ten)[:7]).collect()
+ >>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect()
['0.30102', '0.69897']
- >>> df.select(log(df.age).alias('e')).map(lambda l: str(l.e)[:7]).collect()
+ >>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect()
['0.69314', '1.60943']
"""
sc = SparkContext._active_spark_context
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 90fd769691..b5b848e1db 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -346,7 +346,7 @@ class SQLTests(ReusedPySparkTestCase):
def test_apply_schema_to_row(self):
df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""]))
- df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
+ df2 = self.sqlCtx.createDataFrame(df.rdd.map(lambda x: x), df.schema)
self.assertEqual(df.collect(), df2.collect())
rdd = self.sc.parallelize(range(10)).map(lambda x: Row(a=x))
@@ -382,15 +382,15 @@ class SQLTests(ReusedPySparkTestCase):
self.assertEqual(1, row.l[0].a)
self.assertEqual("2", row.d["key"].d)
- l = df.map(lambda x: x.l).first()
+ l = df.rdd.map(lambda x: x.l).first()
self.assertEqual(1, len(l))
self.assertEqual('s', l[0].b)
- d = df.map(lambda x: x.d).first()
+ d = df.rdd.map(lambda x: x.d).first()
self.assertEqual(1, len(d))
self.assertEqual(1.0, d["key"].c)
- row = df.map(lambda x: x.d["key"]).first()
+ row = df.rdd.map(lambda x: x.d["key"]).first()
self.assertEqual(1.0, row.c)
self.assertEqual("2", row.d)
@@ -399,16 +399,16 @@ class SQLTests(ReusedPySparkTestCase):
Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}, s="")]
rdd = self.sc.parallelize(d)
df = self.sqlCtx.createDataFrame(rdd)
- self.assertEqual([], df.map(lambda r: r.l).first())
- self.assertEqual([None, ""], df.map(lambda r: r.s).collect())
+ self.assertEqual([], df.rdd.map(lambda r: r.l).first())
+ self.assertEqual([None, ""], df.rdd.map(lambda r: r.s).collect())
df.registerTempTable("test")
result = self.sqlCtx.sql("SELECT l[0].a from test where d['key'].d = '2'")
self.assertEqual(1, result.head()[0])
df2 = self.sqlCtx.createDataFrame(rdd, samplingRatio=1.0)
self.assertEqual(df.schema, df2.schema)
- self.assertEqual({}, df2.map(lambda r: r.d).first())
- self.assertEqual([None, ""], df2.map(lambda r: r.s).collect())
+ self.assertEqual({}, df2.rdd.map(lambda r: r.d).first())
+ self.assertEqual([None, ""], df2.rdd.map(lambda r: r.s).collect())
df2.registerTempTable("test2")
result = self.sqlCtx.sql("SELECT l[0].a from test2 where d['key'].d = '2'")
self.assertEqual(1, result.head()[0])
@@ -462,8 +462,8 @@ class SQLTests(ReusedPySparkTestCase):
StructField("list1", ArrayType(ByteType(), False), False),
StructField("null1", DoubleType(), True)])
df = self.sqlCtx.createDataFrame(rdd, schema)
- results = df.map(lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1, x.date1,
- x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
+ results = df.rdd.map(lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1,
+ x.date1, x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
r = (127, -128, -32768, 32767, 2147483647, 1.0, date(2010, 1, 1),
datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)
self.assertEqual(r, results.first())
@@ -570,7 +570,7 @@ class SQLTests(ReusedPySparkTestCase):
from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
df = self.sqlCtx.createDataFrame([row])
- self.assertEqual(1.0, df.map(lambda r: r.point.x).first())
+ self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
udf = UserDefinedFunction(lambda p: p.y, DoubleType())
self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
udf2 = UserDefinedFunction(lambda p: ExamplePoint(p.x + 1, p.y + 1), ExamplePointUDT())
@@ -578,7 +578,7 @@ class SQLTests(ReusedPySparkTestCase):
row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
df = self.sqlCtx.createDataFrame([row])
- self.assertEqual(1.0, df.map(lambda r: r.point.x).first())
+ self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
udf = UserDefinedFunction(lambda p: p.y, DoubleType())
self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
udf2 = UserDefinedFunction(lambda p: PythonOnlyPoint(p.x + 1, p.y + 1), PythonOnlyUDT())