aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-19830][SQL] Add parseTableSchema API to ParserInterfaceXiao Li2017-03-163-1/+104
| | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? Specifying the table schema in DDL formats is needed for different scenarios. For example, - [specifying the schema in SQL function `from_json` using DDL formats](https://issues.apache.org/jira/browse/SPARK-19637), which is suggested by marmbrus , - [specifying the customized JDBC data types](https://github.com/apache/spark/pull/16209). These two PRs need users to use the JSON format to specify the table schema. This is not user friendly. This PR is to provide a `parseTableSchema` API in `ParserInterface`. ### How was this patch tested? Added a test suite `TableSchemaParserSuite` Author: Xiao Li <gatorsmile@gmail.com> Closes #17171 from gatorsmile/parseDDLStmt.
* [SPARK-19751][SQL] Throw an exception if bean class has one's own class in ↵Takeshi Yamamuro2017-03-161-6/+13
| | | | | | | | | | | | | | | | | | | | | fields ## What changes were proposed in this pull request? The current master throws `StackOverflowError` in `createDataFrame`/`createDataset` if bean has one's own class in fields; ``` public class SelfClassInFieldBean implements Serializable { private SelfClassInFieldBean child; ... } ``` This pr added code to throw `UnsupportedOperationException` in that case as soon as possible. ## How was this patch tested? Added tests in `JavaDataFrameSuite` and `JavaDatasetSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17188 from maropu/SPARK-19751.
* [SPARK-19961][SQL][MINOR] unify a erro msg when drop databse for ↵windpiger2017-03-161-1/+1
| | | | | | | | | | | | | | HiveExternalCatalog and InMemoryCatalog ## What changes were proposed in this pull request? unify a exception erro msg for dropdatabase when the database still have some tables for HiveExternalCatalog and InMemoryCatalog ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #17305 from windpiger/unifyErromsg.
* [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change ↵Tejas Patil2017-03-151-0/+30
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | CartesianProductExec, SortMergeJoin, WindowExec to use it ## What issue does this PR address ? Jira: https://issues.apache.org/jira/browse/SPARK-13450 In `SortMergeJoinExec`, rows of the right relation having the same value for a join key are buffered in-memory. In case of skew, this causes OOMs (see comments in SPARK-13450 for more details). Heap dump from a failed job confirms this : https://issues.apache.org/jira/secure/attachment/12846382/heap-dump-analysis.png . While its possible to increase the heap size to workaround, Spark should be resilient to such issues as skews can happen arbitrarily. ## Change proposed in this pull request - Introduces `ExternalAppendOnlyUnsafeRowArray` - It holds `UnsafeRow`s in-memory upto a certain threshold. - After the threshold is hit, it switches to `UnsafeExternalSorter` which enables spilling of the rows to disk. It does NOT sort the data. - Allows iterating the array multiple times. However, any alteration to the array (using `add` or `clear`) will invalidate the existing iterator(s) - `WindowExec` was already using `UnsafeExternalSorter` to support spilling. Changed it to use the new array - Changed `SortMergeJoinExec` to use the new array implementation - NOTE: I have not changed FULL OUTER JOIN to use this new array implementation. Changing that will need more surgery and I will rather put up a separate PR for that once this gets in. - Changed `CartesianProductExec` to use the new array implementation #### Note for reviewers The diff can be divided into 3 parts. My motive behind having all the changes in a single PR was to demonstrate that the API is sane and supports 2 use cases. If reviewing as 3 separate PRs would help, I am happy to make the split. ## How was this patch tested ? #### Unit testing - Added unit tests `ExternalAppendOnlyUnsafeRowArray` to validate all its APIs and access patterns - Added unit test for `SortMergeExec` - with and without spill for inner join, left outer join, right outer join to confirm that the spill threshold config behaves as expected and output is as expected. - This PR touches the scanning logic in `SortMergeExec` for _all_ joins (except FULL OUTER JOIN). However, I expect existing test cases to cover that there is no regression in correctness. - Added unit test for `WindowExec` to check behavior of spilling and correctness of results. #### Stress testing - Confirmed that OOM is gone by running against a production job which used to OOM - Since I cannot share details about prod workload externally, created synthetic data to mimic the issue. Ran before and after the fix to demonstrate the issue and query success with this PR Generating the synthetic data ``` ./bin/spark-shell --driver-memory=6G import org.apache.spark.sql._ val hc = SparkSession.builder.master("local").getOrCreate() hc.sql("DROP TABLE IF EXISTS spark_13450_large_table").collect hc.sql("DROP TABLE IF EXISTS spark_13450_one_row_table").collect val df1 = (0 until 1).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2") df1.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_one_row_table") val df2 = (0 until 3000000).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2") df2.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_large_table") ``` Ran this against trunk VS local build with this PR. OOM repros with trunk and with the fix this query runs fine. ``` ./bin/spark-shell --driver-java-options="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/spark.driver.heapdump.hprof" import org.apache.spark.sql._ val hc = SparkSession.builder.master("local").getOrCreate() hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1") hc.sql("SET spark.sql.sortMergeJoinExec.buffer.spill.threshold=10000") hc.sql("DROP TABLE IF EXISTS spark_13450_result").collect hc.sql(""" CREATE TABLE spark_13450_result AS SELECT a.i AS a_i, a.j AS a_j, a.str1 AS a_str1, a.str2 AS a_str2, b.i AS b_i, b.j AS b_j, b.str1 AS b_str1, b.str2 AS b_str2 FROM spark_13450_one_row_table a JOIN spark_13450_large_table b ON a.i=b.i AND a.j=b.j """) ``` ## Performance comparison ### Macro-benchmark I ran a SMB join query over two real world tables (2 trillion rows (40 TB) and 6 million rows (120 GB)). Note that this dataset does not have skew so no spill happened. I saw improvement in CPU time by 2-4% over version without this PR. This did not add up as I was expected some regression. I think allocating array of capacity of 128 at the start (instead of starting with default size 16) is the sole reason for the perf. gain : https://github.com/tejasapatil/spark/blob/SPARK-13450_smb_buffer_oom/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala#L43 . I could remove that and rerun, but effectively the change will be deployed in this form and I wanted to see the effect of it over large workload. ### Micro-benchmark Two types of benchmarking can be found in `ExternalAppendOnlyUnsafeRowArrayBenchmark`: [A] Comparing `ExternalAppendOnlyUnsafeRowArray` against raw `ArrayBuffer` when all rows fit in-memory and there is no spill ``` Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ ArrayBuffer 7821 / 7941 33.5 29.8 1.0X ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ ArrayBuffer 19200 / 19206 25.6 39.1 1.0X ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ ArrayBuffer 5949 / 6028 17.2 58.1 1.0X ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X ``` [B] Comparing `ExternalAppendOnlyUnsafeRowArray` against raw `UnsafeExternalSorter` when there is spilling of data ``` Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X ``` Author: Tejas Patil <tejasp@fb.com> Closes #16909 from tejasapatil/SPARK-13450_smb_buffer_oom.
* [SPARK-19877][SQL] Restrict the nested level of a viewjiangxingbo2017-03-143-5/+26
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? We should restrict the nested level of a view, to avoid stack overflow exception during the view resolution. ## How was this patch tested? Add new test case in `SQLViewSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17241 from jiangxb1987/view-depth.
* [SPARK-19887][SQL] dynamic partition keys can be null or empty stringWenchen Fan2017-03-152-3/+8
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? When dynamic partition value is null or empty string, we should write the data to a directory like `a=__HIVE_DEFAULT_PARTITION__`, when we read the data back, we should respect this special directory name and treat it as null. This is the same behavior of impala, see https://issues.apache.org/jira/browse/IMPALA-252 ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17277 from cloud-fan/partition.
* [SPARK-19817][SQL] Make it clear that `timeZone` option is a general option ↵Takuya UESHIN2017-03-144-6/+13
| | | | | | | | | | | | | | | | in DataFrameReader/Writer. ## What changes were proposed in this pull request? As timezone setting can also affect partition values, it works for all formats, we should make it clear. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes #17281 from ueshin/issues/SPARK-19817.
* [SPARK-18966][SQL] NOT IN subquery with correlated expressions may return ↵Nattavut Sutyanyong2017-03-141-4/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | incorrect result ## What changes were proposed in this pull request? This PR fixes the following problem: ```` Seq((1, 2)).toDF("a1", "a2").createOrReplaceTempView("a") Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("b1", "b2").createOrReplaceTempView("b") // The expected result is 1 row of (1,2) as shown in the next statement. sql("select * from a where a1 not in (select b1 from b where b2 = a2)").show +---+---+ | a1| a2| +---+---+ +---+---+ sql("select * from a where a1 not in (select b1 from b where b2 = 2)").show +---+---+ | a1| a2| +---+---+ | 1| 2| +---+---+ ```` There are a number of scenarios to consider: 1. When the correlated predicate yields a match (i.e., B.B2 = A.A2) 1.1. When the NOT IN expression yields a match (i.e., A.A1 = B.B1) 1.2. When the NOT IN expression yields no match (i.e., A.A1 = B.B1 returns false) 1.3. When A.A1 is null 1.4. When B.B1 is null 1.4.1. When A.A1 is not null 1.4.2. When A.A1 is null 2. When the correlated predicate yields no match (i.e.,B.B2 = A.A2 is false or unknown) 2.1. When B.B2 is null and A.A2 is null 2.2. When B.B2 is null and A.A2 is not null 2.3. When the value of A.A2 does not match any of B.B2 ```` A.A1 A.A2 B.B1 B.B2 ----- ----- ----- ----- 1 1 1 1 (1.1) 2 1 (1.2) null 1 (1.3) 1 3 null 3 (1.4.1) null 3 (1.4.2) 1 null 1 null (2.1) null 2 (2.2 & 2.3) ```` We can divide the evaluation of the above correlated NOT IN subquery into 2 groups:- Group 1: The rows in A when there is a match from the correlated predicate (A.A1 = B.B1) In this case, the result of the subquery is not empty and the semantics of the NOT IN depends solely on the evaluation of the equality comparison of the columns of NOT IN, i.e., A1 = B1, which says - If A.A1 is null, the row is filtered (1.3 and 1.4.2) - If A.A1 = B.B1, the row is filtered (1.1) - If B.B1 is null, any rows of A in the same group (A.A2 = B.B2) is filtered (1.4.1 & 1.4.2) - Otherwise, the row is qualified. Hence, in this group, the result is the row from (1.2). Group 2: The rows in A when there is no match from the correlated predicate (A.A2 = B.B2) In this case, all the rows in A, including the rows where A.A1, are qualified because the subquery returns an empty set and by the semantics of the NOT IN, all rows from the parent side qualifies as the result set, that is, the rows from (2.1, 2.2 and 2.3). In conclusion, the correct result set of the above query is ```` A.A1 A.A2 ----- ----- 2 1 (1.2) 1 null (2.1) null 2 (2.2 & 2.3) ```` ## How was this patch tested? unit tests, regression tests, and new test cases focusing on the problem being fixed. Author: Nattavut Sutyanyong <nsy.can@gmail.com> Closes #17294 from nsyca/18966.
* [SPARK-19933][SQL] Do not change output of a subqueryHerman van Hovell2017-03-143-3/+28
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The `RemoveRedundantAlias` rule can change the output attributes (the expression id's to be precise) of a query by eliminating the redundant alias producing them. This is no problem for a regular query, but can cause problems for correlated subqueries: The attributes produced by the subquery are used in the parent plan; changing them will break the parent plan. This PR fixes this by wrapping a subquery in a `Subquery` top level node when it gets optimized. The `RemoveRedundantAlias` rule now recognizes `Subquery` and makes sure that the output attributes of the `Subquery` node are retained. ## How was this patch tested? Added a test case to `RemoveRedundantAliasAndProjectSuite` and added a regression test to `SubquerySuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17278 from hvanhovell/SPARK-19933.
* [SPARK-18874][SQL] Fix 2.10 build after moving the subquery rules to ↵Herman van Hovell2017-03-141-2/+2
| | | | | | | | | | | | | | optimization ## What changes were proposed in this pull request? Commit https://github.com/apache/spark/commit/4ce970d71488c7de6025ef925f75b8b92a5a6a79 in accidentally broke the 2.10 build for Spark. This PR fixes this by simplifying the offending pattern match. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17288 from hvanhovell/SPARK-18874.
* [SPARK-19850][SQL] Allow the use of aliases in SQL function callsHerman van Hovell2017-03-143-5/+8
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? We currently cannot use aliases in SQL function calls. This is inconvenient when you try to create a struct. This SQL query for example `select struct(1, 2) st`, will create a struct with column names `col1` and `col2`. This is even more problematic when we want to append a field to an existing struct. For example if we want to a field to struct `st` we would issue the following SQL query `select struct(st.*, 1) as st from src`, the result will be struct `st` with an a column with a non descriptive name `col3` (if `st` itself has 2 fields). This PR proposes to change this by allowing the use of aliased expression in function parameters. For example `select struct(1 as a, 2 as b) st`, will create a struct with columns `a` & `b`. ## How was this patch tested? Added a test to `ExpressionParserSuite` and added a test file for `SQLQueryTestSuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17245 from hvanhovell/SPARK-19850.
* [SPARK-19944][SQL] Move SQLConf from sql/core to sql/catalystReynold Xin2017-03-145-93/+1200
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch moves SQLConf from sql/core to sql/catalyst. To minimize the changes, the patch used type alias to still keep CatalystConf (as a type alias) and SimpleCatalystConf (as a concrete class that extends SQLConf). Motivation for the change is that it is pretty weird to have SQLConf only in sql/core and then we have to duplicate config options that impact optimizer/analyzer in sql/catalyst using CatalystConf. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #17285 from rxin/SPARK-19944.
* [SPARK-18874][SQL] First phase: Deferring the correlated predicate pull up ↵Nattavut Sutyanyong2017-03-1410-294/+667
| | | | | | | | | | | | | | | | | | | | | | | | | to Optimizer phase ## What changes were proposed in this pull request? Currently Analyzer as part of ResolveSubquery, pulls up the correlated predicates to its originating SubqueryExpression. The subquery plan is then transformed to remove the correlated predicates after they are moved up to the outer plan. In this PR, the task of pulling up correlated predicates is deferred to Optimizer. This is the initial work that will allow us to support the form of correlated subqueries that we don't support today. The design document from nsyca can be found in the following link : [DesignDoc](https://docs.google.com/document/d/1QDZ8JwU63RwGFS6KVF54Rjj9ZJyK33d49ZWbjFBaIgU/edit#) The brief description of code changes (hopefully to aid with code review) can be be found in the following link: [CodeChanges](https://docs.google.com/document/d/18mqjhL9V1An-tNta7aVE13HkALRZ5GZ24AATA-Vqqf0/edit#) ## How was this patch tested? The test case PRs were submitted earlier using. [16337](https://github.com/apache/spark/pull/16337) [16759](https://github.com/apache/spark/pull/16759) [16841](https://github.com/apache/spark/pull/16841) [16915](https://github.com/apache/spark/pull/16915) [16798](https://github.com/apache/spark/pull/16798) [16712](https://github.com/apache/spark/pull/16712) [16710](https://github.com/apache/spark/pull/16710) [16760](https://github.com/apache/spark/pull/16760) [16802](https://github.com/apache/spark/pull/16802) Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #16954 from dilipbiswal/SPARK-18874.
* [SPARK-17495][SQL] Support date, timestamp and interval types in Hive hashTejas Patil2017-03-122-11/+268
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? - Timestamp hashing is done as per [TimestampWritable.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java#L406) in Hive - Interval hashing is done as per [HiveIntervalDayTime.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/storage-api/src/java/org/apache/hadoop/hive/common/type/HiveIntervalDayTime.java#L178). Note that there are inherent differences in how Hive and Spark store intervals under the hood which limits the ability to be in completely sync with hive's hashing function. I have explained this in the method doc. - Date type was already supported. This PR adds test for that. ## How was this patch tested? Added unit tests Author: Tejas Patil <tejasp@fb.com> Closes #17062 from tejasapatil/SPARK-17495_time_related_types.
* [SPARK-19893][SQL] should not run DataFrame set oprations with map typeWenchen Fan2017-03-101-3/+22
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17236 from cloud-fan/map.
* [SPARK-19008][SQL] Improve performance of Dataset.map by eliminating ↵Kazuaki Ishizaki2017-03-091-1/+37
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | boxing/unboxing ## What changes were proposed in this pull request? This PR improve performance of Dataset.map() for primitive types by removing boxing/unbox operations. This is based on [the discussion](https://github.com/apache/spark/pull/16391#discussion_r93788919) with cloud-fan. Current Catalyst generates a method call to a `apply()` method of an anonymous function written in Scala. The types of an argument and return value are `java.lang.Object`. As a result, each method call for a primitive value involves a pair of unboxing and boxing for calling this `apply()` method and a pair of boxing and unboxing for returning from this `apply()` method. This PR directly calls a specialized version of a `apply()` method without boxing and unboxing. For example, if types of an arguments ant return value is `int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any combination of `Int`, `Long`, `Float`, and `Double`. The following is a benchmark result using [this program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a Dataset part of this program. Without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ RDD 1923 / 1952 52.0 19.2 1.0X DataFrame 526 / 548 190.2 5.3 3.7X Dataset 3094 / 3154 32.3 30.9 0.6X ``` With this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ RDD 1883 / 1892 53.1 18.8 1.0X DataFrame 502 / 642 199.1 5.0 3.7X Dataset 657 / 784 152.2 6.6 2.9X ``` ```java def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { import spark.implicits._ val rdd = spark.sparkContext.range(0, numRows) val ds = spark.range(0, numRows) val func = (l: Long) => l + 1 val benchmark = new Benchmark("back-to-back map", numRows) ... benchmark.addCase("Dataset") { iter => var res = ds.as[Long] var i = 0 while (i < numChains) { res = res.map(func) i += 1 } res.queryExecution.toRdd.foreach(_ => Unit) } benchmark } ``` A motivating example ```java Seq(1, 2, 3).toDS.map(i => i * 7).show ``` Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow deserializetoobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 012 */ private int mapelements_argValue; /* 013 */ private UnsafeRow mapelements_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 016 */ private UnsafeRow serializefromobject_result; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 019 */ /* 020 */ public GeneratedIterator(Object[] references) { /* 021 */ this.references = references; /* 022 */ } /* 023 */ /* 024 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 025 */ partitionIndex = index; /* 026 */ this.inputs = inputs; /* 027 */ inputadapter_input = inputs[0]; /* 028 */ deserializetoobject_result = new UnsafeRow(1); /* 029 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0); /* 030 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 031 */ /* 032 */ mapelements_result = new UnsafeRow(1); /* 033 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0); /* 034 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 035 */ serializefromobject_result = new UnsafeRow(1); /* 036 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 037 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 038 */ /* 039 */ } /* 040 */ /* 041 */ protected void processNext() throws java.io.IOException { /* 042 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 043 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 044 */ int inputadapter_value = inputadapter_row.getInt(0); /* 045 */ /* 046 */ boolean mapelements_isNull = true; /* 047 */ int mapelements_value = -1; /* 048 */ if (!false) { /* 049 */ mapelements_argValue = inputadapter_value; /* 050 */ /* 051 */ mapelements_isNull = false; /* 052 */ if (!mapelements_isNull) { /* 053 */ Object mapelements_funcResult = null; /* 054 */ mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue); /* 055 */ if (mapelements_funcResult == null) { /* 056 */ mapelements_isNull = true; /* 057 */ } else { /* 058 */ mapelements_value = (Integer) mapelements_funcResult; /* 059 */ } /* 060 */ /* 061 */ } /* 062 */ /* 063 */ } /* 064 */ /* 065 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 066 */ /* 067 */ if (mapelements_isNull) { /* 068 */ serializefromobject_rowWriter.setNullAt(0); /* 069 */ } else { /* 070 */ serializefromobject_rowWriter.write(0, mapelements_value); /* 071 */ } /* 072 */ append(serializefromobject_result); /* 073 */ if (shouldStop()) return; /* 074 */ } /* 075 */ } /* 076 */ } ``` Generated code with this PR (lines 48-56 are changed) ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow deserializetoobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 012 */ private int mapelements_argValue; /* 013 */ private UnsafeRow mapelements_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 016 */ private UnsafeRow serializefromobject_result; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 019 */ /* 020 */ public GeneratedIterator(Object[] references) { /* 021 */ this.references = references; /* 022 */ } /* 023 */ /* 024 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 025 */ partitionIndex = index; /* 026 */ this.inputs = inputs; /* 027 */ inputadapter_input = inputs[0]; /* 028 */ deserializetoobject_result = new UnsafeRow(1); /* 029 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0); /* 030 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 031 */ /* 032 */ mapelements_result = new UnsafeRow(1); /* 033 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0); /* 034 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 035 */ serializefromobject_result = new UnsafeRow(1); /* 036 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 037 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 038 */ /* 039 */ } /* 040 */ /* 041 */ protected void processNext() throws java.io.IOException { /* 042 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 043 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 044 */ int inputadapter_value = inputadapter_row.getInt(0); /* 045 */ /* 046 */ boolean mapelements_isNull = true; /* 047 */ int mapelements_value = -1; /* 048 */ if (!false) { /* 049 */ mapelements_argValue = inputadapter_value; /* 050 */ /* 051 */ mapelements_isNull = false; /* 052 */ if (!mapelements_isNull) { /* 053 */ mapelements_value = ((scala.Function1) references[0]).apply$mcII$sp(mapelements_argValue); /* 054 */ } /* 055 */ /* 056 */ } /* 057 */ /* 058 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 059 */ /* 060 */ if (mapelements_isNull) { /* 061 */ serializefromobject_rowWriter.setNullAt(0); /* 062 */ } else { /* 063 */ serializefromobject_rowWriter.write(0, mapelements_value); /* 064 */ } /* 065 */ append(serializefromobject_result); /* 066 */ if (shouldStop()) return; /* 067 */ } /* 068 */ } /* 069 */ } ``` Java bytecode for methods for `i => i * 7` ```java $ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class Compiled from "Test.scala" public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable { public static final long serialVersionUID; public final int apply(int); Code: 0: aload_0 1: iload_1 2: invokevirtual #18 // Method apply$mcII$sp:(I)I 5: ireturn public int apply$mcII$sp(int); Code: 0: iload_1 1: bipush 7 3: imul 4: ireturn public final java.lang.Object apply(java.lang.Object); Code: 0: aload_0 1: aload_1 2: invokestatic #29 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I 5: invokevirtual #31 // Method apply:(I)I 8: invokestatic #35 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer; 11: areturn public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5); Code: 0: aload_0 1: invokespecial #42 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V 4: return } ``` ## How was this patch tested? Added new test suites to `DatasetPrimitiveSuite`. Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17172 from kiszk/SPARK-19008.
* [SPARK-19611][SQL] Introduce configurable table schema inferenceBudde2017-03-095-4/+47
| | | | | | | | | | | | | | | | | | | | | | | | | | | ## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde <budde@amazon.com> Closes #16944 from budde/SPARK-19611.
* [SPARK-19763][SQL] qualified external datasource table location stored in ↵windpiger2017-03-091-1/+13
| | | | | | | | | | | | | | | | | | | | | | | | | | | catalog ## What changes were proposed in this pull request? If we create a external datasource table with a non-qualified location , we should qualified it to store in catalog. ``` CREATE TABLE t(a string) USING parquet LOCATION '/path/xx' CREATE TABLE t1(a string, b string) USING parquet PARTITIONED BY(b) LOCATION '/path/xx' ``` when we get the table from catalog, the location should be qualified, e.g.'file:/path/xxx' ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17095 from windpiger/tablepathQualified.
* [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.uncleGen2017-03-081-1/+8
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? A follow up to SPARK-19859: - extract the calculation of `delayMs` and reuse it. - update EventTimeWatermarkExec - use the correct `delayMs` in EventTimeWatermark ## How was this patch tested? Jenkins. Author: uncleGen <hustyugm@gmail.com> Closes #17221 from uncleGen/SPARK-19859.
* [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session ↵Kunal Khamar2017-03-084-5/+100
| | | | | | | | | | | | | | | | has an identical copy of the SessionState Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState. Subsequent changes to base session are not propagated to cloned session, clone is independent after creation. If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables. Unit tests Author: Kunal Khamar <kkhamar@outlook.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #16826 from kunalkhamar/fork-sparksession.
* [SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow ↵Shixiong Zhu2017-03-085-23/+344
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | invalid cases ## What changes were proposed in this pull request? Add a output mode parameter to `flatMapGroupsWithState` and just define `mapGroupsWithState` as `flatMapGroupsWithState(Update)`. `UnsupportedOperationChecker` is modified to disallow unsupported cases. - Batch mapGroupsWithState or flatMapGroupsWithState is always allowed. - For streaming (map/flatMap)GroupsWithState, see the following table: | Operators | Supported Query Output Mode | | ------------- | ------------- | | flatMapGroupsWithState(Update) without aggregation | Update | | flatMapGroupsWithState(Update) with aggregation | None | | flatMapGroupsWithState(Append) without aggregation | Append | | flatMapGroupsWithState(Append) before aggregation | Append, Update, Complete | | flatMapGroupsWithState(Append) after aggregation | None | | Multiple flatMapGroupsWithState(Append)s | Append | | Multiple mapGroupsWithStates | None | | Mxing mapGroupsWithStates and flatMapGroupsWithStates | None | | Other cases of multiple flatMapGroupsWithState | None | ## How was this patch tested? The added unit tests. Here are the tests related to (map/flatMap)GroupsWithState: ``` [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in update mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in append mode: not supported (7 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Append mode: not supported (11 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Update mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in update mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Update mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Complete mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Update mode: not supported (4 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation in complete mode: not supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Append output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Update output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Append output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Update output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are in append mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some are not in append mode: not supported (7 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in append mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in complete mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Update mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Complete mode: not supported (4 milliseconds) [info] - streaming plan - mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are in append mode: not supported (4 milliseconds) [info] - streaming plan - mapGroupsWithState - mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation: not supported (4 milliseconds) ``` Author: Shixiong Zhu <shixiong@databricks.com> Closes #17197 from zsxwing/mapgroups-check.
* [SPARK-19727][SQL] Fix for round function that modifies original columnWojtek Szymanski2017-03-086-25/+42
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fix for SQL round function that modifies original column when underlying data frame is created from a local product. import org.apache.spark.sql.functions._ case class NumericRow(value: BigDecimal) val df = spark.createDataFrame(Seq(NumericRow(BigDecimal("1.23456789")))) df.show() +--------------------+ | value| +--------------------+ |1.234567890000000000| +--------------------+ df.withColumn("value_rounded", round('value)).show() // before +--------------------+-------------+ | value|value_rounded| +--------------------+-------------+ |1.000000000000000000| 1| +--------------------+-------------+ // after +--------------------+-------------+ | value|value_rounded| +--------------------+-------------+ |1.234567890000000000| 1| +--------------------+-------------+ ## How was this patch tested? New unit test added to existing suite `org.apache.spark.sql.MathFunctionsSuite` Author: Wojtek Szymanski <wk.szymanski@gmail.com> Closes #17075 from wojtek-szymanski/SPARK-19727.
* [SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled ↵Xiao Li2017-03-084-38/+166
| | | | | | | | | | | | | | | | | | | | | | | | | | | Repartition ### What changes were proposed in this pull request? Observed by felixcheung in https://github.com/apache/spark/pull/16739, when users use the shuffle-enabled `repartition` API, they expect the partition they got should be the exact number they provided, even if they call shuffle-disabled `coalesce` later. Currently, `CollapseRepartition` rule does not consider whether shuffle is enabled or not. Thus, we got the following unexpected result. ```Scala val df = spark.range(0, 10000, 1, 5) val df2 = df.repartition(10) assert(df2.coalesce(13).rdd.getNumPartitions == 5) assert(df2.coalesce(7).rdd.getNumPartitions == 5) assert(df2.coalesce(3).rdd.getNumPartitions == 3) ``` This PR is to fix the issue. We preserve shuffle-enabled Repartition. ### How was this patch tested? Added a test case Author: Xiao Li <gatorsmile@gmail.com> Closes #16933 from gatorsmile/CollapseRepartition.
* [SPARK-19865][SQL] remove the view identifier in SubqueryAliasjiangxingbo2017-03-0812-40/+39
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Since we have a `View` node now, we can remove the view identifier in `SubqueryAlias`, which was used to indicate a view node before. ## How was this patch tested? Update the related test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17210 from jiangxb1987/SubqueryAlias.
* [SPARK-17080][SQL] join reorderwangzhenhua2017-03-086-3/+504
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Reorder the joins using a dynamic programming algorithm (Selinger paper): First we put all items (basic joined nodes) into level 1, then we build all two-way joins at level 2 from plans at level 1 (single items), then build all 3-way joins from plans at previous levels (two-way joins and single items), then 4-way joins ... etc, until we build all n-way joins and pick the best plan among them. When building m-way joins, we only keep the best plan (with the lowest cost) for the same set of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. Thus, the plans maintained for each level when reordering four items A, B, C, D are as follows: ``` level 1: p({A}), p({B}), p({C}), p({D}) level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D}) level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D}) level 4: p({A, B, C, D}) ``` where p({A, B, C, D}) is the final output plan. For cost evaluation, since physical costs for operators are not available currently, we use cardinalities and sizes to compute costs. ## How was this patch tested? add test cases Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #17138 from wzhfy/joinReorder.
* [SPARK-18055][SQL] Use correct mirror in ExpresionEncoderMichael Armbrust2017-03-081-2/+2
| | | | | | | | | | Previously, we were using the mirror of passed in `TypeTag` when reflecting to build an encoder. This fails when the outer class is built in (i.e. `Seq`'s default mirror is based on root classloader) but inner classes (i.e. `A` in `Seq[A]`) are defined in the REPL or a library. This patch changes us to always reflect based on a mirror created using the context classloader. Author: Michael Armbrust <michael@databricks.com> Closes #17201 from marmbrus/replSeqEncoder.
* [SPARK-19859][SS] The new watermark should override the old oneShixiong Zhu2017-03-071-0/+7
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The new watermark should override the old one. Otherwise, we just pick up the first column which has a watermark, it may be unexpected. ## How was this patch tested? The new test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #17199 from zsxwing/SPARK-19859.
* [SPARK-19843][SQL] UTF8String => (int / long) conversion expensive for ↵Tejas Patil2017-03-071-31/+50
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | invalid inputs ## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-19843 Created wrapper classes (`IntWrapper`, `LongWrapper`) to wrap the result of parsing (which are primitive types). In case of problem in parsing, the method would return a boolean. ## How was this patch tested? - Added new unit tests - Ran a prod job which had conversion from string -> int and verified the outputs ## Performance Tiny regression when all strings are valid integers ``` conversion to int: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------- trunk 502 / 522 33.4 29.9 1.0X SPARK-19843 493 / 503 34.0 29.4 1.0X ``` Huge gain when all strings are invalid integers ``` conversion to int: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------- trunk 33913 / 34219 0.5 2021.4 1.0X SPARK-19843 154 / 162 108.8 9.2 220.0X ``` Author: Tejas Patil <tejasp@fb.com> Closes #17184 from tejasapatil/SPARK-19843_is_numeric_maybe.
* [SPARK-19637][SQL] Add to_json in FunctionRegistryTakeshi Yamamuro2017-03-072-2/+42
| | | | | | | | | | | | ## What changes were proposed in this pull request? This pr added entries in `FunctionRegistry` and supported `to_json` in SQL. ## How was this patch tested? Added tests in `JsonFunctionsSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #16981 from maropu/SPARK-19637.
* [SPARK-17075][SQL][FOLLOWUP] fix filter estimation issueswangzhenhua2017-03-062-274/+297
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? 1. support boolean type in binary expression estimation. 2. deal with compound Not conditions. 3. avoid convert BigInt/BigDecimal directly to double unless it's within range (0, 1). 4. reorganize test code. ## How was this patch tested? modify related test cases. Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #17148 from wzhfy/fixFilter.
* [SPARK-19350][SQL] Cardinality estimation of Limit and Samplewangzhenhua2017-03-063-79/+145
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Before this pr, LocalLimit/GlobalLimit/Sample propagates the same row count and column stats from its child, which is incorrect. We can get the correct rowCount in Statistics for GlobalLimit/Sample whether cbo is enabled or not. We don't know the rowCount for LocalLimit because we don't know the partition number at that time. Column stats should not be propagated because we don't know the distribution of columns after Limit or Sample. ## How was this patch tested? Added test cases. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #16696 from wzhfy/limitEstimation.
* [SPARK-19211][SQL] Explicitly prevent Insert into View or Create View As Insertjiangxingbo2017-03-061-1/+5
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently we don't explicitly forbid the following behaviors: 1. The statement CREATE VIEW AS INSERT INTO throws the following exception: ``` scala> spark.sql("CREATE VIEW testView AS INSERT INTO tab VALUES (1, \"a\")") org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: at least one column must be specified for the table; scala> spark.sql("CREATE VIEW testView(a, b) AS INSERT INTO tab VALUES (1, \"a\")") org.apache.spark.sql.AnalysisException: The number of columns produced by the SELECT clause (num: `0`) does not match the number of column names specified by CREATE VIEW (num: `2`).; ``` 2. The statement INSERT INTO view VALUES throws the following exception from checkAnalysis: ``` scala> spark.sql("INSERT INTO testView VALUES (1, \"a\")") org.apache.spark.sql.AnalysisException: Inserting into an RDD-based table is not allowed.;; 'InsertIntoTable View (`default`.`testView`, [a#16,b#17]), false, false +- LocalRelation [col1#14, col2#15] ``` After this PR, the behavior changes to: ``` scala> spark.sql("CREATE VIEW testView AS INSERT INTO tab VALUES (1, \"a\")") org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: CREATE VIEW ... AS INSERT INTO; scala> spark.sql("CREATE VIEW testView(a, b) AS INSERT INTO tab VALUES (1, \"a\")") org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: CREATE VIEW ... AS INSERT INTO; scala> spark.sql("INSERT INTO testView VALUES (1, \"a\")") org.apache.spark.sql.AnalysisException: `default`.`testView` is a view, inserting into a view is not allowed; ``` ## How was this patch tested? Add a new test case in `SparkSqlParserSuite`; Update the corresponding test case in `SQLViewSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17125 from jiangxb1987/insert-with-view.
* [SPARK-19257][SQL] location for table/partition/database should be java.net.URIwindpiger2017-03-065-29/+56
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently we treat the location of table/partition/database as URI string. It will be safer if we can make the type of location as java.net.URI. In this PR, there are following classes changes: **1. CatalogDatabase** ``` case class CatalogDatabase( name: String, description: String, locationUri: String, properties: Map[String, String]) ---> case class CatalogDatabase( name: String, description: String, locationUri: URI, properties: Map[String, String]) ``` **2. CatalogStorageFormat** ``` case class CatalogStorageFormat( locationUri: Option[String], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) ----> case class CatalogStorageFormat( locationUri: Option[URI], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) ``` Before and After this PR, it is transparent for user, there is no change that the user should concern. The `String` to `URI` just happened in SparkSQL internally. Here list some operation related location: **1. whitespace in the location** e.g. `/a/b c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b c/d'` , then `DESC EXTENDED t ` show the location is `/a/b c/d`, and the real path in the FileSystem also show `/a/b c/d` **2. colon(:) in the location** e.g. `/a/b:c/d` For both table location and partition location, when `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b:c/d'` , **In linux file system** `DESC EXTENDED t ` show the location is `/a/b:c/d`, and the real path in the FileSystem also show `/a/b:c/d` **in HDFS** throw exception: `java.lang.IllegalArgumentException: Pathname /a/b:c/d from hdfs://iZbp1151s8hbnnwriekxdeZ:9000/a/b:c/d is not a valid DFS filename.` **while** After `INSERT INTO TABLE t PARTITION(a="a:b") SELECT 1` then `DESC EXTENDED t ` show the location is `/xxx/a=a%3Ab`, and the real path in the FileSystem also show `/xxx/a=a%3Ab` **3. percent sign(%) in the location** e.g. `/a/b%c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%c/d'` , then `DESC EXTENDED t ` show the location is `/a/b%c/d`, and the real path in the FileSystem also show `/a/b%c/d` **4. encoded(%25) in the location** e.g. `/a/b%25c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%25c/d'` , then `DESC EXTENDED t ` show the location is `/a/b%25c/d`, and the real path in the FileSystem also show `/a/b%25c/d` **while** After `INSERT INTO TABLE t PARTITION(a="%25") SELECT 1` then `DESC EXTENDED t ` show the location is `/xxx/a=%2525`, and the real path in the FileSystem also show `/xxx/a=%2525` **Additionally**, except the location, there are two other factors will affect the location of the table/partition. one is the table name which does not allowed to have special characters, and the other is `partition name` which have the same actions with `partition value`, and `partition name` with special character situation has add some testcase and resolve a bug in [PR](https://github.com/apache/spark/pull/17173) ### Summary: After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION path`, the path which we get from `DESC TABLE` and `real path in FileSystem` are all the same with the `CREATE TABLE` command(different filesystem has different action that allow what kind of special character to create the path, e.g. HDFS does not allow colon, but linux filesystem allow it ). `DataBase` also have the same logic with `CREATE TABLE` while if the `partition value` has some special character like `%` `:` `#` etc, then we will get the path with encoded `partition value` like `/xxx/a=A%25B` from `DESC TABLE` and `real path in FileSystem` In this PR, the core change code is using `new Path(str).toUri` and `new Path(uri).toString` which transfrom `str to uri `or `uri to str`. for example: ``` val str = '/a/b c/d' val uri = new Path(str).toUri --> '/a/b%20c/d' val strFromUri = new Path(uri).toString -> '/a/b c/d' ``` when we restore table/partition from metastore, or get the location from `CREATE TABLE` command, we can use it as above to change string to uri `new Path(str).toUri ` ## How was this patch tested? unit test added. The `current master branch` also `passed all the test cases` added in this PR by a litter change. https://github.com/apache/spark/pull/17149/files#diff-b7094baa12601424a5d19cb930e3402fR1764 here `toURI` -> `toString` when test in master branch. This can show that this PR is transparent for user. Author: windpiger <songjun@outlook.com> Closes #17149 from windpiger/changeStringToURI.
* [SPARK-19737][SQL] New analysis rule for reporting unregistered functions ↵Cheng Lian2017-03-062-1/+43
| | | | | | | | | | | | | | | | | | | | | | without relying on relation resolution ## What changes were proposed in this pull request? This PR adds a new `Once` analysis rule batch consists of a single analysis rule `LookupFunctions` that performs simple existence check over `UnresolvedFunctions` without actually resolving them. The benefit of this rule is that it doesn't require function arguments to be resolved first and therefore doesn't rely on relation resolution, which may incur potentially expensive partition/schema discovery cost. Please refer to [SPARK-19737][1] for more details about the motivation. ## How was this patch tested? New test case added in `AnalysisErrorSuite`. [1]: https://issues.apache.org/jira/browse/SPARK-19737 Author: Cheng Lian <lian@databricks.com> Closes #17168 from liancheng/spark-19737-lookup-functions.
* [SPARK-17495][SQL] Support Decimal type in Hive-hashTejas Patil2017-03-062-3/+99
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Hive hash to support Decimal datatype. [Hive internally normalises decimals](https://github.com/apache/hive/blob/4ba713ccd85c3706d195aeef9476e6e6363f1c21/storage-api/src/java/org/apache/hadoop/hive/common/type/HiveDecimalV1.java#L307) and I have ported that logic as-is to HiveHash. ## How was this patch tested? Added unit tests Author: Tejas Patil <tejasp@fb.com> Closes #17056 from tejasapatil/SPARK-17495_decimal.
* [SPARK-19595][SQL] Support json array in from_jsonhyukjinkwon2017-03-052-8/+107
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to both, **Do not allow json arrays with multiple elements and return null in `from_json` with `StructType` as the schema.** Currently, it only reads the single row when the input is a json array. So, the codes below: ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = StructType(StructField("a", IntegerType) :: Nil) Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("struct").select(from_json(col("struct"), schema)).show() ``` prints ``` +--------------------+ |jsontostruct(struct)| +--------------------+ | [1]| +--------------------+ ``` This PR simply suggests to print this as `null` if the schema is `StructType` and input is json array.with multiple elements ``` +--------------------+ |jsontostruct(struct)| +--------------------+ | null| +--------------------+ ``` **Support json arrays in `from_json` with `ArrayType` as the schema.** ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("array").select(from_json(col("array"), schema)).show() ``` prints ``` +-------------------+ |jsontostruct(array)| +-------------------+ | [[1], [2]]| +-------------------+ ``` ## How was this patch tested? Unit test in `JsonExpressionsSuite`, `JsonFunctionsSuite`, Python doctests and manual test. Author: hyukjinkwon <gurwls223@gmail.com> Closes #16929 from HyukjinKwon/disallow-array.
* [SPARK-19254][SQL] Support Seq, Map, and Struct in functions.litTakeshi Yamamuro2017-03-052-12/+90
| | | | | | | | | | | | | ## What changes were proposed in this pull request? This pr is to support Seq, Map, and Struct in functions.lit; it adds a new IF named `lit2` with `TypeTag` for avoiding type erasure. ## How was this patch tested? Added tests in `LiteralExpressionSuite` Author: Takeshi Yamamuro <yamamuro@apache.org> Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #16610 from maropu/SPARK-19254.
* [SPARK-18939][SQL] Timezone support in partition values.Takuya UESHIN2017-03-034-8/+11
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This is a follow-up pr of #16308 and #16750. This pr enables timezone support in partition values. We should use `timeZone` option introduced at #16750 to parse/format partition values of the `TimestampType`. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT` which will be used for partition values, the values written by the default timezone option, which is `"GMT"` because the session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq((1, new java.sql.Timestamp(1451606400000L))).toDF("i", "ts") df: org.apache.spark.sql.DataFrame = [i: int, ts: timestamp] scala> df.show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ scala> df.write.partitionBy("ts").save("/path/to/gmtpartition") ``` ```sh $ ls /path/to/gmtpartition/ _SUCCESS ts=2016-01-01 00%3A00%3A00 ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").partitionBy("ts").save("/path/to/pstpartition") ``` ```sh $ ls /path/to/pstpartition/ _SUCCESS ts=2015-12-31 16%3A00%3A00 ``` We can properly read the partition values if the session local timezone and the timezone of the partition values are the same: ```scala scala> spark.read.load("/path/to/gmtpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ ``` And even if the timezones are different, we can properly read the values with setting corrent timezone option: ```scala // wrong result scala> spark.read.load("/path/to/pstpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2015-12-31 16:00:00| +---+-------------------+ // correct result scala> spark.read.option("timeZone", "PST").load("/path/to/pstpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ ``` ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #17053 from ueshin/issues/SPARK-18939.
* [SPARK-19758][SQL] Resolving timezone aware expressions with time zone when ↵Liang-Chi Hsieh2017-03-033-22/+36
| | | | | | | | | | | | | | | | | | | | | | resolving inline table ## What changes were proposed in this pull request? When we resolve inline tables in analyzer, we will evaluate the expressions of inline tables. When it evaluates a `TimeZoneAwareExpression` expression, an error will happen because the `TimeZoneAwareExpression` is not associated with timezone yet. So we need to resolve these `TimeZoneAwareExpression`s with time zone when resolving inline tables. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17114 from viirya/resolve-timeawareexpr-inline-table.
* [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded ↵Stan Zhai2017-03-012-1/+15
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | by FoldablePropagation rule ## What changes were proposed in this pull request? This PR fixes the code in Optimizer phase where the constant alias columns of a `INNER JOIN` query are folded in Rule `FoldablePropagation`. For the following query(): ``` val sqlA = """ |create temporary view ta as |select a, 'a' as tag from t1 union all |select a, 'b' as tag from t2 """.stripMargin val sqlB = """ |create temporary view tb as |select a, 'a' as tag from t3 union all |select a, 'b' as tag from t4 """.stripMargin val sql = """ |select tb.* from ta inner join tb on |ta.a = tb.a and |ta.tag = tb.tag """.stripMargin ``` The tag column is an constant alias column, it's folded by `FoldablePropagation` like this: ``` TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation === Project [a#4, tag#14] Project [a#4, tag#14] !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14)) +- Join Inner, ((a#0 = a#4) && (a = a)) :- Union :- Union : :- Project [a#0, a AS tag#8] : :- Project [a#0, a AS tag#8] : : +- LocalRelation [a#0] : : +- LocalRelation [a#0] : +- Project [a#2, b AS tag#9] : +- Project [a#2, b AS tag#9] : +- LocalRelation [a#2] : +- LocalRelation [a#2] +- Union +- Union :- Project [a#4, a AS tag#14] :- Project [a#4, a AS tag#14] : +- LocalRelation [a#4] : +- LocalRelation [a#4] +- Project [a#6, b AS tag#15] +- Project [a#6, b AS tag#15] +- LocalRelation [a#6] +- LocalRelation [a#6] ``` Finally the Result of Batch Operator Optimizations is: ``` Project [a#4, tag#14] Project [a#4, tag#14] !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14)) +- Join Inner, (a#0 = a#4) ! :- SubqueryAlias ta, `ta` :- Union ! : +- Union : :- LocalRelation [a#0] ! : :- Project [a#0, a AS tag#8] : +- LocalRelation [a#2] ! : : +- SubqueryAlias t1, `t1` +- Union ! : : +- Project [a#0] :- LocalRelation [a#4, tag#14] ! : : +- SubqueryAlias grouping +- LocalRelation [a#6, tag#15] ! : : +- LocalRelation [a#0] ! : +- Project [a#2, b AS tag#9] ! : +- SubqueryAlias t2, `t2` ! : +- Project [a#2] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#2] ! +- SubqueryAlias tb, `tb` ! +- Union ! :- Project [a#4, a AS tag#14] ! : +- SubqueryAlias t3, `t3` ! : +- Project [a#4] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#4] ! +- Project [a#6, b AS tag#15] ! +- SubqueryAlias t4, `t4` ! +- Project [a#6] ! +- SubqueryAlias grouping ! +- LocalRelation [a#6] ``` The condition `tag#8 = tag#14` of INNER JOIN has been removed. This leads to the data of inner join being wrong. After fix: ``` === Result of Batch LocalRelation === GlobalLimit 21 GlobalLimit 21 +- LocalLimit 21 +- LocalLimit 21 +- Project [a#4, tag#11] +- Project [a#4, tag#11] +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11)) +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11)) ! :- SubqueryAlias ta :- Union ! : +- Union : :- LocalRelation [a#0, tag#8] ! : :- Project [a#0, a AS tag#8] : +- LocalRelation [a#2, tag#9] ! : : +- SubqueryAlias t1 +- Union ! : : +- Project [a#0] :- LocalRelation [a#4, tag#11] ! : : +- SubqueryAlias grouping +- LocalRelation [a#6, tag#12] ! : : +- LocalRelation [a#0] ! : +- Project [a#2, b AS tag#9] ! : +- SubqueryAlias t2 ! : +- Project [a#2] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#2] ! +- SubqueryAlias tb ! +- Union ! :- Project [a#4, a AS tag#11] ! : +- SubqueryAlias t3 ! : +- Project [a#4] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#4] ! +- Project [a#6, b AS tag#12] ! +- SubqueryAlias t4 ! +- Project [a#6] ! +- SubqueryAlias grouping ! +- LocalRelation [a#6] ``` ## How was this patch tested? add sql-tests/inputs/inner-join.sql All tests passed. Author: Stan Zhai <zhaishidan@haizhi.com> Closes #17099 from stanzhai/fix-inner-join.
* [SPARK-19678][SQL] remove MetastoreRelationWenchen Fan2017-02-285-38/+51
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? `MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation` ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17015 from cloud-fan/table-relation.
* [MINOR][BUILD] Fix lint-java breaks in Javahyukjinkwon2017-02-271-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to fix the lint-breaks as below: ``` [ERROR] src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java:[29,8] (imports) UnusedImports: Unused import - org.apache.spark.network.buffer.ManagedBuffer. [ERROR] src/main/java/org/apache/spark/unsafe/types/UTF8String.java:[156,10] (modifier) ModifierOrder: 'Nonnull' annotation modifier does not precede non-annotation modifiers. [ERROR] src/main/java/org/apache/spark/SparkFirehoseListener.java:[122] (sizes) LineLength: Line is longer than 100 characters (found 105). [ERROR] src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java:[164,78] (coding) OneStatementPerLine: Only one statement per line allowed. [ERROR] src/test/java/test/org/apache/spark/JavaAPISuite.java:[1157] (sizes) LineLength: Line is longer than 100 characters (found 121). [ERROR] src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java:[149] (sizes) LineLength: Line is longer than 100 characters (found 113). [ERROR] src/test/java/test/org/apache/spark/streaming/Java8APISuite.java:[146] (sizes) LineLength: Line is longer than 100 characters (found 122). [ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[32,8] (imports) UnusedImports: Unused import - org.apache.spark.streaming.Time. [ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[611] (sizes) LineLength: Line is longer than 100 characters (found 101). [ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[1317] (sizes) LineLength: Line is longer than 100 characters (found 102). [ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java:[91] (sizes) LineLength: Line is longer than 100 characters (found 102). [ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[113] (sizes) LineLength: Line is longer than 100 characters (found 101). [ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[164] (sizes) LineLength: Line is longer than 100 characters (found 110). [ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[212] (sizes) LineLength: Line is longer than 100 characters (found 114). [ERROR] src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java:[36] (sizes) LineLength: Line is longer than 100 characters (found 101). [ERROR] src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java:[26,8] (imports) UnusedImports: Unused import - com.amazonaws.regions.RegionUtils. [ERROR] src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java:[20,8] (imports) UnusedImports: Unused import - com.amazonaws.regions.RegionUtils. [ERROR] src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java:[94] (sizes) LineLength: Line is longer than 100 characters (found 103). [ERROR] src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java:[30,8] (imports) UnusedImports: Unused import - org.apache.spark.sql.api.java.UDF1. [ERROR] src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java:[72] (sizes) LineLength: Line is longer than 100 characters (found 104). [ERROR] src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java:[121] (sizes) LineLength: Line is longer than 100 characters (found 101). [ERROR] src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java:[28,8] (imports) UnusedImports: Unused import - org.apache.spark.api.java.JavaRDD. [ERROR] src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java:[29,8] (imports) UnusedImports: Unused import - org.apache.spark.api.java.JavaSparkContext. ``` ## How was this patch tested? Manually via ```bash ./dev/lint-java ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #17072 from HyukjinKwon/java-lint.
* [SPARK-17075][SQL][FOLLOWUP] fix some minor issues and clean up the codeWenchen Fan2017-02-255-337/+296
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/16395. It fixes some code style issues, naming issues, some missing cases in pattern match, etc. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #17065 from cloud-fan/follow-up.
* [SPARK-19735][SQL] Remove HOLD_DDLTIME from Catalog APIsXiao Li2017-02-243-12/+4
| | | | | | | | | | | | ### What changes were proposed in this pull request? As explained in Hive JIRA https://issues.apache.org/jira/browse/HIVE-12224, HOLD_DDLTIME was broken as soon as it landed. Hive 2.0 removes HOLD_DDLTIME from the API. In Spark SQL, we always set it to FALSE. Like Hive, we should also remove it from our Catalog APIs. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #17063 from gatorsmile/removalHoldDDLTime.
* [SPARK-17078][SQL] Show stats when explainwangzhenhua2017-02-245-14/+39
| | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently we can only check the estimated stats in logical plans by debugging. We need to provide an easier and more efficient way for developers/users. In this pr, we add EXPLAIN COST command to show stats in the optimized logical plan. E.g. ``` spark-sql> EXPLAIN COST select count(1) from store_returns; ... == Optimized Logical Plan == Aggregate [count(1) AS count(1)#24L], Statistics(sizeInBytes=16.0 B, rowCount=1, isBroadcastable=false) +- Project, Statistics(sizeInBytes=4.3 GB, rowCount=5.76E+8, isBroadcastable=false) +- Relation[sr_returned_date_sk#3,sr_return_time_sk#4,sr_item_sk#5,sr_customer_sk#6,sr_cdemo_sk#7,sr_hdemo_sk#8,sr_addr_sk#9,sr_store_sk#10,sr_reason_sk#11,sr_ticket_number#12,sr_return_quantity#13,sr_return_amt#14,sr_return_tax#15,sr_return_amt_inc_tax#16,sr_fee#17,sr_return_ship_cost#18,sr_refunded_cash#19,sr_reversed_charge#20,sr_store_credit#21,sr_net_loss#22] parquet, Statistics(sizeInBytes=28.6 GB, rowCount=5.76E+8, isBroadcastable=false) ... ``` ## How was this patch tested? Add test cases. Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #16594 from wzhfy/showStats.
* [SPARK-17075][SQL] Follow up: fix file line ending and improve the testsShuai Lin2017-02-242-512/+533
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fixed the line ending of `FilterEstimation.scala` (It's still using `\n\r`). Also improved the tests to cover the cases where the literals are on the left side of a binary operator. ## How was this patch tested? Existing unit tests. Author: Shuai Lin <linshuai2012@gmail.com> Closes #17051 from lins05/fix-cbo-filter-file-encoding.
* [SPARK-17495][SQL] Add more tests for hive hashTejas Patil2017-02-242-7/+251
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR adds tests hive-hash by comparing the outputs generated against Hive 1.2.1. Following datatypes are covered by this PR: - null - boolean - byte - short - int - long - float - double - string - array - map - struct Datatypes that I have _NOT_ covered but I will work on separately are: - Decimal (handled separately in https://github.com/apache/spark/pull/17056) - TimestampType - DateType - CalendarIntervalType ## How was this patch tested? NA Author: Tejas Patil <tejasp@fb.com> Closes #17049 from tejasapatil/SPARK-17495_remaining_types.
* [SPARK-17075][SQL] implemented filter estimationRon Hu2017-02-234-1/+939
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? We traverse predicate and evaluate the logical expressions to compute the selectivity of a FILTER operator. ## How was this patch tested? We add a new test suite to test various logical operators. Author: Ron Hu <ron.hu@huawei.com> Closes #16395 from ron8hu/filterSelectivity.
* [SPARK-19497][SS] Implement streaming deduplicationShixiong Zhu2017-02-235-4/+121
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR adds a special streaming deduplication operator to support `dropDuplicates` with `aggregation` and watermark. It reuses the `dropDuplicates` API but creates new logical plan `Deduplication` and new physical plan `DeduplicationExec`. The following cases are supported: - one or multiple `dropDuplicates()` without aggregation (with or without watermark) - `dropDuplicates` before aggregation Not supported cases: - `dropDuplicates` after aggregation Breaking changes: - `dropDuplicates` without aggregation doesn't work with `complete` or `update` mode. ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16970 from zsxwing/dedup.
* [SPARK-19459] Support for nested char/varchar fields in ORCHerman van Hovell2017-02-232-15/+92
| | | | | | | | | | | | ## What changes were proposed in this pull request? This PR is a small follow-up on https://github.com/apache/spark/pull/16804. This PR also adds support for nested char/varchar fields in orc. ## How was this patch tested? I have added a regression test to the OrcSourceSuite. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17030 from hvanhovell/SPARK-19459-follow-up.