aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-01-28 19:10:32 -0800
committerReynold Xin <rxin@databricks.com>2015-01-28 19:10:32 -0800
commit5b9760de8dd2dab7cf9a4f5c65869e4ed296a938 (patch)
tree9e6b1f6c6077f3379276de229ccad2e63ff2e2be /python
parent4ee79c71afc5175ba42b5e3d4088fe23db3e45d1 (diff)
downloadspark-5b9760de8dd2dab7cf9a4f5c65869e4ed296a938.tar.gz
spark-5b9760de8dd2dab7cf9a4f5c65869e4ed296a938.tar.bz2
spark-5b9760de8dd2dab7cf9a4f5c65869e4ed296a938.zip
[SPARK-5445][SQL] Made DataFrame dsl usable in Java
Also removed the literal implicit transformation since it is pretty scary for API design. Instead, created a new lit method for creating literals. This doesn't break anything from a compatibility perspective because Literal was added two days ago. Author: Reynold Xin <rxin@databricks.com> Closes #4241 from rxin/df-docupdate and squashes the following commits: c0f4810 [Reynold Xin] Fix Python merge conflict. 094c7d7 [Reynold Xin] Minor style fix. Reset Python tests. 3c89f4a [Reynold Xin] Package. dfe6962 [Reynold Xin] Updated Python aggregate. 5dd4265 [Reynold Xin] Made dsl Java callable. 14b3c27 [Reynold Xin] Fix literal expression for symbols. 68b31cb [Reynold Xin] Literal. 4cfeb78 [Reynold Xin] [SPARK-5097][SQL] Address DataFrame code review feedback.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql.py38
1 files changed, 22 insertions, 16 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index c3a6938f56..fdd8034c98 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -931,7 +931,7 @@ def _parse_schema_abstract(s):
def _infer_schema_type(obj, dataType):
"""
- Fill the dataType with types infered from obj
+ Fill the dataType with types inferred from obj
>>> schema = _parse_schema_abstract("a b c d")
>>> row = (1, 1.0, "str", datetime.date(2014, 10, 10))
@@ -2140,7 +2140,7 @@ class DataFrame(object):
return Column(self._jdf.apply(name))
raise AttributeError
- def As(self, name):
+ def alias(self, name):
""" Alias the current DataFrame """
return DataFrame(getattr(self._jdf, "as")(name), self.sql_ctx)
@@ -2216,7 +2216,7 @@ class DataFrame(object):
"""
return DataFrame(self._jdf.intersect(other._jdf), self.sql_ctx)
- def Except(self, other):
+ def subtract(self, other):
""" Return a new [[DataFrame]] containing rows in this frame
but not in another frame.
@@ -2234,7 +2234,7 @@ class DataFrame(object):
def addColumn(self, colName, col):
""" Return a new [[DataFrame]] by adding a column. """
- return self.select('*', col.As(colName))
+ return self.select('*', col.alias(colName))
def removeColumn(self, colName):
raise NotImplemented
@@ -2342,7 +2342,7 @@ SCALA_METHOD_MAPPINGS = {
def _create_column_from_literal(literal):
sc = SparkContext._active_spark_context
- return sc._jvm.Literal.apply(literal)
+ return sc._jvm.org.apache.spark.sql.api.java.dsl.lit(literal)
def _create_column_from_name(name):
@@ -2371,13 +2371,20 @@ def _unary_op(name):
return _
-def _bin_op(name):
- """ Create a method for given binary operator """
+def _bin_op(name, pass_literal_through=False):
+ """ Create a method for given binary operator
+
+ Keyword arguments:
+ pass_literal_through -- whether to pass literal value directly through to the JVM.
+ """
def _(self, other):
if isinstance(other, Column):
jc = other._jc
else:
- jc = _create_column_from_literal(other)
+ if pass_literal_through:
+ jc = other
+ else:
+ jc = _create_column_from_literal(other)
return Column(getattr(self._jc, _scalaMethod(name))(jc), self._jdf, self.sql_ctx)
return _
@@ -2458,10 +2465,10 @@ class Column(DataFrame):
# __getattr__ = _bin_op("getField")
# string methods
- rlike = _bin_op("rlike")
- like = _bin_op("like")
- startswith = _bin_op("startsWith")
- endswith = _bin_op("endsWith")
+ rlike = _bin_op("rlike", pass_literal_through=True)
+ like = _bin_op("like", pass_literal_through=True)
+ startswith = _bin_op("startsWith", pass_literal_through=True)
+ endswith = _bin_op("endsWith", pass_literal_through=True)
upper = _unary_op("upper")
lower = _unary_op("lower")
@@ -2487,7 +2494,7 @@ class Column(DataFrame):
isNotNull = _unary_op("isNotNull")
# `as` is keyword
- def As(self, alias):
+ def alias(self, alias):
return Column(getattr(self._jsc, "as")(alias), self._jdf, self.sql_ctx)
def cast(self, dataType):
@@ -2501,15 +2508,14 @@ class Column(DataFrame):
def _aggregate_func(name):
- """ Creat a function for aggregator by name"""
+ """ Create a function for aggregator by name"""
def _(col):
sc = SparkContext._active_spark_context
if isinstance(col, Column):
jcol = col._jc
else:
jcol = _create_column_from_name(col)
- # FIXME: can not access dsl.min/max ...
- jc = getattr(sc._jvm.org.apache.spark.sql.dsl(), name)(jcol)
+ jc = getattr(sc._jvm.org.apache.spark.sql.api.java.dsl, name)(jcol)
return Column(jc)
return staticmethod(_)