aboutsummaryrefslogtreecommitdiff
path: root/sql/core
Commit message (Collapse)AuthorAgeFilesLines
...
* [SPARK-19967][SQL] Add from_json in FunctionRegistryTakeshi Yamamuro2017-03-173-2/+154
| | | | | | | | | | | | ## What changes were proposed in this pull request? This pr added entries in `FunctionRegistry` and supported `from_json` in SQL. ## How was this patch tested? Added tests in `JsonFunctionsSuite` and `SQLQueryTestSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17320 from maropu/SPARK-19967.
* [SPARK-19882][SQL] Pivot with null as a distinct pivot value throws NPEAndrew Ray2017-03-171-0/+14
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Allows null values of the pivot column to be included in the pivot values list without throwing NPE Note this PR was made as an alternative to #17224 but preserves the two phase aggregate operation that is needed for good performance. ## How was this patch tested? Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes #17226 from aray/pivot-null.
* [SPARK-19987][SQL] Pass all filters into FileIndexReynold Xin2017-03-166-24/+34
| | | | | | | | | | | | ## What changes were proposed in this pull request? This is a tiny teeny refactoring to pass data filters also to the FileIndex, so FileIndex can have a more global view on predicates. ## How was this patch tested? Change should be covered by existing test cases. Author: Reynold Xin <rxin@databricks.com> Closes #17322 from rxin/SPARK-19987.
* [SPARK-19721][SS] Good error message for version mismatch in log filesLiwei Lin2017-03-169-26/+129
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## Problem There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message. ## What changes were proposed in this pull request? This patch made two major changes: 1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking): ``` HDFSMetadataLog - CompactibleFileStreamLog ------------> fixed with this patch - FileStreamSourceLog ---------------> inherited the fix of `CompactibleFileStreamLog` - FileStreamSinkLog -----------------> inherited the fix of `CompactibleFileStreamLog` - OffsetSeqLog ------------------------> fixed with this patch - anonymous subclass in KafkaSource ---> fixed with this patch ``` 2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"` - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"` ## Exception message with this patch ``` java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade. at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) ``` ## How was this patch tested? unit tests Author: Liwei Lin <lwlin7@gmail.com> Closes #17070 from lw-lin/better-msg.
* [SPARK-19751][SQL] Throw an exception if bean class has one's own class in ↵Takeshi Yamamuro2017-03-162-0/+119
| | | | | | | | | | | | | | | | | | | | | fields ## What changes were proposed in this pull request? The current master throws `StackOverflowError` in `createDataFrame`/`createDataset` if bean has one's own class in fields; ``` public class SelfClassInFieldBean implements Serializable { private SelfClassInFieldBean child; ... } ``` This pr added code to throw `UnsupportedOperationException` in that case as soon as possible. ## How was this patch tested? Added tests in `JavaDataFrameSuite` and `JavaDatasetSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17188 from maropu/SPARK-19751.
* [SPARK-19961][SQL][MINOR] unify a erro msg when drop databse for ↵windpiger2017-03-161-6/+2
| | | | | | | | | | | | | | HiveExternalCatalog and InMemoryCatalog ## What changes were proposed in this pull request? unify a exception erro msg for dropdatabase when the database still have some tables for HiveExternalCatalog and InMemoryCatalog ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #17305 from windpiger/unifyErromsg.
* [SPARK-19948] Document that saveAsTable uses catalog as source of truth for ↵Juliusz Sompolski2017-03-161-0/+5
| | | | | | | | | | | | table existence. It is quirky behaviour that saveAsTable to e.g. a JDBC source with SaveMode other than Overwrite will nevertheless overwrite the table in the external source, if that table was not a catalog table. Author: Juliusz Sompolski <julek@databricks.com> Closes #17289 from juliuszsompolski/saveAsTableDoc.
* [SPARK-19931][SQL] InMemoryTableScanExec should rewrite output partitioning ↵Liang-Chi Hsieh2017-03-162-3/+44
| | | | | | | | | | | | | | | | | | | | and ordering when aliasing output attributes ## What changes were proposed in this pull request? Now `InMemoryTableScanExec` simply takes the `outputPartitioning` and `outputOrdering` from the associated `InMemoryRelation`'s `child.outputPartitioning` and `outputOrdering`. However, `InMemoryTableScanExec` can alias the output attributes. In this case, its `outputPartitioning` and `outputOrdering` are not correct and its parent operators can't correctly determine its data distribution. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17175 from viirya/ensure-no-unnecessary-shuffle.
* [MINOR][CORE] Fix a info message of `prunePartitions`Dongjoon Hyun2017-03-151-1/+2
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `PrunedInMemoryFileIndex.prunePartitions` shows `pruned NaN% partitions` for the following case. ```scala scala> Seq.empty[(String, String)].toDF("a", "p").write.partitionBy("p").saveAsTable("t1") scala> sc.setLogLevel("INFO") scala> spark.table("t1").filter($"p" === "1").select($"a").show ... 17/03/13 00:33:04 INFO PrunedInMemoryFileIndex: Selected 0 partitions out of 0, pruned NaN% partitions. ``` After this PR, the message looks like this. ```scala 17/03/15 10:39:48 INFO PrunedInMemoryFileIndex: Selected 0 partitions out of 0, pruned 0 partitions. ``` ## How was this patch tested? Pass the Jenkins with the existing tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #17273 from dongjoon-hyun/SPARK-EMPTY-PARTITION.
* [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change ↵Tejas Patil2017-03-1510-292/+1157
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | CartesianProductExec, SortMergeJoin, WindowExec to use it ## What issue does this PR address ? Jira: https://issues.apache.org/jira/browse/SPARK-13450 In `SortMergeJoinExec`, rows of the right relation having the same value for a join key are buffered in-memory. In case of skew, this causes OOMs (see comments in SPARK-13450 for more details). Heap dump from a failed job confirms this : https://issues.apache.org/jira/secure/attachment/12846382/heap-dump-analysis.png . While its possible to increase the heap size to workaround, Spark should be resilient to such issues as skews can happen arbitrarily. ## Change proposed in this pull request - Introduces `ExternalAppendOnlyUnsafeRowArray` - It holds `UnsafeRow`s in-memory upto a certain threshold. - After the threshold is hit, it switches to `UnsafeExternalSorter` which enables spilling of the rows to disk. It does NOT sort the data. - Allows iterating the array multiple times. However, any alteration to the array (using `add` or `clear`) will invalidate the existing iterator(s) - `WindowExec` was already using `UnsafeExternalSorter` to support spilling. Changed it to use the new array - Changed `SortMergeJoinExec` to use the new array implementation - NOTE: I have not changed FULL OUTER JOIN to use this new array implementation. Changing that will need more surgery and I will rather put up a separate PR for that once this gets in. - Changed `CartesianProductExec` to use the new array implementation #### Note for reviewers The diff can be divided into 3 parts. My motive behind having all the changes in a single PR was to demonstrate that the API is sane and supports 2 use cases. If reviewing as 3 separate PRs would help, I am happy to make the split. ## How was this patch tested ? #### Unit testing - Added unit tests `ExternalAppendOnlyUnsafeRowArray` to validate all its APIs and access patterns - Added unit test for `SortMergeExec` - with and without spill for inner join, left outer join, right outer join to confirm that the spill threshold config behaves as expected and output is as expected. - This PR touches the scanning logic in `SortMergeExec` for _all_ joins (except FULL OUTER JOIN). However, I expect existing test cases to cover that there is no regression in correctness. - Added unit test for `WindowExec` to check behavior of spilling and correctness of results. #### Stress testing - Confirmed that OOM is gone by running against a production job which used to OOM - Since I cannot share details about prod workload externally, created synthetic data to mimic the issue. Ran before and after the fix to demonstrate the issue and query success with this PR Generating the synthetic data ``` ./bin/spark-shell --driver-memory=6G import org.apache.spark.sql._ val hc = SparkSession.builder.master("local").getOrCreate() hc.sql("DROP TABLE IF EXISTS spark_13450_large_table").collect hc.sql("DROP TABLE IF EXISTS spark_13450_one_row_table").collect val df1 = (0 until 1).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2") df1.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_one_row_table") val df2 = (0 until 3000000).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2") df2.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_large_table") ``` Ran this against trunk VS local build with this PR. OOM repros with trunk and with the fix this query runs fine. ``` ./bin/spark-shell --driver-java-options="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/spark.driver.heapdump.hprof" import org.apache.spark.sql._ val hc = SparkSession.builder.master("local").getOrCreate() hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1") hc.sql("SET spark.sql.sortMergeJoinExec.buffer.spill.threshold=10000") hc.sql("DROP TABLE IF EXISTS spark_13450_result").collect hc.sql(""" CREATE TABLE spark_13450_result AS SELECT a.i AS a_i, a.j AS a_j, a.str1 AS a_str1, a.str2 AS a_str2, b.i AS b_i, b.j AS b_j, b.str1 AS b_str1, b.str2 AS b_str2 FROM spark_13450_one_row_table a JOIN spark_13450_large_table b ON a.i=b.i AND a.j=b.j """) ``` ## Performance comparison ### Macro-benchmark I ran a SMB join query over two real world tables (2 trillion rows (40 TB) and 6 million rows (120 GB)). Note that this dataset does not have skew so no spill happened. I saw improvement in CPU time by 2-4% over version without this PR. This did not add up as I was expected some regression. I think allocating array of capacity of 128 at the start (instead of starting with default size 16) is the sole reason for the perf. gain : https://github.com/tejasapatil/spark/blob/SPARK-13450_smb_buffer_oom/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala#L43 . I could remove that and rerun, but effectively the change will be deployed in this form and I wanted to see the effect of it over large workload. ### Micro-benchmark Two types of benchmarking can be found in `ExternalAppendOnlyUnsafeRowArrayBenchmark`: [A] Comparing `ExternalAppendOnlyUnsafeRowArray` against raw `ArrayBuffer` when all rows fit in-memory and there is no spill ``` Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ ArrayBuffer 7821 / 7941 33.5 29.8 1.0X ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ ArrayBuffer 19200 / 19206 25.6 39.1 1.0X ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ ArrayBuffer 5949 / 6028 17.2 58.1 1.0X ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X ``` [B] Comparing `ExternalAppendOnlyUnsafeRowArray` against raw `UnsafeExternalSorter` when there is spilling of data ``` Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X ``` Author: Tejas Patil <tejasp@fb.com> Closes #16909 from tejasapatil/SPARK-13450_smb_buffer_oom.
* [SPARK-19877][SQL] Restrict the nested level of a viewjiangxingbo2017-03-141-0/+25
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? We should restrict the nested level of a view, to avoid stack overflow exception during the view resolution. ## How was this patch tested? Add new test case in `SQLViewSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17241 from jiangxb1987/view-depth.
* [SPARK-19817][SS] Make it clear that `timeZone` is a general option in ↵Liwei Lin2017-03-144-10/+42
| | | | | | | | | | | | | | | | DataStreamReader/Writer ## What changes were proposed in this pull request? As timezone setting can also affect partition values, it works for all formats, we should make it clear. ## How was this patch tested? N/A Author: Liwei Lin <lwlin7@gmail.com> Closes #17299 from lw-lin/timezone.
* [SPARK-19828][R] Support array type in from_json in Rhyukjinkwon2017-03-141-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Since we could not directly define the array type in R, this PR proposes to support array types in R as string types that are used in `structField` as below: ```R jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]" df <- as.DataFrame(list(list("people" = jsonArr))) collect(select(df, alias(from_json(df$people, "array<struct<name:string>>"), "arrcol"))) ``` prints ```R arrcol 1 Bob, Alice ``` ## How was this patch tested? Unit tests in `test_sparkSQL.R`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17178 from HyukjinKwon/SPARK-19828.
* [SPARK-19918][SQL] Use TextFileFormat in implementation of ↵hyukjinkwon2017-03-155-94/+122
| | | | | | | | | | | | | | | | | | | | | | | | TextInputJsonDataSource ## What changes were proposed in this pull request? This PR proposes to use text datasource when Json schema inference. This basically proposes the similar approach in https://github.com/apache/spark/pull/15813 If we use Dataset for initial loading when inferring the schema, there are advantages. Please refer SPARK-18362 It seems JSON one was supposed to be fixed together but taken out according to https://github.com/apache/spark/pull/15813 > A similar problem also affects the JSON file format and this patch originally fixed that as well, but I've decided to split that change into a separate patch so as not to conflict with changes in another JSON PR. Also, this seems affecting some functionalities because it does not use `FileScanRDD`. This problem is described in SPARK-19885 (but it was CSV's case). ## How was this patch tested? Existing tests should cover this and manual test by `spark.read.json(path)` and check the UI. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17255 from HyukjinKwon/json-filescanrdd.
* [SPARK-19887][SQL] dynamic partition keys can be null or empty stringWenchen Fan2017-03-153-10/+6
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? When dynamic partition value is null or empty string, we should write the data to a directory like `a=__HIVE_DEFAULT_PARTITION__`, when we read the data back, we should respect this special directory name and treat it as null. This is the same behavior of impala, see https://issues.apache.org/jira/browse/IMPALA-252 ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17277 from cloud-fan/partition.
* [SPARK-19817][SQL] Make it clear that `timeZone` option is a general option ↵Takuya UESHIN2017-03-1411-23/+59
| | | | | | | | | | | | | | | | in DataFrameReader/Writer. ## What changes were proposed in this pull request? As timezone setting can also affect partition values, it works for all formats, we should make it clear. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes #17281 from ueshin/issues/SPARK-19817.
* [SPARK-18966][SQL] NOT IN subquery with correlated expressions may return ↵Nattavut Sutyanyong2017-03-142-1/+73
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | incorrect result ## What changes were proposed in this pull request? This PR fixes the following problem: ```` Seq((1, 2)).toDF("a1", "a2").createOrReplaceTempView("a") Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("b1", "b2").createOrReplaceTempView("b") // The expected result is 1 row of (1,2) as shown in the next statement. sql("select * from a where a1 not in (select b1 from b where b2 = a2)").show +---+---+ | a1| a2| +---+---+ +---+---+ sql("select * from a where a1 not in (select b1 from b where b2 = 2)").show +---+---+ | a1| a2| +---+---+ | 1| 2| +---+---+ ```` There are a number of scenarios to consider: 1. When the correlated predicate yields a match (i.e., B.B2 = A.A2) 1.1. When the NOT IN expression yields a match (i.e., A.A1 = B.B1) 1.2. When the NOT IN expression yields no match (i.e., A.A1 = B.B1 returns false) 1.3. When A.A1 is null 1.4. When B.B1 is null 1.4.1. When A.A1 is not null 1.4.2. When A.A1 is null 2. When the correlated predicate yields no match (i.e.,B.B2 = A.A2 is false or unknown) 2.1. When B.B2 is null and A.A2 is null 2.2. When B.B2 is null and A.A2 is not null 2.3. When the value of A.A2 does not match any of B.B2 ```` A.A1 A.A2 B.B1 B.B2 ----- ----- ----- ----- 1 1 1 1 (1.1) 2 1 (1.2) null 1 (1.3) 1 3 null 3 (1.4.1) null 3 (1.4.2) 1 null 1 null (2.1) null 2 (2.2 & 2.3) ```` We can divide the evaluation of the above correlated NOT IN subquery into 2 groups:- Group 1: The rows in A when there is a match from the correlated predicate (A.A1 = B.B1) In this case, the result of the subquery is not empty and the semantics of the NOT IN depends solely on the evaluation of the equality comparison of the columns of NOT IN, i.e., A1 = B1, which says - If A.A1 is null, the row is filtered (1.3 and 1.4.2) - If A.A1 = B.B1, the row is filtered (1.1) - If B.B1 is null, any rows of A in the same group (A.A2 = B.B2) is filtered (1.4.1 & 1.4.2) - Otherwise, the row is qualified. Hence, in this group, the result is the row from (1.2). Group 2: The rows in A when there is no match from the correlated predicate (A.A2 = B.B2) In this case, all the rows in A, including the rows where A.A1, are qualified because the subquery returns an empty set and by the semantics of the NOT IN, all rows from the parent side qualifies as the result set, that is, the rows from (2.1, 2.2 and 2.3). In conclusion, the correct result set of the above query is ```` A.A1 A.A2 ----- ----- 2 1 (1.2) 1 null (2.1) null 2 (2.2 & 2.3) ```` ## How was this patch tested? unit tests, regression tests, and new test cases focusing on the problem being fixed. Author: Nattavut Sutyanyong <nsy.can@gmail.com> Closes #17294 from nsyca/18966.
* [SPARK-19933][SQL] Do not change output of a subqueryHerman van Hovell2017-03-141-0/+14
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The `RemoveRedundantAlias` rule can change the output attributes (the expression id's to be precise) of a query by eliminating the redundant alias producing them. This is no problem for a regular query, but can cause problems for correlated subqueries: The attributes produced by the subquery are used in the parent plan; changing them will break the parent plan. This PR fixes this by wrapping a subquery in a `Subquery` top level node when it gets optimized. The `RemoveRedundantAlias` rule now recognizes `Subquery` and makes sure that the output attributes of the `Subquery` node are retained. ## How was this patch tested? Added a test case to `RemoveRedundantAliasAndProjectSuite` and added a regression test to `SubquerySuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17278 from hvanhovell/SPARK-19933.
* [SPARK-18961][SQL] Support `SHOW TABLE EXTENDED ... PARTITION` statementjiangxingbo2017-03-147-83/+163
| | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? We should support the statement `SHOW TABLE EXTENDED LIKE 'table_identifier' PARTITION(partition_spec)`, just like that HIVE does. When partition is specified, the `SHOW TABLE EXTENDED` command should output the information of the partitions instead of the tables. Note that in this statement, we require exact matched partition spec. For example: ``` CREATE TABLE show_t1(a String, b Int) PARTITIONED BY (c String, d String); ALTER TABLE show_t1 ADD PARTITION (c='Us', d=1) PARTITION (c='Us', d=22); -- Output the extended information of Partition(c='Us', d=1) SHOW TABLE EXTENDED LIKE 'show_t1' PARTITION(c='Us', d=1); -- Throw an AnalysisException SHOW TABLE EXTENDED LIKE 'show_t1' PARTITION(c='Us'); ``` ## How was this patch tested? Add new test sqls in file `show-tables.sql`. Add new test case in `DDLSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16373 from jiangxb1987/show-partition-extended.
* [SPARK-19850][SQL] Allow the use of aliases in SQL function callsHerman van Hovell2017-03-142-0/+80
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? We currently cannot use aliases in SQL function calls. This is inconvenient when you try to create a struct. This SQL query for example `select struct(1, 2) st`, will create a struct with column names `col1` and `col2`. This is even more problematic when we want to append a field to an existing struct. For example if we want to a field to struct `st` we would issue the following SQL query `select struct(st.*, 1) as st from src`, the result will be struct `st` with an a column with a non descriptive name `col3` (if `st` itself has 2 fields). This PR proposes to change this by allowing the use of aliased expression in function parameters. For example `select struct(1 as a, 2 as b) st`, will create a struct with columns `a` & `b`. ## How was this patch tested? Added a test to `ExpressionParserSuite` and added a test file for `SQLQueryTestSuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17245 from hvanhovell/SPARK-19850.
* [SPARK-19944][SQL] Move SQLConf from sql/core to sql/catalystReynold Xin2017-03-141-1115/+0
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch moves SQLConf from sql/core to sql/catalyst. To minimize the changes, the patch used type alias to still keep CatalystConf (as a type alias) and SimpleCatalystConf (as a concrete class that extends SQLConf). Motivation for the change is that it is pretty weird to have SQLConf only in sql/core and then we have to duplicate config options that impact optimizer/analyzer in sql/catalyst using CatalystConf. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #17285 from rxin/SPARK-19944.
* [SPARK-18874][SQL] First phase: Deferring the correlated predicate pull up ↵Nattavut Sutyanyong2017-03-143-6/+8
| | | | | | | | | | | | | | | | | | | | | | | | | to Optimizer phase ## What changes were proposed in this pull request? Currently Analyzer as part of ResolveSubquery, pulls up the correlated predicates to its originating SubqueryExpression. The subquery plan is then transformed to remove the correlated predicates after they are moved up to the outer plan. In this PR, the task of pulling up correlated predicates is deferred to Optimizer. This is the initial work that will allow us to support the form of correlated subqueries that we don't support today. The design document from nsyca can be found in the following link : [DesignDoc](https://docs.google.com/document/d/1QDZ8JwU63RwGFS6KVF54Rjj9ZJyK33d49ZWbjFBaIgU/edit#) The brief description of code changes (hopefully to aid with code review) can be be found in the following link: [CodeChanges](https://docs.google.com/document/d/18mqjhL9V1An-tNta7aVE13HkALRZ5GZ24AATA-Vqqf0/edit#) ## How was this patch tested? The test case PRs were submitted earlier using. [16337](https://github.com/apache/spark/pull/16337) [16759](https://github.com/apache/spark/pull/16759) [16841](https://github.com/apache/spark/pull/16841) [16915](https://github.com/apache/spark/pull/16915) [16798](https://github.com/apache/spark/pull/16798) [16712](https://github.com/apache/spark/pull/16712) [16710](https://github.com/apache/spark/pull/16710) [16760](https://github.com/apache/spark/pull/16760) [16802](https://github.com/apache/spark/pull/16802) Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #16954 from dilipbiswal/SPARK-18874.
* [SPARK-19921][SQL][TEST] Enable end-to-end testing using different Hive ↵Xiao Li2017-03-141-1/+1
| | | | | | | | | | | | | | | | | metastore versions. ### What changes were proposed in this pull request? To improve the quality of our Spark SQL in different Hive metastore versions, this PR is to enable end-to-end testing using different versions. This PR allows the test cases in sql/hive to pass the existing Hive client to create a SparkSession. - Since Derby does not allow concurrent connections, the pre-built Hive clients use different database from the TestHive's built-in 1.2.1 client. - Since our test cases in sql/hive only can create a single Spark context in the same JVM, the newly created SparkSession share the same spark context with the existing TestHive's corresponding SparkSession. ### How was this patch tested? Fixed the existing test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #17260 from gatorsmile/versionSuite.
* [SPARK-19916][SQL] simplify bad file handlingWenchen Fan2017-03-123-63/+43
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? We should only have one centre place to try catch the exception for corrupted files. ## How was this patch tested? existing test Author: Wenchen Fan <wenchen@databricks.com> Closes #17253 from cloud-fan/bad-file.
* [DOCS][SS] fix structured streaming python exampleuncleGen2017-03-121-1/+1
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? - SS python example: `TypeError: 'xxx' object is not callable` - some other doc issue. ## How was this patch tested? Jenkins. Author: uncleGen <hustyugm@gmail.com> Closes #17257 from uncleGen/docs-ss-python.
* [SPARK-19723][SQL] create datasource table with an non-existent location ↵windpiger2017-03-102-42/+67
| | | | | | | | | | | | | | | | | | | | | | | should work ## What changes were proposed in this pull request? This JIRA is a follow up work after [SPARK-19583](https://issues.apache.org/jira/browse/SPARK-19583) As we discussed in that [PR](https://github.com/apache/spark/pull/16938) The following DDL for datasource table with an non-existent location should work: ``` CREATE TABLE ... (PARTITIONED BY ...) LOCATION path ``` Currently it will throw exception that path not exists for datasource table for datasource table ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17055 from windpiger/CTDataSourcePathNotExists.
* [SPARK-19893][SQL] should not run DataFrame set oprations with map typeWenchen Fan2017-03-102-8/+25
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17236 from cloud-fan/map.
* [SPARK-19905][SQL] Bring back Dataset.inputFiles for Hive SerDe tablesCheng Lian2017-03-101-0/+3
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `Dataset.inputFiles` works by matching `FileRelation`s in the query plan. In Spark 2.1, Hive SerDe tables are represented by `MetastoreRelation`, which inherits from `FileRelation`. However, in Spark 2.2, Hive SerDe tables are now represented by `CatalogRelation`, which doesn't inherit from `FileRelation` anymore, due to the unification of Hive SerDe tables and data source tables. This change breaks `Dataset.inputFiles` for Hive SerDe tables. This PR tries to fix this issue by explicitly matching `CatalogRelation`s that are Hive SerDe tables in `Dataset.inputFiles`. Note that we can't make `CatalogRelation` inherit from `FileRelation` since not all `CatalogRelation`s are file based (e.g., JDBC data source tables). ## How was this patch tested? New test case added in `HiveDDLSuite`. Author: Cheng Lian <lian@databricks.com> Closes #17247 from liancheng/spark-19905-hive-table-input-files.
* [SPARK-19620][SQL] Fix incorrect exchange coordinator id in the physical planCarson Wang2017-03-101-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? When adaptive execution is enabled, an exchange coordinator is used in the Exchange operators. For Join, the same exchange coordinator is used for its two Exchanges. But the physical plan shows two different coordinator Ids which is confusing. This PR is to fix the incorrect exchange coordinator id in the physical plan. The coordinator object instead of the `Option[ExchangeCoordinator]` should be used to generate the identity hash code of the same coordinator. ## How was this patch tested? Before the patch, the physical plan shows two different exchange coordinator id for Join. ``` == Physical Plan == *Project [key1#3L, value2#12L] +- *SortMergeJoin [key1#3L], [key2#11L], Inner :- *Sort [key1#3L ASC NULLS FIRST], false, 0 : +- Exchange(coordinator id: 1804587700) hashpartitioning(key1#3L, 10), coordinator[target post-shuffle partition size: 67108864] : +- *Project [(id#0L % 500) AS key1#3L] : +- *Filter isnotnull((id#0L % 500)) : +- *Range (0, 1000, step=1, splits=Some(10)) +- *Sort [key2#11L ASC NULLS FIRST], false, 0 +- Exchange(coordinator id: 793927319) hashpartitioning(key2#11L, 10), coordinator[target post-shuffle partition size: 67108864] +- *Project [(id#8L % 500) AS key2#11L, id#8L AS value2#12L] +- *Filter isnotnull((id#8L % 500)) +- *Range (0, 1000, step=1, splits=Some(10)) ``` After the patch, two exchange coordinator id are the same. Author: Carson Wang <carson.wang@intel.com> Closes #16952 from carsonwang/FixCoordinatorId.
* [SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler regarding ↵Kazuaki Ishizaki2017-03-105-7/+55
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | range() ## What changes were proposed in this pull request? This PR improves performance of operations with `range()` by changing Java code generated by Catalyst. This PR is inspired by the [blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html). This PR changes generated code in the following two points. 1. Replace a while-loop with long instance variables a for-loop with int local varibles 2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated). These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance is improved by 7.6x. Benchmark program: ```java val N = 1 << 29 val iters = 2 val benchmark = new Benchmark("range.count", N * iters) benchmark.addCase(s"with this PR") { i => var n = 0 var len = 0 while (n < iters) { len += sparkSession.range(N).selectExpr("count(id)").collect.length n += 1 } } benchmark.run ``` Performance result without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz range.count: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ w/o this PR 1349 / 1356 796.2 1.3 1.0X ``` Performance result with this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz range.count: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ with this PR 177 / 271 6065.3 0.2 1.0X ``` Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed. Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows; /* 013 */ private boolean range_initRange; /* 014 */ private long range_number; /* 015 */ private TaskContext range_taskContext; /* 016 */ private InputMetrics range_inputMetrics; /* 017 */ private long range_batchEnd; /* 018 */ private long range_numElementsTodo; /* 019 */ private scala.collection.Iterator range_input; /* 020 */ private UnsafeRow range_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter; /* 023 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 024 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 025 */ private UnsafeRow agg_result; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 028 */ /* 029 */ public GeneratedIterator(Object[] references) { /* 030 */ this.references = references; /* 031 */ } /* 032 */ /* 033 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 034 */ partitionIndex = index; /* 035 */ this.inputs = inputs; /* 036 */ agg_initAgg = false; /* 037 */ /* 038 */ this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 039 */ this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 040 */ range_initRange = false; /* 041 */ range_number = 0L; /* 042 */ range_taskContext = TaskContext.get(); /* 043 */ range_inputMetrics = range_taskContext.taskMetrics().inputMetrics(); /* 044 */ range_batchEnd = 0; /* 045 */ range_numElementsTodo = 0L; /* 046 */ range_input = inputs[0]; /* 047 */ range_result = new UnsafeRow(1); /* 048 */ this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0); /* 049 */ this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1); /* 050 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 051 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 052 */ agg_result = new UnsafeRow(1); /* 053 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 054 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 055 */ /* 056 */ } /* 057 */ /* 058 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 059 */ // initialize aggregation buffer /* 060 */ agg_bufIsNull = false; /* 061 */ agg_bufValue = 0L; /* 062 */ /* 063 */ // initialize Range /* 064 */ if (!range_initRange) { /* 065 */ range_initRange = true; /* 066 */ initRange(partitionIndex); /* 067 */ } /* 068 */ /* 069 */ while (true) { /* 070 */ while (range_number != range_batchEnd) { /* 071 */ long range_value = range_number; /* 072 */ range_number += 1L; /* 073 */ /* 074 */ // do aggregate /* 075 */ // common sub-expressions /* 076 */ /* 077 */ // evaluate aggregate function /* 078 */ boolean agg_isNull1 = false; /* 079 */ /* 080 */ long agg_value1 = -1L; /* 081 */ agg_value1 = agg_bufValue + 1L; /* 082 */ // update aggregation buffer /* 083 */ agg_bufIsNull = false; /* 084 */ agg_bufValue = agg_value1; /* 085 */ /* 086 */ if (shouldStop()) return; /* 087 */ } /* 088 */ /* 089 */ if (range_taskContext.isInterrupted()) { /* 090 */ throw new TaskKilledException(); /* 091 */ } /* 092 */ /* 093 */ long range_nextBatchTodo; /* 094 */ if (range_numElementsTodo > 1000L) { /* 095 */ range_nextBatchTodo = 1000L; /* 096 */ range_numElementsTodo -= 1000L; /* 097 */ } else { /* 098 */ range_nextBatchTodo = range_numElementsTodo; /* 099 */ range_numElementsTodo = 0; /* 100 */ if (range_nextBatchTodo == 0) break; /* 101 */ } /* 102 */ range_numOutputRows.add(range_nextBatchTodo); /* 103 */ range_inputMetrics.incRecordsRead(range_nextBatchTodo); /* 104 */ /* 105 */ range_batchEnd += range_nextBatchTodo * 1L; /* 106 */ } /* 107 */ /* 108 */ } /* 109 */ /* 110 */ private void initRange(int idx) { /* 111 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 112 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L); /* 113 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L); /* 114 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 115 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 117 */ /* 118 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 119 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 120 */ range_number = Long.MAX_VALUE; /* 121 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 122 */ range_number = Long.MIN_VALUE; /* 123 */ } else { /* 124 */ range_number = st.longValue(); /* 125 */ } /* 126 */ range_batchEnd = range_number; /* 127 */ /* 128 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 129 */ .multiply(step).add(start); /* 130 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 131 */ partitionEnd = Long.MAX_VALUE; /* 132 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 133 */ partitionEnd = Long.MIN_VALUE; /* 134 */ } else { /* 135 */ partitionEnd = end.longValue(); /* 136 */ } /* 137 */ /* 138 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 139 */ java.math.BigInteger.valueOf(range_number)); /* 140 */ range_numElementsTodo = startToEnd.divide(step).longValue(); /* 141 */ if (range_numElementsTodo < 0) { /* 142 */ range_numElementsTodo = 0; /* 143 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 144 */ range_numElementsTodo++; /* 145 */ } /* 146 */ } /* 147 */ /* 148 */ protected void processNext() throws java.io.IOException { /* 149 */ while (!agg_initAgg) { /* 150 */ agg_initAgg = true; /* 151 */ long agg_beforeAgg = System.nanoTime(); /* 152 */ agg_doAggregateWithoutKey(); /* 153 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 154 */ /* 155 */ // output the result /* 156 */ /* 157 */ agg_numOutputRows.add(1); /* 158 */ agg_rowWriter.zeroOutNullBytes(); /* 159 */ /* 160 */ if (agg_bufIsNull) { /* 161 */ agg_rowWriter.setNullAt(0); /* 162 */ } else { /* 163 */ agg_rowWriter.write(0, agg_bufValue); /* 164 */ } /* 165 */ append(agg_result); /* 166 */ } /* 167 */ } /* 168 */ } ``` Generated code with this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows; /* 013 */ private boolean range_initRange; /* 014 */ private long range_number; /* 015 */ private TaskContext range_taskContext; /* 016 */ private InputMetrics range_inputMetrics; /* 017 */ private long range_batchEnd; /* 018 */ private long range_numElementsTodo; /* 019 */ private scala.collection.Iterator range_input; /* 020 */ private UnsafeRow range_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter; /* 023 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 024 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 025 */ private UnsafeRow agg_result; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 028 */ /* 029 */ public GeneratedIterator(Object[] references) { /* 030 */ this.references = references; /* 031 */ } /* 032 */ /* 033 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 034 */ partitionIndex = index; /* 035 */ this.inputs = inputs; /* 036 */ agg_initAgg = false; /* 037 */ /* 038 */ this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 039 */ this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 040 */ range_initRange = false; /* 041 */ range_number = 0L; /* 042 */ range_taskContext = TaskContext.get(); /* 043 */ range_inputMetrics = range_taskContext.taskMetrics().inputMetrics(); /* 044 */ range_batchEnd = 0; /* 045 */ range_numElementsTodo = 0L; /* 046 */ range_input = inputs[0]; /* 047 */ range_result = new UnsafeRow(1); /* 048 */ this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0); /* 049 */ this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1); /* 050 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 051 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 052 */ agg_result = new UnsafeRow(1); /* 053 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 054 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 055 */ /* 056 */ } /* 057 */ /* 058 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 059 */ // initialize aggregation buffer /* 060 */ agg_bufIsNull = false; /* 061 */ agg_bufValue = 0L; /* 062 */ /* 063 */ // initialize Range /* 064 */ if (!range_initRange) { /* 065 */ range_initRange = true; /* 066 */ initRange(partitionIndex); /* 067 */ } /* 068 */ /* 069 */ while (true) { /* 070 */ long range_range = range_batchEnd - range_number; /* 071 */ if (range_range != 0L) { /* 072 */ int range_localEnd = (int)(range_range / 1L); /* 073 */ for (int range_localIdx = 0; range_localIdx < range_localEnd; range_localIdx++) { /* 074 */ long range_value = ((long)range_localIdx * 1L) + range_number; /* 075 */ /* 076 */ // do aggregate /* 077 */ // common sub-expressions /* 078 */ /* 079 */ // evaluate aggregate function /* 080 */ boolean agg_isNull1 = false; /* 081 */ /* 082 */ long agg_value1 = -1L; /* 083 */ agg_value1 = agg_bufValue + 1L; /* 084 */ // update aggregation buffer /* 085 */ agg_bufIsNull = false; /* 086 */ agg_bufValue = agg_value1; /* 087 */ /* 088 */ // shouldStop check is eliminated /* 089 */ } /* 090 */ range_number = range_batchEnd; /* 091 */ } /* 092 */ /* 093 */ if (range_taskContext.isInterrupted()) { /* 094 */ throw new TaskKilledException(); /* 095 */ } /* 096 */ /* 097 */ long range_nextBatchTodo; /* 098 */ if (range_numElementsTodo > 1000L) { /* 099 */ range_nextBatchTodo = 1000L; /* 100 */ range_numElementsTodo -= 1000L; /* 101 */ } else { /* 102 */ range_nextBatchTodo = range_numElementsTodo; /* 103 */ range_numElementsTodo = 0; /* 104 */ if (range_nextBatchTodo == 0) break; /* 105 */ } /* 106 */ range_numOutputRows.add(range_nextBatchTodo); /* 107 */ range_inputMetrics.incRecordsRead(range_nextBatchTodo); /* 108 */ /* 109 */ range_batchEnd += range_nextBatchTodo * 1L; /* 110 */ } /* 111 */ /* 112 */ } /* 113 */ /* 114 */ private void initRange(int idx) { /* 115 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 116 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L); /* 117 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L); /* 118 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 119 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 120 */ long partitionEnd; /* 121 */ /* 122 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 123 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 124 */ range_number = Long.MAX_VALUE; /* 125 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 126 */ range_number = Long.MIN_VALUE; /* 127 */ } else { /* 128 */ range_number = st.longValue(); /* 129 */ } /* 130 */ range_batchEnd = range_number; /* 131 */ /* 132 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 133 */ .multiply(step).add(start); /* 134 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 135 */ partitionEnd = Long.MAX_VALUE; /* 136 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 137 */ partitionEnd = Long.MIN_VALUE; /* 138 */ } else { /* 139 */ partitionEnd = end.longValue(); /* 140 */ } /* 141 */ /* 142 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 143 */ java.math.BigInteger.valueOf(range_number)); /* 144 */ range_numElementsTodo = startToEnd.divide(step).longValue(); /* 145 */ if (range_numElementsTodo < 0) { /* 146 */ range_numElementsTodo = 0; /* 147 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 148 */ range_numElementsTodo++; /* 149 */ } /* 150 */ } /* 151 */ /* 152 */ protected void processNext() throws java.io.IOException { /* 153 */ while (!agg_initAgg) { /* 154 */ agg_initAgg = true; /* 155 */ long agg_beforeAgg = System.nanoTime(); /* 156 */ agg_doAggregateWithoutKey(); /* 157 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 158 */ /* 159 */ // output the result /* 160 */ /* 161 */ agg_numOutputRows.add(1); /* 162 */ agg_rowWriter.zeroOutNullBytes(); /* 163 */ /* 164 */ if (agg_bufIsNull) { /* 165 */ agg_rowWriter.setNullAt(0); /* 166 */ } else { /* 167 */ agg_rowWriter.write(0, agg_bufValue); /* 168 */ } /* 169 */ append(agg_result); /* 170 */ } /* 171 */ } /* 172 */ } ``` A part of suppressing `shouldStop()` was originally developed by inouehrs ## How was this patch tested? Add new tests into `DataFrameRangeSuite` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17122 from kiszk/SPARK-19786.
* [SPARK-19891][SS] Await Batch Lock notified on stream execution exitTyson Condie2017-03-091-0/+7
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown. ## How was this patch tested? Current tests that throw exceptions at runtime will finish faster as a result of this update. zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Closes #17231 from tcondie/kafka-writer.
* [SPARK-19008][SQL] Improve performance of Dataset.map by eliminating ↵Kazuaki Ishizaki2017-03-093-8/+171
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | boxing/unboxing ## What changes were proposed in this pull request? This PR improve performance of Dataset.map() for primitive types by removing boxing/unbox operations. This is based on [the discussion](https://github.com/apache/spark/pull/16391#discussion_r93788919) with cloud-fan. Current Catalyst generates a method call to a `apply()` method of an anonymous function written in Scala. The types of an argument and return value are `java.lang.Object`. As a result, each method call for a primitive value involves a pair of unboxing and boxing for calling this `apply()` method and a pair of boxing and unboxing for returning from this `apply()` method. This PR directly calls a specialized version of a `apply()` method without boxing and unboxing. For example, if types of an arguments ant return value is `int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any combination of `Int`, `Long`, `Float`, and `Double`. The following is a benchmark result using [this program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a Dataset part of this program. Without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ RDD 1923 / 1952 52.0 19.2 1.0X DataFrame 526 / 548 190.2 5.3 3.7X Dataset 3094 / 3154 32.3 30.9 0.6X ``` With this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ RDD 1883 / 1892 53.1 18.8 1.0X DataFrame 502 / 642 199.1 5.0 3.7X Dataset 657 / 784 152.2 6.6 2.9X ``` ```java def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { import spark.implicits._ val rdd = spark.sparkContext.range(0, numRows) val ds = spark.range(0, numRows) val func = (l: Long) => l + 1 val benchmark = new Benchmark("back-to-back map", numRows) ... benchmark.addCase("Dataset") { iter => var res = ds.as[Long] var i = 0 while (i < numChains) { res = res.map(func) i += 1 } res.queryExecution.toRdd.foreach(_ => Unit) } benchmark } ``` A motivating example ```java Seq(1, 2, 3).toDS.map(i => i * 7).show ``` Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow deserializetoobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 012 */ private int mapelements_argValue; /* 013 */ private UnsafeRow mapelements_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 016 */ private UnsafeRow serializefromobject_result; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 019 */ /* 020 */ public GeneratedIterator(Object[] references) { /* 021 */ this.references = references; /* 022 */ } /* 023 */ /* 024 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 025 */ partitionIndex = index; /* 026 */ this.inputs = inputs; /* 027 */ inputadapter_input = inputs[0]; /* 028 */ deserializetoobject_result = new UnsafeRow(1); /* 029 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0); /* 030 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 031 */ /* 032 */ mapelements_result = new UnsafeRow(1); /* 033 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0); /* 034 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 035 */ serializefromobject_result = new UnsafeRow(1); /* 036 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 037 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 038 */ /* 039 */ } /* 040 */ /* 041 */ protected void processNext() throws java.io.IOException { /* 042 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 043 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 044 */ int inputadapter_value = inputadapter_row.getInt(0); /* 045 */ /* 046 */ boolean mapelements_isNull = true; /* 047 */ int mapelements_value = -1; /* 048 */ if (!false) { /* 049 */ mapelements_argValue = inputadapter_value; /* 050 */ /* 051 */ mapelements_isNull = false; /* 052 */ if (!mapelements_isNull) { /* 053 */ Object mapelements_funcResult = null; /* 054 */ mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue); /* 055 */ if (mapelements_funcResult == null) { /* 056 */ mapelements_isNull = true; /* 057 */ } else { /* 058 */ mapelements_value = (Integer) mapelements_funcResult; /* 059 */ } /* 060 */ /* 061 */ } /* 062 */ /* 063 */ } /* 064 */ /* 065 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 066 */ /* 067 */ if (mapelements_isNull) { /* 068 */ serializefromobject_rowWriter.setNullAt(0); /* 069 */ } else { /* 070 */ serializefromobject_rowWriter.write(0, mapelements_value); /* 071 */ } /* 072 */ append(serializefromobject_result); /* 073 */ if (shouldStop()) return; /* 074 */ } /* 075 */ } /* 076 */ } ``` Generated code with this PR (lines 48-56 are changed) ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow deserializetoobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 012 */ private int mapelements_argValue; /* 013 */ private UnsafeRow mapelements_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 016 */ private UnsafeRow serializefromobject_result; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 019 */ /* 020 */ public GeneratedIterator(Object[] references) { /* 021 */ this.references = references; /* 022 */ } /* 023 */ /* 024 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 025 */ partitionIndex = index; /* 026 */ this.inputs = inputs; /* 027 */ inputadapter_input = inputs[0]; /* 028 */ deserializetoobject_result = new UnsafeRow(1); /* 029 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0); /* 030 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 031 */ /* 032 */ mapelements_result = new UnsafeRow(1); /* 033 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0); /* 034 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 035 */ serializefromobject_result = new UnsafeRow(1); /* 036 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 037 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 038 */ /* 039 */ } /* 040 */ /* 041 */ protected void processNext() throws java.io.IOException { /* 042 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 043 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 044 */ int inputadapter_value = inputadapter_row.getInt(0); /* 045 */ /* 046 */ boolean mapelements_isNull = true; /* 047 */ int mapelements_value = -1; /* 048 */ if (!false) { /* 049 */ mapelements_argValue = inputadapter_value; /* 050 */ /* 051 */ mapelements_isNull = false; /* 052 */ if (!mapelements_isNull) { /* 053 */ mapelements_value = ((scala.Function1) references[0]).apply$mcII$sp(mapelements_argValue); /* 054 */ } /* 055 */ /* 056 */ } /* 057 */ /* 058 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 059 */ /* 060 */ if (mapelements_isNull) { /* 061 */ serializefromobject_rowWriter.setNullAt(0); /* 062 */ } else { /* 063 */ serializefromobject_rowWriter.write(0, mapelements_value); /* 064 */ } /* 065 */ append(serializefromobject_result); /* 066 */ if (shouldStop()) return; /* 067 */ } /* 068 */ } /* 069 */ } ``` Java bytecode for methods for `i => i * 7` ```java $ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class Compiled from "Test.scala" public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable { public static final long serialVersionUID; public final int apply(int); Code: 0: aload_0 1: iload_1 2: invokevirtual #18 // Method apply$mcII$sp:(I)I 5: ireturn public int apply$mcII$sp(int); Code: 0: iload_1 1: bipush 7 3: imul 4: ireturn public final java.lang.Object apply(java.lang.Object); Code: 0: aload_0 1: aload_1 2: invokestatic #29 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I 5: invokevirtual #31 // Method apply:(I)I 8: invokestatic #35 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer; 11: areturn public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5); Code: 0: aload_0 1: invokespecial #42 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V 4: return } ``` ## How was this patch tested? Added new test suites to `DatasetPrimitiveSuite`. Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17172 from kiszk/SPARK-19008.
* [SPARK-19611][SQL] Introduce configurable table schema inferenceBudde2017-03-093-147/+22
| | | | | | | | | | | | | | | | | | | | | | | | | | | ## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde <budde@amazon.com> Closes #16944 from budde/SPARK-19611.
* [SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc ↵Jeff Zhang2017-03-091-3/+3
| | | | | | | | | | | | file in DataFrameReader.orc Beside the issue in spark api, also fix 2 minor issues in pyspark - support read from multiple input paths for orc - support read from multiple input paths for text Author: Jeff Zhang <zjffdu@apache.org> Closes #10307 from zjffdu/SPARK-12334.
* [SPARK-19861][SS] watermark should not be a negative time.uncleGen2017-03-092-1/+26
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `watermark` should not be negative. This behavior is invalid, check it before real run. ## How was this patch tested? add new unit test. Author: uncleGen <hustyugm@gmail.com> Author: dylon <hustyugm@gmail.com> Closes #17202 from uncleGen/SPARK-19861.
* [SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSourceLiwei Lin2017-03-093-19/+62
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`). This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log). ## Usage ```scala spark .readStream .option("fileNameOnly", true) .text("s3n://bucket/dir1/dir2") .writeStream ... ``` ## How was this patch tested? Added a test case Author: Liwei Lin <lwlin7@gmail.com> Closes #17120 from lw-lin/filename-only.
* [SPARK-19561][SQL] add int case handling for TimestampTypeJason White2017-03-091-0/+2
| | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Add handling of input of type `Int` for dataType `TimestampType` to `EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger than MAX_INT to Long, which are handled correctly already, but values between MIN_INT and MAX_INT are serialized to Int. These range limits correspond to roughly half an hour on either side of the epoch. As a result, PySpark doesn't allow TimestampType values to be created in this range. Alternatives attempted: patching the `TimestampType.toInternal` function to cast return values to `long`, so Py4J would always serialize them to Scala Long. Python3 does not have a `long` type, so this approach failed on Python3. ## How was this patch tested? Added a new PySpark-side test that fails without the change. The contribution is my original work and I license the work to the project under the project’s open source license. Resubmission of https://github.com/apache/spark/pull/16896. The original PR didn't go through Jenkins and broke the build. davies dongjoon-hyun cloud-fan Could you kick off a Jenkins run for me? It passed everything for me locally, but it's possible something has changed in the last few weeks. Author: Jason White <jason.white@shopify.com> Closes #17200 from JasonMWhite/SPARK-19561.
* [SPARK-19763][SQL] qualified external datasource table location stored in ↵windpiger2017-03-094-21/+46
| | | | | | | | | | | | | | | | | | | | | | | | | | | catalog ## What changes were proposed in this pull request? If we create a external datasource table with a non-qualified location , we should qualified it to store in catalog. ``` CREATE TABLE t(a string) USING parquet LOCATION '/path/xx' CREATE TABLE t1(a string, b string) USING parquet PARTITIONED BY(b) LOCATION '/path/xx' ``` when we get the table from catalog, the location should be qualified, e.g.'file:/path/xxx' ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17095 from windpiger/tablepathQualified.
* [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.uncleGen2017-03-081-8/+11
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? A follow up to SPARK-19859: - extract the calculation of `delayMs` and reuse it. - update EventTimeWatermarkExec - use the correct `delayMs` in EventTimeWatermark ## How was this patch tested? Jenkins. Author: uncleGen <hustyugm@gmail.com> Closes #17221 from uncleGen/SPARK-19859.
* [SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive MetastoreXiao Li2017-03-082-191/+270
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? So far, the test cases in DDLSuites only verify the behaviors of InMemoryCatalog. That means, they do not cover the scenarios using HiveExternalCatalog. Thus, we need to improve the existing test suite to run these cases using Hive metastore. When porting these test cases, a bug of `SET LOCATION` is found. `path` is not set when the location is changed. After this PR, a few changes are made, as summarized below, - `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite` and `HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using `InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`. - `InMemoryCatalogedDDLSuite` contains all the existing test cases in `DDLSuite`. - `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following test cases are excluded: 1. The following test cases only make sense for `InMemoryCatalog`: ``` test("desc table for parquet data source table using in-memory catalog") test("create a managed Hive source table") { test("create an external Hive source table") test("Create Hive Table As Select") ``` 2. The following test cases are unable to be ported because we are unable to alter table provider when using Hive metastore. In the future PRs we need to improve the test cases so that altering table provider is not needed: ``` test("alter table: set location (datasource table)") test("alter table: set properties (datasource table)") test("alter table: unset properties (datasource table)") test("alter table: set serde (datasource table)") test("alter table: set serde partition (datasource table)") test("alter table: change column (datasource table)") test("alter table: add partition (datasource table)") test("alter table: drop partition (datasource table)") test("alter table: rename partition (datasource table)") test("drop table - data source table") ``` **TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the test cases to either `DDLSuite`, `InMemoryCatalogedDDLSuite` or `HiveCatalogedDDLSuite`. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Author: gatorsmile <gatorsmile@gmail.com> Closes #16592 from gatorsmile/refactorDDLSuite.
* [MINOR][SQL] The analyzer rules are fired twice for cases when ↵Dilip Biswal2017-03-081-2/+7
| | | | | | | | | | | | | | | | | AnalysisException is raised from analyzer. ## What changes were proposed in this pull request? In general we have a checkAnalysis phase which validates the logical plan and throws AnalysisException on semantic errors. However we also can throw AnalysisException from a few analyzer rules like ResolveSubquery. I found that we fire up the analyzer rules twice for the queries that throw AnalysisException from one of the analyzer rules. This is a very minor fix. We don't have to strictly fix it. I just got confused seeing the rule getting fired two times when i was not expecting it. ## How was this patch tested? Tested manually. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #17214 from dilipbiswal/analyis_twice.
* [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in ↵Burak Yavuz2017-03-083-38/+63
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | combination with maxFileAge in FileStreamSource ## What changes were proposed in this pull request? **The Problem** There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days. This causes a problem when both latestFirst = true maxFilesPerTrigger > total files to be processed. Here is what happens in all combinations 1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed. 2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind. 3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing. The bug is with case 3. **The Solution** Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set. ## How was this patch tested? Regression test in `FileStreamSourceSuite` Author: Burak Yavuz <brkyvz@gmail.com> Closes #17153 from brkyvz/maxFileAge.
* [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSVhyukjinkwon2017-03-085-30/+121
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to add an API that loads `DataFrame` from `Dataset[String]` storing csv. It allows pre-processing before loading into CSV, which means allowing a lot of workarounds for many narrow cases, for example, as below: - Case 1 - pre-processing ```scala val df = spark.read.text("...") // Pre-processing with this. spark.read.csv(df.as[String]) ``` - Case 2 - use other input formats ```scala val rdd = spark.sparkContext.newAPIHadoopFile("/file.csv.lzo", classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]) val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength)) spark.read.csv(stringRdd.toDS) ``` ## How was this patch tested? Added tests in `CSVSuite` and build with Scala 2.10. ``` ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16854 from HyukjinKwon/SPARK-15463.
* [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session ↵Kunal Khamar2017-03-089-111/+421
| | | | | | | | | | | | | | | | has an identical copy of the SessionState Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState. Subsequent changes to base session are not propagated to cloned session, clone is independent after creation. If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables. Unit tests Author: Kunal Khamar <kkhamar@outlook.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #16826 from kunalkhamar/fork-sparksession.
* [SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow ↵Shixiong Zhu2017-03-088-84/+141
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | invalid cases ## What changes were proposed in this pull request? Add a output mode parameter to `flatMapGroupsWithState` and just define `mapGroupsWithState` as `flatMapGroupsWithState(Update)`. `UnsupportedOperationChecker` is modified to disallow unsupported cases. - Batch mapGroupsWithState or flatMapGroupsWithState is always allowed. - For streaming (map/flatMap)GroupsWithState, see the following table: | Operators | Supported Query Output Mode | | ------------- | ------------- | | flatMapGroupsWithState(Update) without aggregation | Update | | flatMapGroupsWithState(Update) with aggregation | None | | flatMapGroupsWithState(Append) without aggregation | Append | | flatMapGroupsWithState(Append) before aggregation | Append, Update, Complete | | flatMapGroupsWithState(Append) after aggregation | None | | Multiple flatMapGroupsWithState(Append)s | Append | | Multiple mapGroupsWithStates | None | | Mxing mapGroupsWithStates and flatMapGroupsWithStates | None | | Other cases of multiple flatMapGroupsWithState | None | ## How was this patch tested? The added unit tests. Here are the tests related to (map/flatMap)GroupsWithState: ``` [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in update mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in append mode: not supported (7 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Append mode: not supported (11 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Update mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in update mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Update mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Complete mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Update mode: not supported (4 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation in complete mode: not supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Append output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Update output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Append output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Update output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are in append mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some are not in append mode: not supported (7 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in append mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in complete mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Update mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Complete mode: not supported (4 milliseconds) [info] - streaming plan - mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are in append mode: not supported (4 milliseconds) [info] - streaming plan - mapGroupsWithState - mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation: not supported (4 milliseconds) ``` Author: Shixiong Zhu <shixiong@databricks.com> Closes #17197 from zsxwing/mapgroups-check.
* [SPARK-19727][SQL] Fix for round function that modifies original columnWojtek Szymanski2017-03-081-0/+12
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fix for SQL round function that modifies original column when underlying data frame is created from a local product. import org.apache.spark.sql.functions._ case class NumericRow(value: BigDecimal) val df = spark.createDataFrame(Seq(NumericRow(BigDecimal("1.23456789")))) df.show() +--------------------+ | value| +--------------------+ |1.234567890000000000| +--------------------+ df.withColumn("value_rounded", round('value)).show() // before +--------------------+-------------+ | value|value_rounded| +--------------------+-------------+ |1.000000000000000000| 1| +--------------------+-------------+ // after +--------------------+-------------+ | value|value_rounded| +--------------------+-------------+ |1.234567890000000000| 1| +--------------------+-------------+ ## How was this patch tested? New unit test added to existing suite `org.apache.spark.sql.MathFunctionsSuite` Author: Wojtek Szymanski <wk.szymanski@gmail.com> Closes #17075 from wojtek-szymanski/SPARK-19727.
* [SPARK-19864][SQL][TEST] provide a makeQualifiedPath functions to optimize ↵windpiger2017-03-082-11/+14
| | | | | | | | | | | | | | | some code ## What changes were proposed in this pull request? Currently there are lots of places to make the path qualified, it is better to provide a function to do this, then the code will be more simple. ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #17204 from windpiger/addQualifiledPathUtil.
* [SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled ↵Xiao Li2017-03-082-9/+10
| | | | | | | | | | | | | | | | | | | | | | | | | | | Repartition ### What changes were proposed in this pull request? Observed by felixcheung in https://github.com/apache/spark/pull/16739, when users use the shuffle-enabled `repartition` API, they expect the partition they got should be the exact number they provided, even if they call shuffle-disabled `coalesce` later. Currently, `CollapseRepartition` rule does not consider whether shuffle is enabled or not. Thus, we got the following unexpected result. ```Scala val df = spark.range(0, 10000, 1, 5) val df2 = df.repartition(10) assert(df2.coalesce(13).rdd.getNumPartitions == 5) assert(df2.coalesce(7).rdd.getNumPartitions == 5) assert(df2.coalesce(3).rdd.getNumPartitions == 3) ``` This PR is to fix the issue. We preserve shuffle-enabled Repartition. ### How was this patch tested? Added a test case Author: Xiao Li <gatorsmile@gmail.com> Closes #16933 from gatorsmile/CollapseRepartition.
* [SPARK-19865][SQL] remove the view identifier in SubqueryAliasjiangxingbo2017-03-082-4/+1
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Since we have a `View` node now, we can remove the view identifier in `SubqueryAlias`, which was used to indicate a view node before. ## How was this patch tested? Update the related test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17210 from jiangxb1987/SubqueryAlias.
* [SPARK-17080][SQL] join reorderwangzhenhua2017-03-082-1/+17
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Reorder the joins using a dynamic programming algorithm (Selinger paper): First we put all items (basic joined nodes) into level 1, then we build all two-way joins at level 2 from plans at level 1 (single items), then build all 3-way joins from plans at previous levels (two-way joins and single items), then 4-way joins ... etc, until we build all n-way joins and pick the best plan among them. When building m-way joins, we only keep the best plan (with the lowest cost) for the same set of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. Thus, the plans maintained for each level when reordering four items A, B, C, D are as follows: ``` level 1: p({A}), p({B}), p({C}), p({D}) level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D}) level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D}) level 4: p({A, B, C, D}) ``` where p({A, B, C, D}) is the final output plan. For cost evaluation, since physical costs for operators are not available currently, we use cardinalities and sizes to compute costs. ## How was this patch tested? add test cases Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #17138 from wzhfy/joinReorder.