aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r--python/pyspark/sql/context.py2
-rw-r--r--python/pyspark/sql/dataframe.py42
-rw-r--r--python/pyspark/sql/functions.py4
-rw-r--r--python/pyspark/sql/tests.py24
4 files changed, 17 insertions, 55 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 87e32c04ea..8e324169d8 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -94,7 +94,7 @@ class SQLContext(object):
... 'from allTypes where b and i > 0').collect()
[Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2, \
dict[s]=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)]
- >>> df.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect()
+ >>> df.rdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect()
[(1, u'string', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])]
"""
self._sc = sparkContext
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 76fbb0c9aa..99d665fafe 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -267,44 +267,6 @@ class DataFrame(object):
self._jdf, num)
return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
- @ignore_unicode_prefix
- @since(1.3)
- def map(self, f):
- """ Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`.
-
- This is a shorthand for ``df.rdd.map()``.
-
- >>> df.map(lambda p: p.name).collect()
- [u'Alice', u'Bob']
- """
- return self.rdd.map(f)
-
- @ignore_unicode_prefix
- @since(1.3)
- def flatMap(self, f):
- """ Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`,
- and then flattening the results.
-
- This is a shorthand for ``df.rdd.flatMap()``.
-
- >>> df.flatMap(lambda p: p.name).collect()
- [u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
- """
- return self.rdd.flatMap(f)
-
- @since(1.3)
- def mapPartitions(self, f, preservesPartitioning=False):
- """Returns a new :class:`RDD` by applying the ``f`` function to each partition.
-
- This is a shorthand for ``df.rdd.mapPartitions()``.
-
- >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
- >>> def f(iterator): yield 1
- >>> rdd.mapPartitions(f).sum()
- 4
- """
- return self.rdd.mapPartitions(f, preservesPartitioning)
-
@since(1.3)
def foreach(self, f):
"""Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`.
@@ -315,7 +277,7 @@ class DataFrame(object):
... print(person.name)
>>> df.foreach(f)
"""
- return self.rdd.foreach(f)
+ self.rdd.foreach(f)
@since(1.3)
def foreachPartition(self, f):
@@ -328,7 +290,7 @@ class DataFrame(object):
... print(person.name)
>>> df.foreachPartition(f)
"""
- return self.rdd.foreachPartition(f)
+ self.rdd.foreachPartition(f)
@since(1.3)
def cache(self):
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index b30cc6799e..92e724fef4 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -616,10 +616,10 @@ def log(arg1, arg2=None):
If there is only one argument, then this takes the natural logarithm of the argument.
- >>> df.select(log(10.0, df.age).alias('ten')).map(lambda l: str(l.ten)[:7]).collect()
+ >>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect()
['0.30102', '0.69897']
- >>> df.select(log(df.age).alias('e')).map(lambda l: str(l.e)[:7]).collect()
+ >>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect()
['0.69314', '1.60943']
"""
sc = SparkContext._active_spark_context
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 90fd769691..b5b848e1db 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -346,7 +346,7 @@ class SQLTests(ReusedPySparkTestCase):
def test_apply_schema_to_row(self):
df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""]))
- df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
+ df2 = self.sqlCtx.createDataFrame(df.rdd.map(lambda x: x), df.schema)
self.assertEqual(df.collect(), df2.collect())
rdd = self.sc.parallelize(range(10)).map(lambda x: Row(a=x))
@@ -382,15 +382,15 @@ class SQLTests(ReusedPySparkTestCase):
self.assertEqual(1, row.l[0].a)
self.assertEqual("2", row.d["key"].d)
- l = df.map(lambda x: x.l).first()
+ l = df.rdd.map(lambda x: x.l).first()
self.assertEqual(1, len(l))
self.assertEqual('s', l[0].b)
- d = df.map(lambda x: x.d).first()
+ d = df.rdd.map(lambda x: x.d).first()
self.assertEqual(1, len(d))
self.assertEqual(1.0, d["key"].c)
- row = df.map(lambda x: x.d["key"]).first()
+ row = df.rdd.map(lambda x: x.d["key"]).first()
self.assertEqual(1.0, row.c)
self.assertEqual("2", row.d)
@@ -399,16 +399,16 @@ class SQLTests(ReusedPySparkTestCase):
Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}, s="")]
rdd = self.sc.parallelize(d)
df = self.sqlCtx.createDataFrame(rdd)
- self.assertEqual([], df.map(lambda r: r.l).first())
- self.assertEqual([None, ""], df.map(lambda r: r.s).collect())
+ self.assertEqual([], df.rdd.map(lambda r: r.l).first())
+ self.assertEqual([None, ""], df.rdd.map(lambda r: r.s).collect())
df.registerTempTable("test")
result = self.sqlCtx.sql("SELECT l[0].a from test where d['key'].d = '2'")
self.assertEqual(1, result.head()[0])
df2 = self.sqlCtx.createDataFrame(rdd, samplingRatio=1.0)
self.assertEqual(df.schema, df2.schema)
- self.assertEqual({}, df2.map(lambda r: r.d).first())
- self.assertEqual([None, ""], df2.map(lambda r: r.s).collect())
+ self.assertEqual({}, df2.rdd.map(lambda r: r.d).first())
+ self.assertEqual([None, ""], df2.rdd.map(lambda r: r.s).collect())
df2.registerTempTable("test2")
result = self.sqlCtx.sql("SELECT l[0].a from test2 where d['key'].d = '2'")
self.assertEqual(1, result.head()[0])
@@ -462,8 +462,8 @@ class SQLTests(ReusedPySparkTestCase):
StructField("list1", ArrayType(ByteType(), False), False),
StructField("null1", DoubleType(), True)])
df = self.sqlCtx.createDataFrame(rdd, schema)
- results = df.map(lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1, x.date1,
- x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
+ results = df.rdd.map(lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1,
+ x.date1, x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
r = (127, -128, -32768, 32767, 2147483647, 1.0, date(2010, 1, 1),
datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)
self.assertEqual(r, results.first())
@@ -570,7 +570,7 @@ class SQLTests(ReusedPySparkTestCase):
from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
df = self.sqlCtx.createDataFrame([row])
- self.assertEqual(1.0, df.map(lambda r: r.point.x).first())
+ self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
udf = UserDefinedFunction(lambda p: p.y, DoubleType())
self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
udf2 = UserDefinedFunction(lambda p: ExamplePoint(p.x + 1, p.y + 1), ExamplePointUDT())
@@ -578,7 +578,7 @@ class SQLTests(ReusedPySparkTestCase):
row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
df = self.sqlCtx.createDataFrame([row])
- self.assertEqual(1.0, df.map(lambda r: r.point.x).first())
+ self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
udf = UserDefinedFunction(lambda p: p.y, DoubleType())
self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
udf2 = UserDefinedFunction(lambda p: PythonOnlyPoint(p.x + 1, p.y + 1), PythonOnlyUDT())