aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/dataframe.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/dataframe.py')
-rw-r--r--python/pyspark/sql/dataframe.py63
1 files changed, 46 insertions, 17 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index f2c3b74a18..d76504f986 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -16,14 +16,19 @@
#
import sys
-import itertools
import warnings
import random
+if sys.version >= '3':
+ basestring = unicode = str
+ long = int
+else:
+ from itertools import imap as map
+
from py4j.java_collections import ListConverter, MapConverter
from pyspark.context import SparkContext
-from pyspark.rdd import RDD, _load_from_socket
+from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
@@ -65,19 +70,20 @@ class DataFrame(object):
self._sc = sql_ctx and sql_ctx._sc
self.is_cached = False
self._schema = None # initialized lazily
+ self._lazy_rdd = None
@property
def rdd(self):
"""Returns the content as an :class:`pyspark.RDD` of :class:`Row`.
"""
- if not hasattr(self, '_lazy_rdd'):
+ if self._lazy_rdd is None:
jrdd = self._jdf.javaToPython()
rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
schema = self.schema
def applySchema(it):
cls = _create_cls(schema)
- return itertools.imap(cls, it)
+ return map(cls, it)
self._lazy_rdd = rdd.mapPartitions(applySchema)
@@ -89,13 +95,14 @@ class DataFrame(object):
"""
return DataFrameNaFunctions(self)
- def toJSON(self, use_unicode=False):
+ @ignore_unicode_prefix
+ def toJSON(self, use_unicode=True):
"""Converts a :class:`DataFrame` into a :class:`RDD` of string.
Each row is turned into a JSON document as one element in the returned RDD.
>>> df.toJSON().first()
- '{"age":2,"name":"Alice"}'
+ u'{"age":2,"name":"Alice"}'
"""
rdd = self._jdf.toJSON()
return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
@@ -228,7 +235,7 @@ class DataFrame(object):
|-- name: string (nullable = true)
<BLANKLINE>
"""
- print (self._jdf.schema().treeString())
+ print(self._jdf.schema().treeString())
def explain(self, extended=False):
"""Prints the (logical and physical) plans to the console for debugging purpose.
@@ -250,9 +257,9 @@ class DataFrame(object):
== RDD ==
"""
if extended:
- print self._jdf.queryExecution().toString()
+ print(self._jdf.queryExecution().toString())
else:
- print self._jdf.queryExecution().executedPlan().toString()
+ print(self._jdf.queryExecution().executedPlan().toString())
def isLocal(self):
"""Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
@@ -270,7 +277,7 @@ class DataFrame(object):
2 Alice
5 Bob
"""
- print self._jdf.showString(n).encode('utf8', 'ignore')
+ print(self._jdf.showString(n))
def __repr__(self):
return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
@@ -279,10 +286,11 @@ class DataFrame(object):
"""Returns the number of rows in this :class:`DataFrame`.
>>> df.count()
- 2L
+ 2
"""
- return self._jdf.count()
+ return int(self._jdf.count())
+ @ignore_unicode_prefix
def collect(self):
"""Returns all the records as a list of :class:`Row`.
@@ -295,6 +303,7 @@ class DataFrame(object):
cls = _create_cls(self.schema)
return [cls(r) for r in rs]
+ @ignore_unicode_prefix
def limit(self, num):
"""Limits the result count to the number specified.
@@ -306,6 +315,7 @@ class DataFrame(object):
jdf = self._jdf.limit(num)
return DataFrame(jdf, self.sql_ctx)
+ @ignore_unicode_prefix
def take(self, num):
"""Returns the first ``num`` rows as a :class:`list` of :class:`Row`.
@@ -314,6 +324,7 @@ class DataFrame(object):
"""
return self.limit(num).collect()
+ @ignore_unicode_prefix
def map(self, f):
""" Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`.
@@ -324,6 +335,7 @@ class DataFrame(object):
"""
return self.rdd.map(f)
+ @ignore_unicode_prefix
def flatMap(self, f):
""" Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`,
and then flattening the results.
@@ -353,7 +365,7 @@ class DataFrame(object):
This is a shorthand for ``df.rdd.foreach()``.
>>> def f(person):
- ... print person.name
+ ... print(person.name)
>>> df.foreach(f)
"""
return self.rdd.foreach(f)
@@ -365,7 +377,7 @@ class DataFrame(object):
>>> def f(people):
... for person in people:
- ... print person.name
+ ... print(person.name)
>>> df.foreachPartition(f)
"""
return self.rdd.foreachPartition(f)
@@ -412,7 +424,7 @@ class DataFrame(object):
"""Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`.
>>> df.distinct().count()
- 2L
+ 2
"""
return DataFrame(self._jdf.distinct(), self.sql_ctx)
@@ -420,10 +432,10 @@ class DataFrame(object):
"""Returns a sampled subset of this :class:`DataFrame`.
>>> df.sample(False, 0.5, 97).count()
- 1L
+ 1
"""
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
- seed = seed if seed is not None else random.randint(0, sys.maxint)
+ seed = seed if seed is not None else random.randint(0, sys.maxsize)
rdd = self._jdf.sample(withReplacement, fraction, long(seed))
return DataFrame(rdd, self.sql_ctx)
@@ -437,6 +449,7 @@ class DataFrame(object):
return [(str(f.name), f.dataType.simpleString()) for f in self.schema.fields]
@property
+ @ignore_unicode_prefix
def columns(self):
"""Returns all column names as a list.
@@ -445,6 +458,7 @@ class DataFrame(object):
"""
return [f.name for f in self.schema.fields]
+ @ignore_unicode_prefix
def join(self, other, joinExprs=None, joinType=None):
"""Joins with another :class:`DataFrame`, using the given join expression.
@@ -470,6 +484,7 @@ class DataFrame(object):
jdf = self._jdf.join(other._jdf, joinExprs._jc, joinType)
return DataFrame(jdf, self.sql_ctx)
+ @ignore_unicode_prefix
def sort(self, *cols):
"""Returns a new :class:`DataFrame` sorted by the specified column(s).
@@ -513,6 +528,7 @@ class DataFrame(object):
jdf = self._jdf.describe(self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols))
return DataFrame(jdf, self.sql_ctx)
+ @ignore_unicode_prefix
def head(self, n=None):
"""
Returns the first ``n`` rows as a list of :class:`Row`,
@@ -528,6 +544,7 @@ class DataFrame(object):
return rs[0] if rs else None
return self.take(n)
+ @ignore_unicode_prefix
def first(self):
"""Returns the first row as a :class:`Row`.
@@ -536,6 +553,7 @@ class DataFrame(object):
"""
return self.head()
+ @ignore_unicode_prefix
def __getitem__(self, item):
"""Returns the column as a :class:`Column`.
@@ -567,6 +585,7 @@ class DataFrame(object):
jc = self._jdf.apply(name)
return Column(jc)
+ @ignore_unicode_prefix
def select(self, *cols):
"""Projects a set of expressions and returns a new :class:`DataFrame`.
@@ -598,6 +617,7 @@ class DataFrame(object):
jdf = self._jdf.selectExpr(self._sc._jvm.PythonUtils.toSeq(jexpr))
return DataFrame(jdf, self.sql_ctx)
+ @ignore_unicode_prefix
def filter(self, condition):
"""Filters rows using the given condition.
@@ -626,6 +646,7 @@ class DataFrame(object):
where = filter
+ @ignore_unicode_prefix
def groupBy(self, *cols):
"""Groups the :class:`DataFrame` using the specified columns,
so we can run aggregation on them. See :class:`GroupedData`
@@ -775,6 +796,7 @@ class DataFrame(object):
cols = self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols)
return DataFrame(self._jdf.na().fill(value, cols), self.sql_ctx)
+ @ignore_unicode_prefix
def withColumn(self, colName, col):
"""Returns a new :class:`DataFrame` by adding a column.
@@ -786,6 +808,7 @@ class DataFrame(object):
"""
return self.select('*', col.alias(colName))
+ @ignore_unicode_prefix
def withColumnRenamed(self, existing, new):
"""REturns a new :class:`DataFrame` by renaming an existing column.
@@ -852,6 +875,7 @@ class GroupedData(object):
self._jdf = jdf
self.sql_ctx = sql_ctx
+ @ignore_unicode_prefix
def agg(self, *exprs):
"""Compute aggregates and returns the result as a :class:`DataFrame`.
@@ -1041,11 +1065,13 @@ class Column(object):
__sub__ = _bin_op("minus")
__mul__ = _bin_op("multiply")
__div__ = _bin_op("divide")
+ __truediv__ = _bin_op("divide")
__mod__ = _bin_op("mod")
__radd__ = _bin_op("plus")
__rsub__ = _reverse_op("minus")
__rmul__ = _bin_op("multiply")
__rdiv__ = _reverse_op("divide")
+ __rtruediv__ = _reverse_op("divide")
__rmod__ = _reverse_op("mod")
# logistic operators
@@ -1075,6 +1101,7 @@ class Column(object):
startswith = _bin_op("startsWith")
endswith = _bin_op("endsWith")
+ @ignore_unicode_prefix
def substr(self, startPos, length):
"""
Return a :class:`Column` which is a substring of the column
@@ -1097,6 +1124,7 @@ class Column(object):
__getslice__ = substr
+ @ignore_unicode_prefix
def inSet(self, *cols):
""" A boolean expression that is evaluated to true if the value of this
expression is contained by the evaluated values of the arguments.
@@ -1131,6 +1159,7 @@ class Column(object):
"""
return Column(getattr(self._jc, "as")(alias))
+ @ignore_unicode_prefix
def cast(self, dataType):
""" Convert the column into type `dataType`