diff options
Diffstat (limited to 'python/pyspark/sql/dataframe.py')
-rw-r--r-- | python/pyspark/sql/dataframe.py | 63 |
1 files changed, 46 insertions, 17 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index f2c3b74a18..d76504f986 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -16,14 +16,19 @@ # import sys -import itertools import warnings import random +if sys.version >= '3': + basestring = unicode = str + long = int +else: + from itertools import imap as map + from py4j.java_collections import ListConverter, MapConverter from pyspark.context import SparkContext -from pyspark.rdd import RDD, _load_from_socket +from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync @@ -65,19 +70,20 @@ class DataFrame(object): self._sc = sql_ctx and sql_ctx._sc self.is_cached = False self._schema = None # initialized lazily + self._lazy_rdd = None @property def rdd(self): """Returns the content as an :class:`pyspark.RDD` of :class:`Row`. """ - if not hasattr(self, '_lazy_rdd'): + if self._lazy_rdd is None: jrdd = self._jdf.javaToPython() rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) schema = self.schema def applySchema(it): cls = _create_cls(schema) - return itertools.imap(cls, it) + return map(cls, it) self._lazy_rdd = rdd.mapPartitions(applySchema) @@ -89,13 +95,14 @@ class DataFrame(object): """ return DataFrameNaFunctions(self) - def toJSON(self, use_unicode=False): + @ignore_unicode_prefix + def toJSON(self, use_unicode=True): """Converts a :class:`DataFrame` into a :class:`RDD` of string. Each row is turned into a JSON document as one element in the returned RDD. >>> df.toJSON().first() - '{"age":2,"name":"Alice"}' + u'{"age":2,"name":"Alice"}' """ rdd = self._jdf.toJSON() return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode)) @@ -228,7 +235,7 @@ class DataFrame(object): |-- name: string (nullable = true) <BLANKLINE> """ - print (self._jdf.schema().treeString()) + print(self._jdf.schema().treeString()) def explain(self, extended=False): """Prints the (logical and physical) plans to the console for debugging purpose. @@ -250,9 +257,9 @@ class DataFrame(object): == RDD == """ if extended: - print self._jdf.queryExecution().toString() + print(self._jdf.queryExecution().toString()) else: - print self._jdf.queryExecution().executedPlan().toString() + print(self._jdf.queryExecution().executedPlan().toString()) def isLocal(self): """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally @@ -270,7 +277,7 @@ class DataFrame(object): 2 Alice 5 Bob """ - print self._jdf.showString(n).encode('utf8', 'ignore') + print(self._jdf.showString(n)) def __repr__(self): return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) @@ -279,10 +286,11 @@ class DataFrame(object): """Returns the number of rows in this :class:`DataFrame`. >>> df.count() - 2L + 2 """ - return self._jdf.count() + return int(self._jdf.count()) + @ignore_unicode_prefix def collect(self): """Returns all the records as a list of :class:`Row`. @@ -295,6 +303,7 @@ class DataFrame(object): cls = _create_cls(self.schema) return [cls(r) for r in rs] + @ignore_unicode_prefix def limit(self, num): """Limits the result count to the number specified. @@ -306,6 +315,7 @@ class DataFrame(object): jdf = self._jdf.limit(num) return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix def take(self, num): """Returns the first ``num`` rows as a :class:`list` of :class:`Row`. @@ -314,6 +324,7 @@ class DataFrame(object): """ return self.limit(num).collect() + @ignore_unicode_prefix def map(self, f): """ Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`. @@ -324,6 +335,7 @@ class DataFrame(object): """ return self.rdd.map(f) + @ignore_unicode_prefix def flatMap(self, f): """ Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`, and then flattening the results. @@ -353,7 +365,7 @@ class DataFrame(object): This is a shorthand for ``df.rdd.foreach()``. >>> def f(person): - ... print person.name + ... print(person.name) >>> df.foreach(f) """ return self.rdd.foreach(f) @@ -365,7 +377,7 @@ class DataFrame(object): >>> def f(people): ... for person in people: - ... print person.name + ... print(person.name) >>> df.foreachPartition(f) """ return self.rdd.foreachPartition(f) @@ -412,7 +424,7 @@ class DataFrame(object): """Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`. >>> df.distinct().count() - 2L + 2 """ return DataFrame(self._jdf.distinct(), self.sql_ctx) @@ -420,10 +432,10 @@ class DataFrame(object): """Returns a sampled subset of this :class:`DataFrame`. >>> df.sample(False, 0.5, 97).count() - 1L + 1 """ assert fraction >= 0.0, "Negative fraction value: %s" % fraction - seed = seed if seed is not None else random.randint(0, sys.maxint) + seed = seed if seed is not None else random.randint(0, sys.maxsize) rdd = self._jdf.sample(withReplacement, fraction, long(seed)) return DataFrame(rdd, self.sql_ctx) @@ -437,6 +449,7 @@ class DataFrame(object): return [(str(f.name), f.dataType.simpleString()) for f in self.schema.fields] @property + @ignore_unicode_prefix def columns(self): """Returns all column names as a list. @@ -445,6 +458,7 @@ class DataFrame(object): """ return [f.name for f in self.schema.fields] + @ignore_unicode_prefix def join(self, other, joinExprs=None, joinType=None): """Joins with another :class:`DataFrame`, using the given join expression. @@ -470,6 +484,7 @@ class DataFrame(object): jdf = self._jdf.join(other._jdf, joinExprs._jc, joinType) return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix def sort(self, *cols): """Returns a new :class:`DataFrame` sorted by the specified column(s). @@ -513,6 +528,7 @@ class DataFrame(object): jdf = self._jdf.describe(self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols)) return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix def head(self, n=None): """ Returns the first ``n`` rows as a list of :class:`Row`, @@ -528,6 +544,7 @@ class DataFrame(object): return rs[0] if rs else None return self.take(n) + @ignore_unicode_prefix def first(self): """Returns the first row as a :class:`Row`. @@ -536,6 +553,7 @@ class DataFrame(object): """ return self.head() + @ignore_unicode_prefix def __getitem__(self, item): """Returns the column as a :class:`Column`. @@ -567,6 +585,7 @@ class DataFrame(object): jc = self._jdf.apply(name) return Column(jc) + @ignore_unicode_prefix def select(self, *cols): """Projects a set of expressions and returns a new :class:`DataFrame`. @@ -598,6 +617,7 @@ class DataFrame(object): jdf = self._jdf.selectExpr(self._sc._jvm.PythonUtils.toSeq(jexpr)) return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix def filter(self, condition): """Filters rows using the given condition. @@ -626,6 +646,7 @@ class DataFrame(object): where = filter + @ignore_unicode_prefix def groupBy(self, *cols): """Groups the :class:`DataFrame` using the specified columns, so we can run aggregation on them. See :class:`GroupedData` @@ -775,6 +796,7 @@ class DataFrame(object): cols = self.sql_ctx._sc._jvm.PythonUtils.toSeq(cols) return DataFrame(self._jdf.na().fill(value, cols), self.sql_ctx) + @ignore_unicode_prefix def withColumn(self, colName, col): """Returns a new :class:`DataFrame` by adding a column. @@ -786,6 +808,7 @@ class DataFrame(object): """ return self.select('*', col.alias(colName)) + @ignore_unicode_prefix def withColumnRenamed(self, existing, new): """REturns a new :class:`DataFrame` by renaming an existing column. @@ -852,6 +875,7 @@ class GroupedData(object): self._jdf = jdf self.sql_ctx = sql_ctx + @ignore_unicode_prefix def agg(self, *exprs): """Compute aggregates and returns the result as a :class:`DataFrame`. @@ -1041,11 +1065,13 @@ class Column(object): __sub__ = _bin_op("minus") __mul__ = _bin_op("multiply") __div__ = _bin_op("divide") + __truediv__ = _bin_op("divide") __mod__ = _bin_op("mod") __radd__ = _bin_op("plus") __rsub__ = _reverse_op("minus") __rmul__ = _bin_op("multiply") __rdiv__ = _reverse_op("divide") + __rtruediv__ = _reverse_op("divide") __rmod__ = _reverse_op("mod") # logistic operators @@ -1075,6 +1101,7 @@ class Column(object): startswith = _bin_op("startsWith") endswith = _bin_op("endsWith") + @ignore_unicode_prefix def substr(self, startPos, length): """ Return a :class:`Column` which is a substring of the column @@ -1097,6 +1124,7 @@ class Column(object): __getslice__ = substr + @ignore_unicode_prefix def inSet(self, *cols): """ A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments. @@ -1131,6 +1159,7 @@ class Column(object): """ return Column(getattr(self._jc, "as")(alias)) + @ignore_unicode_prefix def cast(self, dataType): """ Convert the column into type `dataType` |