aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorNathan Howell <nhowell@godaddy.com>2015-05-06 22:56:53 -0700
committerYin Huai <yhuai@databricks.com>2015-05-06 22:56:53 -0700
commit2d6612cc8b98f767d73c4d15e4065bf3d6c12ea7 (patch)
treeb8b410071f36da0a5aaa22cdcd7fc2cdbb66aa16 /sql/catalyst
parent9cfa9a516ed991de6c5900c7285b47380a396142 (diff)
downloadspark-2d6612cc8b98f767d73c4d15e4065bf3d6c12ea7.tar.gz
spark-2d6612cc8b98f767d73c4d15e4065bf3d6c12ea7.tar.bz2
spark-2d6612cc8b98f767d73c4d15e4065bf3d6c12ea7.zip
[SPARK-5938] [SPARK-5443] [SQL] Improve JsonRDD performance
This patch comprises of a few related pieces of work: * Schema inference is performed directly on the JSON token stream * `String => Row` conversion populate Spark SQL structures without intermediate types * Projection pushdown is implemented via CatalystScan for DataFrame queries * Support for the legacy parser by setting `spark.sql.json.useJacksonStreamingAPI` to `false` Performance improvements depend on the schema and queries being executed, but it should be faster across the board. Below are benchmarks using the last.fm Million Song dataset: ``` Command | Baseline | Patched ---------------------------------------------------|----------|-------- import sqlContext.implicits._ | | val df = sqlContext.jsonFile("/tmp/lastfm.json") | 70.0s | 14.6s df.count() | 28.8s | 6.2s df.rdd.count() | 35.3s | 21.5s df.where($"artist" === "Robert Hood").collect() | 28.3s | 16.9s ``` To prepare this dataset for benchmarking, follow these steps: ``` # Fetch the datasets from http://labrosa.ee.columbia.edu/millionsong/lastfm wget http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_test.zip \ http://labrosa.ee.columbia.edu/millionsong/sites/default/files/lastfm/lastfm_train.zip # Decompress and combine, pipe through `jq -c` to ensure there is one record per line unzip -p lastfm_test.zip lastfm_train.zip | jq -c . > lastfm.json ``` Author: Nathan Howell <nhowell@godaddy.com> Closes #5801 from NathanHowell/json-performance and squashes the following commits: 26fea31 [Nathan Howell] Recreate the baseRDD each for each scan operation a7ebeb2 [Nathan Howell] Increase coverage of inserts into a JSONRelation e06a1dd [Nathan Howell] Add comments to the `useJacksonStreamingAPI` config flag 6822712 [Nathan Howell] Split up JsonRDD2 into multiple objects fa8234f [Nathan Howell] Wrap long lines b31917b [Nathan Howell] Rename `useJsonRDD2` to `useJacksonStreamingAPI` 15c5d1b [Nathan Howell] JSONRelation's baseRDD need not be lazy f8add6e [Nathan Howell] Add comments on lack of support for precision and scale DecimalTypes fa0be47 [Nathan Howell] Remove unused default case in the field parser 80dba17 [Nathan Howell] Add comments regarding null handling and empty strings 842846d [Nathan Howell] Point the empty schema inference test at JsonRDD2 ab6ee87 [Nathan Howell] Add projection pushdown support to JsonRDD/JsonRDD2 f636c14 [Nathan Howell] Enable JsonRDD2 by default, add a flag to switch back to JsonRDD 0bbc445 [Nathan Howell] Improve JSON parsing and type inference performance 7ca70c1 [Nathan Howell] Eliminate arrow pattern, replace with pattern matches
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala43
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala4
2 files changed, 27 insertions, 20 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index 96e2aee4de..873c75c525 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -26,7 +26,14 @@ object HiveTypeCoercion {
// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
// The conversion for integral and floating point types have a linear widening hierarchy:
private val numericPrecedence =
- Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited)
+ IndexedSeq(
+ ByteType,
+ ShortType,
+ IntegerType,
+ LongType,
+ FloatType,
+ DoubleType,
+ DecimalType.Unlimited)
/**
* Find the tightest common type of two types that might be used in a binary expression.
@@ -34,25 +41,21 @@ object HiveTypeCoercion {
* with primitive types, because in that case the precision and scale of the result depends on
* the operation. Those rules are implemented in [[HiveTypeCoercion.DecimalPrecision]].
*/
- def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
- val valueTypes = Seq(t1, t2).filter(t => t != NullType)
- if (valueTypes.distinct.size > 1) {
- // Promote numeric types to the highest of the two and all numeric types to unlimited decimal
- if (numericPrecedence.contains(t1) && numericPrecedence.contains(t2)) {
- Some(numericPrecedence.filter(t => t == t1 || t == t2).last)
- } else if (t1.isInstanceOf[DecimalType] && t2.isInstanceOf[DecimalType]) {
- // Fixed-precision decimals can up-cast into unlimited
- if (t1 == DecimalType.Unlimited || t2 == DecimalType.Unlimited) {
- Some(DecimalType.Unlimited)
- } else {
- None
- }
- } else {
- None
- }
- } else {
- Some(if (valueTypes.size == 0) NullType else valueTypes.head)
- }
+ val findTightestCommonType: (DataType, DataType) => Option[DataType] = {
+ case (t1, t2) if t1 == t2 => Some(t1)
+ case (NullType, t1) => Some(t1)
+ case (t1, NullType) => Some(t1)
+
+ // Promote numeric types to the highest of the two and all numeric types to unlimited decimal
+ case (t1, t2) if Seq(t1, t2).forall(numericPrecedence.contains) =>
+ val index = numericPrecedence.lastIndexWhere(t => t == t1 || t == t2)
+ Some(numericPrecedence(index))
+
+ // Fixed-precision decimals can up-cast into unlimited
+ case (DecimalType.Unlimited, _: DecimalType) => Some(DecimalType.Unlimited)
+ case (_: DecimalType, DecimalType.Unlimited) => Some(DecimalType.Unlimited)
+
+ case _ => None
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index d80ffca18e..7e00a27dfe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -134,6 +134,10 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
}
+ private[sql] def getFieldIndex(name: String): Option[Int] = {
+ nameToIndex.get(name)
+ }
+
protected[sql] def toAttributes: Seq[AttributeReference] =
map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())