aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-03-31 15:23:46 -0700
committerReynold Xin <rxin@apache.org>2014-03-31 15:23:46 -0700
commit5731af5be65ccac831445f351baf040a0d007687 (patch)
tree992d6369f4311802eb8f33993997e6c757fea85a /sql/catalyst
parent841721e03cc44ee7d8fe72c882db8c0f9f3af365 (diff)
downloadspark-5731af5be65ccac831445f351baf040a0d007687.tar.gz
spark-5731af5be65ccac831445f351baf040a0d007687.tar.bz2
spark-5731af5be65ccac831445f351baf040a0d007687.zip
[SQL] Rewrite join implementation to allow streaming of one relation.
Before we were materializing everything in memory. This also uses the projection interface so will be easier to plug in code gen (its ported from that branch). @rxin @liancheng Author: Michael Armbrust <michael@databricks.com> Closes #250 from marmbrus/hashJoin and squashes the following commits: 1ad873e [Michael Armbrust] Change hasNext logic back to the correct version. 8e6f2a2 [Michael Armbrust] Review comments. 1e9fb63 [Michael Armbrust] style bc0cb84 [Michael Armbrust] Rewrite join implementation to allow streaming of one relation.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala6
2 files changed, 16 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
index 31d42b9ee7..6f939e6c41 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
@@ -44,6 +44,16 @@ trait Row extends Seq[Any] with Serializable {
s"[${this.mkString(",")}]"
def copy(): Row
+
+ /** Returns true if there are any NULL values in this row. */
+ def anyNull: Boolean = {
+ var i = 0
+ while (i < length) {
+ if (isNullAt(i)) { return true }
+ i += 1
+ }
+ false
+ }
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 722ff517d2..02fedd16b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -21,6 +21,12 @@ import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.types.{BooleanType, StringType}
+object InterpretedPredicate {
+ def apply(expression: Expression): (Row => Boolean) = {
+ (r: Row) => expression.apply(r).asInstanceOf[Boolean]
+ }
+}
+
trait Predicate extends Expression {
self: Product =>