aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-19 13:57:52 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-19 13:57:52 -0700
commit21bdbe9fe69be47be562de24216a469e5ee64c7b (patch)
tree160a4a88daaaabc9d7b851b55375415adcb8e015 /sql
parent08887369c890e0dd87eb8b34e8c32bb03307bf24 (diff)
downloadspark-21bdbe9fe69be47be562de24216a469e5ee64c7b.tar.gz
spark-21bdbe9fe69be47be562de24216a469e5ee64c7b.tar.bz2
spark-21bdbe9fe69be47be562de24216a469e5ee64c7b.zip
[SPARK-9627] [SQL] Stops using Scala runtime reflection in DictionaryEncoding
`DictionaryEncoding` uses Scala runtime reflection to avoid boxing costs while building the directory array. However, this code path may hit [SI-6240] [1] and throw exception. [1]: https://issues.scala-lang.org/browse/SI-6240 Author: Cheng Lian <lian@databricks.com> Closes #8306 from liancheng/spark-9627/in-memory-cache-scala-reflection.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala15
2 files changed, 4 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 45f15fd04d..66d429bc06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -120,7 +120,6 @@ private[sql] case class InMemoryRelation(
new Iterator[CachedBatch] {
def next(): CachedBatch = {
val columnBuilders = output.map { attribute =>
- val columnType = ColumnType(attribute.dataType)
ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
}.toArray
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index c91d960a09..ca910a99db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -270,20 +270,13 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
extends compression.Decoder[T] {
- private val dictionary = {
- // TODO Can we clean up this mess? Maybe move this to `DataType`?
- implicit val classTag = {
- val mirror = runtimeMirror(Utils.getSparkClassLoader)
- ClassTag[T#InternalType](mirror.runtimeClass(columnType.scalaTag.tpe))
- }
-
- Array.fill(buffer.getInt()) {
- columnType.extract(buffer)
- }
+ private val dictionary: Array[Any] = {
+ val elementNum = buffer.getInt()
+ Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any])
}
override def next(row: MutableRow, ordinal: Int): Unit = {
- columnType.setField(row, ordinal, dictionary(buffer.getShort()))
+ columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType])
}
override def hasNext: Boolean = buffer.hasRemaining