aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-07-26 23:01:04 -0700
committerReynold Xin <rxin@databricks.com>2015-07-26 23:01:04 -0700
commitaa80c64fcf9626b3720ee000a653db9266b74839 (patch)
treef9eca6289b36785a26fa8a586d04643bf9887b51 /sql
parent945d8bcbf67032edd7bdd201cf9f88c75b3464f7 (diff)
downloadspark-aa80c64fcf9626b3720ee000a653db9266b74839.tar.gz
spark-aa80c64fcf9626b3720ee000a653db9266b74839.tar.bz2
spark-aa80c64fcf9626b3720ee000a653db9266b74839.zip
[SPARK-9368][SQL] Support get(ordinal, dataType) generic getter in UnsafeRow.
Author: Reynold Xin <rxin@databricks.com> Closes #7682 from rxin/unsaferow-generic-getter and squashes the following commits: 3063788 [Reynold Xin] Reset the change for real this time. 0f57c55 [Reynold Xin] Reset the changes in ExpressionEvalHelper. fb6ca30 [Reynold Xin] Support BinaryType. 24a3e46 [Reynold Xin] Added support for DateType/TimestampType. 9989064 [Reynold Xin] JoinedRow. 11f80a3 [Reynold Xin] [SPARK-9368][SQL] Support get(ordinal, dataType) generic getter in UnsafeRow.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java52
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala4
6 files changed, 58 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 87e5a89c19..0fb33dd5a1 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -24,7 +24,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
@@ -236,6 +236,41 @@ public final class UnsafeRow extends MutableRow {
}
@Override
+ public Object get(int ordinal, DataType dataType) {
+ if (dataType instanceof NullType) {
+ return null;
+ } else if (dataType instanceof BooleanType) {
+ return getBoolean(ordinal);
+ } else if (dataType instanceof ByteType) {
+ return getByte(ordinal);
+ } else if (dataType instanceof ShortType) {
+ return getShort(ordinal);
+ } else if (dataType instanceof IntegerType) {
+ return getInt(ordinal);
+ } else if (dataType instanceof LongType) {
+ return getLong(ordinal);
+ } else if (dataType instanceof FloatType) {
+ return getFloat(ordinal);
+ } else if (dataType instanceof DoubleType) {
+ return getDouble(ordinal);
+ } else if (dataType instanceof DecimalType) {
+ return getDecimal(ordinal);
+ } else if (dataType instanceof DateType) {
+ return getInt(ordinal);
+ } else if (dataType instanceof TimestampType) {
+ return getLong(ordinal);
+ } else if (dataType instanceof BinaryType) {
+ return getBinary(ordinal);
+ } else if (dataType instanceof StringType) {
+ return getUTF8String(ordinal);
+ } else if (dataType instanceof StructType) {
+ return getStruct(ordinal, ((StructType) dataType).size());
+ } else {
+ throw new UnsupportedOperationException("Unsupported data type " + dataType.simpleString());
+ }
+ }
+
+ @Override
public boolean isNullAt(int ordinal) {
assertIndexIsValid(ordinal);
return BitSetMethods.isSet(baseObject, baseOffset, ordinal);
@@ -436,4 +471,19 @@ public final class UnsafeRow extends MutableRow {
public boolean anyNull() {
return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes / 8);
}
+
+ /**
+ * Writes the content of this row into a memory address, identified by an object and an offset.
+ * The target memory address must already been allocated, and have enough space to hold all the
+ * bytes in this string.
+ */
+ public void writeToMemory(Object target, long targetOffset) {
+ PlatformDependent.copyMemory(
+ baseObject,
+ baseOffset,
+ target,
+ targetOffset,
+ sizeInBytes
+ );
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
index 385d967138..ad3977281d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -30,11 +30,11 @@ abstract class InternalRow extends Serializable {
def numFields: Int
- def get(ordinal: Int): Any
+ def get(ordinal: Int): Any = get(ordinal, null)
def genericGet(ordinal: Int): Any = get(ordinal, null)
- def get(ordinal: Int, dataType: DataType): Any = get(ordinal)
+ def get(ordinal: Int, dataType: DataType): Any
def getAs[T](ordinal: Int, dataType: DataType): T = get(ordinal, dataType).asInstanceOf[T]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index cc89d74146..27d6ff587a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -198,7 +198,7 @@ class JoinedRow extends InternalRow {
if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields)
}
- override def get(i: Int): Any =
+ override def get(i: Int, dataType: DataType): Any =
if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
override def isNullAt(i: Int): Boolean =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
index 5953a093dc..b877ce47c0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
@@ -219,7 +219,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
values(i).isNull = true
}
- override def get(i: Int): Any = values(i).boxed
+ override def get(i: Int, dataType: DataType): Any = values(i).boxed
override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
values(ordinal).boxed.asInstanceOf[InternalRow]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
index a361b216eb..3592014710 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
@@ -183,7 +183,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
public void setNullAt(int i) { nullBits[i] = true; }
public boolean isNullAt(int i) { return nullBits[i]; }
- public Object get(int i) {
+ public Object get(int i, ${classOf[DataType].getName} dataType) {
if (isNullAt(i)) return null;
switch (i) {
$getCases
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index daeabe8e90..b7c4ece4a1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -99,7 +99,7 @@ class GenericInternalRow(protected[sql] val values: Array[Any]) extends Internal
override def numFields: Int = values.length
- override def get(i: Int): Any = values(i)
+ override def get(i: Int, dataType: DataType): Any = values(i)
override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
values(ordinal).asInstanceOf[InternalRow]
@@ -130,7 +130,7 @@ class GenericMutableRow(val values: Array[Any]) extends MutableRow {
override def numFields: Int = values.length
- override def get(i: Int): Any = values(i)
+ override def get(i: Int, dataType: DataType): Any = values(i)
override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
values(ordinal).asInstanceOf[InternalRow]