From 0c0e09f567deb775ee378f5385a16884f68b332d Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Thu, 9 Oct 2014 14:59:03 -0700 Subject: [SPARK-3412][SQL]add missing row api chenghao-intel assigned this to me, check PR #2284 for previous discussion Author: Daoyuan Wang Closes #2529 from adrian-wang/rowapi and squashes the following commits: c6594b2 [Daoyuan Wang] using boxed 7b7e6e3 [Daoyuan Wang] update pattern match 7a39456 [Daoyuan Wang] rename file and refresh getAs[T] 4c18c29 [Daoyuan Wang] remove setAs[T] and null judge 1614493 [Daoyuan Wang] add missing row api --- .../sql/catalyst/expressions/Projection.scala | 15 + .../spark/sql/catalyst/expressions/Row.scala | 20 +- .../catalyst/expressions/SpecificMutableRow.scala | 313 +++++++++++++++++++++ .../sql/catalyst/expressions/SpecificRow.scala | 309 -------------------- 4 files changed, 339 insertions(+), 318 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index ef1d12531f..204904ecf0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -137,6 +137,9 @@ class JoinedRow extends Row { def getString(i: Int): String = if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) + override def getAs[T](i: Int): T = + if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) + def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) @@ -226,6 +229,9 @@ class JoinedRow2 extends Row { def getString(i: Int): String = if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) + override def getAs[T](i: Int): T = + if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) + def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) @@ -309,6 +315,9 @@ class JoinedRow3 extends Row { def getString(i: Int): String = if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) + override def getAs[T](i: Int): T = + if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) + def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) @@ -392,6 +401,9 @@ class JoinedRow4 extends Row { def getString(i: Int): String = if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) + override def getAs[T](i: Int): T = + if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) + def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) @@ -475,6 +487,9 @@ class JoinedRow5 extends Row { def getString(i: Int): String = if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) + override def getAs[T](i: Int): T = + if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) + def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala index d68a4fabea..d00ec39774 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala @@ -64,6 +64,7 @@ trait Row extends Seq[Any] with Serializable { def getShort(i: Int): Short def getByte(i: Int): Byte def getString(i: Int): String + def getAs[T](i: Int): T = apply(i).asInstanceOf[T] override def toString() = s"[${this.mkString(",")}]" @@ -118,6 +119,7 @@ object EmptyRow extends Row { def getShort(i: Int): Short = throw new UnsupportedOperationException def getByte(i: Int): Byte = throw new UnsupportedOperationException def getString(i: Int): String = throw new UnsupportedOperationException + override def getAs[T](i: Int): T = throw new UnsupportedOperationException def copy() = this } @@ -217,19 +219,19 @@ class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow { /** No-arg constructor for serialization. */ def this() = this(0) - override def setBoolean(ordinal: Int,value: Boolean): Unit = { values(ordinal) = value } - override def setByte(ordinal: Int,value: Byte): Unit = { values(ordinal) = value } - override def setDouble(ordinal: Int,value: Double): Unit = { values(ordinal) = value } - override def setFloat(ordinal: Int,value: Float): Unit = { values(ordinal) = value } - override def setInt(ordinal: Int,value: Int): Unit = { values(ordinal) = value } - override def setLong(ordinal: Int,value: Long): Unit = { values(ordinal) = value } - override def setString(ordinal: Int,value: String): Unit = { values(ordinal) = value } + override def setBoolean(ordinal: Int, value: Boolean): Unit = { values(ordinal) = value } + override def setByte(ordinal: Int, value: Byte): Unit = { values(ordinal) = value } + override def setDouble(ordinal: Int, value: Double): Unit = { values(ordinal) = value } + override def setFloat(ordinal: Int, value: Float): Unit = { values(ordinal) = value } + override def setInt(ordinal: Int, value: Int): Unit = { values(ordinal) = value } + override def setLong(ordinal: Int, value: Long): Unit = { values(ordinal) = value } + override def setString(ordinal: Int, value: String): Unit = { values(ordinal) = value } override def setNullAt(i: Int): Unit = { values(i) = null } - override def setShort(ordinal: Int,value: Short): Unit = { values(ordinal) = value } + override def setShort(ordinal: Int, value: Short): Unit = { values(ordinal) = value } - override def update(ordinal: Int,value: Any): Unit = { values(ordinal) = value } + override def update(ordinal: Int, value: Any): Unit = { values(ordinal) = value } override def copy() = new GenericRow(values.clone()) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala new file mode 100644 index 0000000000..570379c533 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.types._ + +/** + * A parent class for mutable container objects that are reused when the values are changed, + * resulting in less garbage. These values are held by a [[SpecificMutableRow]]. + * + * The following code was roughly used to generate these objects: + * {{{ + * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",") + * types.map {tpe => + * s""" + * final class Mutable$tpe extends MutableValue { + * var value: $tpe = 0 + * def boxed = if (isNull) null else value + * def update(v: Any) = value = { + * isNull = false + * v.asInstanceOf[$tpe] + * } + * def copy() = { + * val newCopy = new Mutable$tpe + * newCopy.isNull = isNull + * newCopy.value = value + * newCopy.asInstanceOf[this.type] + * } + * }""" + * }.foreach(println) + * + * types.map { tpe => + * s""" + * override def set$tpe(ordinal: Int, value: $tpe): Unit = { + * val currentValue = values(ordinal).asInstanceOf[Mutable$tpe] + * currentValue.isNull = false + * currentValue.value = value + * } + * + * override def get$tpe(i: Int): $tpe = { + * values(i).asInstanceOf[Mutable$tpe].value + * }""" + * }.foreach(println) + * }}} + */ +abstract class MutableValue extends Serializable { + var isNull: Boolean = true + def boxed: Any + def update(v: Any) + def copy(): this.type +} + +final class MutableInt extends MutableValue { + var value: Int = 0 + def boxed = if (isNull) null else value + def update(v: Any) = value = { + isNull = false + v.asInstanceOf[Int] + } + def copy() = { + val newCopy = new MutableInt + newCopy.isNull = isNull + newCopy.value = value + newCopy.asInstanceOf[this.type] + } +} + +final class MutableFloat extends MutableValue { + var value: Float = 0 + def boxed = if (isNull) null else value + def update(v: Any) = value = { + isNull = false + v.asInstanceOf[Float] + } + def copy() = { + val newCopy = new MutableFloat + newCopy.isNull = isNull + newCopy.value = value + newCopy.asInstanceOf[this.type] + } +} + +final class MutableBoolean extends MutableValue { + var value: Boolean = false + def boxed = if (isNull) null else value + def update(v: Any) = value = { + isNull = false + v.asInstanceOf[Boolean] + } + def copy() = { + val newCopy = new MutableBoolean + newCopy.isNull = isNull + newCopy.value = value + newCopy.asInstanceOf[this.type] + } +} + +final class MutableDouble extends MutableValue { + var value: Double = 0 + def boxed = if (isNull) null else value + def update(v: Any) = value = { + isNull = false + v.asInstanceOf[Double] + } + def copy() = { + val newCopy = new MutableDouble + newCopy.isNull = isNull + newCopy.value = value + newCopy.asInstanceOf[this.type] + } +} + +final class MutableShort extends MutableValue { + var value: Short = 0 + def boxed = if (isNull) null else value + def update(v: Any) = value = { + isNull = false + v.asInstanceOf[Short] + } + def copy() = { + val newCopy = new MutableShort + newCopy.isNull = isNull + newCopy.value = value + newCopy.asInstanceOf[this.type] + } +} + +final class MutableLong extends MutableValue { + var value: Long = 0 + def boxed = if (isNull) null else value + def update(v: Any) = value = { + isNull = false + v.asInstanceOf[Long] + } + def copy() = { + val newCopy = new MutableLong + newCopy.isNull = isNull + newCopy.value = value + newCopy.asInstanceOf[this.type] + } +} + +final class MutableByte extends MutableValue { + var value: Byte = 0 + def boxed = if (isNull) null else value + def update(v: Any) = value = { + isNull = false + v.asInstanceOf[Byte] + } + def copy() = { + val newCopy = new MutableByte + newCopy.isNull = isNull + newCopy.value = value + newCopy.asInstanceOf[this.type] + } +} + +final class MutableAny extends MutableValue { + var value: Any = _ + def boxed = if (isNull) null else value + def update(v: Any) = value = { + isNull = false + v.asInstanceOf[Any] + } + def copy() = { + val newCopy = new MutableAny + newCopy.isNull = isNull + newCopy.value = value + newCopy.asInstanceOf[this.type] + } +} + +/** + * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen + * based on the dataTypes of each column. The intent is to decrease garbage when modifying the + * values of primitive columns. + */ +final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow { + + def this(dataTypes: Seq[DataType]) = + this( + dataTypes.map { + case IntegerType => new MutableInt + case ByteType => new MutableByte + case FloatType => new MutableFloat + case ShortType => new MutableShort + case DoubleType => new MutableDouble + case BooleanType => new MutableBoolean + case LongType => new MutableLong + case _ => new MutableAny + }.toArray) + + def this() = this(Seq.empty) + + override def length: Int = values.length + + override def setNullAt(i: Int): Unit = { + values(i).isNull = true + } + + override def apply(i: Int): Any = values(i).boxed + + override def isNullAt(i: Int): Boolean = values(i).isNull + + override def copy(): Row = { + val newValues = new Array[MutableValue](values.length) + var i = 0 + while (i < values.length) { + newValues(i) = values(i).copy() + i += 1 + } + new SpecificMutableRow(newValues) + } + + override def update(ordinal: Int, value: Any): Unit = { + if (value == null) setNullAt(ordinal) else values(ordinal).update(value) + } + + override def iterator: Iterator[Any] = values.map(_.boxed).iterator + + override def setString(ordinal: Int, value: String) = update(ordinal, value) + + override def getString(ordinal: Int) = apply(ordinal).asInstanceOf[String] + + override def setInt(ordinal: Int, value: Int): Unit = { + val currentValue = values(ordinal).asInstanceOf[MutableInt] + currentValue.isNull = false + currentValue.value = value + } + + override def getInt(i: Int): Int = { + values(i).asInstanceOf[MutableInt].value + } + + override def setFloat(ordinal: Int, value: Float): Unit = { + val currentValue = values(ordinal).asInstanceOf[MutableFloat] + currentValue.isNull = false + currentValue.value = value + } + + override def getFloat(i: Int): Float = { + values(i).asInstanceOf[MutableFloat].value + } + + override def setBoolean(ordinal: Int, value: Boolean): Unit = { + val currentValue = values(ordinal).asInstanceOf[MutableBoolean] + currentValue.isNull = false + currentValue.value = value + } + + override def getBoolean(i: Int): Boolean = { + values(i).asInstanceOf[MutableBoolean].value + } + + override def setDouble(ordinal: Int, value: Double): Unit = { + val currentValue = values(ordinal).asInstanceOf[MutableDouble] + currentValue.isNull = false + currentValue.value = value + } + + override def getDouble(i: Int): Double = { + values(i).asInstanceOf[MutableDouble].value + } + + override def setShort(ordinal: Int, value: Short): Unit = { + val currentValue = values(ordinal).asInstanceOf[MutableShort] + currentValue.isNull = false + currentValue.value = value + } + + override def getShort(i: Int): Short = { + values(i).asInstanceOf[MutableShort].value + } + + override def setLong(ordinal: Int, value: Long): Unit = { + val currentValue = values(ordinal).asInstanceOf[MutableLong] + currentValue.isNull = false + currentValue.value = value + } + + override def getLong(i: Int): Long = { + values(i).asInstanceOf[MutableLong].value + } + + override def setByte(ordinal: Int, value: Byte): Unit = { + val currentValue = values(ordinal).asInstanceOf[MutableByte] + currentValue.isNull = false + currentValue.value = value + } + + override def getByte(i: Int): Byte = { + values(i).asInstanceOf[MutableByte].value + } + + override def getAs[T](i: Int): T = { + values(i).boxed.asInstanceOf[T] + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala deleted file mode 100644 index 9cbab3d5d0..0000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions - -import org.apache.spark.sql.catalyst.types._ - -/** - * A parent class for mutable container objects that are reused when the values are changed, - * resulting in less garbage. These values are held by a [[SpecificMutableRow]]. - * - * The following code was roughly used to generate these objects: - * {{{ - * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",") - * types.map {tpe => - * s""" - * final class Mutable$tpe extends MutableValue { - * var value: $tpe = 0 - * def boxed = if (isNull) null else value - * def update(v: Any) = value = { - * isNull = false - * v.asInstanceOf[$tpe] - * } - * def copy() = { - * val newCopy = new Mutable$tpe - * newCopy.isNull = isNull - * newCopy.value = value - * newCopy.asInstanceOf[this.type] - * } - * }""" - * }.foreach(println) - * - * types.map { tpe => - * s""" - * override def set$tpe(ordinal: Int, value: $tpe): Unit = { - * val currentValue = values(ordinal).asInstanceOf[Mutable$tpe] - * currentValue.isNull = false - * currentValue.value = value - * } - * - * override def get$tpe(i: Int): $tpe = { - * values(i).asInstanceOf[Mutable$tpe].value - * }""" - * }.foreach(println) - * }}} - */ -abstract class MutableValue extends Serializable { - var isNull: Boolean = true - def boxed: Any - def update(v: Any) - def copy(): this.type -} - -final class MutableInt extends MutableValue { - var value: Int = 0 - def boxed = if (isNull) null else value - def update(v: Any) = value = { - isNull = false - v.asInstanceOf[Int] - } - def copy() = { - val newCopy = new MutableInt - newCopy.isNull = isNull - newCopy.value = value - newCopy.asInstanceOf[this.type] - } -} - -final class MutableFloat extends MutableValue { - var value: Float = 0 - def boxed = if (isNull) null else value - def update(v: Any) = value = { - isNull = false - v.asInstanceOf[Float] - } - def copy() = { - val newCopy = new MutableFloat - newCopy.isNull = isNull - newCopy.value = value - newCopy.asInstanceOf[this.type] - } -} - -final class MutableBoolean extends MutableValue { - var value: Boolean = false - def boxed = if (isNull) null else value - def update(v: Any) = value = { - isNull = false - v.asInstanceOf[Boolean] - } - def copy() = { - val newCopy = new MutableBoolean - newCopy.isNull = isNull - newCopy.value = value - newCopy.asInstanceOf[this.type] - } -} - -final class MutableDouble extends MutableValue { - var value: Double = 0 - def boxed = if (isNull) null else value - def update(v: Any) = value = { - isNull = false - v.asInstanceOf[Double] - } - def copy() = { - val newCopy = new MutableDouble - newCopy.isNull = isNull - newCopy.value = value - newCopy.asInstanceOf[this.type] - } -} - -final class MutableShort extends MutableValue { - var value: Short = 0 - def boxed = if (isNull) null else value - def update(v: Any) = value = { - isNull = false - v.asInstanceOf[Short] - } - def copy() = { - val newCopy = new MutableShort - newCopy.isNull = isNull - newCopy.value = value - newCopy.asInstanceOf[this.type] - } -} - -final class MutableLong extends MutableValue { - var value: Long = 0 - def boxed = if (isNull) null else value - def update(v: Any) = value = { - isNull = false - v.asInstanceOf[Long] - } - def copy() = { - val newCopy = new MutableLong - newCopy.isNull = isNull - newCopy.value = value - newCopy.asInstanceOf[this.type] - } -} - -final class MutableByte extends MutableValue { - var value: Byte = 0 - def boxed = if (isNull) null else value - def update(v: Any) = value = { - isNull = false - v.asInstanceOf[Byte] - } - def copy() = { - val newCopy = new MutableByte - newCopy.isNull = isNull - newCopy.value = value - newCopy.asInstanceOf[this.type] - } -} - -final class MutableAny extends MutableValue { - var value: Any = _ - def boxed = if (isNull) null else value - def update(v: Any) = value = { - isNull = false - v.asInstanceOf[Any] - } - def copy() = { - val newCopy = new MutableAny - newCopy.isNull = isNull - newCopy.value = value - newCopy.asInstanceOf[this.type] - } -} - -/** - * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen - * based on the dataTypes of each column. The intent is to decrease garbage when modifying the - * values of primitive columns. - */ -final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow { - - def this(dataTypes: Seq[DataType]) = - this( - dataTypes.map { - case IntegerType => new MutableInt - case ByteType => new MutableByte - case FloatType => new MutableFloat - case ShortType => new MutableShort - case DoubleType => new MutableDouble - case BooleanType => new MutableBoolean - case LongType => new MutableLong - case _ => new MutableAny - }.toArray) - - def this() = this(Seq.empty) - - override def length: Int = values.length - - override def setNullAt(i: Int): Unit = { - values(i).isNull = true - } - - override def apply(i: Int): Any = values(i).boxed - - override def isNullAt(i: Int): Boolean = values(i).isNull - - override def copy(): Row = { - val newValues = new Array[MutableValue](values.length) - var i = 0 - while (i < values.length) { - newValues(i) = values(i).copy() - i += 1 - } - new SpecificMutableRow(newValues) - } - - override def update(ordinal: Int, value: Any): Unit = { - if (value == null) setNullAt(ordinal) else values(ordinal).update(value) - } - - override def iterator: Iterator[Any] = values.map(_.boxed).iterator - - def setString(ordinal: Int, value: String) = update(ordinal, value) - - def getString(ordinal: Int) = apply(ordinal).asInstanceOf[String] - - override def setInt(ordinal: Int, value: Int): Unit = { - val currentValue = values(ordinal).asInstanceOf[MutableInt] - currentValue.isNull = false - currentValue.value = value - } - - override def getInt(i: Int): Int = { - values(i).asInstanceOf[MutableInt].value - } - - override def setFloat(ordinal: Int, value: Float): Unit = { - val currentValue = values(ordinal).asInstanceOf[MutableFloat] - currentValue.isNull = false - currentValue.value = value - } - - override def getFloat(i: Int): Float = { - values(i).asInstanceOf[MutableFloat].value - } - - override def setBoolean(ordinal: Int, value: Boolean): Unit = { - val currentValue = values(ordinal).asInstanceOf[MutableBoolean] - currentValue.isNull = false - currentValue.value = value - } - - override def getBoolean(i: Int): Boolean = { - values(i).asInstanceOf[MutableBoolean].value - } - - override def setDouble(ordinal: Int, value: Double): Unit = { - val currentValue = values(ordinal).asInstanceOf[MutableDouble] - currentValue.isNull = false - currentValue.value = value - } - - override def getDouble(i: Int): Double = { - values(i).asInstanceOf[MutableDouble].value - } - - override def setShort(ordinal: Int, value: Short): Unit = { - val currentValue = values(ordinal).asInstanceOf[MutableShort] - currentValue.isNull = false - currentValue.value = value - } - - override def getShort(i: Int): Short = { - values(i).asInstanceOf[MutableShort].value - } - - override def setLong(ordinal: Int, value: Long): Unit = { - val currentValue = values(ordinal).asInstanceOf[MutableLong] - currentValue.isNull = false - currentValue.value = value - } - - override def getLong(i: Int): Long = { - values(i).asInstanceOf[MutableLong].value - } - - override def setByte(ordinal: Int, value: Byte): Unit = { - val currentValue = values(ordinal).asInstanceOf[MutableByte] - currentValue.isNull = false - currentValue.value = value - } - - override def getByte(i: Int): Byte = { - values(i).asInstanceOf[MutableByte].value - } -} -- cgit v1.2.3