aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-12 23:06:31 -0700
committerReynold Xin <rxin@databricks.com>2015-06-12 23:06:31 -0700
commitd46f8e5d4b5c1278e0fae3ad133b2229ac01b197 (patch)
tree7ec124d4ac2ff365a9675d89113cbfee1e8abad8 /sql/catalyst
parent6e9c3ff1ecaf12a0126d83f27f5a4153ae420a34 (diff)
downloadspark-d46f8e5d4b5c1278e0fae3ad133b2229ac01b197.tar.gz
spark-d46f8e5d4b5c1278e0fae3ad133b2229ac01b197.tar.bz2
spark-d46f8e5d4b5c1278e0fae3ad133b2229ac01b197.zip
[SPARK-7186] [SQL] Decouple internal Row from external Row
Currently, we use o.a.s.sql.Row both internally and externally. The external interface is wider than what the internal needs because it is designed to facilitate end-user programming. This design has proven to be very error prone and cumbersome for internal Row implementations. As a first step, we create an InternalRow interface in the catalyst module, which is identical to the current Row interface. And we switch all internal operators/expressions to use this InternalRow instead. When we need to expose Row, we convert the InternalRow implementation into Row for users. For all public API, we use Row (for example, data source APIs), which will be converted into/from InternalRow by CatalystTypeConverters. For all internal data sources (Json, Parquet, JDBC, Hive), we use InternalRow for better performance, casted into Row in buildScan() (without change the public API). When create a PhysicalRDD, we cast them back to InternalRow. cc rxin marmbrus JoshRosen Author: Davies Liu <davies@databricks.com> Closes #6792 from davies/internal_row and squashes the following commits: f2abd13 [Davies Liu] fix scalastyle a7e025c [Davies Liu] move InternalRow into catalyst 30db8ba [Davies Liu] Merge branch 'master' of github.com:apache/spark into internal_row 7cbced8 [Davies Liu] separate Row and InternalRow
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/BaseMutableRow.java (renamed from sql/catalyst/src/main/scala/org/apache/spark/sql/BaseMutableRow.java)0
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java (renamed from sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java)5
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java8
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala54
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala57
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtractValue.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala104
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala51
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala66
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala73
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala17
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala13
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala9
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala9
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala28
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala20
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala23
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala9
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala7
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala22
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala16
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala10
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala1
49 files changed, 459 insertions, 345 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseMutableRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/BaseMutableRow.java
index acec2bf452..acec2bf452 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseMutableRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/BaseMutableRow.java
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java
index e91daf17f8..611e02d8fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java
@@ -25,10 +25,11 @@ import java.util.List;
import scala.collection.Seq;
import scala.collection.mutable.ArraySeq;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.types.StructType;
-public abstract class BaseRow implements Row {
+public abstract class BaseRow extends InternalRow {
@Override
final public int length() {
@@ -176,7 +177,7 @@ public abstract class BaseRow implements Row {
}
@Override
- public Row copy() {
+ public InternalRow copy() {
final int n = size();
Object[] arr = new Object[n];
for (int i = 0; i < n; i++) {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
index 299ff3728a..b23e0efc83 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions;
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.PlatformDependent;
@@ -107,7 +107,7 @@ public final class UnsafeFixedWidthAggregationMap {
* @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
*/
public UnsafeFixedWidthAggregationMap(
- Row emptyAggregationBuffer,
+ InternalRow emptyAggregationBuffer,
StructType aggregationBufferSchema,
StructType groupingKeySchema,
TaskMemoryManager memoryManager,
@@ -125,7 +125,7 @@ public final class UnsafeFixedWidthAggregationMap {
/**
* Convert a Java object row into an UnsafeRow, allocating it into a new long array.
*/
- private static long[] convertToUnsafeRow(Row javaRow, StructType schema) {
+ private static long[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
final UnsafeRowConverter converter = new UnsafeRowConverter(schema);
final long[] unsafeRow = new long[converter.getSizeRequirement(javaRow)];
final long writtenLength =
@@ -138,7 +138,7 @@ public final class UnsafeFixedWidthAggregationMap {
* Return the aggregation buffer for the current group. For efficiency, all calls to this method
* return the same object.
*/
- public UnsafeRow getAggregationBuffer(Row groupingKey) {
+ public UnsafeRow getAggregationBuffer(InternalRow groupingKey) {
final int groupingKeySize = groupingKeyToUnsafeRowConverter.getSizeRequirement(groupingKey);
// Make sure that the buffer is large enough to hold the key. If it's not, grow it:
if (groupingKeySize > groupingKeyConversionScratchSpace.length) {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 143acc9f5e..aec88c9241 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -26,7 +26,7 @@ import java.util.Set;
import scala.collection.Seq;
import scala.collection.mutable.ArraySeq;
-import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.BaseMutableRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
@@ -334,7 +334,7 @@ public final class UnsafeRow extends BaseMutableRow {
@Override
- public Row copy() {
+ public InternalRow copy() {
throw new UnsupportedOperationException();
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 7e4b11a495..6175456c58 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -105,7 +106,7 @@ object CatalystTypeConverters {
/**
* Given a Catalyst row, convert the value at column `column` to its Scala equivalent.
*/
- final def toScala(row: Row, column: Int): ScalaOutputType = {
+ final def toScala(row: InternalRow, column: Int): ScalaOutputType = {
if (row.isNullAt(column)) null.asInstanceOf[ScalaOutputType] else toScalaImpl(row, column)
}
@@ -125,20 +126,20 @@ object CatalystTypeConverters {
* Given a Catalyst row, convert the value at column `column` to its Scala equivalent.
* This method will only be called on non-null columns.
*/
- protected def toScalaImpl(row: Row, column: Int): ScalaOutputType
+ protected def toScalaImpl(row: InternalRow, column: Int): ScalaOutputType
}
private object IdentityConverter extends CatalystTypeConverter[Any, Any, Any] {
override def toCatalystImpl(scalaValue: Any): Any = scalaValue
override def toScala(catalystValue: Any): Any = catalystValue
- override def toScalaImpl(row: Row, column: Int): Any = row(column)
+ override def toScalaImpl(row: InternalRow, column: Int): Any = row(column)
}
private case class UDTConverter(
udt: UserDefinedType[_]) extends CatalystTypeConverter[Any, Any, Any] {
override def toCatalystImpl(scalaValue: Any): Any = udt.serialize(scalaValue)
override def toScala(catalystValue: Any): Any = udt.deserialize(catalystValue)
- override def toScalaImpl(row: Row, column: Int): Any = toScala(row(column))
+ override def toScalaImpl(row: InternalRow, column: Int): Any = toScala(row(column))
}
/** Converter for arrays, sequences, and Java iterables. */
@@ -170,7 +171,7 @@ object CatalystTypeConverters {
}
}
- override def toScalaImpl(row: Row, column: Int): Seq[Any] =
+ override def toScalaImpl(row: InternalRow, column: Int): Seq[Any] =
toScala(row(column).asInstanceOf[Seq[Any]])
}
@@ -209,16 +210,16 @@ object CatalystTypeConverters {
}
}
- override def toScalaImpl(row: Row, column: Int): Map[Any, Any] =
+ override def toScalaImpl(row: InternalRow, column: Int): Map[Any, Any] =
toScala(row(column).asInstanceOf[Map[Any, Any]])
}
private case class StructConverter(
- structType: StructType) extends CatalystTypeConverter[Any, Row, Row] {
+ structType: StructType) extends CatalystTypeConverter[Any, Row, InternalRow] {
private[this] val converters = structType.fields.map { f => getConverterForType(f.dataType) }
- override def toCatalystImpl(scalaValue: Any): Row = scalaValue match {
+ override def toCatalystImpl(scalaValue: Any): InternalRow = scalaValue match {
case row: Row =>
val ar = new Array[Any](row.size)
var idx = 0
@@ -239,7 +240,7 @@ object CatalystTypeConverters {
new GenericRowWithSchema(ar, structType)
}
- override def toScala(row: Row): Row = {
+ override def toScala(row: InternalRow): Row = {
if (row == null) {
null
} else {
@@ -253,7 +254,8 @@ object CatalystTypeConverters {
}
}
- override def toScalaImpl(row: Row, column: Int): Row = toScala(row(column).asInstanceOf[Row])
+ override def toScalaImpl(row: InternalRow, column: Int): Row =
+ toScala(row(column).asInstanceOf[InternalRow])
}
private object StringConverter extends CatalystTypeConverter[Any, String, Any] {
@@ -266,14 +268,14 @@ object CatalystTypeConverters {
case str: String => str
case utf8: UTF8String => utf8.toString()
}
- override def toScalaImpl(row: Row, column: Int): String = row(column).toString
+ override def toScalaImpl(row: InternalRow, column: Int): String = row(column).toString
}
private object DateConverter extends CatalystTypeConverter[Date, Date, Any] {
override def toCatalystImpl(scalaValue: Date): Int = DateUtils.fromJavaDate(scalaValue)
override def toScala(catalystValue: Any): Date =
if (catalystValue == null) null else DateUtils.toJavaDate(catalystValue.asInstanceOf[Int])
- override def toScalaImpl(row: Row, column: Int): Date = toScala(row.getInt(column))
+ override def toScalaImpl(row: InternalRow, column: Int): Date = toScala(row.getInt(column))
}
private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
@@ -282,7 +284,8 @@ object CatalystTypeConverters {
override def toScala(catalystValue: Any): Timestamp =
if (catalystValue == null) null
else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
- override def toScalaImpl(row: Row, column: Int): Timestamp = toScala(row.getLong(column))
+ override def toScalaImpl(row: InternalRow, column: Int): Timestamp =
+ toScala(row.getLong(column))
}
private object BigDecimalConverter extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
@@ -292,10 +295,11 @@ object CatalystTypeConverters {
case d: Decimal => d
}
override def toScala(catalystValue: Decimal): JavaBigDecimal = catalystValue.toJavaBigDecimal
- override def toScalaImpl(row: Row, column: Int): JavaBigDecimal = row.get(column) match {
- case d: JavaBigDecimal => d
- case d: Decimal => d.toJavaBigDecimal
- }
+ override def toScalaImpl(row: InternalRow, column: Int): JavaBigDecimal =
+ row.get(column) match {
+ case d: JavaBigDecimal => d
+ case d: Decimal => d.toJavaBigDecimal
+ }
}
private abstract class PrimitiveConverter[T] extends CatalystTypeConverter[T, Any, Any] {
@@ -304,31 +308,31 @@ object CatalystTypeConverters {
}
private object BooleanConverter extends PrimitiveConverter[Boolean] {
- override def toScalaImpl(row: Row, column: Int): Boolean = row.getBoolean(column)
+ override def toScalaImpl(row: InternalRow, column: Int): Boolean = row.getBoolean(column)
}
private object ByteConverter extends PrimitiveConverter[Byte] {
- override def toScalaImpl(row: Row, column: Int): Byte = row.getByte(column)
+ override def toScalaImpl(row: InternalRow, column: Int): Byte = row.getByte(column)
}
private object ShortConverter extends PrimitiveConverter[Short] {
- override def toScalaImpl(row: Row, column: Int): Short = row.getShort(column)
+ override def toScalaImpl(row: InternalRow, column: Int): Short = row.getShort(column)
}
private object IntConverter extends PrimitiveConverter[Int] {
- override def toScalaImpl(row: Row, column: Int): Int = row.getInt(column)
+ override def toScalaImpl(row: InternalRow, column: Int): Int = row.getInt(column)
}
private object LongConverter extends PrimitiveConverter[Long] {
- override def toScalaImpl(row: Row, column: Int): Long = row.getLong(column)
+ override def toScalaImpl(row: InternalRow, column: Int): Long = row.getLong(column)
}
private object FloatConverter extends PrimitiveConverter[Float] {
- override def toScalaImpl(row: Row, column: Int): Float = row.getFloat(column)
+ override def toScalaImpl(row: InternalRow, column: Int): Float = row.getFloat(column)
}
private object DoubleConverter extends PrimitiveConverter[Double] {
- override def toScalaImpl(row: Row, column: Int): Double = row.getDouble(column)
+ override def toScalaImpl(row: InternalRow, column: Int): Double = row.getDouble(column)
}
/**
@@ -382,7 +386,7 @@ object CatalystTypeConverters {
case d: BigDecimal => BigDecimalConverter.toCatalyst(d)
case d: JavaBigDecimal => BigDecimalConverter.toCatalyst(d)
case seq: Seq[Any] => seq.map(convertToCatalyst)
- case r: Row => Row(r.toSeq.map(convertToCatalyst): _*)
+ case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*)
case arr: Array[Any] => arr.toSeq.map(convertToCatalyst).toArray
case m: Map[Any, Any] =>
m.map { case (k, v) => (convertToCatalyst(k), convertToCatalyst(v)) }.toMap
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
new file mode 100644
index 0000000000..e3c2cc2433
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+
+/**
+ * An abstract class for row used internal in Spark SQL, which only contain the columns as
+ * internal types.
+ */
+abstract class InternalRow extends Row {
+ // A default implementation to change the return type
+ override def copy(): InternalRow = {this}
+}
+
+object InternalRow {
+ def unapplySeq(row: InternalRow): Some[Seq[Any]] = Some(row.toSeq)
+
+ /**
+ * This method can be used to construct a [[Row]] with the given values.
+ */
+ def apply(values: Any*): InternalRow = new GenericRow(values.toArray)
+
+ /**
+ * This method can be used to construct a [[Row]] from a [[Seq]] of values.
+ */
+ def fromSeq(values: Seq[Any]): InternalRow = new GenericRow(values.toArray)
+
+ def fromTuple(tuple: Product): InternalRow = fromSeq(tuple.productIterator.toSeq)
+
+ /**
+ * Merge multiple rows into a single row, one after another.
+ */
+ def merge(rows: InternalRow*): InternalRow = {
+ // TODO: Improve the performance of this if used in performance critical part.
+ new GenericRow(rows.flatMap(_.toSeq).toArray)
+ }
+
+ /** Returns an empty row. */
+ val empty = apply()
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index bbb150c1e8..5de188d418 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.{errors, trees}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
@@ -67,7 +68,7 @@ case class UnresolvedAttribute(nameParts: Seq[String])
override def withName(newName: String): UnresolvedAttribute = UnresolvedAttribute.quoted(newName)
// Unresolved attributes are transient at compile time and don't get evaluated during execution.
- override def eval(input: Row = null): Any =
+ override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString: String = s"'$name"
@@ -85,7 +86,7 @@ case class UnresolvedFunction(name: String, children: Seq[Expression]) extends E
override lazy val resolved = false
// Unresolved functions are transient at compile time and don't get evaluated during execution.
- override def eval(input: Row = null): Any =
+ override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString: String = s"'$name(${children.mkString(",")})"
@@ -107,7 +108,7 @@ trait Star extends NamedExpression with trees.LeafNode[Expression] {
override lazy val resolved = false
// Star gets expanded at runtime so we never evaluate a Star.
- override def eval(input: Row = null): Any =
+ override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression]
@@ -166,7 +167,7 @@ case class MultiAlias(child: Expression, names: Seq[String])
override lazy val resolved = false
- override def eval(input: Row = null): Any =
+ override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString: String = s"$child AS $names"
@@ -200,7 +201,7 @@ case class UnresolvedExtractValue(child: Expression, extraction: Expression)
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
override lazy val resolved = false
- override def eval(input: Row = null): Any =
+ override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString: String = s"$child[$extraction]"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index fcadf9595e..c4dd11a451 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -21,7 +21,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.catalyst.{InternalRow, trees}
/**
* A bound reference points to a specific slot in the input tuple, allowing the actual value
@@ -33,7 +33,7 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
override def toString: String = s"input[$ordinal]"
- override def eval(input: Row): Any = input(ordinal)
+ override def eval(input: InternalRow): Any = input(ordinal)
override def name: String = s"i[$ordinal]"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 4c7123fcb7..afbf30af33 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
@@ -393,7 +394,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
}
// TODO: Could be faster?
val newRow = new GenericMutableRow(from.fields.size)
- buildCast[Row](_, row => {
+ buildCast[catalyst.InternalRow](_, row => {
var i = 0
while (i < row.length) {
val v = row(i)
@@ -425,7 +426,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
private[this] lazy val cast: Any => Any = cast(child.dataType, dataType)
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evaluated = child.eval(input)
if (evaluated == null) null else cast(evaluated)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 0b9f621fed..61de34bfa4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types._
@@ -59,7 +59,7 @@ abstract class Expression extends TreeNode[Expression] {
def references: AttributeSet = AttributeSet(children.flatMap(_.references.iterator))
/** Returns the result of evaluating this expression on a given input Row */
- def eval(input: Row = null): Any
+ def eval(input: InternalRow = null): Any
/**
* Returns an [[GeneratedExpressionCode]], which contains Java source code that
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtractValue.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtractValue.scala
index a1e0819e8a..16f3ccc3d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtractValue.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtractValue.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import scala.collection.Map
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{catalyst, AnalysisException}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.types._
@@ -105,8 +105,8 @@ case class GetStructField(child: Expression, field: StructField, ordinal: Int)
override def foldable: Boolean = child.foldable
override def toString: String = s"$child.${field.name}"
- override def eval(input: Row): Any = {
- val baseValue = child.eval(input).asInstanceOf[Row]
+ override def eval(input: catalyst.InternalRow): Any = {
+ val baseValue = child.eval(input).asInstanceOf[catalyst.InternalRow]
if (baseValue == null) null else baseValue(ordinal)
}
}
@@ -125,8 +125,8 @@ case class GetArrayStructFields(
override def foldable: Boolean = child.foldable
override def toString: String = s"$child.${field.name}"
- override def eval(input: Row): Any = {
- val baseValue = child.eval(input).asInstanceOf[Seq[Row]]
+ override def eval(input: catalyst.InternalRow): Any = {
+ val baseValue = child.eval(input).asInstanceOf[Seq[catalyst.InternalRow]]
if (baseValue == null) null else {
baseValue.map { row =>
if (row == null) null else row(ordinal)
@@ -146,7 +146,7 @@ abstract class ExtractValueWithOrdinal extends ExtractValue {
override def toString: String = s"$child[$ordinal]"
override def children: Seq[Expression] = child :: ordinal :: Nil
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val value = child.eval(input)
if (value == null) {
null
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 8cae548279..d6806f78ab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
+
/**
* A [[Projection]] that is calculated by calling the `eval` of each of the specified expressions.
@@ -30,7 +32,7 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
// null check is required for when Kryo invokes the no-arg constructor.
protected val exprArray = if (expressions != null) expressions.toArray else null
- def apply(input: Row): Row = {
+ def apply(input: catalyst.InternalRow): catalyst.InternalRow = {
val outputArray = new Array[Any](exprArray.length)
var i = 0
while (i < exprArray.length) {
@@ -55,14 +57,14 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu
private[this] val exprArray = expressions.toArray
private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.size)
- def currentValue: Row = mutableRow
+ def currentValue: catalyst.InternalRow = mutableRow
override def target(row: MutableRow): MutableProjection = {
mutableRow = row
this
}
- override def apply(input: Row): Row = {
+ override def apply(input: catalyst.InternalRow): catalyst.InternalRow = {
var i = 0
while (i < exprArray.length) {
mutableRow(i) = exprArray(i).eval(input)
@@ -76,31 +78,31 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu
* A mutable wrapper that makes two rows appear as a single concatenated row. Designed to
* be instantiated once per thread and reused.
*/
-class JoinedRow extends Row {
- private[this] var row1: Row = _
- private[this] var row2: Row = _
+class JoinedRow extends catalyst.InternalRow {
+ private[this] var row1: catalyst.InternalRow = _
+ private[this] var row2: catalyst.InternalRow = _
- def this(left: Row, right: Row) = {
+ def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
this()
row1 = left
row2 = right
}
/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: Row, r2: Row): Row = {
+ def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
row1 = r1
row2 = r2
this
}
/** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: Row): Row = {
+ def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
row1 = newLeft
this
}
/** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: Row): Row = {
+ def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
row2 = newRight
this
}
@@ -142,7 +144,7 @@ class JoinedRow extends Row {
override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
- override def copy(): Row = {
+ override def copy(): catalyst.InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
var i = 0
@@ -176,31 +178,31 @@ class JoinedRow extends Row {
* Row will be referenced, increasing the opportunity for the JIT to play tricks. This sounds
* crazy but in benchmarks it had noticeable effects.
*/
-class JoinedRow2 extends Row {
- private[this] var row1: Row = _
- private[this] var row2: Row = _
+class JoinedRow2 extends catalyst.InternalRow {
+ private[this] var row1: catalyst.InternalRow = _
+ private[this] var row2: catalyst.InternalRow = _
- def this(left: Row, right: Row) = {
+ def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
this()
row1 = left
row2 = right
}
/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: Row, r2: Row): Row = {
+ def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
row1 = r1
row2 = r2
this
}
/** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: Row): Row = {
+ def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
row1 = newLeft
this
}
/** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: Row): Row = {
+ def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
row2 = newRight
this
}
@@ -242,7 +244,7 @@ class JoinedRow2 extends Row {
override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
- override def copy(): Row = {
+ override def copy(): catalyst.InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
var i = 0
@@ -270,31 +272,31 @@ class JoinedRow2 extends Row {
/**
* JIT HACK: Replace with macros
*/
-class JoinedRow3 extends Row {
- private[this] var row1: Row = _
- private[this] var row2: Row = _
+class JoinedRow3 extends catalyst.InternalRow {
+ private[this] var row1: catalyst.InternalRow = _
+ private[this] var row2: catalyst.InternalRow = _
- def this(left: Row, right: Row) = {
+ def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
this()
row1 = left
row2 = right
}
/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: Row, r2: Row): Row = {
+ def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
row1 = r1
row2 = r2
this
}
/** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: Row): Row = {
+ def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
row1 = newLeft
this
}
/** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: Row): Row = {
+ def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
row2 = newRight
this
}
@@ -336,7 +338,7 @@ class JoinedRow3 extends Row {
override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
- override def copy(): Row = {
+ override def copy(): catalyst.InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
var i = 0
@@ -364,31 +366,31 @@ class JoinedRow3 extends Row {
/**
* JIT HACK: Replace with macros
*/
-class JoinedRow4 extends Row {
- private[this] var row1: Row = _
- private[this] var row2: Row = _
+class JoinedRow4 extends catalyst.InternalRow {
+ private[this] var row1: catalyst.InternalRow = _
+ private[this] var row2: catalyst.InternalRow = _
- def this(left: Row, right: Row) = {
+ def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
this()
row1 = left
row2 = right
}
/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: Row, r2: Row): Row = {
+ def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
row1 = r1
row2 = r2
this
}
/** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: Row): Row = {
+ def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
row1 = newLeft
this
}
/** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: Row): Row = {
+ def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
row2 = newRight
this
}
@@ -430,7 +432,7 @@ class JoinedRow4 extends Row {
override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
- override def copy(): Row = {
+ override def copy(): catalyst.InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
var i = 0
@@ -458,31 +460,31 @@ class JoinedRow4 extends Row {
/**
* JIT HACK: Replace with macros
*/
-class JoinedRow5 extends Row {
- private[this] var row1: Row = _
- private[this] var row2: Row = _
+class JoinedRow5 extends catalyst.InternalRow {
+ private[this] var row1: catalyst.InternalRow = _
+ private[this] var row2: catalyst.InternalRow = _
- def this(left: Row, right: Row) = {
+ def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
this()
row1 = left
row2 = right
}
/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: Row, r2: Row): Row = {
+ def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
row1 = r1
row2 = r2
this
}
/** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: Row): Row = {
+ def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
row1 = newLeft
this
}
/** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: Row): Row = {
+ def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
row2 = newRight
this
}
@@ -524,7 +526,7 @@ class JoinedRow5 extends Row {
override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
- override def copy(): Row = {
+ override def copy(): catalyst.InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
var i = 0
@@ -552,31 +554,31 @@ class JoinedRow5 extends Row {
/**
* JIT HACK: Replace with macros
*/
-class JoinedRow6 extends Row {
- private[this] var row1: Row = _
- private[this] var row2: Row = _
+class JoinedRow6 extends catalyst.InternalRow {
+ private[this] var row1: catalyst.InternalRow = _
+ private[this] var row2: catalyst.InternalRow = _
- def this(left: Row, right: Row) = {
+ def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
this()
row1 = left
row2 = right
}
/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: Row, r2: Row): Row = {
+ def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
row1 = r1
row2 = r2
this
}
/** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: Row): Row = {
+ def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
row1 = newLeft
this
}
/** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: Row): Row = {
+ def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
row2 = newRight
this
}
@@ -618,7 +620,7 @@ class JoinedRow6 extends Row {
override def getAs[T](i: Int): T =
if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
- override def copy(): Row = {
+ override def copy(): catalyst.InternalRow = {
val totalSize = row1.length + row2.length
val copiedValues = new Array[Any](totalSize)
var i = 0
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index 5b45347872..40f235fc19 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.types.DataType
@@ -45,7 +46,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
val func = function.asInstanceOf[($anys) => Any]
$childs
$converters
- (input: Row) => {
+ (input: InternalRow) => {
func(
$evals)
}
@@ -57,7 +58,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
private[this] val f = children.size match {
case 0 =>
val func = function.asInstanceOf[() => Any]
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func()
}
@@ -65,7 +66,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
val func = function.asInstanceOf[(Any) => Any]
val child0 = children(0)
lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)))
}
@@ -76,7 +77,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
val child1 = children(1)
lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)))
@@ -90,7 +91,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -107,7 +108,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -127,7 +128,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -150,7 +151,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -176,7 +177,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -205,7 +206,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -237,7 +238,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -272,7 +273,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -310,7 +311,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -351,7 +352,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -395,7 +396,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -442,7 +443,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -492,7 +493,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -545,7 +546,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -601,7 +602,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -660,7 +661,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -722,7 +723,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -787,7 +788,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -855,7 +856,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
lazy val converter20 = CatalystTypeConverters.createToScalaConverter(child20.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -926,7 +927,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
lazy val converter20 = CatalystTypeConverters.createToScalaConverter(child20.dataType)
lazy val converter21 = CatalystTypeConverters.createToScalaConverter(child21.dataType)
- (input: Row) => {
+ (input: catalyst.InternalRow) => {
func(
converter0(child0.eval(input)),
converter1(child1.eval(input)),
@@ -955,6 +956,6 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
// scalastyle:on
private[this] val converter = CatalystTypeConverters.createToCatalystConverter(dataType)
- override def eval(input: Row): Any = converter(f(input))
+ override def eval(input: catalyst.InternalRow): Any = converter(f(input))
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index 99340a14c9..8a34355999 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.catalyst.{InternalRow, trees}
import org.apache.spark.sql.types.DataType
abstract sealed class SortDirection
@@ -36,7 +36,7 @@ case class SortOrder(child: Expression, direction: SortDirection) extends Expres
override def nullable: Boolean = child.nullable
// SortOrder itself is never evaluated.
- override def eval(input: Row = null): Any =
+ override def eval(input: InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString: String = s"$child ${if (direction == Ascending) "ASC" else "DESC"}"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
index 98eda61a80..05aab34559 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
@@ -222,7 +222,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
override def isNullAt(i: Int): Boolean = values(i).isNull
- override def copy(): Row = {
+ override def copy(): InternalRow = {
val newValues = new Array[Any](values.length)
var i = 0
while (i < values.length) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
index 5350123bf4..d771e454b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
@@ -48,7 +48,7 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {
/**
* Compute the amount of space, in bytes, required to encode the given row.
*/
- def getSizeRequirement(row: Row): Int = {
+ def getSizeRequirement(row: InternalRow): Int = {
var fieldNumber = 0
var variableLengthFieldSize: Int = 0
while (fieldNumber < writers.length) {
@@ -68,7 +68,7 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {
* @param baseOffset the base offset of the destination address
* @return the number of bytes written. This should be equal to `getSizeRequirement(row)`.
*/
- def writeRow(row: Row, baseObject: Object, baseOffset: Long): Long = {
+ def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Long = {
unsafeRow.pointTo(baseObject, baseOffset, writers.length, null)
var fieldNumber = 0
var appendCursor: Int = fixedLengthSize
@@ -99,12 +99,12 @@ private abstract class UnsafeColumnWriter {
* used for calculating where variable-length data should be written
* @return the number of variable-length bytes written
*/
- def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int
+ def write(source: InternalRow, target: UnsafeRow, column: Int, appendCursor: Int): Int
/**
* Return the number of bytes that are needed to write this variable-length value.
*/
- def getSize(source: Row, column: Int): Int
+ def getSize(source: InternalRow, column: Int): Int
}
private object UnsafeColumnWriter {
@@ -140,72 +140,108 @@ private object StringUnsafeColumnWriter extends StringUnsafeColumnWriter
private abstract class PrimitiveUnsafeColumnWriter extends UnsafeColumnWriter {
// Primitives don't write to the variable-length region:
- def getSize(sourceRow: Row, column: Int): Int = 0
+ def getSize(sourceRow: InternalRow, column: Int): Int = 0
}
private class NullUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
- override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+ override def write(
+ source: InternalRow,
+ target: UnsafeRow,
+ column: Int,
+ appendCursor: Int): Int = {
target.setNullAt(column)
0
}
}
private class BooleanUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
- override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+ override def write(
+ source: InternalRow,
+ target: UnsafeRow,
+ column: Int,
+ appendCursor: Int): Int = {
target.setBoolean(column, source.getBoolean(column))
0
}
}
private class ByteUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
- override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+ override def write(
+ source: InternalRow,
+ target: UnsafeRow,
+ column: Int,
+ appendCursor: Int): Int = {
target.setByte(column, source.getByte(column))
0
}
}
private class ShortUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
- override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+ override def write(
+ source: InternalRow,
+ target: UnsafeRow,
+ column: Int,
+ appendCursor: Int): Int = {
target.setShort(column, source.getShort(column))
0
}
}
private class IntUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
- override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+ override def write(
+ source: InternalRow,
+ target: UnsafeRow,
+ column: Int,
+ appendCursor: Int): Int = {
target.setInt(column, source.getInt(column))
0
}
}
private class LongUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
- override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+ override def write(
+ source: InternalRow,
+ target: UnsafeRow,
+ column: Int,
+ appendCursor: Int): Int = {
target.setLong(column, source.getLong(column))
0
}
}
private class FloatUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
- override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+ override def write(
+ source: InternalRow,
+ target: UnsafeRow,
+ column: Int,
+ appendCursor: Int): Int = {
target.setFloat(column, source.getFloat(column))
0
}
}
private class DoubleUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
- override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+ override def write(
+ source: InternalRow,
+ target: UnsafeRow,
+ column: Int,
+ appendCursor: Int): Int = {
target.setDouble(column, source.getDouble(column))
0
}
}
private class StringUnsafeColumnWriter private() extends UnsafeColumnWriter {
- def getSize(source: Row, column: Int): Int = {
+ def getSize(source: InternalRow, column: Int): Int = {
val numBytes = source.get(column).asInstanceOf[UTF8String].getBytes.length
8 + ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
}
- override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+ override def write(
+ source: InternalRow,
+ target: UnsafeRow,
+ column: Int,
+ appendCursor: Int): Int = {
val value = source.get(column).asInstanceOf[UTF8String]
val baseObject = target.getBaseObject
val baseOffset = target.getBaseOffset
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 0266084a6d..f9e8150a68 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.catalyst.expressions
import com.clearspring.analytics.stream.cardinality.HyperLogLog
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.types._
import org.apache.spark.util.collection.OpenHashSet
abstract class AggregateExpression extends Expression {
@@ -37,7 +38,7 @@ abstract class AggregateExpression extends Expression {
* [[AggregateExpression.eval]] should never be invoked because [[AggregateExpression]]'s are
* replaced with a physical aggregate operator at runtime.
*/
- override def eval(input: Row = null): Any =
+ override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
@@ -80,7 +81,7 @@ abstract class AggregateFunction
override def nullable: Boolean = base.nullable
override def dataType: DataType = base.dataType
- def update(input: Row): Unit
+ def update(input: catalyst.InternalRow): Unit
// Do we really need this?
override def newInstance(): AggregateFunction = {
@@ -108,7 +109,7 @@ case class MinFunction(expr: Expression, base: AggregateExpression) extends Aggr
val currentMin: MutableLiteral = MutableLiteral(null, expr.dataType)
val cmp = GreaterThan(currentMin, expr)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
if (currentMin.value == null) {
currentMin.value = expr.eval(input)
} else if (cmp.eval(input) == true) {
@@ -116,7 +117,7 @@ case class MinFunction(expr: Expression, base: AggregateExpression) extends Aggr
}
}
- override def eval(input: Row): Any = currentMin.value
+ override def eval(input: catalyst.InternalRow): Any = currentMin.value
}
case class Max(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -139,7 +140,7 @@ case class MaxFunction(expr: Expression, base: AggregateExpression) extends Aggr
val currentMax: MutableLiteral = MutableLiteral(null, expr.dataType)
val cmp = LessThan(currentMax, expr)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
if (currentMax.value == null) {
currentMax.value = expr.eval(input)
} else if (cmp.eval(input) == true) {
@@ -147,7 +148,7 @@ case class MaxFunction(expr: Expression, base: AggregateExpression) extends Aggr
}
}
- override def eval(input: Row): Any = currentMax.value
+ override def eval(input: catalyst.InternalRow): Any = currentMax.value
}
case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -205,14 +206,14 @@ case class CollectHashSetFunction(
@transient
val distinctValue = new InterpretedProjection(expr)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = distinctValue(input)
if (!evaluatedExpr.anyNull) {
seen.add(evaluatedExpr)
}
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
seen
}
}
@@ -238,7 +239,7 @@ case class CombineSetsAndCountFunction(
val seen = new OpenHashSet[Any]()
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val inputSetEval = inputSet.eval(input).asInstanceOf[OpenHashSet[Any]]
val inputIterator = inputSetEval.iterator
while (inputIterator.hasNext) {
@@ -246,7 +247,7 @@ case class CombineSetsAndCountFunction(
}
}
- override def eval(input: Row): Any = seen.size.toLong
+ override def eval(input: catalyst.InternalRow): Any = seen.size.toLong
}
/** The data type of ApproxCountDistinctPartition since its output is a HyperLogLog object. */
@@ -453,7 +454,7 @@ case class CombineSetsAndSumFunction(
val seen = new OpenHashSet[Any]()
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val inputSetEval = inputSet.eval(input).asInstanceOf[OpenHashSet[Any]]
val inputIterator = inputSetEval.iterator
while (inputIterator.hasNext) {
@@ -461,8 +462,8 @@ case class CombineSetsAndSumFunction(
}
}
- override def eval(input: Row): Any = {
- val casted = seen.asInstanceOf[OpenHashSet[Row]]
+ override def eval(input: catalyst.InternalRow): Any = {
+ val casted = seen.asInstanceOf[OpenHashSet[catalyst.InternalRow]]
if (casted.size == 0) {
null
} else {
@@ -524,7 +525,7 @@ case class AverageFunction(expr: Expression, base: AggregateExpression)
private def addFunction(value: Any) = Add(sum,
Cast(Literal.create(value, expr.dataType), calcType))
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
if (count == 0L) {
null
} else {
@@ -541,7 +542,7 @@ case class AverageFunction(expr: Expression, base: AggregateExpression)
}
}
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = expr.eval(input)
if (evaluatedExpr != null) {
count += 1
@@ -555,14 +556,14 @@ case class CountFunction(expr: Expression, base: AggregateExpression) extends Ag
var count: Long = _
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = expr.eval(input)
if (evaluatedExpr != null) {
count += 1L
}
}
- override def eval(input: Row): Any = count
+ override def eval(input: catalyst.InternalRow): Any = count
}
case class ApproxCountDistinctPartitionFunction(
@@ -574,14 +575,14 @@ case class ApproxCountDistinctPartitionFunction(
private val hyperLogLog = new HyperLogLog(relativeSD)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = expr.eval(input)
if (evaluatedExpr != null) {
hyperLogLog.offer(evaluatedExpr)
}
}
- override def eval(input: Row): Any = hyperLogLog
+ override def eval(input: catalyst.InternalRow): Any = hyperLogLog
}
case class ApproxCountDistinctMergeFunction(
@@ -593,12 +594,12 @@ case class ApproxCountDistinctMergeFunction(
private val hyperLogLog = new HyperLogLog(relativeSD)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = expr.eval(input)
hyperLogLog.addAll(evaluatedExpr.asInstanceOf[HyperLogLog])
}
- override def eval(input: Row): Any = hyperLogLog.cardinality()
+ override def eval(input: catalyst.InternalRow): Any = hyperLogLog.cardinality()
}
case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
@@ -619,11 +620,11 @@ case class SumFunction(expr: Expression, base: AggregateExpression) extends Aggr
private val addFunction =
Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, zero))
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
sum.update(addFunction, input)
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
expr.dataType match {
case DecimalType.Fixed(_, _) =>
Cast(sum, dataType).eval(null)
@@ -652,7 +653,7 @@ case class CombineSumFunction(expr: Expression, base: AggregateExpression)
private val addFunction =
Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, zero))
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val result = expr.eval(input)
// partial sum result can be null only when no input rows present
if(result != null) {
@@ -660,7 +661,7 @@ case class CombineSumFunction(expr: Expression, base: AggregateExpression)
}
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
expr.dataType match {
case DecimalType.Fixed(_, _) =>
Cast(sum, dataType).eval(null)
@@ -676,14 +677,14 @@ case class SumDistinctFunction(expr: Expression, base: AggregateExpression)
private val seen = new scala.collection.mutable.HashSet[Any]()
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = expr.eval(input)
if (evaluatedExpr != null) {
seen += evaluatedExpr
}
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
if (seen.size == 0) {
null
} else {
@@ -707,14 +708,14 @@ case class CountDistinctFunction(
@transient
val distinctValue = new InterpretedProjection(expr)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = distinctValue(input)
if (!evaluatedExpr.anyNull) {
seen.add(evaluatedExpr)
}
}
- override def eval(input: Row): Any = seen.size.toLong
+ override def eval(input: catalyst.InternalRow): Any = seen.size.toLong
}
case class FirstFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
@@ -722,13 +723,13 @@ case class FirstFunction(expr: Expression, base: AggregateExpression) extends Ag
var result: Any = null
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
if (result == null) {
result = expr.eval(input)
}
}
- override def eval(input: Row): Any = result
+ override def eval(input: catalyst.InternalRow): Any = result
}
case class LastFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
@@ -736,11 +737,11 @@ case class LastFunction(expr: Expression, base: AggregateExpression) extends Agg
var result: Any = null
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
result = input
}
- override def eval(input: Row): Any = {
- if (result != null) expr.eval(result.asInstanceOf[Row]) else null
+ override def eval(input: catalyst.InternalRow): Any = {
+ if (result != null) expr.eval(result.asInstanceOf[catalyst.InternalRow]) else null
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 124274c942..0ba2ff75aa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._
@@ -29,7 +30,7 @@ abstract class UnaryArithmetic extends UnaryExpression {
override def nullable: Boolean = child.nullable
override def dataType: DataType = child.dataType
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE = child.eval(input)
if (evalE == null) {
null
@@ -124,7 +125,7 @@ abstract class BinaryArithmetic extends BinaryExpression {
protected def checkTypesInternal(t: DataType): TypeCheckResult
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE1 = left.eval(input)
if(evalE1 == null) {
null
@@ -219,7 +220,7 @@ case class Divide(left: Expression, right: Expression) extends BinaryArithmetic
case it: IntegralType => it.integral.asInstanceOf[Integral[Any]].quot
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE2 = right.eval(input)
if (evalE2 == null || evalE2 == 0) {
null
@@ -279,7 +280,7 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet
case i: FractionalType => i.asIntegral.asInstanceOf[Integral[Any]]
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE2 = right.eval(input)
if (evalE2 == null || evalE2 == 0) {
null
@@ -330,7 +331,7 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic {
private lazy val ordering = TypeUtils.getOrdering(dataType)
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE1 = left.eval(input)
val evalE2 = right.eval(input)
if (evalE1 == null) {
@@ -384,7 +385,7 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic {
private lazy val ordering = TypeUtils.getOrdering(dataType)
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE1 = left.eval(input)
val evalE2 = right.eval(input)
if (evalE1 == null) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 536e477330..244a06638f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -24,6 +24,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.codehaus.janino.ClassBodyEvaluator
import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -34,7 +35,7 @@ class IntegerHashSet extends org.apache.spark.util.collection.OpenHashSet[Int]
class LongHashSet extends org.apache.spark.util.collection.OpenHashSet[Long]
/**
- * Java source for evaluating an [[Expression]] given a [[Row]] of input.
+ * Java source for evaluating an [[Expression]] given a [[catalyst.InternalRow]] of input.
*
* @param code The sequence of statements required to evaluate the expression.
* @param isNull A term that holds a boolean value representing whether the expression evaluated
@@ -183,13 +184,13 @@ class CodeGenContext {
}
/**
- * List of data types that have special accessors and setters in [[Row]].
+ * List of data types that have special accessors and setters in [[catalyst.InternalRow]].
*/
val nativeTypes =
Seq(IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType)
/**
- * Returns true if the data type has a special accessor and setter in [[Row]].
+ * Returns true if the data type has a special accessor and setter in [[catalyst.InternalRow]].
*/
def isNativeType(dt: DataType): Boolean = nativeTypes.contains(dt)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index ed3df547d1..35cb954c54 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions.codegen
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions._
// MutableProjection is not accessible in Java
@@ -24,7 +25,7 @@ abstract class BaseMutableProjection extends MutableProjection {}
/**
* Generates byte code that produces a [[MutableRow]] object that can update itself based on a new
- * input [[Row]] for a fixed set of [[Expression Expressions]].
+ * input [[catalyst.InternalRow]] for a fixed set of [[Expression Expressions]].
*/
object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => MutableProjection] {
@@ -47,7 +48,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
"""
}.mkString("\n")
val code = s"""
- import org.apache.spark.sql.Row;
+ import org.apache.spark.sql.catalyst.InternalRow;
public SpecificProjection generate($exprType[] expr) {
return new SpecificProjection(expr);
@@ -69,12 +70,12 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
}
/* Provide immutable access to the last projected row. */
- public Row currentValue() {
- return mutableRow;
+ public InternalRow currentValue() {
+ return (InternalRow) mutableRow;
}
public Object apply(Object _i) {
- Row i = (Row) _i;
+ InternalRow i = (InternalRow) _i;
$projectionCode
return mutableRow;
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index 56ecc5fc06..db5d570aeb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst.expressions.codegen
import org.apache.spark.Logging
import org.apache.spark.annotation.Private
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{catalyst, Row}
import org.apache.spark.sql.catalyst.expressions._
/**
* Inherits some default implementation for Java from `Ordering[Row]`
*/
@Private
-class BaseOrdering extends Ordering[Row] {
- def compare(a: Row, b: Row): Int = {
+class BaseOrdering extends Ordering[catalyst.InternalRow] {
+ def compare(a: catalyst.InternalRow, b: catalyst.InternalRow): Int = {
throw new UnsupportedOperationException
}
}
@@ -36,7 +36,8 @@ class BaseOrdering extends Ordering[Row] {
* Generates bytecode for an [[Ordering]] of [[Row Rows]] for a given set of
* [[Expression Expressions]].
*/
-object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] with Logging {
+object GenerateOrdering
+ extends CodeGenerator[Seq[SortOrder], Ordering[catalyst.InternalRow]] with Logging {
import scala.reflect.runtime.universe._
protected def canonicalize(in: Seq[SortOrder]): Seq[SortOrder] =
@@ -45,7 +46,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] wit
protected def bind(in: Seq[SortOrder], inputSchema: Seq[Attribute]): Seq[SortOrder] =
in.map(BindReferences.bindReference(_, inputSchema))
- protected def create(ordering: Seq[SortOrder]): Ordering[Row] = {
+ protected def create(ordering: Seq[SortOrder]): Ordering[catalyst.InternalRow] = {
val a = newTermName("a")
val b = newTermName("b")
val ctx = newCodeGenContext()
@@ -75,7 +76,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] wit
}.mkString("\n")
val code = s"""
- import org.apache.spark.sql.Row;
+ import org.apache.spark.sql.catalyst.InternalRow;
public SpecificOrdering generate($exprType[] expr) {
return new SpecificOrdering(expr);
@@ -90,8 +91,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] wit
}
@Override
- public int compare(Row a, Row b) {
- Row i = null; // Holds current row being evaluated.
+ public int compare(InternalRow a, InternalRow b) {
+ InternalRow i = null; // Holds current row being evaluated.
$comparisons
return 0;
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
index 4a547b5ce9..9e191dc2e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
@@ -17,30 +17,31 @@
package org.apache.spark.sql.catalyst.expressions.codegen
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions._
/**
* Interface for generated predicate
*/
abstract class Predicate {
- def eval(r: Row): Boolean
+ def eval(r: catalyst.InternalRow): Boolean
}
/**
- * Generates bytecode that evaluates a boolean [[Expression]] on a given input [[Row]].
+ * Generates bytecode that evaluates a boolean [[Expression]] on a given input [[InternalRow]].
*/
-object GeneratePredicate extends CodeGenerator[Expression, (Row) => Boolean] {
+object GeneratePredicate extends CodeGenerator[Expression, (catalyst.InternalRow) => Boolean] {
protected def canonicalize(in: Expression): Expression = ExpressionCanonicalizer.execute(in)
protected def bind(in: Expression, inputSchema: Seq[Attribute]): Expression =
BindReferences.bindReference(in, inputSchema)
- protected def create(predicate: Expression): ((Row) => Boolean) = {
+ protected def create(predicate: Expression): ((catalyst.InternalRow) => Boolean) = {
val ctx = newCodeGenContext()
val eval = predicate.gen(ctx)
val code = s"""
- import org.apache.spark.sql.Row;
+ import org.apache.spark.sql.catalyst.InternalRow;
public SpecificPredicate generate($exprType[] expr) {
return new SpecificPredicate(expr);
@@ -53,7 +54,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (Row) => Boolean] {
}
@Override
- public boolean eval(Row i) {
+ public boolean eval(InternalRow i) {
${eval.code}
return !${eval.isNull} && ${eval.primitive};
}
@@ -65,6 +66,6 @@ object GeneratePredicate extends CodeGenerator[Expression, (Row) => Boolean] {
// fetch the only one method `generate(Expression[])`
val m = c.getDeclaredMethods()(0)
val p = m.invoke(c.newInstance(), ctx.references.toArray).asInstanceOf[Predicate]
- (r: Row) => p.eval(r)
+ (r: catalyst.InternalRow) => p.eval(r)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
index 9b906c3ff5..8b5dc194be 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
@@ -27,9 +27,10 @@ import org.apache.spark.sql.types._
abstract class BaseProject extends Projection {}
/**
- * Generates bytecode that produces a new [[Row]] object based on a fixed set of input
- * [[Expression Expressions]] and a given input [[Row]]. The returned [[Row]] object is custom
- * generated based on the output types of the [[Expression]] to avoid boxing of primitive values.
+ * Generates bytecode that produces a new [[InternalRow]] object based on a fixed set of input
+ * [[Expression Expressions]] and a given input [[InternalRow]]. The returned [[InternalRow]]
+ * object is custom generated based on the output types of the [[Expression]] to avoid boxing of
+ * primitive values.
*/
object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
import scala.reflect.runtime.universe._
@@ -146,7 +147,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
}.mkString("\n")
val code = s"""
- import org.apache.spark.sql.Row;
+ import org.apache.spark.sql.catalyst.InternalRow;
public SpecificProjection generate($exprType[] expr) {
return new SpecificProjection(expr);
@@ -161,7 +162,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
@Override
public Object apply(Object r) {
- return new SpecificRow(expressions, (Row) r);
+ return new SpecificRow(expressions, (InternalRow) r);
}
}
@@ -169,7 +170,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
$columns
- public SpecificRow($exprType[] expressions, Row i) {
+ public SpecificRow($exprType[] expressions, InternalRow i) {
$initColumns
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
index 6398b8f9e4..a6913cc03c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.types._
@@ -41,7 +42,7 @@ case class CreateArray(children: Seq[Expression]) extends Expression {
override def nullable: Boolean = false
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
children.map(_.eval(input))
}
@@ -69,7 +70,7 @@ case class CreateStruct(children: Seq[NamedExpression]) extends Expression {
override def nullable: Boolean = false
- override def eval(input: Row): Any = {
- Row(children.map(_.eval(input)): _*)
+ override def eval(input: catalyst.InternalRow): Any = {
+ InternalRow(children.map(_.eval(input)): _*)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
index 72b9f23456..a119c31300 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.types.{BooleanType, DataType}
@@ -42,7 +43,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
override def dataType: DataType = trueValue.dataType
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
if (true == predicate.eval(input)) {
trueValue.eval(input)
} else {
@@ -137,7 +138,7 @@ case class CaseWhen(branches: Seq[Expression]) extends CaseWhenLike {
}
/** Written in imperative fashion for performance considerations. */
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val len = branchesArr.length
var i = 0
// If all branches fail and an elseVal is not provided, the whole statement
@@ -229,7 +230,7 @@ case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseW
}
/** Written in imperative fashion for performance considerations. */
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evaluatedKey = key.eval(input)
val len = branchesArr.length
var i = 0
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala
index 8ab6d977dd..de8b66bc3b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.types._
/** Return the unscaled Long value of a Decimal, assuming it fits in a Long */
@@ -28,7 +29,7 @@ case class UnscaledValue(child: Expression) extends UnaryExpression {
override def nullable: Boolean = child.nullable
override def toString: String = s"UnscaledValue($child)"
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val childResult = child.eval(input)
if (childResult == null) {
null
@@ -50,7 +51,7 @@ case class MakeDecimal(child: Expression, precision: Int, scale: Int) extends Un
override def nullable: Boolean = child.nullable
override def toString: String = s"MakeDecimal($child,$precision,$scale)"
- override def eval(input: Row): Decimal = {
+ override def eval(input: catalyst.InternalRow): Decimal = {
val childResult = child.eval(input)
if (childResult == null) {
null
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index b6191eafba..a80c255a29 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import scala.collection.Map
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
import org.apache.spark.sql.types._
@@ -53,13 +54,13 @@ abstract class Generator extends Expression {
def elementTypes: Seq[(DataType, Boolean)]
/** Should be implemented by child classes to perform specific Generators. */
- override def eval(input: Row): TraversableOnce[Row]
+ override def eval(input: catalyst.InternalRow): TraversableOnce[catalyst.InternalRow]
/**
* Notifies that there are no more rows to process, clean up code, and additional
* rows can be made here.
*/
- def terminate(): TraversableOnce[Row] = Nil
+ def terminate(): TraversableOnce[catalyst.InternalRow] = Nil
}
/**
@@ -67,22 +68,22 @@ abstract class Generator extends Expression {
*/
case class UserDefinedGenerator(
elementTypes: Seq[(DataType, Boolean)],
- function: Row => TraversableOnce[Row],
+ function: catalyst.InternalRow => TraversableOnce[catalyst.InternalRow],
children: Seq[Expression])
extends Generator {
@transient private[this] var inputRow: InterpretedProjection = _
- @transient private[this] var convertToScala: (Row) => Row = _
+ @transient private[this] var convertToScala: (catalyst.InternalRow) => catalyst.InternalRow = _
private def initializeConverters(): Unit = {
inputRow = new InterpretedProjection(children)
convertToScala = {
val inputSchema = StructType(children.map(e => StructField(e.simpleString, e.dataType, true)))
CatalystTypeConverters.createToScalaConverter(inputSchema)
- }.asInstanceOf[(Row => Row)]
+ }.asInstanceOf[(catalyst.InternalRow => catalyst.InternalRow)]
}
- override def eval(input: Row): TraversableOnce[Row] = {
+ override def eval(input: catalyst.InternalRow): TraversableOnce[catalyst.InternalRow] = {
if (inputRow == null) {
initializeConverters()
}
@@ -108,7 +109,7 @@ case class Explode(child: Expression)
case MapType(kt, vt, valueContainsNull) => (kt, false) :: (vt, valueContainsNull) :: Nil
}
- override def eval(input: Row): TraversableOnce[Row] = {
+ override def eval(input: catalyst.InternalRow): TraversableOnce[catalyst.InternalRow] = {
child.dataType match {
case ArrayType(_, _) =>
val inputArray = child.eval(input).asInstanceOf[Seq[Any]]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index a33007bda1..d8fff2b84d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Date, Timestamp}
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
@@ -87,7 +88,7 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres
case _ => false
}
- override def eval(input: Row): Any = value
+ override def eval(input: catalyst.InternalRow): Any = value
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
// change the isNull and primitive to consts, to inline them
@@ -142,9 +143,9 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres
case class MutableLiteral(var value: Any, dataType: DataType, nullable: Boolean = true)
extends LeafExpression {
- def update(expression: Expression, input: Row): Unit = {
+ def update(expression: Expression, input: catalyst.InternalRow): Unit = {
value = expression.eval(input)
}
- override def eval(input: Row): Any = value
+ override def eval(input: catalyst.InternalRow): Any = value
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
index 97e960b8d6..6f90d607dd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.types.{DataType, DoubleType}
@@ -34,7 +35,7 @@ abstract class LeafMathExpression(c: Double, name: String)
override def nullable: Boolean = false
override def toString: String = s"$name()"
- override def eval(input: Row): Any = c
+ override def eval(input: catalyst.InternalRow): Any = c
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
s"""
@@ -60,7 +61,7 @@ abstract class UnaryMathExpression(f: Double => Double, name: String)
override def nullable: Boolean = true
override def toString: String = s"$name($child)"
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE = child.eval(input)
if (evalE == null) {
null
@@ -103,7 +104,7 @@ abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String)
override def dataType: DataType = DoubleType
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE1 = left.eval(input)
if (evalE1 == null) {
null
@@ -215,7 +216,7 @@ case class ToRadians(child: Expression) extends UnaryMathExpression(math.toRadia
case class Atan2(left: Expression, right: Expression)
extends BinaryMathExpression(math.atan2, "ATAN2") {
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE1 = left.eval(input)
if (evalE1 == null) {
null
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 2e4b9ba678..20505129e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
@@ -114,7 +115,7 @@ case class Alias(child: Expression, name: String)(
// Alias(Generator, xx) need to be transformed into Generate(generator, ...)
override lazy val resolved = childrenResolved && !child.isInstanceOf[Generator]
- override def eval(input: Row): Any = child.eval(input)
+ override def eval(input: catalyst.InternalRow): Any = child.eval(input)
override def gen(ctx: CodeGenContext): GeneratedExpressionCode = child.gen(ctx)
@@ -230,7 +231,7 @@ case class AttributeReference(
}
// Unresolved attributes are transient at compile time and don't get evaluated during execution.
- override def eval(input: Row = null): Any =
+ override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString: String = s"$name#${exprId.id}$typeSuffix"
@@ -252,7 +253,7 @@ case class PrettyAttribute(name: String) extends Attribute with trees.LeafNode[E
override def withName(newName: String): Attribute = throw new UnsupportedOperationException
override def qualifiers: Seq[String] = throw new UnsupportedOperationException
override def exprId: ExprId = throw new UnsupportedOperationException
- override def eval(input: Row): Any = throw new UnsupportedOperationException
+ override def eval(input: catalyst.InternalRow): Any = throw new UnsupportedOperationException
override def nullable: Boolean = throw new UnsupportedOperationException
override def dataType: DataType = NullType
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
index c2d1a4eada..292d626f01 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
@@ -43,7 +44,7 @@ case class Coalesce(children: Seq[Expression]) extends Expression {
this, s"Coalesce cannot have children of different types. $childTypes")
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
var i = 0
var result: Any = null
val childIterator = children.iterator
@@ -77,7 +78,7 @@ case class IsNull(child: Expression) extends Predicate with trees.UnaryNode[Expr
override def foldable: Boolean = child.foldable
override def nullable: Boolean = false
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
child.eval(input) == null
}
@@ -96,7 +97,7 @@ case class IsNotNull(child: Expression) extends Predicate with trees.UnaryNode[E
override def nullable: Boolean = false
override def toString: String = s"IS NOT NULL $child"
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
child.eval(input) != null
}
@@ -118,7 +119,7 @@ case class AtLeastNNonNulls(n: Int, children: Seq[Expression]) extends Predicate
private[this] val childrenArray = children.toArray
- override def eval(input: Row): Boolean = {
+ override def eval(input: catalyst.InternalRow): Boolean = {
var numNonNulls = 0
var i = 0
while (i < childrenArray.length && numNonNulls < n) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index fbc97b2e75..c2e57b4715 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst
+import org.apache.spark.sql.catalyst
+
/**
* A set of classes that can be used to represent trees of relational expressions. A key goal of
* the expression library is to hide the details of naming and scoping from developers who want to
@@ -49,30 +51,30 @@ package org.apache.spark.sql.catalyst
*/
package object expressions {
- type Row = org.apache.spark.sql.Row
+ type InternalRow = catalyst.InternalRow
- val Row = org.apache.spark.sql.Row
+ val InternalRow = catalyst.InternalRow
/**
- * Converts a [[Row]] to another Row given a sequence of expression that define each column of the
- * new row. If the schema of the input row is specified, then the given expression will be bound
- * to that schema.
+ * Converts a [[InternalRow]] to another Row given a sequence of expression that define each
+ * column of the new row. If the schema of the input row is specified, then the given expression
+ * will be bound to that schema.
*/
- abstract class Projection extends (Row => Row)
+ abstract class Projection extends (InternalRow => InternalRow)
/**
- * Converts a [[Row]] to another Row given a sequence of expression that define each column of the
- * new row. If the schema of the input row is specified, then the given expression will be bound
- * to that schema.
+ * Converts a [[InternalRow]] to another Row given a sequence of expression that define each
+ * column of the new row. If the schema of the input row is specified, then the given expression
+ * will be bound to that schema.
*
* In contrast to a normal projection, a MutableProjection reuses the same underlying row object
* each time an input row is added. This significantly reduces the cost of calculating the
- * projection, but means that it is not safe to hold on to a reference to a [[Row]] after `next()`
- * has been called on the [[Iterator]] that produced it. Instead, the user must call `Row.copy()`
- * and hold on to the returned [[Row]] before calling `next()`.
+ * projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after
+ * `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call
+ * `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`.
*/
abstract class MutableProjection extends Projection {
- def currentValue: Row
+ def currentValue: InternalRow
/** Uses the given row to store the output of the projection. */
def target(row: MutableRow): MutableProjection
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 7574d1cbda..082d72eb43 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -24,11 +24,11 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types._
object InterpretedPredicate {
- def create(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
+ def create(expression: Expression, inputSchema: Seq[Attribute]): (InternalRow => Boolean) =
create(BindReferences.bindReference(expression, inputSchema))
- def create(expression: Expression): (Row => Boolean) = {
- (r: Row) => expression.eval(r).asInstanceOf[Boolean]
+ def create(expression: Expression): (InternalRow => Boolean) = {
+ (r: InternalRow) => expression.eval(r).asInstanceOf[Boolean]
}
}
@@ -77,7 +77,7 @@ case class Not(child: Expression) extends UnaryExpression with Predicate with Ex
override def expectedChildTypes: Seq[DataType] = Seq(BooleanType)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
child.eval(input) match {
case null => null
case b: Boolean => !b
@@ -98,7 +98,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate {
override def nullable: Boolean = true // TODO: Figure out correct nullability semantics of IN.
override def toString: String = s"$value IN ${list.mkString("(", ",", ")")}"
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val evaluatedValue = value.eval(input)
list.exists(e => e.eval(input) == evaluatedValue)
}
@@ -117,7 +117,7 @@ case class InSet(value: Expression, hset: Set[Any])
override def nullable: Boolean = true // TODO: Figure out correct nullability semantics of IN.
override def toString: String = s"$value INSET ${hset.mkString("(", ",", ")")}"
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
hset.contains(value.eval(input))
}
}
@@ -129,7 +129,7 @@ case class And(left: Expression, right: Expression)
override def symbol: String = "&&"
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val l = left.eval(input)
if (l == false) {
false
@@ -178,7 +178,7 @@ case class Or(left: Expression, right: Expression)
override def symbol: String = "||"
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val l = left.eval(input)
if (l == true) {
true
@@ -235,7 +235,7 @@ abstract class BinaryComparison extends BinaryExpression with Predicate {
protected def checkTypesInternal(t: DataType): TypeCheckResult
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val evalE1 = left.eval(input)
if (evalE1 == null) {
null
@@ -288,7 +288,7 @@ case class EqualNullSafe(left: Expression, right: Expression) extends BinaryComp
override protected def checkTypesInternal(t: DataType) = TypeCheckResult.TypeCheckSuccess
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == null && r == null) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
index 6e4e9cb1be..7e8033307e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
@@ -48,7 +48,7 @@ abstract class RDG(seed: Long) extends LeafExpression with Serializable {
/** Generate a random column with i.i.d. uniformly distributed values in [0, 1). */
case class Rand(seed: Long) extends RDG(seed) {
- override def eval(input: Row): Double = rng.nextDouble()
+ override def eval(input: InternalRow): Double = rng.nextDouble()
}
object Rand {
@@ -62,7 +62,7 @@ object Rand {
/** Generate a random column with i.i.d. gaussian random distribution. */
case class Randn(seed: Long) extends RDG(seed) {
- override def eval(input: Row): Double = rng.nextGaussian()
+ override def eval(input: InternalRow): Double = rng.nextGaussian()
}
object Randn {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 5d2d82077f..534dac1f92 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -21,10 +21,10 @@ import org.apache.spark.sql.types.{DataType, StructType, AtomicType}
import org.apache.spark.unsafe.types.UTF8String
/**
- * An extended interface to [[Row]] that allows the values for each column to be updated. Setting
- * a value through a primitive function implicitly marks that column as not null.
+ * An extended interface to [[InternalRow]] that allows the values for each column to be updated.
+ * Setting a value through a primitive function implicitly marks that column as not null.
*/
-trait MutableRow extends Row {
+trait MutableRow extends InternalRow {
def setNullAt(i: Int): Unit
def update(ordinal: Int, value: Any)
@@ -37,13 +37,12 @@ trait MutableRow extends Row {
def setByte(ordinal: Int, value: Byte)
def setFloat(ordinal: Int, value: Float)
def setString(ordinal: Int, value: String)
- // TODO(davies): add setDate() and setDecimal()
}
/**
* A row with no data. Calling any methods will result in an error. Can be used as a placeholder.
*/
-object EmptyRow extends Row {
+object EmptyRow extends InternalRow {
override def apply(i: Int): Any = throw new UnsupportedOperationException
override def toSeq: Seq[Any] = Seq.empty
override def length: Int = 0
@@ -57,7 +56,7 @@ object EmptyRow extends Row {
override def getByte(i: Int): Byte = throw new UnsupportedOperationException
override def getString(i: Int): String = throw new UnsupportedOperationException
override def getAs[T](i: Int): T = throw new UnsupportedOperationException
- override def copy(): Row = this
+ override def copy(): InternalRow = this
}
/**
@@ -65,7 +64,7 @@ object EmptyRow extends Row {
* the array is not copied, and thus could technically be mutated after creation, this is not
* allowed.
*/
-class GenericRow(protected[sql] val values: Array[Any]) extends Row {
+class GenericRow(protected[sql] val values: Array[Any]) extends InternalRow {
/** No-arg constructor for serialization. */
protected def this() = this(null)
@@ -154,7 +153,7 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {
}
override def equals(o: Any): Boolean = o match {
- case other: Row =>
+ case other: InternalRow =>
if (values.length != other.length) {
return false
}
@@ -174,7 +173,7 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {
case _ => false
}
- override def copy(): Row = this
+ override def copy(): InternalRow = this
}
class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
@@ -207,15 +206,15 @@ class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
override def update(ordinal: Int, value: Any): Unit = { values(ordinal) = value }
- override def copy(): Row = new GenericRow(values.clone())
+ override def copy(): InternalRow = new GenericRow(values.clone())
}
-class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
+class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(ordering.map(BindReferences.bindReference(_, inputSchema)))
- def compare(a: Row, b: Row): Int = {
+ def compare(a: InternalRow, b: InternalRow): Int = {
var i = 0
while (i < ordering.size) {
val order = ordering(i)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala
index 2bcb960e91..30e41677b7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.OpenHashSet
@@ -57,7 +57,7 @@ case class NewSet(elementType: DataType) extends LeafExpression {
override def dataType: OpenHashSetUDT = new OpenHashSetUDT(elementType)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
new OpenHashSet[Any]()
}
@@ -87,7 +87,7 @@ case class AddItemToSet(item: Expression, set: Expression) extends Expression {
override def dataType: OpenHashSetUDT = set.dataType.asInstanceOf[OpenHashSetUDT]
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val itemEval = item.eval(input)
val setEval = set.eval(input).asInstanceOf[OpenHashSet[Any]]
@@ -137,7 +137,7 @@ case class CombineSets(left: Expression, right: Expression) extends BinaryExpres
override def symbol: String = "++="
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val leftEval = left.eval(input).asInstanceOf[OpenHashSet[Any]]
if(leftEval != null) {
val rightEval = right.eval(input).asInstanceOf[OpenHashSet[Any]]
@@ -183,7 +183,7 @@ case class CountSet(child: Expression) extends UnaryExpression {
override def dataType: DataType = LongType
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val childEval = child.eval(input).asInstanceOf[OpenHashSet[Any]]
if (childEval != null) {
childEval.size.toLong
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
index 4f4c19526e..8ca8d22bc4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
@@ -49,7 +49,7 @@ trait StringRegexExpression extends ExpectsInputTypes {
protected def pattern(str: String) = if (cache == null) compile(str) else cache
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val l = left.eval(input)
if (l == null) {
null
@@ -121,7 +121,7 @@ trait CaseConversionExpression extends ExpectsInputTypes {
override def dataType: DataType = StringType
override def expectedChildTypes: Seq[DataType] = Seq(StringType)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val evaluated = child.eval(input)
if (evaluated == null) {
null
@@ -169,7 +169,7 @@ trait StringComparison extends ExpectsInputTypes {
override def expectedChildTypes: Seq[DataType] = Seq(StringType, StringType)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val leftEval = left.eval(input)
if(leftEval == null) {
null
@@ -262,7 +262,7 @@ case class Substring(str: Expression, pos: Expression, len: Expression)
(start, end)
}
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val string = str.eval(input)
val po = pos.eval(input)
val ln = len.eval(input)
@@ -303,7 +303,7 @@ case class StringLength(child: Expression) extends UnaryExpression with ExpectsI
override def dataType: DataType = IntegerType
override def expectedChildTypes: Seq[DataType] = Seq(StringType)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val string = child.eval(input)
if (string == null) null else string.asInstanceOf[UTF8String].length
}
@@ -314,5 +314,3 @@ case class StringLength(child: Expression) extends UnaryExpression with ExpectsI
defineCodeGen(ctx, ev, c => s"($c).length()")
}
}
-
-
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 82c4d462cc..056f170539 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -74,7 +74,7 @@ case class WindowSpecDefinition(
override def toString: String = simpleString
- override def eval(input: Row): Any = throw new UnsupportedOperationException
+ override def eval(input: InternalRow): Any = throw new UnsupportedOperationException
override def nullable: Boolean = true
override def foldable: Boolean = false
override def dataType: DataType = throw new UnsupportedOperationException
@@ -259,7 +259,7 @@ trait WindowFunction extends Expression {
def reset(): Unit
- def prepareInputParameters(input: Row): AnyRef
+ def prepareInputParameters(input: InternalRow): AnyRef
def update(input: AnyRef): Unit
@@ -286,7 +286,7 @@ case class UnresolvedWindowFunction(
throw new UnresolvedException(this, "init")
override def reset(): Unit =
throw new UnresolvedException(this, "reset")
- override def prepareInputParameters(input: Row): AnyRef =
+ override def prepareInputParameters(input: InternalRow): AnyRef =
throw new UnresolvedException(this, "prepareInputParameters")
override def update(input: AnyRef): Unit =
throw new UnresolvedException(this, "update")
@@ -297,7 +297,7 @@ case class UnresolvedWindowFunction(
override def get(index: Int): Any =
throw new UnresolvedException(this, "get")
// Unresolved functions are transient at compile time and don't get evaluated during execution.
- override def eval(input: Row = null): Any =
+ override def eval(input: InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString: String = s"'$name(${children.mkString(",")})"
@@ -316,7 +316,7 @@ case class UnresolvedWindowExpression(
override lazy val resolved = false
// Unresolved functions are transient at compile time and don't get evaluated during execution.
- override def eval(input: Row = null): Any =
+ override def eval(input: InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
@@ -327,7 +327,7 @@ case class WindowExpression(
override def children: Seq[Expression] =
windowFunction :: windowSpec :: Nil
- override def eval(input: Row): Any =
+ override def eval(input: InternalRow): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def dataType: DataType = windowFunction.dataType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index e3e070f0ff..2c946cd12f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -17,10 +17,9 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, analysis}
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.types.{StructType, StructField}
+import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters, analysis}
+import org.apache.spark.sql.types.{StructField, StructType}
object LocalRelation {
def apply(output: Attribute*): LocalRelation = new LocalRelation(output)
@@ -32,11 +31,11 @@ object LocalRelation {
def fromProduct(output: Seq[Attribute], data: Seq[Product]): LocalRelation = {
val schema = StructType.fromAttributes(output)
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
- LocalRelation(output, data.map(converter(_).asInstanceOf[Row]))
+ LocalRelation(output, data.map(converter(_).asInstanceOf[InternalRow]))
}
}
-case class LocalRelation(output: Seq[Attribute], data: Seq[Row] = Nil)
+case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
extends LeafNode with analysis.MultiInstanceRelation {
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 80ba57a082..42dead7c28 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.catalyst.plans.physical
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.{Expression, Row, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
import org.apache.spark.sql.types.{DataType, IntegerType}
/**
@@ -169,7 +170,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
override def keyExpressions: Seq[Expression] = expressions
- override def eval(input: Row = null): Any =
+ override def eval(input: InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
@@ -213,6 +214,6 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
override def keyExpressions: Seq[Expression] = ordering.map(_.child)
- override def eval(input: Row): Any =
+ override def eval(input: InternalRow): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index 9a24b23024..b4d5e013f3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -21,7 +21,7 @@ import java.math.BigInteger
import java.sql.{Date, Timestamp}
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
case class PrimitiveData(
@@ -257,7 +257,7 @@ class ScalaReflectionSuite extends SparkFunSuite {
test("convert PrimitiveData to catalyst") {
val data = PrimitiveData(1, 1, 1, 1, 1, 1, true)
- val convertedData = Row(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true)
+ val convertedData = InternalRow(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true)
val dataType = schemaFor[PrimitiveData].dataType
assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData)
}
@@ -267,8 +267,8 @@ class ScalaReflectionSuite extends SparkFunSuite {
val data = OptionalData(Some(2), Some(2), Some(2), Some(2), Some(2), Some(2), Some(true),
Some(primitiveData))
val dataType = schemaFor[OptionalData].dataType
- val convertedData = Row(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true,
- Row(1, 1, 1, 1, 1, 1, true))
+ val convertedData = InternalRow(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true,
+ InternalRow(1, 1, 1, 1, 1, 1, true))
assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
index 969c6cc15f..e407f6f166 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -437,14 +437,14 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
test("cast from struct") {
val struct = Literal.create(
- Row("123", "abc", "", null),
+ InternalRow("123", "abc", "", null),
StructType(Seq(
StructField("a", StringType, nullable = true),
StructField("b", StringType, nullable = true),
StructField("c", StringType, nullable = true),
StructField("d", StringType, nullable = true))))
val struct_notNull = Literal.create(
- Row("123", "abc", ""),
+ InternalRow("123", "abc", ""),
StructType(Seq(
StructField("a", StringType, nullable = false),
StructField("b", StringType, nullable = false),
@@ -457,7 +457,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("c", IntegerType, nullable = true),
StructField("d", IntegerType, nullable = true))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(123, null, null, null))
+ checkEvaluation(ret, InternalRow(123, null, null, null))
}
{
val ret = cast(struct, StructType(Seq(
@@ -474,7 +474,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("c", BooleanType, nullable = true),
StructField("d", BooleanType, nullable = true))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(true, true, false, null))
+ checkEvaluation(ret, InternalRow(true, true, false, null))
}
{
val ret = cast(struct, StructType(Seq(
@@ -491,7 +491,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("b", IntegerType, nullable = true),
StructField("c", IntegerType, nullable = true))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(123, null, null))
+ checkEvaluation(ret, InternalRow(123, null, null))
}
{
val ret = cast(struct_notNull, StructType(Seq(
@@ -506,7 +506,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("b", BooleanType, nullable = true),
StructField("c", BooleanType, nullable = true))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(true, true, false))
+ checkEvaluation(ret, InternalRow(true, true, false))
}
{
val ret = cast(struct_notNull, StructType(Seq(
@@ -514,7 +514,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("b", BooleanType, nullable = true),
StructField("c", BooleanType, nullable = false))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(true, true, false))
+ checkEvaluation(ret, InternalRow(true, true, false))
}
{
@@ -532,10 +532,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
test("complex casting") {
val complex = Literal.create(
- Row(
+ InternalRow(
Seq("123", "abc", ""),
Map("a" -> "123", "b" -> "abc", "c" -> ""),
- Row(0)),
+ InternalRow(0)),
StructType(Seq(
StructField("a",
ArrayType(StringType, containsNull = false), nullable = true),
@@ -555,10 +555,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("l", LongType, nullable = true)))))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(
+ checkEvaluation(ret, InternalRow(
Seq(123, null, null),
Map("a" -> true, "b" -> true, "c" -> false),
- Row(0L)))
+ InternalRow(0L)))
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
index bcc594cb7c..2b0f4618b2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
@@ -27,10 +27,10 @@ import org.apache.spark.unsafe.types.UTF8String
class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
test("CreateStruct") {
- val row = Row(1, 2, 3)
+ val row = InternalRow(1, 2, 3)
val c1 = 'a.int.at(0).as("a")
val c3 = 'c.int.at(2).as("c")
- checkEvaluation(CreateStruct(Seq(c1, c3)), Row(1, 3), row)
+ checkEvaluation(CreateStruct(Seq(c1, c3)), InternalRow(1, 3), row)
}
test("complex type") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 4a241d3603..12d2da8b33 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -32,26 +32,26 @@ import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
trait ExpressionEvalHelper {
self: SparkFunSuite =>
- protected def create_row(values: Any*): Row = {
+ protected def create_row(values: Any*): InternalRow = {
new GenericRow(values.map(CatalystTypeConverters.convertToCatalyst).toArray)
}
protected def checkEvaluation(
- expression: Expression, expected: Any, inputRow: Row = EmptyRow): Unit = {
+ expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
checkEvaluationWithoutCodegen(expression, expected, inputRow)
checkEvaluationWithGeneratedMutableProjection(expression, expected, inputRow)
checkEvaluationWithGeneratedProjection(expression, expected, inputRow)
checkEvaluationWithOptimization(expression, expected, inputRow)
}
- protected def evaluate(expression: Expression, inputRow: Row = EmptyRow): Any = {
+ protected def evaluate(expression: Expression, inputRow: InternalRow = EmptyRow): Any = {
expression.eval(inputRow)
}
protected def checkEvaluationWithoutCodegen(
expression: Expression,
expected: Any,
- inputRow: Row = EmptyRow): Unit = {
+ inputRow: InternalRow = EmptyRow): Unit = {
val actual = try evaluate(expression, inputRow) catch {
case e: Exception => fail(s"Exception evaluating $expression", e)
}
@@ -66,7 +66,7 @@ trait ExpressionEvalHelper {
protected def checkEvaluationWithGeneratedMutableProjection(
expression: Expression,
expected: Any,
- inputRow: Row = EmptyRow): Unit = {
+ inputRow: InternalRow = EmptyRow): Unit = {
val plan = try {
GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)()
@@ -92,7 +92,7 @@ trait ExpressionEvalHelper {
protected def checkEvaluationWithGeneratedProjection(
expression: Expression,
expected: Any,
- inputRow: Row = EmptyRow): Unit = {
+ inputRow: InternalRow = EmptyRow): Unit = {
val ctx = GenerateProjection.newCodeGenContext()
lazy val evaluated = expression.gen(ctx)
@@ -128,7 +128,7 @@ trait ExpressionEvalHelper {
protected def checkEvaluationWithOptimization(
expression: Expression,
expected: Any,
- inputRow: Row = EmptyRow): Unit = {
+ inputRow: InternalRow = EmptyRow): Unit = {
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
val optimizedPlan = DefaultOptimizer.execute(plan)
checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow)
@@ -137,7 +137,7 @@ trait ExpressionEvalHelper {
protected def checkDoubleEvaluation(
expression: Expression,
expected: Spread[Double],
- inputRow: Row = EmptyRow): Unit = {
+ inputRow: InternalRow = EmptyRow): Unit = {
val actual = try evaluate(expression, inputRow) catch {
case e: Exception => fail(s"Exception evaluating $expression", e)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
index 72bbc4efeb..7aae2bbd8a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
@@ -37,7 +37,7 @@ class UnsafeFixedWidthAggregationMapSuite
private val groupKeySchema = StructType(StructField("product", StringType) :: Nil)
private val aggBufferSchema = StructType(StructField("salePrice", IntegerType) :: Nil)
- private def emptyAggregationBuffer: Row = new GenericRow(Array[Any](0))
+ private def emptyAggregationBuffer: InternalRow = new GenericRow(Array[Any](0))
private var memoryManager: TaskMemoryManager = null
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
index 61722f1ffa..577c7a0de0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
@@ -86,7 +86,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
DoubleType)
val converter = new UnsafeRowConverter(fieldTypes)
- val rowWithAllNullColumns: Row = {
+ val rowWithAllNullColumns: InternalRow = {
val r = new SpecificMutableRow(fieldTypes)
for (i <- 0 to fieldTypes.length - 1) {
r.setNullAt(i)
@@ -117,7 +117,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
// If we have an UnsafeRow with columns that are initially non-null and we null out those
// columns, then the serialized row representation should be identical to what we would get by
// creating an entirely null row via the converter
- val rowWithNoNullColumns: Row = {
+ val rowWithNoNullColumns: InternalRow = {
val r = new SpecificMutableRow(fieldTypes)
r.setNullAt(0)
r.setBoolean(1, false)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
index 6841bd9890..54e8c6462e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
@@ -17,10 +17,10 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -37,13 +37,11 @@ class ConvertToLocalRelationSuite extends PlanTest {
test("Project on LocalRelation should be turned into a single LocalRelation") {
val testRelation = LocalRelation(
LocalRelation('a.int, 'b.int).output,
- Row(1, 2) ::
- Row(4, 5) :: Nil)
+ InternalRow(1, 2) :: InternalRow(4, 5) :: Nil)
val correctAnswer = LocalRelation(
LocalRelation('a1.int, 'b1.int).output,
- Row(1, 3) ::
- Row(4, 6) :: Nil)
+ InternalRow(1, 3) :: InternalRow(4, 6) :: Nil)
val projectOnLocal = testRelation.select(
UnresolvedAttribute("a").as("a1"),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index 8ec79c3d4d..bda217935c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -28,7 +28,7 @@ case class Dummy(optKey: Option[Expression]) extends Expression {
override def nullable: Boolean = true
override def dataType: NullType = NullType
override lazy val resolved = true
- override def eval(input: Row): Any = null.asInstanceOf[Any]
+ override def eval(input: InternalRow): Any = null.asInstanceOf[Any]
}
case class ComplexPlan(exprs: Seq[Seq[Expression]])
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
index a4245545ff..4d8fe4ac5e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
@@ -21,7 +21,6 @@ import java.sql.Timestamp
import org.apache.spark.SparkFunSuite
-
class DateUtilsSuite extends SparkFunSuite {
test("timestamp") {