aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-04-07 10:45:31 -0700
committerReynold Xin <rxin@apache.org>2014-04-07 10:45:31 -0700
commit83f2a2f14e4145a04672e42216d43100a66b1fc2 (patch)
tree07aaf15a57154de78128de3a5942ec8f4dfdb31b /sql/core
parenta3c51c6ea2320efdeb2a6a5c1cd11d714f8994aa (diff)
downloadspark-83f2a2f14e4145a04672e42216d43100a66b1fc2.tar.gz
spark-83f2a2f14e4145a04672e42216d43100a66b1fc2.tar.bz2
spark-83f2a2f14e4145a04672e42216d43100a66b1fc2.zip
[sql] Rename Expression.apply to eval for better readability.
Also used this opportunity to add a bunch of override's and made some members private. Author: Reynold Xin <rxin@apache.org> Closes #340 from rxin/eval and squashes the following commits: a7c7ca7 [Reynold Xin] Fixed conflicts in merge. 9069de6 [Reynold Xin] Merge branch 'master' into eval 3ccc313 [Reynold Xin] Merge branch 'master' into eval 1a47e10 [Reynold Xin] Renamed apply to eval for generators and added a bunch of override's. ea061de [Reynold Xin] Rename Expression.apply to eval for better readability.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TgfSuite.scala6
4 files changed, 10 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index e902e6ced5..cff4887936 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -36,10 +36,10 @@ case class Generate(
child: SparkPlan)
extends UnaryNode {
- def output =
+ override def output =
if (join) child.output ++ generator.output else generator.output
- def execute() = {
+ override def execute() = {
if (join) {
child.execute().mapPartitions { iter =>
val nullValues = Seq.fill(generator.output.size)(Literal(null))
@@ -52,7 +52,7 @@ case class Generate(
val joinedRow = new JoinedRow
iter.flatMap {row =>
- val outputRows = generator(row)
+ val outputRows = generator.eval(row)
if (outer && outputRows.isEmpty) {
outerProjection(row) :: Nil
} else {
@@ -61,7 +61,7 @@ case class Generate(
}
}
} else {
- child.execute().mapPartitions(iter => iter.flatMap(generator))
+ child.execute().mapPartitions(iter => iter.flatMap(row => generator.eval(row)))
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala
index 2a4f7b5670..0890faa33b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala
@@ -144,7 +144,7 @@ case class Aggregate(
var i = 0
while (i < buffer.length) {
- aggregateResults(i) = buffer(i).apply(EmptyRow)
+ aggregateResults(i) = buffer(i).eval(EmptyRow)
i += 1
}
@@ -190,7 +190,7 @@ case class Aggregate(
while (i < currentBuffer.length) {
// Evaluating an aggregate buffer returns the result. No row is required since we
// already added all rows in the group using update.
- aggregateResults(i) = currentBuffer(i).apply(EmptyRow)
+ aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
i += 1
}
resultProjection(joinedRow(aggregateResults, currentGroup))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 524e5022ee..ab2e624637 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -41,7 +41,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
override def output = child.output
override def execute() = child.execute().mapPartitions { iter =>
- iter.filter(condition.apply(_).asInstanceOf[Boolean])
+ iter.filter(condition.eval(_).asInstanceOf[Boolean])
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TgfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TgfSuite.scala
index ca5c8b8eb6..e55648b8ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TgfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TgfSuite.scala
@@ -39,9 +39,9 @@ case class ExampleTGF(input: Seq[Attribute] = Seq('name, 'age)) extends Generato
val Seq(nameAttr, ageAttr) = input
- override def apply(input: Row): TraversableOnce[Row] = {
- val name = nameAttr.apply(input)
- val age = ageAttr.apply(input).asInstanceOf[Int]
+ override def eval(input: Row): TraversableOnce[Row] = {
+ val name = nameAttr.eval(input)
+ val age = ageAttr.eval(input).asInstanceOf[Int]
Iterator(
new GenericRow(Array[Any](s"$name is $age years old")),