aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Davies <Michael.BellDavies@gmail.com>2015-01-29 15:40:59 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-29 15:40:59 -0800
commit940f3756116647a25fddb54111112b95ba9b8740 (patch)
tree1ad5582a69902ebc171839e47984656ad5a076f0 /sql
parentbce0ba1fbd05788f1c08549b2fd0c6a9e320a41a (diff)
downloadspark-940f3756116647a25fddb54111112b95ba9b8740.tar.gz
spark-940f3756116647a25fddb54111112b95ba9b8740.tar.bz2
spark-940f3756116647a25fddb54111112b95ba9b8740.zip
[SPARK-5309][SQL] Add support for dictionaries in PrimitiveConverter for Strin...
...gs. Parquet Converters allow developers to take advantage of dictionary encoding of column data to reduce Column Binary decoding. The Spark PrimitiveConverter was not using that API and consequently for String columns that used dictionary compression repeated Binary to String conversions for the same String. In measurements this could account for over 25% of entire query time. For example a 500M row table split across 16 blocks was aggregated and summed in a litte under 30s before this change and a little under 20s after the change. Author: Michael Davies <Michael.BellDavies@gmail.com> Closes #4187 from MickDavies/SPARK-5309-2 and squashes the following commits: 327287e [Michael Davies] SPARK-5309: Add support for dictionaries in PrimitiveConverter for Strings. 33c002c [Michael Davies] SPARK-5309: Add support for dictionaries in PrimitiveConverter for Strings.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala48
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala11
2 files changed, 47 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 9d9150246c..10df8c3310 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
+import parquet.column.Dictionary
import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
import parquet.schema.MessageType
@@ -102,12 +103,8 @@ private[sql] object CatalystConverter {
}
// Strings, Shorts and Bytes do not have a corresponding type in Parquet
// so we need to treat them separately
- case StringType => {
- new CatalystPrimitiveConverter(parent, fieldIndex) {
- override def addBinary(value: Binary): Unit =
- parent.updateString(fieldIndex, value)
- }
- }
+ case StringType =>
+ new CatalystPrimitiveStringConverter(parent, fieldIndex)
case ShortType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addInt(value: Int): Unit =
@@ -197,8 +194,8 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
updateField(fieldIndex, value.getBytes)
- protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
- updateField(fieldIndex, value.toStringUsingUTF8)
+ protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
+ updateField(fieldIndex, value)
protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
@@ -384,8 +381,8 @@ private[parquet] class CatalystPrimitiveRowConverter(
override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
current.update(fieldIndex, value.getBytes)
- override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
- current.setString(fieldIndex, value.toStringUsingUTF8)
+ override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
+ current.setString(fieldIndex, value)
override protected[parquet] def updateDecimal(
fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
@@ -426,6 +423,33 @@ private[parquet] class CatalystPrimitiveConverter(
parent.updateLong(fieldIndex, value)
}
+/**
+ * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String.
+ * Supports dictionaries to reduce Binary to String conversion overhead.
+ *
+ * Follows pattern in Parquet of using dictionaries, where supported, for String conversion.
+ *
+ * @param parent The parent group converter.
+ * @param fieldIndex The index inside the record.
+ */
+private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int)
+ extends CatalystPrimitiveConverter(parent, fieldIndex) {
+
+ private[this] var dict: Array[String] = null
+
+ override def hasDictionarySupport: Boolean = true
+
+ override def setDictionary(dictionary: Dictionary):Unit =
+ dict = Array.tabulate(dictionary.getMaxId + 1) {dictionary.decodeToBinary(_).toStringUsingUTF8}
+
+
+ override def addValueFromDictionary(dictionaryId: Int): Unit =
+ parent.updateString(fieldIndex, dict(dictionaryId))
+
+ override def addBinary(value: Binary): Unit =
+ parent.updateString(fieldIndex, value.toStringUsingUTF8)
+}
+
private[parquet] object CatalystArrayConverter {
val INITIAL_ARRAY_SIZE = 20
}
@@ -583,9 +607,9 @@ private[parquet] class CatalystNativeArrayConverter(
elements += 1
}
- override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = {
+ override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = {
checkGrowBuffer()
- buffer(elements) = value.toStringUsingUTF8.asInstanceOf[NativeType]
+ buffer(elements) = value.asInstanceOf[NativeType]
elements += 1
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 1263ff818e..3d82f4bce7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -85,4 +85,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest {
checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
}
}
+
+ test("SPARK-5309 strings stored using dictionary compression in parquet") {
+ withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), "t") {
+
+ checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"),
+ (0 until 10).map(i => Row("same", "run_" + i, 100)))
+
+ checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' GROUP BY _1, _2"),
+ List(Row("same", "run_5", 100)))
+ }
+ }
}