aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test
Commit message (Collapse)AuthorAgeFilesLines
...
* [SPARK-18185] Fix all forms of INSERT / OVERWRITE TABLE for Datasource tablesEric Liang2016-11-101-3/+158
| | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? As of current 2.1, INSERT OVERWRITE with dynamic partitions against a Datasource table will overwrite the entire table instead of only the partitions matching the static keys, as in Hive. It also doesn't respect custom partition locations. This PR adds support for all these operations to Datasource tables managed by the Hive metastore. It is implemented as follows - During planning time, the full set of partitions affected by an INSERT or OVERWRITE command is read from the Hive metastore. - The planner identifies any partitions with custom locations and includes this in the write task metadata. - FileFormatWriter tasks refer to this custom locations map when determining where to write for dynamic partition output. - When the write job finishes, the set of written partitions is compared against the initial set of matched partitions, and the Hive metastore is updated to reflect the newly added / removed partitions. It was necessary to introduce a method for staging files with absolute output paths to `FileCommitProtocol`. These files are not handled by the Hadoop output committer but are moved to their final locations when the job commits. The overwrite behavior of legacy Datasource tables is also changed: no longer will the entire table be overwritten if a partial partition spec is present. cc cloud-fan yhuai ## How was this patch tested? Unit tests, existing tests. Author: Eric Liang <ekl@databricks.com> Author: Wenchen Fan <wenchen@databricks.com> Closes #15814 from ericl/sc-5027.
* [SPARK-18403][SQL] Temporarily disable flaky ObjectHashAggregateSuiteCheng Lian2016-11-101-1/+2
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Randomized tests in `ObjectHashAggregateSuite` is being flaky and breaks PR builds. This PR disables them temporarily to bring back the PR build. ## How was this patch tested? N/A Author: Cheng Lian <lian@databricks.com> Closes #15845 from liancheng/ignore-flaky-object-hash-agg-suite.
* [SPARK-17990][SPARK-18302][SQL] correct several partition related behaviours ↵Wenchen Fan2016-11-104-5/+5
| | | | | | | | | | | | | | | | | | | | | | | of ExternalCatalog ## What changes were proposed in this pull request? This PR corrects several partition related behaviors of `ExternalCatalog`: 1. default partition location should not always lower case the partition column names in path string(fix `HiveExternalCatalog`) 2. rename partition should not always lower case the partition column names in updated partition path string(fix `HiveExternalCatalog`) 3. rename partition should update the partition location only for managed table(fix `InMemoryCatalog`) 4. create partition with existing directory should be fine(fix `InMemoryCatalog`) 5. create partition with non-existing directory should create that directory(fix `InMemoryCatalog`) 6. drop partition from external table should not delete the directory(fix `InMemoryCatalog`) ## How was this patch tested? new tests in `ExternalCatalogSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #15797 from cloud-fan/partition.
* [SPARK-17993][SQL] Fix Parquet log output redirectionMichael Allman2016-11-101-0/+4
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | (Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17993) ## What changes were proposed in this pull request? PR #14690 broke parquet log output redirection for converted partitioned Hive tables. For example, when querying parquet files written by Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from the Parquet reader: ``` Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0 org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\) at org.apache.parquet.VersionParser.parse(VersionParser.java:112) at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60) at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263) at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583) at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` This only happens during execution, not planning, and it doesn't matter what log level the `SparkContext` is set to. That's because Parquet (versions < 1.9) doesn't use slf4j for logging. Note, you can tell that log redirection is not working here because the log message format does not conform to the default Spark log message format. This is a regression I noted as something we needed to fix as a follow up. It appears that the problem arose because we removed the call to `inferSchema` during Hive table conversion. That call is what triggered the output redirection. ## How was this patch tested? I tested this manually in four ways: 1. Executing `spark.sqlContext.range(10).selectExpr("id as a").write.mode("overwrite").parquet("test")`. 2. Executing `spark.read.format("parquet").load(legacyParquetFile).show` for a Parquet file `legacyParquetFile` written using Parquet-mr 1.6.0. 3. Executing `select * from legacy_parquet_table limit 1` for some unpartitioned Parquet-based Hive table written using Parquet-mr 1.6.0. 4. Executing `select * from legacy_partitioned_parquet_table where partcol=x limit 1` for some partitioned Parquet-based Hive table written using Parquet-mr 1.6.0. I ran each test with a new instance of `spark-shell` or `spark-sql`. Incidentally, I found that test case 3 was not a regression—redirection was not occurring in the master codebase prior to #14690. I spent some time working on a unit test, but based on my experience working on this ticket I feel that automated testing here is far from feasible. cc ericl dongjoon-hyun Author: Michael Allman <michael@videoamp.com> Closes #15538 from mallman/spark-17993-fix_parquet_log_redirection.
* [SPARK-18370][SQL] Add table information to InsertIntoHadoopFsRelationCommandHerman van Hovell2016-11-091-2/+4
| | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `InsertIntoHadoopFsRelationCommand` does not keep track if it inserts into a table and what table it inserts to. This can make debugging these statements problematic. This PR adds table information the `InsertIntoHadoopFsRelationCommand`. Explaining this SQL command `insert into prq select * from range(0, 100000)` now yields the following executed plan: ``` == Physical Plan == ExecutedCommand +- InsertIntoHadoopFsRelationCommand file:/dev/assembly/spark-warehouse/prq, ParquetFormat, <function1>, Map(serialization.format -> 1, path -> file:/dev/assembly/spark-warehouse/prq), Append, CatalogTable( Table: `default`.`prq` Owner: hvanhovell Created: Wed Nov 09 17:42:30 CET 2016 Last Access: Thu Jan 01 01:00:00 CET 1970 Type: MANAGED Schema: [StructField(id,LongType,true)] Provider: parquet Properties: [transient_lastDdlTime=1478709750] Storage(Location: file:/dev/assembly/spark-warehouse/prq, InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, Serde: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Properties: [serialization.format=1])) +- Project [id#7L] +- Range (0, 100000, step=1, splits=None) ``` ## How was this patch tested? Added extra checks to the `ParquetMetastoreSuite` Author: Herman van Hovell <hvanhovell@databricks.com> Closes #15832 from hvanhovell/SPARK-18370.
* [SPARK-18338][SQL][TEST-MAVEN] Fix test case initialization order under ↵Cheng Lian2016-11-091-13/+10
| | | | | | | | | | | | | | | | | | | | Maven builds ## What changes were proposed in this pull request? Test case initialization order under Maven and SBT are different. Maven always creates instances of all test cases and then run them all together. This fails `ObjectHashAggregateSuite` because the randomized test cases there register a temporary Hive function right before creating a test case, and can be cleared while initializing other successive test cases. In SBT, this is fine since the created test case is executed immediately after creating the temporary function. To fix this issue, we should put initialization/destruction code into `beforeAll()` and `afterAll()`. ## How was this patch tested? Existing tests. Author: Cheng Lian <lian@databricks.com> Closes #15802 from liancheng/fix-flaky-object-hash-agg-suite.
* [SPARK-18292][SQL] LogicalPlanToSQLSuite should not use resource dependent ↵Dongjoon Hyun2016-11-091-1/+9
| | | | | | | | | | | | | | | | | | | | | | path for golden file generation ## What changes were proposed in this pull request? `LogicalPlanToSQLSuite` uses the following command to update the existing answer files. ```bash SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "hive/test-only *LogicalPlanToSQLSuite" ``` However, after introducing `getTestResourcePath`, it fails to update the previous golden answer files in the predefined directory. This issue aims to fix that. ## How was this patch tested? It's a testsuite update. Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #15789 from dongjoon-hyun/SPARK-18292.
* [SPARK-17659][SQL] Partitioned View is Not Supported By SHOW CREATE TABLEgatorsmile2016-11-091-0/+28
| | | | | | | | | | | | | ### What changes were proposed in this pull request? `Partitioned View` is not supported by SPARK SQL. For Hive partitioned view, SHOW CREATE TABLE is unable to generate the right DDL. Thus, SHOW CREATE TABLE should not support it like the other Hive-only features. This PR is to issue an exception when detecting the view is a partitioned view. ### How was this patch tested? Added a test case Author: gatorsmile <gatorsmile@gmail.com> Closes #15233 from gatorsmile/partitionedView.
* [SPARK-18346][SQL] TRUNCATE TABLE should fail if no partition is matched for ↵Wenchen Fan2016-11-081-7/+5
| | | | | | | | | | | | | | | | the given non-partial partition spec ## What changes were proposed in this pull request? a follow up of https://github.com/apache/spark/pull/15688 ## How was this patch tested? updated test in `DDLSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #15805 from cloud-fan/truncate.
* [SPARK-18137][SQL] Fix RewriteDistinctAggregates UnresolvedException when a ↵root2016-11-081-0/+35
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | UDAF has a foldable TypeCheck ## What changes were proposed in this pull request? In RewriteDistinctAggregates rewrite funtion,after the UDAF's childs are mapped to AttributeRefference, If the UDAF(such as ApproximatePercentile) has a foldable TypeCheck for the input, It will failed because the AttributeRefference is not foldable,then the UDAF is not resolved, and then nullify on the unresolved object will throw a Exception. In this PR, only map Unfoldable child to AttributeRefference, this can avoid the UDAF's foldable TypeCheck. and then only Expand Unfoldable child, there is no need to Expand a static value(foldable value). **Before sql result** > select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1 > org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'percentile_approx(CAST(src.`key` AS DOUBLE), CAST(0.99999BD AS DOUBLE), 10000) > at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:92) > at org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates$.org$apache$spark$sql$catalyst$optimizer$RewriteDistinctAggregates$$nullify(RewriteDistinctAggregates.scala:261) **After sql result** > select percentile_approxy(key,0.99999),count(distinct key),sume(distinc key) from src limit 1 > [498.0,309,79136] ## How was this patch tested? Add a test case in HiveUDFSuit. Author: root <root@iZbp1gsnrlfzjxh82cz80vZ.(none)> Closes #15668 from windpiger/RewriteDistinctUDAFUnresolveExcep.
* [SPARK-18217][SQL] Disallow creating permanent views based on temporary ↵gatorsmile2016-11-071-9/+90
| | | | | | | | | | | | | | | | | | | | | | | | | | views or UDFs ### What changes were proposed in this pull request? Based on the discussion in [SPARK-18209](https://issues.apache.org/jira/browse/SPARK-18209). It doesn't really make sense to create permanent views based on temporary views or temporary UDFs. To disallow the supports and issue the exceptions, this PR needs to detect whether a temporary view/UDF is being used when defining a permanent view. Basically, this PR can be split to two sub-tasks: **Task 1:** detecting a temporary view from the query plan of view definition. When finding an unresolved temporary view, Analyzer replaces it by a `SubqueryAlias` with the corresponding logical plan, which is stored in an in-memory HashMap. After replacement, it is impossible to detect whether the `SubqueryAlias` is added/generated from a temporary view. Thus, to detect the usage of a temporary view in view definition, this PR traverses the unresolved logical plan and uses the name of an `UnresolvedRelation` to detect whether it is a (global) temporary view. **Task 2:** detecting a temporary UDF from the query plan of view definition. Detecting usage of a temporary UDF in view definition is not straightfoward. First, in the analyzed plan, we are having different forms to represent the functions. More importantly, some classes (e.g., `HiveGenericUDF`) are not accessible from `CreateViewCommand`, which is part of `sql/core`. Thus, we used the unanalyzed plan `child` of `CreateViewCommand` to detect the usage of a temporary UDF. Because the plan has already been successfully analyzed, we can assume the functions have been defined/registered. Second, in Spark, the functions have four forms: Spark built-in functions, built-in hash functions, permanent UDFs and temporary UDFs. We do not have any direct way to determine whether a function is temporary or not. Thus, we introduced a function `isTemporaryFunction` in `SessionCatalog`. This function contains the detailed logics to determine whether a function is temporary or not. ### How was this patch tested? Added test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #15764 from gatorsmile/blockTempFromPermViewCreation.
* [SPARK-18086] Add support for Hive session vars.Ryan Blue2016-11-071-0/+50
| | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This adds support for Hive variables: * Makes values set via `spark-sql --hivevar name=value` accessible * Adds `getHiveVar` and `setHiveVar` to the `HiveClient` interface * Adds a SessionVariables trait for sessions like Hive that support variables (including Hive vars) * Adds SessionVariables support to variable substitution * Adds SessionVariables support to the SET command ## How was this patch tested? * Adds a test to all supported Hive versions for accessing Hive variables * Adds HiveVariableSubstitutionSuite Author: Ryan Blue <blue@apache.org> Closes #15738 from rdblue/SPARK-18086-add-hivevar-support.
* [SPARK-17108][SQL] Fix BIGINT and INT comparison failure in spark sqlWeiqing Yang2016-11-071-0/+12
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Add a function to check if two integers are compatible when invoking `acceptsType()` in `DataType`. ## How was this patch tested? Manually. E.g. ``` spark.sql("create table t3(a map<bigint, array<string>>)") spark.sql("select * from t3 where a[1] is not null") ``` Before: ``` cannot resolve 't.`a`[1]' due to data type mismatch: argument 2 requires bigint type, however, '1' is of int type.; line 1 pos 22 org.apache.spark.sql.AnalysisException: cannot resolve 't.`a`[1]' due to data type mismatch: argument 2 requires bigint type, however, '1' is of int type.; line 1 pos 22 at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:307) ``` After: Run the sql queries above. No errors. Author: Weiqing Yang <yangweiqing001@gmail.com> Closes #15448 from weiqingy/SPARK_17108.
* [SPARK-18167][SQL] Disable flaky hive partition pruning test.Reynold Xin2016-11-061-1/+1
|
* [SPARK-18173][SQL] data source tables should support truncating partitionWenchen Fan2016-11-061-0/+64
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Previously `TRUNCATE TABLE ... PARTITION` will always truncate the whole table for data source tables, this PR fixes it and improve `InMemoryCatalog` to make this command work with it. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #15688 from cloud-fan/truncate.
* [SPARK-17183][SPARK-17983][SPARK-18101][SQL] put hive serde table schema to ↵Wenchen Fan2016-11-057-20/+17
| | | | | | | | | | | | | | | | | table properties like data source table ## What changes were proposed in this pull request? For data source tables, we will put its table schema, partition columns, etc. to table properties, to work around some hive metastore issues, e.g. not case-preserving, bad decimal type support, etc. We should also do this for hive serde tables, to reduce the difference between hive serde tables and data source tables, e.g. column names should be case preserving. ## How was this patch tested? existing tests, and a new test in `HiveExternalCatalog` Author: Wenchen Fan <wenchen@databricks.com> Closes #14750 from cloud-fan/minor1.
* [SPARK-18167] Re-enable the non-flaky parts of SQLQuerySuiteEric Liang2016-11-041-21/+10
| | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? It seems the proximate cause of the test failures is that `cast(str as decimal)` in derby will raise an exception instead of returning NULL. This is a problem since Hive sometimes inserts `__HIVE_DEFAULT_PARTITION__` entries into the partition table as documented here: https://github.com/apache/hive/blob/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java#L1034 Basically, when these special default partitions are present, partition pruning pushdown using the SQL-direct mode will fail due this cast exception. As commented on in `MetaStoreDirectSql.java` above, this is normally fine since Hive falls back to JDO pruning, however when the pruning predicate contains an unsupported operator such as `>`, that will fail as well. The only remaining question is why this behavior is nondeterministic. We know that when the test flakes, retries do not help, therefore the cause must be environmental. The current best hypothesis is that some config is different between different jenkins runs, which is why this PR prints out the Spark SQL and Hive confs for the test. The hope is that by comparing the config state for failure vs success we can isolate the root cause of the flakiness. **Update:** we could not isolate the issue. It does not seem to be due to configuration differences. As such, I'm going to enable the non-flaky parts of the test since we are fairly confident these issues only occur with Derby (which is not used in production). ## How was this patch tested? N/A Author: Eric Liang <ekl@databricks.com> Closes #15725 from ericl/print-confs-out.
* [SPARK-17949][SQL] A JVM object based aggregate operatorCheng Lian2016-11-033-0/+750
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR adds a new hash-based aggregate operator named `ObjectHashAggregateExec` that supports `TypedImperativeAggregate`, which may use arbitrary Java objects as aggregation states. Please refer to the [design doc](https://issues.apache.org/jira/secure/attachment/12834260/%5BDesign%20Doc%5D%20Support%20for%20Arbitrary%20Aggregation%20States.pdf) attached in [SPARK-17949](https://issues.apache.org/jira/browse/SPARK-17949) for more details about it. The major benefit of this operator is better performance when evaluating `TypedImperativeAggregate` functions, especially when there are relatively few distinct groups. Functions like Hive UDAFs, `collect_list`, and `collect_set` may also benefit from this after being migrated to `TypedImperativeAggregate`. The following feature flag is introduced to enable or disable the new aggregate operator: - Name: `spark.sql.execution.useObjectHashAggregateExec` - Default value: `true` We can also configure the fallback threshold using the following SQL operation: - Name: `spark.sql.objectHashAggregate.sortBased.fallbackThreshold` - Default value: 128 Fallback to sort-based aggregation when more than 128 distinct groups are accumulated in the aggregation hash map. This number is intentionally made small to avoid GC problems since aggregation buffers of this operator may contain arbitrary Java objects. This may be improved by implementing size tracking for this operator, but that can be done in a separate PR. Code generation and size tracking are planned to be implemented in follow-up PRs. ## Benchmark results ### `ObjectHashAggregateExec` vs `SortAggregateExec` The first benchmark compares `ObjectHashAggregateExec` and `SortAggregateExec` by evaluating `typed_count`, a testing `TypedImperativeAggregate` version of the SQL `count` function. ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ sort agg w/ group by 31251 / 31908 3.4 298.0 1.0X object agg w/ group by w/o fallback 6903 / 7141 15.2 65.8 4.5X object agg w/ group by w/ fallback 20945 / 21613 5.0 199.7 1.5X sort agg w/o group by 4734 / 5463 22.1 45.2 6.6X object agg w/o group by w/o fallback 4310 / 4529 24.3 41.1 7.3X ``` The next benchmark compares `ObjectHashAggregateExec` and `SortAggregateExec` by evaluating the Spark native version of `percentile_approx`. Note that `percentile_approx` is so heavy an aggregate function that the bottleneck of the benchmark is evaluating the aggregate function itself rather than the aggregate operator since I couldn't run a large scale benchmark on my laptop. That's why the results are so close and looks counter-intuitive (aggregation with grouping is even faster than that aggregation without grouping). ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz object agg v.s. sort agg: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ sort agg w/ group by 3418 / 3530 0.6 1630.0 1.0X object agg w/ group by w/o fallback 3210 / 3314 0.7 1530.7 1.1X object agg w/ group by w/ fallback 3419 / 3511 0.6 1630.1 1.0X sort agg w/o group by 4336 / 4499 0.5 2067.3 0.8X object agg w/o group by w/o fallback 4271 / 4372 0.5 2036.7 0.8X ``` ### Hive UDAF vs Spark AF This benchmark compares the following two kinds of aggregate functions: - "hive udaf": Hive implementation of `percentile_approx`, without partial aggregation supports, evaluated using `SortAggregateExec`. - "spark af": Spark native implementation of `percentile_approx`, with partial aggregation support, evaluated using `ObjectHashAggregateExec` The performance differences are mostly due to faster implementation and partial aggregation support in the Spark native version of `percentile_approx`. This benchmark basically shows the performance differences between the worst case, where an aggregate function without partial aggregation support is evaluated using `SortAggregateExec`, and the best case, where a `TypedImperativeAggregate` with partial aggregation support is evaluated using `ObjectHashAggregateExec`. ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ hive udaf w/o group by 5326 / 5408 0.0 81264.2 1.0X spark af w/o group by 93 / 111 0.7 1415.6 57.4X hive udaf w/ group by 3804 / 3946 0.0 58050.1 1.4X spark af w/ group by w/o fallback 71 / 90 0.9 1085.7 74.8X spark af w/ group by w/ fallback 98 / 111 0.7 1501.6 54.1X ``` ### Real world benchmark We also did a relatively large benchmark using a real world query involving `percentile_approx`: - Hive UDAF implementation, sort-based aggregation, w/o partial aggregation support 24.77 minutes - Native implementation, sort-based aggregation, w/ partial aggregation support 4.64 minutes - Native implementation, object hash aggregator, w/ partial aggregation support 1.80 minutes ## How was this patch tested? New unit tests and randomized test cases are added in `ObjectAggregateFunctionSuite`. Author: Cheng Lian <lian@databricks.com> Closes #15590 from liancheng/obj-hash-agg.
* [SPARK-17963][SQL][DOCUMENTATION] Add examples (extend) in each expression ↵hyukjinkwon2016-11-021-10/+14
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | and improve documentation ## What changes were proposed in this pull request? This PR proposes to change the documentation for functions. Please refer the discussion from https://github.com/apache/spark/pull/15513 The changes include - Re-indent the documentation - Add examples/arguments in `extended` where the arguments are multiple or specific format (e.g. xml/ json). For examples, the documentation was updated as below: ### Functions with single line usage **Before** - `pow` ``` sql Usage: pow(x1, x2) - Raise x1 to the power of x2. Extended Usage: > SELECT pow(2, 3); 8.0 ``` - `current_timestamp` ``` sql Usage: current_timestamp() - Returns the current timestamp at the start of query evaluation. Extended Usage: No example for current_timestamp. ``` **After** - `pow` ``` sql Usage: pow(expr1, expr2) - Raises `expr1` to the power of `expr2`. Extended Usage: Examples: > SELECT pow(2, 3); 8.0 ``` - `current_timestamp` ``` sql Usage: current_timestamp() - Returns the current timestamp at the start of query evaluation. Extended Usage: No example/argument for current_timestamp. ``` ### Functions with (already) multiple line usage **Before** - `approx_count_distinct` ``` sql Usage: approx_count_distinct(expr) - Returns the estimated cardinality by HyperLogLog++. approx_count_distinct(expr, relativeSD=0.05) - Returns the estimated cardinality by HyperLogLog++ with relativeSD, the maximum estimation error allowed. Extended Usage: No example for approx_count_distinct. ``` - `percentile_approx` ``` sql Usage: percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric column `col` at the given percentage. The value of percentage must be between 0.0 and 1.0. The `accuracy` parameter (default: 10000) is a positive integer literal which controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of the approximation. percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy]) - Returns the approximate percentile array of column `col` at the given percentage array. Each value of the percentage array must be between 0.0 and 1.0. The `accuracy` parameter (default: 10000) is a positive integer literal which controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of the approximation. Extended Usage: No example for percentile_approx. ``` **After** - `approx_count_distinct` ``` sql Usage: approx_count_distinct(expr[, relativeSD]) - Returns the estimated cardinality by HyperLogLog++. `relativeSD` defines the maximum estimation error allowed. Extended Usage: No example/argument for approx_count_distinct. ``` - `percentile_approx` ``` sql Usage: percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric column `col` at the given percentage. The value of percentage must be between 0.0 and 1.0. The `accuracy` parameter (default: 10000) is a positive numeric literal which controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of the approximation. When `percentage` is an array, each value of the percentage array must be between 0.0 and 1.0. In this case, returns the approximate percentile array of column `col` at the given percentage array. Extended Usage: Examples: > SELECT percentile_approx(10.0, array(0.5, 0.4, 0.1), 100); [10.0,10.0,10.0] > SELECT percentile_approx(10.0, 0.5, 100); 10.0 ``` ## How was this patch tested? Manually tested **When examples are multiple** ``` sql spark-sql> describe function extended reflect; Function: reflect Class: org.apache.spark.sql.catalyst.expressions.CallMethodViaReflection Usage: reflect(class, method[, arg1[, arg2 ..]]) - Calls a method with reflection. Extended Usage: Examples: > SELECT reflect('java.util.UUID', 'randomUUID'); c33fb387-8500-4bfa-81d2-6e0e3e930df2 > SELECT reflect('java.util.UUID', 'fromString', 'a5cf6c42-0c85-418f-af6c-3e4e5b1328f2'); a5cf6c42-0c85-418f-af6c-3e4e5b1328f2 ``` **When `Usage` is in single line** ``` sql spark-sql> describe function extended min; Function: min Class: org.apache.spark.sql.catalyst.expressions.aggregate.Min Usage: min(expr) - Returns the minimum value of `expr`. Extended Usage: No example/argument for min. ``` **When `Usage` is already in multiple lines** ``` sql spark-sql> describe function extended percentile_approx; Function: percentile_approx Class: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile Usage: percentile_approx(col, percentage [, accuracy]) - Returns the approximate percentile value of numeric column `col` at the given percentage. The value of percentage must be between 0.0 and 1.0. The `accuracy` parameter (default: 10000) is a positive numeric literal which controls approximation accuracy at the cost of memory. Higher value of `accuracy` yields better accuracy, `1.0/accuracy` is the relative error of the approximation. When `percentage` is an array, each value of the percentage array must be between 0.0 and 1.0. In this case, returns the approximate percentile array of column `col` at the given percentage array. Extended Usage: Examples: > SELECT percentile_approx(10.0, array(0.5, 0.4, 0.1), 100); [10.0,10.0,10.0] > SELECT percentile_approx(10.0, 0.5, 100); 10.0 ``` **When example/argument is missing** ``` sql spark-sql> describe function extended rank; Function: rank Class: org.apache.spark.sql.catalyst.expressions.Rank Usage: rank() - Computes the rank of a value in a group of values. The result is one plus the number of rows preceding or equal to the current row in the ordering of the partition. The values will produce gaps in the sequence. Extended Usage: No example/argument for rank. ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #15677 from HyukjinKwon/SPARK-17963-1.
* [SPARK-17470][SQL] unify path for data source table and locationUri for hive ↵Wenchen Fan2016-11-025-21/+30
| | | | | | | | | | | | | | | | | | | | | | | | serde table ## What changes were proposed in this pull request? Due to a limitation of hive metastore(table location must be directory path, not file path), we always store `path` for data source table in storage properties, instead of the `locationUri` field. However, we should not expose this difference to `CatalogTable` level, but just treat it as a hack in `HiveExternalCatalog`, like we store table schema of data source table in table properties. This PR unifies `path` and `locationUri` outside of `HiveExternalCatalog`, both data source table and hive serde table should use the `locationUri` field. This PR also unifies the way we handle default table location for managed table. Previously, the default table location of hive serde managed table is set by external catalog, but the one of data source table is set by command. After this PR, we follow the hive way and the default table location is always set by external catalog. For managed non-file-based tables, we will assign a default table location and create an empty directory for it, the table location will be removed when the table is dropped. This is reasonable as metastore doesn't care about whether a table is file-based or not, and an empty table directory has no harm. For external non-file-based tables, ideally we can omit the table location, but due to a hive metastore issue, we will assign a random location to it, and remove it right after the table is created. See SPARK-15269 for more details. This is fine as it's well isolated in `HiveExternalCatalog`. To keep the existing behaviour of the `path` option, in this PR we always add the `locationUri` to storage properties using key `path`, before passing storage properties to `DataSource` as data source options. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #15024 from cloud-fan/path.
* [SPARK-16839][SQL] Simplify Struct creation code patheyal farago2016-11-022-5/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Simplify struct creation, especially the aspect of `CleanupAliases` which missed some aliases when handling trees created by `CreateStruct`. This PR includes: 1. A failing test (create struct with nested aliases, some of the aliases survive `CleanupAliases`). 2. A fix that transforms `CreateStruct` into a `CreateNamedStruct` constructor, effectively eliminating `CreateStruct` from all expression trees. 3. A `NamePlaceHolder` used by `CreateStruct` when column names cannot be extracted from unresolved `NamedExpression`. 4. A new Analyzer rule that resolves `NamePlaceHolder` into a string literal once the `NamedExpression` is resolved. 5. `CleanupAliases` code was simplified as it no longer has to deal with `CreateStruct`'s top level columns. ## How was this patch tested? Running all tests-suits in package org.apache.spark.sql, especially including the analysis suite, making sure added test initially fails, after applying suggested fix rerun the entire analysis package successfully. Modified few tests that expected `CreateStruct` which is now transformed into `CreateNamedStruct`. Author: eyal farago <eyal farago> Author: Herman van Hovell <hvanhovell@databricks.com> Author: eyal farago <eyal.farago@gmail.com> Author: Eyal Farago <eyal.farago@actimize.com> Author: Hyukjin Kwon <gurwls223@gmail.com> Author: eyalfa <eyal.farago@gmail.com> Closes #15718 from hvanhovell/SPARK-16839-2.
* [SPARK-18076][CORE][SQL] Fix default Locale used in DateFormat, NumberFormat ↵Sean Owen2016-11-021-1/+2
| | | | | | | | | | | | | | | to Locale.US ## What changes were proposed in this pull request? Fix `Locale.US` for all usages of `DateFormat`, `NumberFormat` ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #15610 from srowen/SPARK-18076.
* [SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION ↵Eric Liang2016-11-021-0/+52
| | | | | | | | | | | | | | | | | | | | | | | for Datasource tables ## What changes were proposed in this pull request? There are a couple issues with the current 2.1 behavior when inserting into Datasource tables with partitions managed by Hive. (1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table instead of just the specified partition. (2) INSERT|OVERWRITE does not work with partitions that have custom locations. This PR fixes both of these issues for Datasource tables managed by Hive. The behavior for legacy tables or when `manageFilesourcePartitions = false` is unchanged. There is one other issue in that INSERT OVERWRITE with dynamic partitions will overwrite the entire table instead of just the updated partitions, but this behavior is pretty complicated to implement for Datasource tables. We should address that in a future release. ## How was this patch tested? Unit tests. Author: Eric Liang <ekl@databricks.com> Closes #15705 from ericl/sc-4942.
* [SPARK-17992][SQL] Return all partitions from HiveShim when Hive throws a ↵Michael Allman2016-11-013-57/+137
| | | | | | | | | | | | | | | | | | | | metastore exception when attempting to fetch partitions by filter (Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17992) ## What changes were proposed in this pull request? We recently added table partition pruning for partitioned Hive tables converted to using `TableFileCatalog`. When the Hive configuration option `hive.metastore.try.direct.sql` is set to `false`, Hive will throw an exception for unsupported filter expressions. For example, attempting to filter on an integer partition column will throw a `org.apache.hadoop.hive.metastore.api.MetaException`. I discovered this behavior because VideoAmp uses the CDH version of Hive with a Postgresql metastore DB. In this configuration, CDH sets `hive.metastore.try.direct.sql` to `false` by default, and queries that filter on a non-string partition column will fail. Rather than throw an exception in query planning, this patch catches this exception, logs a warning and returns all table partitions instead. Clients of this method are already expected to handle the possibility that the filters will not be honored. ## How was this patch tested? A unit test was added. Author: Michael Allman <michael@videoamp.com> Closes #15673 from mallman/spark-17992-catch_hive_partition_filter_exception.
* [SPARK-18167] Disable flaky SQLQuerySuite testEric Liang2016-11-011-1/+1
| | | | | | | | | | We now know it's a persistent environmental issue that is causing this test to sometimes fail. One hypothesis is that some configuration is leaked from another suite, and depending on suite ordering this can cause this test to fail. I am planning on mining the jenkins logs to try to narrow down which suite could be causing this. For now, disable the test. Author: Eric Liang <ekl@databricks.com> Closes #15720 from ericl/disable-flaky-test.
* Revert "[SPARK-16839][SQL] redundant aliases after cleanupAliases"Herman van Hovell2016-11-012-9/+5
| | | | This reverts commit 5441a6269e00e3903ae6c1ea8deb4ddf3d2e9975.
* [SPARK-16839][SQL] redundant aliases after cleanupAliaseseyal farago2016-11-012-5/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Simplify struct creation, especially the aspect of `CleanupAliases` which missed some aliases when handling trees created by `CreateStruct`. This PR includes: 1. A failing test (create struct with nested aliases, some of the aliases survive `CleanupAliases`). 2. A fix that transforms `CreateStruct` into a `CreateNamedStruct` constructor, effectively eliminating `CreateStruct` from all expression trees. 3. A `NamePlaceHolder` used by `CreateStruct` when column names cannot be extracted from unresolved `NamedExpression`. 4. A new Analyzer rule that resolves `NamePlaceHolder` into a string literal once the `NamedExpression` is resolved. 5. `CleanupAliases` code was simplified as it no longer has to deal with `CreateStruct`'s top level columns. ## How was this patch tested? running all tests-suits in package org.apache.spark.sql, especially including the analysis suite, making sure added test initially fails, after applying suggested fix rerun the entire analysis package successfully. modified few tests that expected `CreateStruct` which is now transformed into `CreateNamedStruct`. Credit goes to hvanhovell for assisting with this PR. Author: eyal farago <eyal farago> Author: eyal farago <eyal.farago@gmail.com> Author: Herman van Hovell <hvanhovell@databricks.com> Author: Eyal Farago <eyal.farago@actimize.com> Author: Hyukjin Kwon <gurwls223@gmail.com> Author: eyalfa <eyal.farago@gmail.com> Closes #14444 from eyalfa/SPARK-16839_redundant_aliases_after_cleanupAliases.
* [SPARK-18107][SQL] Insert overwrite statement runs much slower in spark-sql ↵Liang-Chi Hsieh2016-11-011-0/+33
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | than it does in hive-client ## What changes were proposed in this pull request? As reported on the jira, insert overwrite statement runs much slower in Spark, compared with hive-client. It seems there is a patch [HIVE-11940](https://github.com/apache/hive/commit/ba21806b77287e237e1aa68fa169d2a81e07346d) which largely improves insert overwrite performance on Hive. HIVE-11940 is patched after Hive 2.0.0. Because Spark SQL uses older Hive library, we can not benefit from such improvement. The reporter verified that there is also a big performance gap between Hive 1.2.1 (520.037 secs) and Hive 2.0.1 (35.975 secs) on insert overwrite execution. Instead of upgrading to Hive 2.0 in Spark SQL, which might not be a trivial task, this patch provides an approach to delete the partition before asking Hive to load data files into the partition. Note: The case reported on the jira is insert overwrite to partition. Since `Hive.loadTable` also uses the function to replace files, insert overwrite to table should has the same issue. We can take the same approach to delete the table first. I will upgrade this to include this. ## How was this patch tested? Jenkins tests. There are existing tests using insert overwrite statement. Those tests should be passed. I added a new test to specially test insert overwrite into partition. For performance issue, as I don't have Hive 2.0 environment, this needs the reporter to verify it. Please refer to the jira. Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #15667 from viirya/improve-hive-insertoverwrite.
* [SPARK-18024][SQL] Introduce an internal commit protocol APIReynold Xin2016-10-312-17/+12
| | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch introduces an internal commit protocol API that is used by the batch data source to do write commits. It currently has only one implementation that uses Hadoop MapReduce's OutputCommitter API. In the future, this commit API can be used to unify streaming and batch commits. ## How was this patch tested? Should be covered by existing write tests. Author: Reynold Xin <rxin@databricks.com> Author: Eric Liang <ekl@databricks.com> Closes #15707 from rxin/SPARK-18024-2.
* [SPARK-18167][SQL] Retry when the SQLQuerySuite test flakesEric Liang2016-10-311-8/+20
| | | | | | | | | | | | ## What changes were proposed in this pull request? This will re-run the flaky test a few times after it fails. This will help determine if it's due to nondeterministic test setup, or because of some environment issue (e.g. leaked config from another test). cc yhuai Author: Eric Liang <ekl@databricks.com> Closes #15708 from ericl/spark-18167-3.
* [SPARK-18103][SQL] Rename *FileCatalog to *FileIndexEric Liang2016-10-303-9/+9
| | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? To reduce the number of components in SQL named *Catalog, rename *FileCatalog to *FileIndex. A FileIndex is responsible for returning the list of partitions / files to scan given a filtering expression. ``` TableFileCatalog => CatalogFileIndex FileCatalog => FileIndex ListingFileCatalog => InMemoryFileIndex MetadataLogFileCatalog => MetadataLogFileIndex PrunedTableFileCatalog => PrunedInMemoryFileIndex ``` cc yhuai marmbrus ## How was this patch tested? N/A Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes #15634 from ericl/rename-file-provider.
* [SPARK-18121][SQL] Unable to query global temp views when hive support is ↵Sunitha Kambhampati2016-10-281-0/+16
| | | | | | | | | | | | | | | | | | | | | | | | | | enabled ## What changes were proposed in this pull request? Issue: Querying on a global temp view throws Table or view not found exception. Fix: Update the lookupRelation in HiveSessionCatalog to check for global temp views similar to the SessionCatalog.lookupRelation. Before fix: Querying on a global temp view ( for. e.g.: select * from global_temp.v1) throws Table or view not found exception After fix: Query succeeds and returns the right result. ## How was this patch tested? - Two unit tests are added to check for global temp view for the code path when hive support is enabled. - Regression unit tests were run successfully. ( build/sbt -Phive hive/test, build/sbt sql/test, build/sbt catalyst/test) Author: Sunitha Kambhampati <skambha@us.ibm.com> Closes #15649 from skambha/lookuprelationChanges.
* [SPARK-17970][SQL] store partition spec in metastore for data source tableEric Liang2016-10-276-61/+268
| | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? We should follow hive table and also store partition spec in metastore for data source table. This brings 2 benefits: 1. It's more flexible to manage the table data files, as users can use `ADD PARTITION`, `DROP PARTITION` and `RENAME PARTITION` 2. We don't need to cache all file status for data source table anymore. ## How was this patch tested? existing tests. Author: Eric Liang <ekl@databricks.com> Author: Michael Allman <michael@videoamp.com> Author: Eric Liang <ekhliang@gmail.com> Author: Wenchen Fan <wenchen@databricks.com> Closes #15515 from cloud-fan/partition.
* [SPARK-18026][SQL] should not always lowercase partition columns of ↵Wenchen Fan2016-10-252-16/+4
| | | | | | | | | | | | | | | | | | | | partition spec in parser ## What changes were proposed in this pull request? Currently we always lowercase the partition columns of partition spec in parser, with the assumption that table partition columns are always lowercased. However, this is not true for data source tables, which are case preserving. It's safe for now because data source tables don't store partition spec in metastore and don't support `ADD PARTITION`, `DROP PARTITION`, `RENAME PARTITION`, but we should make our code future-proof. This PR makes partition spec case preserving at parser, and improve the `PreprocessTableInsertion` analyzer rule to normalize the partition columns in partition spec, w.r.t. the table partition columns. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #15566 from cloud-fan/partition-spec.
* [SPARK-17409][SQL][FOLLOW-UP] Do Not Optimize Query in CTAS More Than Oncegatorsmile2016-10-251-2/+18
| | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? This follow-up PR is for addressing the [comment](https://github.com/apache/spark/pull/15048). We added two test cases based on the suggestion from yhuai . One is a new test case using the `saveAsTable` API to create a data source table. Another is for CTAS on Hive serde table. Note: No need to backport this PR to 2.0. Will submit a new PR to backport the whole fix with new test cases to Spark 2.0 ### How was this patch tested? N/A Author: gatorsmile <gatorsmile@gmail.com> Closes #15459 from gatorsmile/ctasOptimizedTestCases.
* [SPARK-18028][SQL] simplify TableFileCatalogWenchen Fan2016-10-252-7/+41
| | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Simplify/cleanup TableFileCatalog: 1. pass a `CatalogTable` instead of `databaseName` and `tableName` into `TableFileCatalog`, so that we don't need to fetch table metadata from metastore again 2. In `TableFileCatalog.filterPartitions0`, DO NOT set `PartitioningAwareFileCatalog.BASE_PATH_PARAM`. According to the [classdoc](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L189-L209), the default value of `basePath` already satisfies our need. What's more, if we set this parameter, we may break the case 2 which is metioned in the classdoc. 3. add `equals` and `hashCode` to `TableFileCatalog` 4. add `SessionCatalog.listPartitionsByFilter` which handles case sensitivity. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #15568 from cloud-fan/table-file-catalog.
* [SPARK-17810][SQL] Default spark.sql.warehouse.dir is relative to local FS ↵Sean Owen2016-10-242-3/+6
| | | | | | | | | | | | | | | | but can resolve as HDFS path ## What changes were proposed in this pull request? Always resolve spark.sql.warehouse.dir as a local path, and as relative to working dir not home dir ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #15382 from srowen/SPARK-17810.
* [SPARK-18045][SQL][TESTS] Move `HiveDataFrameAnalyticsSuite` to package `sql`jiangxingbo2016-10-231-72/+0
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? The testsuite `HiveDataFrameAnalyticsSuite` has nothing to do with HIVE, we should move it to package `sql`. The original test cases in that suite are splited into two existing testsuites: `DataFrameAggregateSuite` tests for the functions and ~~`SQLQuerySuite`~~`SQLQueryTestSuite` tests for the SQL statements. ## How was this patch tested? ~~Modified `SQLQuerySuite` in package `sql`.~~ Add query file for `SQLQueryTestSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #15582 from jiangxb1987/group-analytics-test.
* [SPARK-18038][SQL] Move output partitioning definition from UnaryNodeExec to ↵Tejas Patil2016-10-231-0/+4
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | its children ## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-18038 This was a suggestion by rxin over one of the dev list discussion : http://apache-spark-developers-list.1001551.n3.nabble.com/Project-not-preserving-child-partitioning-td19417.html His words: >> It would be better (safer) to move the output partitioning definition into each of the operator and remove it from UnaryExecNode. With this PR, following is the output partitioning and ordering for all the impls of `UnaryExecNode`. UnaryExecNode's impl | outputPartitioning | outputOrdering | comment ------------ | ------------- | ------------ | ------------ AppendColumnsExec | child's | Nil | child's ordering can be used AppendColumnsWithObjectExec | child's | Nil | child's ordering can be used BroadcastExchangeExec | BroadcastPartitioning | Nil | - CoalesceExec | UnknownPartitioning | Nil | - CollectLimitExec | SinglePartition | Nil | - DebugExec | child's | Nil | child's ordering can be used DeserializeToObjectExec | child's | Nil | child's ordering can be used ExpandExec | UnknownPartitioning | Nil | - FilterExec | child's | child's | - FlatMapGroupsInRExec | child's | Nil | child's ordering can be used GenerateExec | child's | Nil | need to dig more GlobalLimitExec | child's | child's | - HashAggregateExec | child's | Nil | - InputAdapter | child's | child's | - InsertIntoHiveTable | child's | Nil | terminal node, doesn't need partitioning LocalLimitExec | child's | child's | - MapElementsExec | child's | child's | - MapGroupsExec | child's | Nil | child's ordering can be used MapPartitionsExec | child's | Nil | child's ordering can be used ProjectExec | child's | child's | - SampleExec | child's | Nil | child's ordering can be used ScriptTransformation | child's | Nil | child's ordering can be used SerializeFromObjectExec | child's | Nil | child's ordering can be used ShuffleExchange | custom | Nil | - SortAggregateExec | child's | sort over grouped exprs | - SortExec | child's | custom | - StateStoreRestoreExec | child's | Nil | child's ordering can be used StateStoreSaveExec | child's | Nil | child's ordering can be used SubqueryExec | child's | child's | - TakeOrderedAndProjectExec | SinglePartition | custom | - WholeStageCodegenExec | child's | child's | - WindowExec | child's | child's | - ## How was this patch tested? This does NOT change any existing functionality so relying on existing tests Author: Tejas Patil <tejasp@fb.com> Closes #15575 from tejasapatil/SPARK-18038_UnaryNodeExec_output_partitioning.
* [SPARK-17994][SQL] Add back a file status cache for catalog tablesEric Liang2016-10-222-17/+126
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? In SPARK-16980, we removed the full in-memory cache of table partitions in favor of loading only needed partitions from the metastore. This greatly improves the initial latency of queries that only read a small fraction of table partitions. However, since the metastore does not store file statistics, we need to discover those from remote storage. With the loss of the in-memory file status cache this has to happen on each query, increasing the latency of repeated queries over the same partitions. The proposal is to add back a per-table cache of partition contents, i.e. Map[Path, Array[FileStatus]]. This cache would be retained per-table, and can be invalidated through refreshTable() and refreshByPath(). Unlike the prior cache, it can be incrementally updated as new partitions are read. ## How was this patch tested? Existing tests and new tests in `HiveTablePerfStatsSuite`. cc mallman Author: Eric Liang <ekl@databricks.com> Author: Michael Allman <michael@videoamp.com> Author: Eric Liang <ekhliang@gmail.com> Closes #15539 from ericl/meta-cache.
* [SPARK-18042][SQL] OutputWriter should expose file path writtenReynold Xin2016-10-212-0/+6
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch adds a new "path" method on OutputWriter that returns the path of the file written by the OutputWriter. This is part of the necessary work to consolidate structured streaming and batch write paths. The batch write path has a nice feature that each data source can define the extension of the files, and allow Spark to specify the staging directory and the prefix for the files. However, in the streaming path we need to collect the list of files written, and there is no interface right now to do that. ## How was this patch tested? N/A - there is no behavior change and this should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #15580 from rxin/SPARK-18042.
* [SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of ↵Wenchen Fan2016-10-213-4/+80
| | | | | | | | | | | | | | | | LogicalRelation ## What changes were proposed in this pull request? In `PruneFileSourcePartitions`, we will replace the `LogicalRelation` with a pruned one. However, this replacement may change the output of the `LogicalRelation` if it doesn't have `expectedOutputAttributes`. This PR fixes it. ## How was this patch tested? the new `PruneFileSourcePartitionsSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #15569 from cloud-fan/partition-bug.
* [SPARK-18021][SQL] Refactor file name specification for data sourcesReynold Xin2016-10-203-22/+15
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently each data source OutputWriter is responsible for specifying the entire file name for each file output. This, however, does not make any sense because we rely on file naming schemes for certain behaviors in Spark SQL, e.g. bucket id. The current approach allows individual data sources to break the implementation of bucketing. On the flip side, we also don't want to move file naming entirely out of data sources, because different data sources do want to specify different extensions. This patch divides file name specification into two parts: the first part is a prefix specified by the caller of OutputWriter (in WriteOutput), and the second part is the suffix that can be specified by the OutputWriter itself. Note that a side effect of this change is that now all file based data sources also support bucketing automatically. There are also some other minor cleanups: - Removed the UUID passed through generic Configuration string - Some minor rewrites for better clarity - Renamed "path" in multiple places to "stagingDir", to more accurately reflect its meaning ## How was this patch tested? This should be covered by existing data source tests. Author: Reynold Xin <rxin@databricks.com> Closes #15562 from rxin/SPARK-18021.
* [SPARK-17698][SQL] Join predicates should not contain filter clausesTejas Patil2016-10-201-23/+101
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-17698 `ExtractEquiJoinKeys` is incorrectly using filter predicates as the join condition for joins. `canEvaluate` [0] tries to see if the an `Expression` can be evaluated using output of a given `Plan`. In case of filter predicates (eg. `a.id='1'`), the `Expression` passed for the right hand side (ie. '1' ) is a `Literal` which does not have any attribute references. Thus `expr.references` is an empty set which theoretically is a subset of any set. This leads to `canEvaluate` returning `true` and `a.id='1'` is treated as a join predicate. While this does not lead to incorrect results but in case of bucketed + sorted tables, we might miss out on avoiding un-necessary shuffle + sort. See example below: [0] : https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L91 eg. ``` val df = (1 until 10).toDF("id").coalesce(1) hc.sql("DROP TABLE IF EXISTS table1").collect df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table1") hc.sql("DROP TABLE IF EXISTS table2").collect df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table2") sqlContext.sql(""" SELECT a.id, b.id FROM table1 a FULL OUTER JOIN table2 b ON a.id = b.id AND a.id='1' AND b.id='1' """).explain(true) ``` BEFORE: This is doing shuffle + sort over table scan outputs which is not needed as both tables are bucketed and sorted on the same columns and have same number of buckets. This should be a single stage job. ``` SortMergeJoin [id#38, cast(id#38 as double), 1.0], [id#39, 1.0, cast(id#39 as double)], FullOuter :- *Sort [id#38 ASC NULLS FIRST, cast(id#38 as double) ASC NULLS FIRST, 1.0 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#38, cast(id#38 as double), 1.0, 200) : +- *FileScan parquet default.table1[id#38] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int> +- *Sort [id#39 ASC NULLS FIRST, 1.0 ASC NULLS FIRST, cast(id#39 as double) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#39, 1.0, cast(id#39 as double), 200) +- *FileScan parquet default.table2[id#39] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int> ``` AFTER : ``` SortMergeJoin [id#32], [id#33], FullOuter, ((cast(id#32 as double) = 1.0) && (cast(id#33 as double) = 1.0)) :- *FileScan parquet default.table1[id#32] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int> +- *FileScan parquet default.table2[id#33] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int> ``` ## How was this patch tested? - Added a new test case for this scenario : `SPARK-17698 Join predicates should not contain filter clauses` - Ran all the tests in `BucketedReadSuite` Author: Tejas Patil <tejasp@fb.com> Closes #15272 from tejasapatil/SPARK-17698_join_predicate_filter_clause.
* [SPARK-17860][SQL] SHOW COLUMN's database conflict check should respect case ↵Dilip Biswal2016-10-202-23/+2
| | | | | | | | | | | | | | | | | sensitivity configuration ## What changes were proposed in this pull request? SHOW COLUMNS command validates the user supplied database name with database name from qualified table name name to make sure both of them are consistent. This comparison should respect case sensitivity. ## How was this patch tested? Added tests in DDLSuite and existing tests were moved to use new sql based test infrastructure. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #15423 from dilipbiswal/dkb_show_column_fix.
* [SPARK-17796][SQL] Support wildcard character in filename for LOAD DATA ↵Dongjoon Hyun2016-10-201-0/+30
| | | | | | | | | | | | | | | | | | | | | | | | | LOCAL INPATH ## What changes were proposed in this pull request? Currently, Spark 2.0 raises an `input path does not exist` AnalysisException if the file name contains '*'. It is misleading since it occurs when there exist some matched files. Also, it was a supported feature in Spark 1.6.2. This PR aims to support wildcard characters in filename for `LOAD DATA LOCAL INPATH` SQL command like Spark 1.6.2. **Reported Error Scenario** ```scala scala> sql("CREATE TABLE t(a string)") res0: org.apache.spark.sql.DataFrame = [] scala> sql("LOAD DATA LOCAL INPATH '/tmp/x*' INTO TABLE t") org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: /tmp/x*; ``` ## How was this patch tested? Pass the Jenkins test with a new test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #15376 from dongjoon-hyun/SPARK-17796.
* [SPARK-17980][SQL] Fix refreshByPath for converted Hive tablesEric Liang2016-10-191-2/+19
| | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? There was a bug introduced in https://github.com/apache/spark/pull/14690 which broke refreshByPath with converted hive tables (though, it turns out it was very difficult to refresh converted hive tables anyways, since you had to specify the exact path of one of the partitions). This changes refreshByPath to invalidate by prefix instead of exact match, and fixes the issue. cc sameeragarwal for refreshByPath changes mallman ## How was this patch tested? Extended unit test. Author: Eric Liang <ekl@databricks.com> Closes #15521 from ericl/fix-caching.
* [SPARK-17899][SQL][FOLLOW-UP] debug mode should work for corrupted tableWenchen Fan2016-10-181-3/+15
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Debug mode should work for corrupted table, so that we can really debug ## How was this patch tested? new test in `MetastoreDataSourcesSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #15528 from cloud-fan/debug.
* [SPARK-17620][SQL] Determine Serde by hive.default.fileformat when Creating ↵Dilip Biswal2016-10-172-6/+59
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Hive Serde Tables ## What changes were proposed in this pull request? Reopens the closed PR https://github.com/apache/spark/pull/15190 (Please refer to the above link for review comments on the PR) Make sure the hive.default.fileformat is used to when creating the storage format metadata. Output ``` SQL scala> spark.sql("SET hive.default.fileformat=orc") res1: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> spark.sql("CREATE TABLE tmp_default(id INT)") res2: org.apache.spark.sql.DataFrame = [] ``` Before ```SQL scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println) .. [# Storage Information,,] [SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,] [InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,] [OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,] [Compressed:,No,] [Storage Desc Parameters:,,] [ serialization.format,1,] ``` After ```SQL scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println) .. [# Storage Information,,] [SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,] [InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,] [OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,] [Compressed:,No,] [Storage Desc Parameters:,,] [ serialization.format,1,] ``` ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Added new tests to HiveDDLCommandSuite, SQLQuerySuite Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #15495 from dilipbiswal/orc2.
* [SPARK-16980][SQL] Load only catalog table partition metadata required to ↵Michael Allman2016-10-145-5/+191
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | answer a query (This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.) ## What changes were proposed in this pull request? In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference. If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild. In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space. This PR proposes an alternative approach. Basically, it makes four changes: 1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates. 1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates. 1. It removes partition loading and caching from `HiveMetastoreCatalog`. 1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog. The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters. As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted. ## Open Issues 1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR. 1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by #14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue. 1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`. 1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly. ## How was this patch tested? The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded. Author: Michael Allman <michael@videoamp.com> Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes #14690 from mallman/spark-16980-lazy_partition_fetching.