diff options
author | Cheng Lian <lian@databricks.com> | 2015-08-19 13:57:52 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-08-19 13:57:52 -0700 |
commit | 21bdbe9fe69be47be562de24216a469e5ee64c7b (patch) | |
tree | 160a4a88daaaabc9d7b851b55375415adcb8e015 | |
parent | 08887369c890e0dd87eb8b34e8c32bb03307bf24 (diff) | |
download | spark-21bdbe9fe69be47be562de24216a469e5ee64c7b.tar.gz spark-21bdbe9fe69be47be562de24216a469e5ee64c7b.tar.bz2 spark-21bdbe9fe69be47be562de24216a469e5ee64c7b.zip |
[SPARK-9627] [SQL] Stops using Scala runtime reflection in DictionaryEncoding
`DictionaryEncoding` uses Scala runtime reflection to avoid boxing costs while building the directory array. However, this code path may hit [SI-6240] [1] and throw exception.
[1]: https://issues.scala-lang.org/browse/SI-6240
Author: Cheng Lian <lian@databricks.com>
Closes #8306 from liancheng/spark-9627/in-memory-cache-scala-reflection.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala | 1 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala | 15 |
2 files changed, 4 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 45f15fd04d..66d429bc06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -120,7 +120,6 @@ private[sql] case class InMemoryRelation( new Iterator[CachedBatch] { def next(): CachedBatch = { val columnBuilders = output.map { attribute => - val columnType = ColumnType(attribute.dataType) ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) }.toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala index c91d960a09..ca910a99db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala @@ -270,20 +270,13 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) extends compression.Decoder[T] { - private val dictionary = { - // TODO Can we clean up this mess? Maybe move this to `DataType`? - implicit val classTag = { - val mirror = runtimeMirror(Utils.getSparkClassLoader) - ClassTag[T#InternalType](mirror.runtimeClass(columnType.scalaTag.tpe)) - } - - Array.fill(buffer.getInt()) { - columnType.extract(buffer) - } + private val dictionary: Array[Any] = { + val elementNum = buffer.getInt() + Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any]) } override def next(row: MutableRow, ordinal: Int): Unit = { - columnType.setField(row, ordinal, dictionary(buffer.getShort())) + columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType]) } override def hasNext: Boolean = buffer.hasRemaining |