aboutsummaryrefslogtreecommitdiff
path: root/bin/spark-shell
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-07-28 10:59:53 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-28 10:59:53 -0700
commit2b8d89e30ebfe2272229a1eddd7542d7437c9924 (patch)
tree631e24b0ed25be8f7b5a5eb3074e1c283e54a1f4 /bin/spark-shell
parenta7d145e98c55fa66a541293930f25d9cdc25f3b4 (diff)
downloadspark-2b8d89e30ebfe2272229a1eddd7542d7437c9924.tar.gz
spark-2b8d89e30ebfe2272229a1eddd7542d7437c9924.tar.bz2
spark-2b8d89e30ebfe2272229a1eddd7542d7437c9924.zip
[SPARK-2523] [SQL] Hadoop table scan bug fixing
In HiveTableScan.scala, ObjectInspector was created for all of the partition based records, which probably causes ClassCastException if the object inspector is not identical among table & partitions. This is the follow up with: https://github.com/apache/spark/pull/1408 https://github.com/apache/spark/pull/1390 I've run a micro benchmark in my local with 15000000 records totally, and got the result as below: With This Patch | Partition-Based Table | Non-Partition-Based Table ------------ | ------------- | ------------- No | 1927 ms | 1885 ms Yes | 1541 ms | 1524 ms It showed this patch will also improve the performance. PS: the benchmark code is also attached. (thanks liancheng ) ``` package org.apache.spark.sql.hive import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql._ object HiveTableScanPrepare extends App { case class Record(key: String, value: String) val sparkContext = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new LocalHiveContext(sparkContext) val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"$i", s"val_$i"))) import hiveContext._ hql("SHOW TABLES") hql("DROP TABLE if exists part_scan_test") hql("DROP TABLE if exists scan_test") hql("DROP TABLE if exists records") rdd.registerAsTable("records") hql("""CREATE TABLE part_scan_test (key STRING, value STRING) PARTITIONED BY (part1 string, part2 STRING) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE """.stripMargin) hql("""CREATE TABLE scan_test (key STRING, value STRING) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE """.stripMargin) for (part1 <- 2000 until 2001) { for (part2 <- 1 to 5) { hql(s"""from records | insert into table part_scan_test PARTITION (part1='$part1', part2='2010-01-$part2') | select key, value """.stripMargin) hql(s"""from records | insert into table scan_test select key, value """.stripMargin) } } } object HiveTableScanTest extends App { val sparkContext = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new LocalHiveContext(sparkContext) import hiveContext._ hql("SHOW TABLES") val part_scan_test = hql("select key, value from part_scan_test") val scan_test = hql("select key, value from scan_test") val r_part_scan_test = (0 to 5).map(i => benchmark(part_scan_test)) val r_scan_test = (0 to 5).map(i => benchmark(scan_test)) println("Scanning Partition-Based Table") r_part_scan_test.foreach(printResult) println("Scanning Non-Partition-Based Table") r_scan_test.foreach(printResult) def printResult(result: (Long, Long)) { println(s"Duration: ${result._1} ms Result: ${result._2}") } def benchmark(srdd: SchemaRDD) = { val begin = System.currentTimeMillis() val result = srdd.count() val end = System.currentTimeMillis() ((end - begin), result) } } ``` Author: Cheng Hao <hao.cheng@intel.com> Closes #1439 from chenghao-intel/hadoop_table_scan and squashes the following commits: 888968f [Cheng Hao] Fix issues in code style 27540ba [Cheng Hao] Fix the TableScan Bug while partition serde differs 40a24a7 [Cheng Hao] Add Unit Test
Diffstat (limited to 'bin/spark-shell')
0 files changed, 0 insertions, 0 deletions