aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-16213][SQL] Reduce runtime overhead of a program that creates an ↵Kazuaki Ishizaki2016-12-297-89/+222
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | primitive array in DataFrame ## What changes were proposed in this pull request? This PR reduces runtime overhead of a program the creates an primitive array in DataFrame by using the similar approach to #15044. Generated code performs boxing operation in an assignment from InternalRow to an `Object[]` temporary array (at Lines 051 and 061 in the generated code before without this PR). If we know that type of array elements is primitive, we apply the following optimizations: 1. Eliminate a pair of `isNullAt()` and a null assignment 2. Allocate an primitive array instead of `Object[]` (eliminate boxing operations) 3. Create `UnsafeArrayData` by using `UnsafeArrayWriter` to keep a primitive array in a row format instead of doing non-lightweight operations in constructor of `GenericArrayData` The PR also performs the same things for `CreateMap`. Here are performance results of [DataFrame programs](https://github.com/kiszk/spark/blob/6bf54ec5e227689d69f6db991e9ecbc54e153d0a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala#L83-L112) by up to 17.9x over without this PR. ``` Without SPARK-16043 OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) Read a primitive array in DataFrame: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 3805 / 4150 0.0 507308.9 1.0X Double 3593 / 3852 0.0 479056.9 1.1X With SPARK-16043 Read a primitive array in DataFrame: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 213 / 271 0.0 28387.5 1.0X Double 204 / 223 0.0 27250.9 1.0X ``` Note : #15780 is enabled for these measurements An motivating example ``` java val df = sparkContext.parallelize(Seq(0.0d, 1.0d), 1).toDF df.selectExpr("Array(value + 1.1d, value + 2.2d)").show ``` Generated code without this PR ``` java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow serializefromobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 012 */ private Object[] project_values; /* 013 */ private UnsafeRow project_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter project_arrayWriter; /* 017 */ /* 018 */ public GeneratedIterator(Object[] references) { /* 019 */ this.references = references; /* 020 */ } /* 021 */ /* 022 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 023 */ partitionIndex = index; /* 024 */ this.inputs = inputs; /* 025 */ inputadapter_input = inputs[0]; /* 026 */ serializefromobject_result = new UnsafeRow(1); /* 027 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 028 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 029 */ this.project_values = null; /* 030 */ project_result = new UnsafeRow(1); /* 031 */ this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32); /* 032 */ this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1); /* 033 */ this.project_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 034 */ /* 035 */ } /* 036 */ /* 037 */ protected void processNext() throws java.io.IOException { /* 038 */ while (inputadapter_input.hasNext()) { /* 039 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 040 */ double inputadapter_value = inputadapter_row.getDouble(0); /* 041 */ /* 042 */ final boolean project_isNull = false; /* 043 */ this.project_values = new Object[2]; /* 044 */ boolean project_isNull1 = false; /* 045 */ /* 046 */ double project_value1 = -1.0; /* 047 */ project_value1 = inputadapter_value + 1.1D; /* 048 */ if (false) { /* 049 */ project_values[0] = null; /* 050 */ } else { /* 051 */ project_values[0] = project_value1; /* 052 */ } /* 053 */ /* 054 */ boolean project_isNull4 = false; /* 055 */ /* 056 */ double project_value4 = -1.0; /* 057 */ project_value4 = inputadapter_value + 2.2D; /* 058 */ if (false) { /* 059 */ project_values[1] = null; /* 060 */ } else { /* 061 */ project_values[1] = project_value4; /* 062 */ } /* 063 */ /* 064 */ final ArrayData project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_values); /* 065 */ this.project_values = null; /* 066 */ project_holder.reset(); /* 067 */ /* 068 */ project_rowWriter.zeroOutNullBytes(); /* 069 */ /* 070 */ if (project_isNull) { /* 071 */ project_rowWriter.setNullAt(0); /* 072 */ } else { /* 073 */ // Remember the current cursor so that we can calculate how many bytes are /* 074 */ // written later. /* 075 */ final int project_tmpCursor = project_holder.cursor; /* 076 */ /* 077 */ if (project_value instanceof UnsafeArrayData) { /* 078 */ final int project_sizeInBytes = ((UnsafeArrayData) project_value).getSizeInBytes(); /* 079 */ // grow the global buffer before writing data. /* 080 */ project_holder.grow(project_sizeInBytes); /* 081 */ ((UnsafeArrayData) project_value).writeToMemory(project_holder.buffer, project_holder.cursor); /* 082 */ project_holder.cursor += project_sizeInBytes; /* 083 */ /* 084 */ } else { /* 085 */ final int project_numElements = project_value.numElements(); /* 086 */ project_arrayWriter.initialize(project_holder, project_numElements, 8); /* 087 */ /* 088 */ for (int project_index = 0; project_index < project_numElements; project_index++) { /* 089 */ if (project_value.isNullAt(project_index)) { /* 090 */ project_arrayWriter.setNullDouble(project_index); /* 091 */ } else { /* 092 */ final double project_element = project_value.getDouble(project_index); /* 093 */ project_arrayWriter.write(project_index, project_element); /* 094 */ } /* 095 */ } /* 096 */ } /* 097 */ /* 098 */ project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor); /* 099 */ } /* 100 */ project_result.setTotalSize(project_holder.totalSize()); /* 101 */ append(project_result); /* 102 */ if (shouldStop()) return; /* 103 */ } /* 104 */ } /* 105 */ } ``` Generated code with this PR ``` java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow serializefromobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 012 */ private UnsafeArrayData project_arrayData; /* 013 */ private UnsafeRow project_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter project_arrayWriter; /* 017 */ /* 018 */ public GeneratedIterator(Object[] references) { /* 019 */ this.references = references; /* 020 */ } /* 021 */ /* 022 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 023 */ partitionIndex = index; /* 024 */ this.inputs = inputs; /* 025 */ inputadapter_input = inputs[0]; /* 026 */ serializefromobject_result = new UnsafeRow(1); /* 027 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 028 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 029 */ /* 030 */ project_result = new UnsafeRow(1); /* 031 */ this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32); /* 032 */ this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1); /* 033 */ this.project_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 034 */ /* 035 */ } /* 036 */ /* 037 */ protected void processNext() throws java.io.IOException { /* 038 */ while (inputadapter_input.hasNext()) { /* 039 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 040 */ double inputadapter_value = inputadapter_row.getDouble(0); /* 041 */ /* 042 */ byte[] project_array = new byte[32]; /* 043 */ project_arrayData = new UnsafeArrayData(); /* 044 */ Platform.putLong(project_array, 16, 2); /* 045 */ project_arrayData.pointTo(project_array, 16, 32); /* 046 */ /* 047 */ boolean project_isNull1 = false; /* 048 */ /* 049 */ double project_value1 = -1.0; /* 050 */ project_value1 = inputadapter_value + 1.1D; /* 051 */ if (false) { /* 052 */ project_arrayData.setNullAt(0); /* 053 */ } else { /* 054 */ project_arrayData.setDouble(0, project_value1); /* 055 */ } /* 056 */ /* 057 */ boolean project_isNull4 = false; /* 058 */ /* 059 */ double project_value4 = -1.0; /* 060 */ project_value4 = inputadapter_value + 2.2D; /* 061 */ if (false) { /* 062 */ project_arrayData.setNullAt(1); /* 063 */ } else { /* 064 */ project_arrayData.setDouble(1, project_value4); /* 065 */ } /* 066 */ project_holder.reset(); /* 067 */ /* 068 */ // Remember the current cursor so that we can calculate how many bytes are /* 069 */ // written later. /* 070 */ final int project_tmpCursor = project_holder.cursor; /* 071 */ /* 072 */ if (project_arrayData instanceof UnsafeArrayData) { /* 073 */ final int project_sizeInBytes = ((UnsafeArrayData) project_arrayData).getSizeInBytes(); /* 074 */ // grow the global buffer before writing data. /* 075 */ project_holder.grow(project_sizeInBytes); /* 076 */ ((UnsafeArrayData) project_arrayData).writeToMemory(project_holder.buffer, project_holder.cursor); /* 077 */ project_holder.cursor += project_sizeInBytes; /* 078 */ /* 079 */ } else { /* 080 */ final int project_numElements = project_arrayData.numElements(); /* 081 */ project_arrayWriter.initialize(project_holder, project_numElements, 8); /* 082 */ /* 083 */ for (int project_index = 0; project_index < project_numElements; project_index++) { /* 084 */ if (project_arrayData.isNullAt(project_index)) { /* 085 */ project_arrayWriter.setNullDouble(project_index); /* 086 */ } else { /* 087 */ final double project_element = project_arrayData.getDouble(project_index); /* 088 */ project_arrayWriter.write(project_index, project_element); /* 089 */ } /* 090 */ } /* 091 */ } /* 092 */ /* 093 */ project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor); /* 094 */ project_result.setTotalSize(project_holder.totalSize()); /* 095 */ append(project_result); /* 096 */ if (shouldStop()) return; /* 097 */ } /* 098 */ } /* 099 */ } ``` ## How was this patch tested? Added unit tests into `DataFrameComplexTypeSuite` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #13909 from kiszk/SPARK-16213.
* [SPARK-18999][SQL][MINOR] simplify Literal codegenWenchen Fan2016-12-274-35/+18
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? `Literal` can use `CodegenContex.addReferenceObj` to implement codegen, instead of `CodegenFallback`. This can also simplify the generated code a little bit, before we will generate: `((Expression) references[1]).eval(null)`, now it's just `references[1]`. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #16402 from cloud-fan/minor.
* [SPARK-18980][SQL] implement Aggregator with TypedImperativeAggregateWenchen Fan2016-12-264-15/+30
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently we implement `Aggregator` with `DeclarativeAggregate`, which will serialize/deserialize the buffer object every time we process an input. This PR implements `Aggregator` with `TypedImperativeAggregate` and avoids to serialize/deserialize buffer object many times. The benchmark shows we get about 2 times speed up. For simple buffer object that doesn't need serialization, we still go with `DeclarativeAggregate`, to avoid performance regression. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #16383 from cloud-fan/aggregator.
* [SPARK-18911][SQL] Define CatalogStatistics to interact with metastore and ↵wangzhenhua2016-12-242-5/+33
| | | | | | | | | | | | | | | | | | | convert it to Statistics in relations ## What changes were proposed in this pull request? Statistics in LogicalPlan should use attributes to refer to columns rather than column names, because two columns from two relations can have the same column name. But CatalogTable doesn't have the concepts of attribute or broadcast hint in Statistics. Therefore, putting Statistics in CatalogTable is confusing. We define a different statistic structure in CatalogTable, which is only responsible for interacting with metastore, and is converted to statistics in LogicalPlan when it is used. ## How was this patch tested? add test cases Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #16323 from wzhfy/nameToAttr.
* [SPARK-18973][SQL] Remove SortPartitions and RedistributeDataReynold Xin2016-12-227-54/+26
| | | | | | | | | | | | ## What changes were proposed in this pull request? SortPartitions and RedistributeData logical operators are not actually used and can be removed. Note that we do have a Sort operator (with global flag false) that subsumed SortPartitions. ## How was this patch tested? Also updated test cases to reflect the removal. Author: Reynold Xin <rxin@databricks.com> Closes #16381 from rxin/SPARK-18973.
* [SPARK-18234][SS] Made update mode publicTathagata Das2016-12-214-4/+15
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Made update mode public. As part of that here are the changes. - Update DatastreamWriter to accept "update" - Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst - Added update mode state removing with watermark to StateStoreSaveExec ## How was this patch tested? Added new tests in changed modules Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16360 from tdas/SPARK-18234.
* [SPARK-17807][CORE] split test-tags into test-JARRyan Williams2016-12-211-0/+12
| | | | | | | | | | Remove spark-tag's compile-scope dependency (and, indirectly, spark-core's compile-scope transitive-dependency) on scalatest by splitting test-oriented tags into spark-tags' test JAR. Alternative to #16303. Author: Ryan Williams <ryan.blake.williams@gmail.com> Closes #16311 from ryan-williams/tt.
* [SPARK-18899][SPARK-18912][SPARK-18913][SQL] refactor the error checking ↵Wenchen Fan2016-12-192-0/+47
| | | | | | | | | | | | | | | | | | | | | | when append data to an existing table ## What changes were proposed in this pull request? When we append data to an existing table with `DataFrameWriter.saveAsTable`, we will do various checks to make sure the appended data is consistent with the existing data. However, we get the information of the existing table by matching the table relation, instead of looking at the table metadata. This is error-prone, e.g. we only check the number of columns for `HadoopFsRelation`, we forget to check bucketing, etc. This PR refactors the error checking by looking at the metadata of the existing table, and fix several bugs: * SPARK-18899: We forget to check if the specified bucketing matched the existing table, which may lead to a problematic table that has different bucketing in different data files. * SPARK-18912: We forget to check the number of columns for non-file-based data source table * SPARK-18913: We don't support append data to a table with special column names. ## How was this patch tested? new regression test. Author: Wenchen Fan <wenchen@databricks.com> Closes #16313 from cloud-fan/bug1.
* [SPARK-18624][SQL] Implicit cast ArrayType(InternalType)jiangxingbo2016-12-195-49/+92
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently `ImplicitTypeCasts` doesn't handle casts between `ArrayType`s, this is not convenient, we should add a rule to enable casting from `ArrayType(InternalType)` to `ArrayType(newInternalType)`. Goals: 1. Add a rule to `ImplicitTypeCasts` to enable casting between `ArrayType`s; 2. Simplify `Percentile` and `ApproximatePercentile`. ## How was this patch tested? Updated test cases in `TypeCoercionSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16057 from jiangxb1987/implicit-cast-complex-types.
* [SPARK-18892][SQL] Alias percentile_approx approx_percentileReynold Xin2016-12-152-2/+3
| | | | | | | | | | | | ## What changes were proposed in this pull request? percentile_approx is the name used in Hive, and approx_percentile is the name used in Presto. approx_percentile is actually more consistent with our approx_count_distinct. Given the cost to alias SQL functions is low (one-liner), it'd be better to just alias them so it is easier to use. ## How was this patch tested? Technically I could add an end-to-end test to verify this one-line change, but it seemed too trivial to me. Author: Reynold Xin <rxin@databricks.com> Closes #16300 from rxin/SPARK-18892.
* [SPARK-18870] Disallowed Distinct Aggregations on Streaming DatasetsTathagata Das2016-12-152-2/+26
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Check whether Aggregation operators on a streaming subplan have aggregate expressions with isDistinct = true. ## How was this patch tested? Added unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16289 from tdas/SPARK-18870.
* [SPARK-17910][SQL] Allow users to update the comment of a columnjiangxingbo2016-12-151-2/+8
| | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Right now, once a user set the comment of a column with create table command, he/she cannot update the comment. It will be useful to provide a public interface (e.g. SQL) to do that. This PR implements the following SQL statement: ``` ALTER TABLE table [PARTITION partition_spec] CHANGE [COLUMN] column_old_name column_new_name column_dataType [COMMENT column_comment] [FIRST | AFTER column_name]; ``` For further expansion, we could support alter `name`/`dataType`/`index` of a column too. ## How was this patch tested? Add new test cases in `ExternalCatalogSuite` and `SessionCatalogSuite`. Add sql file test for `ALTER TABLE CHANGE COLUMN` statement. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #15717 from jiangxb1987/change-column.
* [SPARK-18869][SQL] Add TreeNode.p that returns BaseTypeReynold Xin2016-12-142-10/+9
| | | | | | | | | | | | ## What changes were proposed in this pull request? After the bug fix in SPARK-18854, TreeNode.apply now returns TreeNode[_] rather than a more specific type. It would be easier for interactive debugging to introduce a function that returns the BaseType. ## How was this patch tested? N/A - this is a developer only feature used for interactive debugging. As long as it compiles, it should be good to go. I tested this in spark-shell. Author: Reynold Xin <rxin@databricks.com> Closes #16288 from rxin/SPARK-18869.
* [SPARK-18854][SQL] numberedTreeString and apply(i) inconsistent for subqueriesReynold Xin2016-12-143-21/+36
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This is a bug introduced by subquery handling. numberedTreeString (which uses generateTreeString under the hood) numbers trees including innerChildren (used to print subqueries), but apply (which uses getNodeNumbered) ignores innerChildren. As a result, apply(i) would return the wrong plan node if there are subqueries. This patch fixes the bug. ## How was this patch tested? Added a test case in SubquerySuite.scala to test both the depth-first traversal of numbering as well as making sure the two methods are consistent. Author: Reynold Xin <rxin@databricks.com> Closes #16277 from rxin/SPARK-18854.
* [SPARK-18853][SQL] Project (UnaryNode) is way too aggressive in estimating ↵Reynold Xin2016-12-143-13/+13
| | | | | | | | | | | | | | statistics ## What changes were proposed in this pull request? This patch reduces the default number element estimation for arrays and maps from 100 to 1. The issue with the 100 number is that when nested (e.g. an array of map), 100 * 100 would be used as the default size. This sounds like just an overestimation which doesn't seem that bad (since it is usually better to overestimate than underestimate). However, due to the way we assume the size output for Project (new estimated column size / old estimated column size), this overestimation can become underestimation. It is actually in general in this case safer to assume 1 default element. ## How was this patch tested? This should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #16274 from rxin/SPARK-18853.
* [SPARK-18814][SQL] CheckAnalysis rejects TPCDS query 32Nattavut Sutyanyong2016-12-141-8/+23
| | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Move the checking of GROUP BY column in correlated scalar subquery from CheckAnalysis to Analysis to fix a regression caused by SPARK-18504. This problem can be reproduced with a simple script now. Seq((1,1)).toDF("pk","pv").createOrReplaceTempView("p") Seq((1,1)).toDF("ck","cv").createOrReplaceTempView("c") sql("select * from p,c where p.pk=c.ck and c.cv = (select avg(c1.cv) from c c1 where c1.ck = p.pk)").show The requirements are: 1. We need to reference the same table twice in both the parent and the subquery. Here is the table c. 2. We need to have a correlated predicate but to a different table. Here is from c (as c1) in the subquery to p in the parent. 3. We will then "deduplicate" c1.ck in the subquery to `ck#<n1>#<n2>` at `Project` above `Aggregate` of `avg`. Then when we compare `ck#<n1>#<n2>` and the original group by column `ck#<n1>` by their canonicalized form, which is #<n2> != #<n1>. That's how we trigger the exception added in SPARK-18504. ## How was this patch tested? SubquerySuite and a simplified version of TPCDS-Q32 Author: Nattavut Sutyanyong <nsy.can@gmail.com> Closes #16246 from nsyca/18814.
* [SPARK-18566][SQL] remove OverwriteOptionsWenchen Fan2016-12-144-37/+7
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? `OverwriteOptions` was introduced in https://github.com/apache/spark/pull/15705, to carry the information of static partitions. However, after further refactor, this information becomes duplicated and we can remove `OverwriteOptions`. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #15995 from cloud-fan/overwrite.
* [SPARK-18752][SQL] Follow-up: add scaladoc explaining isSrcLocal arg.Marcelo Vanzin2016-12-131-0/+12
| | | | | | Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16257 from vanzin/SPARK-18752.2.
* [SPARK-17932][SQL][FOLLOWUP] Change statement `SHOW TABLES EXTENDED` to ↵jiangxingbo2016-12-131-2/+4
| | | | | | | | | | | | | | | | | | | | | `SHOW TABLE EXTENDED` ## What changes were proposed in this pull request? Change the statement `SHOW TABLES [EXTENDED] [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards'] [PARTITION(partition_spec)]` to the following statements: - SHOW TABLES [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards'] - SHOW TABLE EXTENDED [(IN|FROM) database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)] After this change, the statements `SHOW TABLE/SHOW TABLES` have the same syntax with that HIVE has. ## How was this patch tested? Modified the test sql file `show-tables.sql`; Modified the test suite `DDLSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16262 from jiangxb1987/show-table-extended.
* [SPARK-18835][SQL] Don't expose Guava types in the JavaTypeInference API.Marcelo Vanzin2016-12-131-1/+11
| | | | | | | | This avoids issues during maven tests because of shading. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16260 from vanzin/SPARK-18835.
* [SPARK-18717][SQL] Make code generation for Scala Map work with ↵Andrew Ray2016-12-131-1/+1
| | | | | | | | | | | | | | | | immutable.Map also ## What changes were proposed in this pull request? Fixes compile errors in generated code when user has case class with a `scala.collections.immutable.Map` instead of a `scala.collections.Map`. Since ArrayBasedMapData.toScalaMap returns the immutable version we can make it work with both. ## How was this patch tested? Additional unit tests. Author: Andrew Ray <ray.andrew@gmail.com> Closes #16161 from aray/fix-map-codegen.
* [SPARK-18752][HIVE] isSrcLocal" value should be set from user query.Marcelo Vanzin2016-12-123-8/+14
| | | | | | | | | | | | | | | | | | | | | | | | | | The value of the "isSrcLocal" parameter passed to Hive's loadTable and loadPartition methods needs to be set according to the user query (e.g. "LOAD DATA LOCAL"), and not the current code that tries to guess what it should be. For existing versions of Hive the current behavior is probably ok, but some recent changes in the Hive code changed the semantics slightly, making code that sets "isSrcLocal" to "true" incorrectly to do the wrong thing. It would end up moving the parent directory of the files into the final location, instead of the file themselves, resulting in a table that cannot be read. I modified HiveCommandSuite so that existing "LOAD DATA" tests are run both in local and non-local mode, since the semantics are slightly different. The tests include a few new checks to make sure the semantics follow what Hive describes in its documentation. Tested with existing unit tests and also ran some Hive integration tests with a version of Hive containing the changes that surfaced the problem. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16179 from vanzin/SPARK-18752.
* [SQL][MINOR] simplify a test to fix the maven testsWenchen Fan2016-12-111-12/+7
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? After https://github.com/apache/spark/pull/15620 , all of the Maven-based 2.0 Jenkins jobs time out consistently. As I pointed out in https://github.com/apache/spark/pull/15620#discussion_r91829129 , it seems that the regression test is an overkill and may hit constants pool size limitation, which is a known issue and hasn't been fixed yet. Since #15620 only fix the code size limitation problem, we can simplify the test to avoid hitting constants pool size limitation. ## How was this patch tested? test only change Author: Wenchen Fan <wenchen@databricks.com> Closes #16244 from cloud-fan/minor.
* [SPARK-18815][SQL] Fix NPE when collecting column stats for string/binary ↵wangzhenhua2016-12-101-3/+6
| | | | | | | | | | | | | | | | column having only null values ## What changes were proposed in this pull request? During column stats collection, average and max length will be null if a column of string/binary type has only null values. To fix this, I use default size when avg/max length is null. ## How was this patch tested? Add a test for handling null columns Author: wangzhenhua <wangzhenhua@huawei.com> Closes #16243 from wzhfy/nullStats.
* [SPARK-17460][SQL] Make sure sizeInBytes in Statistics will not overflowHuaxin Gao2016-12-101-1/+2
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? 1. In SparkStrategies.canBroadcast, I will add the check plan.statistics.sizeInBytes >= 0 2. In LocalRelations.statistics, when calculate the statistics, I will change the size to BigInt so it won't overflow. ## How was this patch tested? I will add a test case to make sure the statistics.sizeInBytes won't overflow. Author: Huaxin Gao <huaxing@us.ibm.com> Closes #16175 from huaxingao/spark-17460.
* [MINOR][CORE][SQL][DOCS] Typo fixesJacek Laskowski2016-12-095-7/+7
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Typo fixes ## How was this patch tested? Local build. Awaiting the official build. Author: Jacek Laskowski <jacek@japila.pl> Closes #16144 from jaceklaskowski/typo-fixes.
* [SPARK-18654][SQL] Remove unreachable patterns in makeRootConverterNathan Howell2016-12-071-35/+21
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? `makeRootConverter` is only called with a `StructType` value. By making this method less general we can remove pattern matches, which are never actually hit outside of the test suite. ## How was this patch tested? The existing tests. Author: Nathan Howell <nhowell@godaddy.com> Closes #16084 from NathanHowell/SPARK-18654.
* [SPARK-17760][SQL] AnalysisException with dataframe pivot when groupBy ↵Andrew Ray2016-12-071-2/+3
| | | | | | | | | | | | | | | | column is not attribute ## What changes were proposed in this pull request? Fixes AnalysisException for pivot queries that have group by columns that are expressions and not attributes by substituting the expressions output attribute in the second aggregation and final projection. ## How was this patch tested? existing and additional unit tests Author: Andrew Ray <ray.andrew@gmail.com> Closes #16177 from aray/SPARK-17760.
* [SPARK-18634][SQL][TRIVIAL] Touch-up GenerateHerman van Hovell2016-12-061-1/+1
| | | | | | | | | | | | ## What changes were proposed in this pull request? I jumped the gun on merging https://github.com/apache/spark/pull/16120, and missed a tiny potential problem. This PR fixes that by changing a val into a def; this should prevent potential serialization/initialization weirdness from happening. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #16170 from hvanhovell/SPARK-18634.
* [SPARK-18572][SQL] Add a method `listPartitionNames` to `ExternalCatalog`Michael Allman2016-12-065-2/+125
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | (Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-18572) ## What changes were proposed in this pull request? Currently Spark answers the `SHOW PARTITIONS` command by fetching all of the table's partition metadata from the external catalog and constructing partition names therefrom. The Hive client has a `getPartitionNames` method which is many times faster for this purpose, with the performance improvement scaling with the number of partitions in a table. To test the performance impact of this PR, I ran the `SHOW PARTITIONS` command on two Hive tables with large numbers of partitions. One table has ~17,800 partitions, and the other has ~95,000 partitions. For the purposes of this PR, I'll call the former table `table1` and the latter table `table2`. I ran 5 trials for each table with before-and-after versions of this PR. The results are as follows: Spark at bdc8153, `SHOW PARTITIONS table1`, times in seconds: 7.901 3.983 4.018 4.331 4.261 Spark at bdc8153, `SHOW PARTITIONS table2` (Timed out after 10 minutes with a `SocketTimeoutException`.) Spark at this PR, `SHOW PARTITIONS table1`, times in seconds: 3.801 0.449 0.395 0.348 0.336 Spark at this PR, `SHOW PARTITIONS table2`, times in seconds: 5.184 1.63 1.474 1.519 1.41 Taking the best times from each trial, we get a 12x performance improvement for a table with ~17,800 partitions and at least a 426x improvement for a table with ~95,000 partitions. More significantly, the latter command doesn't even complete with the current code in master. This is actually a patch we've been using in-house at VideoAmp since Spark 1.1. It's made all the difference in the practical usability of our largest tables. Even with tables with about 1,000 partitions there's a performance improvement of about 2-3x. ## How was this patch tested? I added a unit test to `VersionsSuite` which tests that the Hive client's `getPartitionNames` method returns the correct number of partitions. Author: Michael Allman <michael@videoamp.com> Closes #15998 from mallman/spark-18572-list_partition_names.
* [SPARK-18634][PYSPARK][SQL] Corruption and Correctness issues with exploding ↵Liang-Chi Hsieh2016-12-051-6/+6
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Python UDFs ## What changes were proposed in this pull request? As reported in the Jira, there are some weird issues with exploding Python UDFs in SparkSQL. The following test code can reproduce it. Notice: the following test code is reported to return wrong results in the Jira. However, as I tested on master branch, it causes exception and so can't return any result. >>> from pyspark.sql.functions import * >>> from pyspark.sql.types import * >>> >>> df = spark.range(10) >>> >>> def return_range(value): ... return [(i, str(i)) for i in range(value - 1, value + 1)] ... >>> range_udf = udf(return_range, ArrayType(StructType([StructField("integer_val", IntegerType()), ... StructField("string_val", StringType())]))) >>> >>> df.select("id", explode(range_udf(df.id))).show() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/spark/python/pyspark/sql/dataframe.py", line 318, in show print(self._jdf.showString(n, 20)) File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.showString.: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:120) at org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:57) The cause of this issue is, in `ExtractPythonUDFs` we insert `BatchEvalPythonExec` to run PythonUDFs in batch. `BatchEvalPythonExec` will add extra outputs (e.g., `pythonUDF0`) to original plan. In above case, the original `Range` only has one output `id`. After `ExtractPythonUDFs`, the added `BatchEvalPythonExec` has two outputs `id` and `pythonUDF0`. Because the output of `GenerateExec` is given after analysis phase, in above case, it is the combination of `id`, i.e., the output of `Range`, and `col`. But in planning phase, we change `GenerateExec`'s child plan to `BatchEvalPythonExec` with additional output attributes. It will cause no problem in non wholestage codegen. Because when evaluating the additional attributes are projected out the final output of `GenerateExec`. However, as `GenerateExec` now supports wholestage codegen, the framework will input all the outputs of the child plan to `GenerateExec`. Then when consuming `GenerateExec`'s output data (i.e., calling `consume`), the number of output attributes is different to the output variables in wholestage codegen. To solve this issue, this patch only gives the generator's output to `GenerateExec` after analysis phase. `GenerateExec`'s output is the combination of its child plan's output and the generator's output. So when we change `GenerateExec`'s child, its output is still correct. ## How was this patch tested? Added test cases to PySpark. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #16120 from viirya/fix-py-udf-with-generator.
* [SPARK-18711][SQL] should disable subexpression elimination for LambdaVariableWenchen Fan2016-12-051-1/+5
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This is kind of a long-standing bug, it's hidden until https://github.com/apache/spark/pull/15780 , which may add `AssertNotNull` on top of `LambdaVariable` and thus enables subexpression elimination. However, subexpression elimination will evaluate the common expressions at the beginning, which is invalid for `LambdaVariable`. `LambdaVariable` usually represents loop variable, which can't be evaluated ahead of the loop. This PR skips expressions containing `LambdaVariable` when doing subexpression elimination. ## How was this patch tested? updated test in `DatasetAggregatorSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #16143 from cloud-fan/aggregator.
* [SPARK-18702][SQL] input_file_block_start and input_file_block_lengthReynold Xin2016-12-043-49/+96
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? We currently have function input_file_name to get the path of the input file, but don't have functions to get the block start offset and length. This patch introduces two functions: 1. input_file_block_start: returns the file block start offset, or -1 if not available. 2. input_file_block_length: returns the file block length, or -1 if not available. ## How was this patch tested? Updated existing test cases in ColumnExpressionSuite that covered input_file_name to also cover the two new functions. Author: Reynold Xin <rxin@databricks.com> Closes #16133 from rxin/SPARK-18702.
* [SPARK-18091][SQL] Deep if expressions cause Generated ↵Kapil Singh2016-12-042-13/+90
| | | | | | | | | | | | | | | | | SpecificUnsafeProjection code to exceed JVM code size limit ## What changes were proposed in this pull request? Fix for SPARK-18091 which is a bug related to large if expressions causing generated SpecificUnsafeProjection code to exceed JVM code size limit. This PR changes if expression's code generation to place its predicate, true value and false value expressions' generated code in separate methods in context so as to never generate too long combined code. ## How was this patch tested? Added a unit test and also tested manually with the application (having transformations similar to the unit test) which caused the issue to be identified in the first place. Author: Kapil Singh <kapsingh@adobe.com> Closes #15620 from kapilsingh5050/SPARK-18091-IfCodegenFix.
* [SPARK-18582][SQL] Whitelist LogicalPlan operators allowed in correlated ↵Nattavut Sutyanyong2016-12-033-53/+111
| | | | | | | | | | | | | | | | subqueries ## What changes were proposed in this pull request? This fix puts an explicit list of operators that Spark supports for correlated subqueries. ## How was this patch tested? Run sql/test, catalyst/test and add a new test case on Generate. Author: Nattavut Sutyanyong <nsy.can@gmail.com> Closes #16046 from nsyca/spark18455.0.
* [SPARK-18695] Bump master branch version to 2.2.0-SNAPSHOTReynold Xin2016-12-021-1/+1
| | | | | | | | | | | | ## What changes were proposed in this pull request? This patch bumps master branch version to 2.2.0-SNAPSHOT. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #16126 from rxin/SPARK-18695.
* [SPARK-18677] Fix parsing ['key'] in JSON path expressions.Ryan Blue2016-12-022-1/+25
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This fixes the parser rule to match named expressions, which doesn't work for two reasons: 1. The name match is not coerced to a regular expression (missing .r) 2. The surrounding literals are incorrect and attempt to escape a single quote, which is unnecessary ## How was this patch tested? This adds test cases for named expressions using the bracket syntax, including one with quoted spaces. Author: Ryan Blue <blue@apache.org> Closes #16107 from rdblue/SPARK-18677-fix-json-path.
* [SPARK-18674][SQL][FOLLOW-UP] improve the error message of using joingatorsmile2016-12-022-7/+17
| | | | | | | | | | | | ### What changes were proposed in this pull request? Added a test case for using joins with nested fields. ### How was this patch tested? N/A Author: gatorsmile <gatorsmile@gmail.com> Closes #16110 from gatorsmile/followup-18674.
* [SPARK-18659][SQL] Incorrect behaviors in overwrite table for datasource tablesEric Liang2016-12-025-22/+44
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Two bugs are addressed here 1. INSERT OVERWRITE TABLE sometime crashed when catalog partition management was enabled. This was because when dropping partitions after an overwrite operation, the Hive client will attempt to delete the partition files. If the entire partition directory was dropped, this would fail. The PR fixes this by adding a flag to control whether the Hive client should attempt to delete files. 2. The static partition spec for OVERWRITE TABLE was not correctly resolved to the case-sensitive original partition names. This resulted in the entire table being overwritten if you did not correctly capitalize your partition names. cc yhuai cloud-fan ## How was this patch tested? Unit tests. Surprisingly, the existing overwrite table tests did not catch these edge cases. Author: Eric Liang <ekl@databricks.com> Closes #16088 from ericl/spark-18659.
* [SPARK-18658][SQL] Write text records directly to a FileOutputStreamNathan Howell2016-12-011-0/+4
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering. The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric. ## How was this patch tested? Existing unit tests. Author: Nathan Howell <nhowell@godaddy.com> Closes #16089 from NathanHowell/SPARK-18658.
* [SPARK-18663][SQL] Simplify CountMinSketch aggregate implementationReynold Xin2016-12-013-230/+103
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? SPARK-18429 introduced count-min sketch aggregate function for SQL, but the implementation and testing is more complicated than needed. This simplifies the test cases and removes support for data types that don't have clear equality semantics: 1. Removed support for floating point and decimal types. 2. Removed the heavy randomized tests. The underlying CountMinSketch implementation already had pretty good test coverage through randomized tests, and the SPARK-18429 implementation is just to add an aggregate function wrapper around CountMinSketch. There is no need for randomized tests at three different levels of the implementations. ## How was this patch tested? A lot of the change is to simplify test cases. Author: Reynold Xin <rxin@databricks.com> Closes #16093 from rxin/SPARK-18663.
* [SPARK-18284][SQL] Make ExpressionEncoder.serializer.nullable preciseKazuaki Ishizaki2016-12-026-19/+44
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR makes `ExpressionEncoder.serializer.nullable` for flat encoder for a primitive type `false`. Since it is `true` for now, it is too conservative. While `ExpressionEncoder.schema` has correct information (e.g. `<IntegerType, false>`), `serializer.head.nullable` of `ExpressionEncoder`, which got from `encoderFor[T]`, is always false. It is too conservative. This is accomplished by checking whether a type is one of primitive types. If it is `true`, `nullable` should be `false`. ## How was this patch tested? Added new tests for encoder and dataframe Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #15780 from kiszk/SPARK-18284.
* [SPARK-18674][SQL] improve the error message of using joinWenchen Fan2016-12-016-57/+33
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? The current error message of USING join is quite confusing, for example: ``` scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1") df1: org.apache.spark.sql.DataFrame = [c1: int] scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2") df2: org.apache.spark.sql.DataFrame = [c2: int] scala> df1.join(df2, usingColumn = "c1") org.apache.spark.sql.AnalysisException: using columns ['c1] can not be resolved given input columns: [c1, c2] ;; 'Join UsingJoin(Inner,List('c1)) :- Project [value#1 AS c1#3] : +- LocalRelation [value#1] +- Project [value#7 AS c2#9] +- LocalRelation [value#7] ``` after this PR, it becomes: ``` scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1") df1: org.apache.spark.sql.DataFrame = [c1: int] scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2") df2: org.apache.spark.sql.DataFrame = [c2: int] scala> df1.join(df2, usingColumn = "c1") org.apache.spark.sql.AnalysisException: USING column `c1` can not be resolved with the right join side, the right output is: [c2]; ``` ## How was this patch tested? updated tests Author: Wenchen Fan <wenchen@databricks.com> Closes #16100 from cloud-fan/natural.
* [SPARK-18635][SQL] Partition name/values not escaped correctly in some casesEric Liang2016-12-011-0/+3
| | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Due to confusion between URI vs paths, in certain cases we escape partition values too many times, which causes some Hive client operations to fail or write data to the wrong location. This PR fixes at least some of these cases. To my understanding this is how values, filesystem paths, and URIs interact. - Hive stores raw (unescaped) partition values that are returned to you directly when you call listPartitions. - Internally, we convert these raw values to filesystem paths via `ExternalCatalogUtils.[un]escapePathName`. - In some circumstances we store URIs instead of filesystem paths. When a path is converted to a URI via `path.toURI`, the escaped partition values are further URI-encoded. This means that to get a path back from a URI, you must call `new Path(new URI(uriTxt))` in order to decode the URI-encoded string. - In `CatalogStorageFormat` we store URIs as strings. This makes it easy to forget to URI-decode the value before converting it into a path. - Finally, the Hive client itself uses mostly Paths for representing locations, and only URIs occasionally. In the future we should probably clean this up, perhaps by dropping use of URIs when unnecessary. We should also try fixing escaping for partition names as well as values, though names are unlikely to contain special characters. cc mallman cloud-fan yhuai ## How was this patch tested? Unit tests. Author: Eric Liang <ekl@databricks.com> Closes #16071 from ericl/spark-18635.
* [SPARK-18251][SQL] the type of Dataset can't be Option of non-flat typeWenchen Fan2016-11-302-2/+25
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL doesn't allow the entire row to be null, only its columns can be null. That's the reason we forbid users to use top level null objects in https://github.com/apache/spark/pull/13469 However, if users wrap non-flat type with `Option`, then we may still encoder top level null object to row, which is not allowed. This PR fixes this case, and suggests users to wrap their type with `Tuple1` if they do wanna top level null objects. ## How was this patch tested? new test Author: Wenchen Fan <wenchen@databricks.com> Closes #15979 from cloud-fan/option.
* [SPARK-17932][SQL] Support SHOW TABLES EXTENDED LIKE ↵jiangxingbo2016-11-301-2/+2
| | | | | | | | | | | | | | | | | | | | 'identifier_with_wildcards' statement ## What changes were proposed in this pull request? Currently we haven't implemented `SHOW TABLE EXTENDED` in Spark 2.0. This PR is to implement the statement. Goals: 1. Support `SHOW TABLES EXTENDED LIKE 'identifier_with_wildcards'`; 2. Explicitly output an unsupported error message for `SHOW TABLES [EXTENDED] ... PARTITION` statement; 3. Improve test cases for `SHOW TABLES` statement. ## How was this patch tested? 1. Add new test cases in file `show-tables.sql`. 2. Modify tests for `SHOW TABLES` in `DDLSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #15958 from jiangxb1987/show-table-extended.
* [SPARK-17897][SQL] Fixed IsNotNull Constraint Inference Rulegatorsmile2016-11-302-7/+29
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? The `constraints` of an operator is the expressions that evaluate to `true` for all the rows produced. That means, the expression result should be neither `false` nor `unknown` (NULL). Thus, we can conclude that `IsNotNull` on all the constraints, which are generated by its own predicates or propagated from the children. The constraint can be a complex expression. For better usage of these constraints, we try to push down `IsNotNull` to the lowest-level expressions (i.e., `Attribute`). `IsNotNull` can be pushed through an expression when it is null intolerant. (When the input is NULL, the null-intolerant expression always evaluates to NULL.) Below is the existing code we have for `IsNotNull` pushdown. ```Scala private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match { case a: Attribute => Seq(a) case _: NullIntolerant | IsNotNull(_: NullIntolerant) => expr.children.flatMap(scanNullIntolerantExpr) case _ => Seq.empty[Attribute] } ``` **`IsNotNull` itself is not null-intolerant.** It converts `null` to `false`. If the expression does not include any `Not`-like expression, it works; otherwise, it could generate a wrong result. This PR is to fix the above function by removing the `IsNotNull` from the inference. After the fix, when a constraint has a `IsNotNull` expression, we infer new attribute-specific `IsNotNull` constraints if and only if `IsNotNull` appears in the root. Without the fix, the following test case will return empty. ```Scala val data = Seq[java.lang.Integer](1, null).toDF("key") data.filter("not key is not null").show() ``` Before the fix, the optimized plan is like ``` == Optimized Logical Plan == Project [value#1 AS key#3] +- Filter (isnotnull(value#1) && NOT isnotnull(value#1)) +- LocalRelation [value#1] ``` After the fix, the optimized plan is like ``` == Optimized Logical Plan == Project [value#1 AS key#3] +- Filter NOT isnotnull(value#1) +- LocalRelation [value#1] ``` ### How was this patch tested? Added a test Author: gatorsmile <gatorsmile@gmail.com> Closes #16067 from gatorsmile/isNotNull2.
* [SPARK-18622][SQL] Fix the datatype of the Sum aggregate functionHerman van Hovell2016-11-301-3/+3
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? The result of a `sum` aggregate function is typically a Decimal, Double or a Long. Currently the output dataType is based on input's dataType. The `FunctionArgumentConversion` rule will make sure that the input is promoted to the largest type, and that also ensures that the output uses a (hopefully) sufficiently large output dataType. The issue is that sum is in a resolved state when we cast the input type, this means that rules assuming that the dataType of the expression does not change anymore could have been applied in the mean time. This is what happens if we apply `WidenSetOperationTypes` before applying the casts, and this breaks analysis. The most straight forward and future proof solution is to make `sum` always output the widest dataType in its class (Long for IntegralTypes, Decimal for DecimalTypes & Double for FloatType and DoubleType). This PR implements that solution. We should move expression specific type casting rules into the given Expression at some point. ## How was this patch tested? Added (regression) tests to SQLQueryTestSuite's `union.sql`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #16063 from hvanhovell/SPARK-18622.
* [SPARK-18632][SQL] AggregateFunction should not implement ImplicitCastInputTypesHerman van Hovell2016-11-2918-48/+57
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? `AggregateFunction` currently implements `ImplicitCastInputTypes` (which enables implicit input type casting). There are actually quite a few situations in which we don't need this, or require more control over our input. A recent example is the aggregate for `CountMinSketch` which should only take string, binary or integral types inputs. This PR removes `ImplicitCastInputTypes` from the `AggregateFunction` and makes a case-by-case decision on what kind of input validation we should use. ## How was this patch tested? Refactoring only. Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #16066 from hvanhovell/SPARK-18632.
* [SPARK-18614][SQL] Incorrect predicate pushdown from ExistenceJoinNattavut Sutyanyong2016-11-292-2/+19
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? ExistenceJoin should be treated the same as LeftOuter and LeftAnti, not InnerLike and LeftSemi. This is not currently exposed because the rewrite of [NOT] EXISTS OR ... to ExistenceJoin happens in rule RewritePredicateSubquery, which is in a separate rule set and placed after the rule PushPredicateThroughJoin. During the transformation in the rule PushPredicateThroughJoin, an ExistenceJoin never exists. The semantics of ExistenceJoin says we need to preserve all the rows from the left table through the join operation as if it is a regular LeftOuter join. The ExistenceJoin augments the LeftOuter operation with a new column called exists, set to true when the join condition in the ON clause is true and false otherwise. The filter of any rows will happen in the Filter operation above the ExistenceJoin. Example: A(c1, c2): { (1, 1), (1, 2) } // B can be any value as it is irrelevant in this example B(c1): { (NULL) } select A.* from A where exists (select 1 from B where A.c1 = A.c2) or A.c2=2 In this example, the correct result is all the rows from A. If the pattern ExistenceJoin around line 935 in Optimizer.scala is indeed active, the code will push down the predicate A.c1 = A.c2 to be a Filter on relation A, which will incorrectly filter the row (1,2) from A. ## How was this patch tested? Since this is not an exposed case, no new test cases is added. The scenario is discovered via a code review of another PR and confirmed to be valid with peer. Author: Nattavut Sutyanyong <nsy.can@gmail.com> Closes #16044 from nsyca/spark-18614.