aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-10-22 20:43:43 -0700
committergatorsmile <gatorsmile@gmail.com>2016-10-22 20:43:43 -0700
commiteff4aed1ac1e500d4aa40665dd06b527dffbc111 (patch)
tree130c5f6f52410aefec45b5b04b5cb2c5c0fb1fee /sql/catalyst
parentbc167a2a53f5a795d089e8a884569b1b3e2cd439 (diff)
downloadspark-eff4aed1ac1e500d4aa40665dd06b527dffbc111.tar.gz
spark-eff4aed1ac1e500d4aa40665dd06b527dffbc111.tar.bz2
spark-eff4aed1ac1e500d4aa40665dd06b527dffbc111.zip
[SPARK-18035][SQL] Introduce performant and memory efficient APIs to create ArrayBasedMapData
## What changes were proposed in this pull request? Jira: https://issues.apache.org/jira/browse/SPARK-18035 In HiveInspectors, I saw that converting Java map to Spark's `ArrayBasedMapData` spent quite sometime in buffer copying : https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala#L658 The reason being `map.toSeq` allocates a new buffer and copies the map entries to it: https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/MapLike.scala#L323 This copy is not needed as we get rid of it once we extract the key and value arrays. Here is the call trace: ``` org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$41.apply(HiveInspectors.scala:664) scala.collection.AbstractMap.toSeq(Map.scala:59) scala.collection.MapLike$class.toSeq(MapLike.scala:323) scala.collection.AbstractMap.toBuffer(Map.scala:59) scala.collection.MapLike$class.toBuffer(MapLike.scala:326) scala.collection.AbstractTraversable.copyToBuffer(Traversable.scala:104) scala.collection.TraversableOnce$class.copyToBuffer(TraversableOnce.scala:275) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) scala.collection.AbstractIterable.foreach(Iterable.scala:54) scala.collection.IterableLike$class.foreach(IterableLike.scala:72) scala.collection.AbstractIterator.foreach(Iterator.scala:1336) scala.collection.Iterator$class.foreach(Iterator.scala:893) scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) ``` Also, earlier code was populating keys and values arrays separately by iterating twice. The PR avoids double iteration of the map and does it in one iteration. EDIT: During code review, there were several more places in the code which were found to do similar thing. The PR dedupes those instances and introduces convenient APIs which are performant and memory efficient ## Performance gains The number is subjective and depends on how many map columns are accessed in the query and average entries per map. For one the queries that I tried out, I saw 3% CPU savings (end-to-end) for the query. ## How was this patch tested? This does not change the end result produced so relying on existing tests. Author: Tejas Patil <tejasp@fb.com> Closes #15573 from tejasapatil/SPARK-18035_avoid_toSeq.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala53
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala32
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala81
3 files changed, 111 insertions, 55 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index f542f5cf40..5b9161551a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -199,34 +199,14 @@ object CatalystTypeConverters {
private[this] val keyConverter = getConverterForType(keyType)
private[this] val valueConverter = getConverterForType(valueType)
- override def toCatalystImpl(scalaValue: Any): MapData = scalaValue match {
- case m: Map[_, _] =>
- val length = m.size
- val convertedKeys = new Array[Any](length)
- val convertedValues = new Array[Any](length)
-
- var i = 0
- for ((key, value) <- m) {
- convertedKeys(i) = keyConverter.toCatalyst(key)
- convertedValues(i) = valueConverter.toCatalyst(value)
- i += 1
- }
- ArrayBasedMapData(convertedKeys, convertedValues)
-
- case jmap: JavaMap[_, _] =>
- val length = jmap.size()
- val convertedKeys = new Array[Any](length)
- val convertedValues = new Array[Any](length)
-
- var i = 0
- val iter = jmap.entrySet.iterator
- while (iter.hasNext) {
- val entry = iter.next()
- convertedKeys(i) = keyConverter.toCatalyst(entry.getKey)
- convertedValues(i) = valueConverter.toCatalyst(entry.getValue)
- i += 1
- }
- ArrayBasedMapData(convertedKeys, convertedValues)
+ override def toCatalystImpl(scalaValue: Any): MapData = {
+ val keyFunction = (k: Any) => keyConverter.toCatalyst(k)
+ val valueFunction = (k: Any) => valueConverter.toCatalyst(k)
+
+ scalaValue match {
+ case map: Map[_, _] => ArrayBasedMapData(map, keyFunction, valueFunction)
+ case javaMap: JavaMap[_, _] => ArrayBasedMapData(javaMap, keyFunction, valueFunction)
+ }
}
override def toScala(catalystValue: MapData): Map[Any, Any] = {
@@ -433,18 +413,11 @@ object CatalystTypeConverters {
case seq: Seq[Any] => new GenericArrayData(seq.map(convertToCatalyst).toArray)
case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*)
case arr: Array[Any] => new GenericArrayData(arr.map(convertToCatalyst))
- case m: Map[_, _] =>
- val length = m.size
- val convertedKeys = new Array[Any](length)
- val convertedValues = new Array[Any](length)
-
- var i = 0
- for ((key, value) <- m) {
- convertedKeys(i) = convertToCatalyst(key)
- convertedValues(i) = convertToCatalyst(value)
- i += 1
- }
- ArrayBasedMapData(convertedKeys, convertedValues)
+ case map: Map[_, _] =>
+ ArrayBasedMapData(
+ map,
+ (key: Any) => convertToCatalyst(key),
+ (value: Any) => convertToCatalyst(value))
case other => other
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index 09e22aaf3e..917aa08731 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -427,18 +427,28 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E
}
}
- override def nullSafeEval(str: Any, delim1: Any, delim2: Any): Any = {
- val array = str.asInstanceOf[UTF8String]
- .split(delim1.asInstanceOf[UTF8String], -1)
- .map { kv =>
- val arr = kv.split(delim2.asInstanceOf[UTF8String], 2)
- if (arr.length < 2) {
- Array(arr(0), null)
- } else {
- arr
- }
+ override def nullSafeEval(
+ inputString: Any,
+ stringDelimiter: Any,
+ keyValueDelimiter: Any): Any = {
+ val keyValues =
+ inputString.asInstanceOf[UTF8String].split(stringDelimiter.asInstanceOf[UTF8String], -1)
+
+ val iterator = new Iterator[(UTF8String, UTF8String)] {
+ var index = 0
+ val keyValueDelimiterUTF8String = keyValueDelimiter.asInstanceOf[UTF8String]
+
+ override def hasNext: Boolean = {
+ keyValues.length > index
}
- ArrayBasedMapData(array.map(_ (0)), array.map(_ (1)))
+
+ override def next(): (UTF8String, UTF8String) = {
+ val keyValueArray = keyValues(index).split(keyValueDelimiterUTF8String, 2)
+ index += 1
+ (keyValueArray(0), if (keyValueArray.length < 2) null else keyValueArray(1))
+ }
+ }
+ ArrayBasedMapData(iterator, keyValues.size, identity, identity)
}
override def prettyName: String = "str_to_map"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
index 4449da13c0..91b3139443 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.util
+import java.util.{Map => JavaMap}
+
class ArrayBasedMapData(val keyArray: ArrayData, val valueArray: ArrayData) extends MapData {
require(keyArray.numElements() == valueArray.numElements())
@@ -30,12 +32,83 @@ class ArrayBasedMapData(val keyArray: ArrayData, val valueArray: ArrayData) exte
}
object ArrayBasedMapData {
- def apply(map: Map[Any, Any]): ArrayBasedMapData = {
- val array = map.toArray
- ArrayBasedMapData(array.map(_._1), array.map(_._2))
+ /**
+ * Creates a [[ArrayBasedMapData]] by applying the given converters over
+ * each (key -> value) pair of the input [[java.util.Map]]
+ *
+ * @param javaMap Input map
+ * @param keyConverter This function is applied over all the keys of the input map to
+ * obtain the output map's keys
+ * @param valueConverter This function is applied over all the values of the input map to
+ * obtain the output map's values
+ */
+ def apply(
+ javaMap: JavaMap[_, _],
+ keyConverter: (Any) => Any,
+ valueConverter: (Any) => Any): ArrayBasedMapData = {
+ import scala.language.existentials
+
+ val keys: Array[Any] = new Array[Any](javaMap.size())
+ val values: Array[Any] = new Array[Any](javaMap.size())
+
+ var i: Int = 0
+ val iterator = javaMap.entrySet().iterator()
+ while (iterator.hasNext) {
+ val entry = iterator.next()
+ keys(i) = keyConverter(entry.getKey)
+ values(i) = valueConverter(entry.getValue)
+ i += 1
+ }
+ ArrayBasedMapData(keys, values)
+ }
+
+ /**
+ * Creates a [[ArrayBasedMapData]] by applying the given converters over
+ * each (key -> value) pair of the input map
+ *
+ * @param map Input map
+ * @param keyConverter This function is applied over all the keys of the input map to
+ * obtain the output map's keys
+ * @param valueConverter This function is applied over all the values of the input map to
+ * obtain the output map's values
+ */
+ def apply(
+ map: scala.collection.Map[_, _],
+ keyConverter: (Any) => Any = identity,
+ valueConverter: (Any) => Any = identity): ArrayBasedMapData = {
+ ArrayBasedMapData(map.iterator, map.size, keyConverter, valueConverter)
+ }
+
+ /**
+ * Creates a [[ArrayBasedMapData]] by applying the given converters over
+ * each (key -> value) pair from the given iterator
+ *
+ * @param iterator Input iterator
+ * @param size Number of elements
+ * @param keyConverter This function is applied over all the keys extracted from the
+ * given iterator to obtain the output map's keys
+ * @param valueConverter This function is applied over all the values extracted from the
+ * given iterator to obtain the output map's values
+ */
+ def apply(
+ iterator: Iterator[(_, _)],
+ size: Int,
+ keyConverter: (Any) => Any,
+ valueConverter: (Any) => Any): ArrayBasedMapData = {
+
+ val keys: Array[Any] = new Array[Any](size)
+ val values: Array[Any] = new Array[Any](size)
+
+ var i = 0
+ for ((key, value) <- iterator) {
+ keys(i) = keyConverter(key)
+ values(i) = valueConverter(value)
+ i += 1
+ }
+ ArrayBasedMapData(keys, values)
}
- def apply(keys: Array[Any], values: Array[Any]): ArrayBasedMapData = {
+ def apply(keys: Array[_], values: Array[_]): ArrayBasedMapData = {
new ArrayBasedMapData(new GenericArrayData(keys), new GenericArrayData(values))
}