aboutsummaryrefslogtreecommitdiff
path: root/launcher
diff options
context:
space:
mode:
authorKazuaki Ishizaki <ishizaki@jp.ibm.com>2016-08-31 12:40:53 +0800
committerWenchen Fan <wenchen@databricks.com>2016-08-31 12:40:53 +0800
commitd92cd227cf245be9ab8f9bce714386f8283a97cb (patch)
tree28739daba548b110250cc9f434e5b395f739e449 /launcher
parent231f973295129dca976f2e4a8222a63318d4aafe (diff)
downloadspark-d92cd227cf245be9ab8f9bce714386f8283a97cb.tar.gz
spark-d92cd227cf245be9ab8f9bce714386f8283a97cb.tar.bz2
spark-d92cd227cf245be9ab8f9bce714386f8283a97cb.zip
[SPARK-15985][SQL] Eliminate redundant cast from an array without null or a map without null
## What changes were proposed in this pull request? This PR eliminates redundant cast from an `ArrayType` with `containsNull = false` or a `MapType` with `containsNull = false`. For example, in `ArrayType` case, current implementation leaves a cast `cast(value#63 as array<double>).toDoubleArray`. However, we can eliminate `cast(value#63 as array<double>)` if we know `value#63` does not include `null`. This PR apply this elimination for `ArrayType` and `MapType` in `SimplifyCasts` at a plan optimization phase. In summary, we got 1.2-1.3x performance improvements over the code before applying this PR. Here are performance results of benchmark programs: ``` test("Read array in Dataset") { import sparkSession.implicits._ val iters = 5 val n = 1024 * 1024 val rows = 15 val benchmark = new Benchmark("Read primnitive array", n) val rand = new Random(511) val intDS = sparkSession.sparkContext.parallelize(0 until rows, 1) .map(i => Array.tabulate(n)(i => i)).toDS() intDS.count() // force to create ds val lastElement = n - 1 val randElement = rand.nextInt(lastElement) benchmark.addCase(s"Read int array in Dataset", numIters = iters)(iter => { val idx0 = randElement val idx1 = lastElement intDS.map(a => a(0) + a(idx0) + a(idx1)).collect }) val doubleDS = sparkSession.sparkContext.parallelize(0 until rows, 1) .map(i => Array.tabulate(n)(i => i.toDouble)).toDS() doubleDS.count() // force to create ds benchmark.addCase(s"Read double array in Dataset", numIters = iters)(iter => { val idx0 = randElement val idx1 = lastElement doubleDS.map(a => a(0) + a(idx0) + a(idx1)).collect }) benchmark.run() } Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4 Intel(R) Core(TM) i5-5257U CPU 2.70GHz without this PR Read primnitive array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Read int array in Dataset 525 / 690 2.0 500.9 1.0X Read double array in Dataset 947 / 1209 1.1 902.7 0.6X with this PR Read primnitive array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Read int array in Dataset 400 / 492 2.6 381.5 1.0X Read double array in Dataset 788 / 870 1.3 751.4 0.5X ``` An example program that originally caused this performance issue. ``` val ds = Seq(Array(1.0, 2.0, 3.0), Array(4.0, 5.0, 6.0)).toDS() val ds2 = ds.map(p => { var s = 0.0 for (i <- 0 to 2) { s += p(i) } s }) ds2.show ds2.explain(true) ``` Plans before this PR ``` == Parsed Logical Plan == 'SerializeFromObject [input[0, double, true] AS value#68] +- 'MapElements <function1>, obj#67: double +- 'DeserializeToObject unresolveddeserializer(upcast(getcolumnbyordinal(0, ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: "scala.Array").toDoubleArray), obj#66: [D +- LocalRelation [value#63] == Analyzed Logical Plan == value: double SerializeFromObject [input[0, double, true] AS value#68] +- MapElements <function1>, obj#67: double +- DeserializeToObject cast(value#63 as array<double>).toDoubleArray, obj#66: [D +- LocalRelation [value#63] == Optimized Logical Plan == SerializeFromObject [input[0, double, true] AS value#68] +- MapElements <function1>, obj#67: double +- DeserializeToObject cast(value#63 as array<double>).toDoubleArray, obj#66: [D +- LocalRelation [value#63] == Physical Plan == *SerializeFromObject [input[0, double, true] AS value#68] +- *MapElements <function1>, obj#67: double +- *DeserializeToObject cast(value#63 as array<double>).toDoubleArray, obj#66: [D +- LocalTableScan [value#63] ``` Plans after this PR ``` == Parsed Logical Plan == 'SerializeFromObject [input[0, double, true] AS value#6] +- 'MapElements <function1>, obj#5: double +- 'DeserializeToObject unresolveddeserializer(upcast(getcolumnbyordinal(0, ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: "scala.Array").toDoubleArray), obj#4: [D +- LocalRelation [value#1] == Analyzed Logical Plan == value: double SerializeFromObject [input[0, double, true] AS value#6] +- MapElements <function1>, obj#5: double +- DeserializeToObject cast(value#1 as array<double>).toDoubleArray, obj#4: [D +- LocalRelation [value#1] == Optimized Logical Plan == SerializeFromObject [input[0, double, true] AS value#6] +- MapElements <function1>, obj#5: double +- DeserializeToObject value#1.toDoubleArray, obj#4: [D +- LocalRelation [value#1] == Physical Plan == *SerializeFromObject [input[0, double, true] AS value#6] +- *MapElements <function1>, obj#5: double +- *DeserializeToObject value#1.toDoubleArray, obj#4: [D +- LocalTableScan [value#1] ``` ## How was this patch tested? Tested by new test cases in `SimplifyCastsSuite` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #13704 from kiszk/SPARK-15985.
Diffstat (limited to 'launcher')
0 files changed, 0 insertions, 0 deletions