aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-20253][SQL] Remove unnecessary nullchecks of a return value from ↵Kazuaki Ishizaki2017-04-103-33/+41
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Spark runtime routines in generated Java code ## What changes were proposed in this pull request? This PR elminates unnecessary nullchecks of a return value from known Spark runtime routines. We know whether a given Spark runtime routine returns ``null`` or not (e.g. ``ArrayData.toDoubleArray()`` never returns ``null``). Thus, we can eliminate a null check for the return value from the Spark runtime routine. When we run the following example program, now we get the Java code "Without this PR". In this code, since we know ``ArrayData.toDoubleArray()`` never returns ``null```, we can eliminate null checks at lines 90-92, and 97. ```java val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache ds.count ds.map(e => e).show ``` Without this PR ```java /* 050 */ protected void processNext() throws java.io.IOException { /* 051 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 052 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 053 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 054 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0)); /* 055 */ /* 056 */ ArrayData deserializetoobject_value1 = null; /* 057 */ /* 058 */ if (!inputadapter_isNull) { /* 059 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 060 */ /* 061 */ Double[] deserializetoobject_convertedArray = null; /* 062 */ deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength]; /* 063 */ /* 064 */ int deserializetoobject_loopIndex = 0; /* 065 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 066 */ MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex)); /* 067 */ MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 068 */ /* 069 */ if (MapObjects_loopIsNull2) { /* 070 */ throw new RuntimeException(((java.lang.String) references[0])); /* 071 */ } /* 072 */ if (false) { /* 073 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 074 */ } else { /* 075 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2; /* 076 */ } /* 077 */ /* 078 */ deserializetoobject_loopIndex += 1; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/ /* 082 */ } /* 083 */ boolean deserializetoobject_isNull = true; /* 084 */ double[] deserializetoobject_value = null; /* 085 */ if (!inputadapter_isNull) { /* 086 */ deserializetoobject_isNull = false; /* 087 */ if (!deserializetoobject_isNull) { /* 088 */ Object deserializetoobject_funcResult = null; /* 089 */ deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray(); /* 090 */ if (deserializetoobject_funcResult == null) { /* 091 */ deserializetoobject_isNull = true; /* 092 */ } else { /* 093 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult; /* 094 */ } /* 095 */ /* 096 */ } /* 097 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 098 */ } /* 099 */ /* 100 */ boolean mapelements_isNull = true; /* 101 */ double[] mapelements_value = null; /* 102 */ if (!false) { /* 103 */ mapelements_resultIsNull = false; /* 104 */ /* 105 */ if (!mapelements_resultIsNull) { /* 106 */ mapelements_resultIsNull = deserializetoobject_isNull; /* 107 */ mapelements_argValue = deserializetoobject_value; /* 108 */ } /* 109 */ /* 110 */ mapelements_isNull = mapelements_resultIsNull; /* 111 */ if (!mapelements_isNull) { /* 112 */ Object mapelements_funcResult = null; /* 113 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 114 */ if (mapelements_funcResult == null) { /* 115 */ mapelements_isNull = true; /* 116 */ } else { /* 117 */ mapelements_value = (double[]) mapelements_funcResult; /* 118 */ } /* 119 */ /* 120 */ } /* 121 */ mapelements_isNull = mapelements_value == null; /* 122 */ } /* 123 */ /* 124 */ serializefromobject_resultIsNull = false; /* 125 */ /* 126 */ if (!serializefromobject_resultIsNull) { /* 127 */ serializefromobject_resultIsNull = mapelements_isNull; /* 128 */ serializefromobject_argValue = mapelements_value; /* 129 */ } /* 130 */ /* 131 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull; /* 132 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue); /* 133 */ serializefromobject_isNull = serializefromobject_value == null; /* 134 */ serializefromobject_holder.reset(); /* 135 */ /* 136 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 137 */ /* 138 */ if (serializefromobject_isNull) { /* 139 */ serializefromobject_rowWriter.setNullAt(0); /* 140 */ } else { /* 141 */ // Remember the current cursor so that we can calculate how many bytes are /* 142 */ // written later. /* 143 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 144 */ /* 145 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 146 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 147 */ // grow the global buffer before writing data. /* 148 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 149 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 150 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 151 */ /* 152 */ } else { /* 153 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 154 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8); /* 155 */ /* 156 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 157 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 158 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index); /* 159 */ } else { /* 160 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index); /* 161 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 162 */ } /* 163 */ } /* 164 */ } /* 165 */ /* 166 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 167 */ } /* 168 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 169 */ append(serializefromobject_result); /* 170 */ if (shouldStop()) return; /* 171 */ } /* 172 */ } ``` With this PR (removed most of lines 90-97 in the above code) ```java /* 050 */ protected void processNext() throws java.io.IOException { /* 051 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 052 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 053 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 054 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0)); /* 055 */ /* 056 */ ArrayData deserializetoobject_value1 = null; /* 057 */ /* 058 */ if (!inputadapter_isNull) { /* 059 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 060 */ /* 061 */ Double[] deserializetoobject_convertedArray = null; /* 062 */ deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength]; /* 063 */ /* 064 */ int deserializetoobject_loopIndex = 0; /* 065 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 066 */ MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex)); /* 067 */ MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 068 */ /* 069 */ if (MapObjects_loopIsNull2) { /* 070 */ throw new RuntimeException(((java.lang.String) references[0])); /* 071 */ } /* 072 */ if (false) { /* 073 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 074 */ } else { /* 075 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2; /* 076 */ } /* 077 */ /* 078 */ deserializetoobject_loopIndex += 1; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/ /* 082 */ } /* 083 */ boolean deserializetoobject_isNull = true; /* 084 */ double[] deserializetoobject_value = null; /* 085 */ if (!inputadapter_isNull) { /* 086 */ deserializetoobject_isNull = false; /* 087 */ if (!deserializetoobject_isNull) { /* 088 */ Object deserializetoobject_funcResult = null; /* 089 */ deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray(); /* 090 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult; /* 091 */ /* 092 */ } /* 093 */ /* 094 */ } /* 095 */ /* 096 */ boolean mapelements_isNull = true; /* 097 */ double[] mapelements_value = null; /* 098 */ if (!false) { /* 099 */ mapelements_resultIsNull = false; /* 100 */ /* 101 */ if (!mapelements_resultIsNull) { /* 102 */ mapelements_resultIsNull = deserializetoobject_isNull; /* 103 */ mapelements_argValue = deserializetoobject_value; /* 104 */ } /* 105 */ /* 106 */ mapelements_isNull = mapelements_resultIsNull; /* 107 */ if (!mapelements_isNull) { /* 108 */ Object mapelements_funcResult = null; /* 109 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 110 */ if (mapelements_funcResult == null) { /* 111 */ mapelements_isNull = true; /* 112 */ } else { /* 113 */ mapelements_value = (double[]) mapelements_funcResult; /* 114 */ } /* 115 */ /* 116 */ } /* 117 */ mapelements_isNull = mapelements_value == null; /* 118 */ } /* 119 */ /* 120 */ serializefromobject_resultIsNull = false; /* 121 */ /* 122 */ if (!serializefromobject_resultIsNull) { /* 123 */ serializefromobject_resultIsNull = mapelements_isNull; /* 124 */ serializefromobject_argValue = mapelements_value; /* 125 */ } /* 126 */ /* 127 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull; /* 128 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue); /* 129 */ serializefromobject_isNull = serializefromobject_value == null; /* 130 */ serializefromobject_holder.reset(); /* 131 */ /* 132 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 133 */ /* 134 */ if (serializefromobject_isNull) { /* 135 */ serializefromobject_rowWriter.setNullAt(0); /* 136 */ } else { /* 137 */ // Remember the current cursor so that we can calculate how many bytes are /* 138 */ // written later. /* 139 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 140 */ /* 141 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 142 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 143 */ // grow the global buffer before writing data. /* 144 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 145 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 146 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 147 */ /* 148 */ } else { /* 149 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 150 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8); /* 151 */ /* 152 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 153 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 154 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index); /* 155 */ } else { /* 156 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index); /* 157 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 158 */ } /* 159 */ } /* 160 */ } /* 161 */ /* 162 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 163 */ } /* 164 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 165 */ append(serializefromobject_result); /* 166 */ if (shouldStop()) return; /* 167 */ } /* 168 */ } ``` ## How was this patch tested? Add test suites to ``DatasetPrimitiveSuite`` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17569 from kiszk/SPARK-20253.
* [SPARK-20262][SQL] AssertNotNull should throw NullPointerExceptionReynold Xin2017-04-071-3/+3
| | | | | | | | | | | | ## What changes were proposed in this pull request? AssertNotNull currently throws RuntimeException. It should throw NullPointerException, which is more specific. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #17573 from rxin/SPARK-20262.
* [SPARK-20246][SQL] should not push predicate down through aggregate with ↵Wenchen Fan2017-04-072-33/+68
| | | | | | | | | | | | | | | | non-deterministic expressions ## What changes were proposed in this pull request? Similar to `Project`, when `Aggregate` has non-deterministic expressions, we should not push predicate down through it, as it will change the number of input rows and thus change the evaluation result of non-deterministic expressions in `Aggregate`. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17562 from cloud-fan/filter.
* [SPARK-20245][SQL][MINOR] pass output to LogicalRelation directlyWenchen Fan2017-04-071-4/+4
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently `LogicalRelation` has a `expectedOutputAttributes` parameter, which makes it hard to reason about what the actual output is. Like other leaf nodes, `LogicalRelation` should also take `output` as a parameter, to simplify the logic ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17552 from cloud-fan/minor.
* [SPARK-19495][SQL] Make SQLConf slightly more extensible - addendumReynold Xin2017-04-061-1/+1
| | | | | | | | | | | | ## What changes were proposed in this pull request? This is a tiny addendum to SPARK-19495 to remove the private visibility for copy, which is the only package private method in the entire file. ## How was this patch tested? N/A - no semantic change. Author: Reynold Xin <rxin@databricks.com> Closes #17555 from rxin/SPARK-19495-2.
* [SPARK-20231][SQL] Refactor star schema code for the subsequent star join ↵Ioana Delaney2017-04-053-329/+354
| | | | | | | | | | | | | | | detection in CBO ## What changes were proposed in this pull request? This commit moves star schema code from ```join.scala``` to ```StarSchemaDetection.scala```. It also applies some minor fixes in ```StarJoinReorderSuite.scala```. ## How was this patch tested? Run existing ```StarJoinReorderSuite.scala```. Author: Ioana Delaney <ioanamdelaney@gmail.com> Closes #17544 from ioana-delaney/starSchemaCBOv2.
* [SPARK-20204][SQL][FOLLOWUP] SQLConf should react to change in default ↵Dilip Biswal2017-04-061-1/+1
| | | | | | | | | | | | | | timezone settings ## What changes were proposed in this pull request? Make sure SESSION_LOCAL_TIMEZONE reflects the change in JVM's default timezone setting. Currently several timezone related tests fail as the change to default timezone is not picked up by SQLConf. ## How was this patch tested? Added an unit test in ConfigEntrySuite Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #17537 from dilipbiswal/timezone_debug.
* [SPARK-19716][SQL] support by-name resolution for struct type elements in arrayWenchen Fan2017-04-045-40/+131
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Previously when we construct deserializer expression for array type, we will first cast the corresponding field to expected array type and then apply `MapObjects`. However, by doing that, we lose the opportunity to do by-name resolution for struct type inside array type. In this PR, I introduce a `UnresolvedMapObjects` to hold the lambda function and the input array expression. Then during analysis, after the input array expression is resolved, we get the actual array element type and apply by-name resolution. Then we don't need to add `Cast` for array type when constructing the deserializer expression, as the element type is determined later at analyzer. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17398 from cloud-fan/dataset.
* [SPARK-20204][SQL] remove SimpleCatalystConf and CatalystConf type aliasWenchen Fan2017-04-0448-204/+148
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/17285 . ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17521 from cloud-fan/conf.
* [SPARK-20198][SQL] Remove the inconsistency in table/function name ↵Xiao Li2017-04-044-2/+31
| | | | | | | | | | | | | | | | | | | | | conventions in SparkSession.Catalog APIs ### What changes were proposed in this pull request? Observed by felixcheung , in `SparkSession`.`Catalog` APIs, we have different conventions/rules for table/function identifiers/names. Most APIs accept the qualified name (i.e., `databaseName`.`tableName` or `databaseName`.`functionName`). However, the following five APIs do not accept it. - def listColumns(tableName: String): Dataset[Column] - def getTable(tableName: String): Table - def getFunction(functionName: String): Function - def tableExists(tableName: String): Boolean - def functionExists(functionName: String): Boolean To make them consistent with the other Catalog APIs, this PR does the changes, updates the function/API comments and adds the `params` to clarify the inputs we allow. ### How was this patch tested? Added the test cases . Author: Xiao Li <gatorsmile@gmail.com> Closes #17518 from gatorsmile/tableIdentifier.
* [SPARK-20067][SQL] Unify and Clean Up Desc Commands Using Catalog InterfaceXiao Li2017-04-031-49/+87
| | | | | | | | | | | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? This PR is to unify and clean up the outputs of `DESC EXTENDED/FORMATTED` and `SHOW TABLE EXTENDED` by moving the logics into the Catalog interface. The output formats are improved. We also add the missing attributes. It impacts the DDL commands like `SHOW TABLE EXTENDED`, `DESC EXTENDED` and `DESC FORMATTED`. In addition, by following what we did in Dataset API `printSchema`, we can use `treeString` to show the schema in the more readable way. Below is the current way: ``` Schema: STRUCT<`a`: STRING (nullable = true), `b`: INT (nullable = true), `c`: STRING (nullable = true), `d`: STRING (nullable = true)> ``` After the change, it should look like ``` Schema: root |-- a: string (nullable = true) |-- b: integer (nullable = true) |-- c: string (nullable = true) |-- d: string (nullable = true) ``` ### How was this patch tested? `describe.sql` and `show-tables.sql` Author: Xiao Li <gatorsmile@gmail.com> Closes #17394 from gatorsmile/descFollowUp.
* [SPARK-10364][SQL] Support Parquet logical type TIMESTAMP_MILLISDilip Biswal2017-04-042-0/+28
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? **Description** from JIRA The TimestampType in Spark SQL is of microsecond precision. Ideally, we should convert Spark SQL timestamp values into Parquet TIMESTAMP_MICROS. But unfortunately parquet-mr hasn't supported it yet. For the read path, we should be able to read TIMESTAMP_MILLIS Parquet values and pad a 0 microsecond part to read values. For the write path, currently we are writing timestamps as INT96, similar to Impala and Hive. One alternative is that, we can have a separate SQL option to let users be able to write Spark SQL timestamp values as TIMESTAMP_MILLIS. Of course, in this way the microsecond part will be truncated. ## How was this patch tested? Added new tests in ParquetQuerySuite and ParquetIOSuite Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #15332 from dilipbiswal/parquet-time-millis.
* [SPARK-19408][SQL] filter estimation on two columns of same tableRon Hu2017-04-032-10/+363
| | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? In SQL queries, we also see predicate expressions involving two columns such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. Note that, if column-1 and column-2 belong to different tables, then it is a join operator's work, NOT a filter operator's work. This PR estimates filter selectivity on two columns of same table. For example, multiple tpc-h queries have this predicate "WHERE l_commitdate < l_receiptdate" ## How was this patch tested? We added 6 new test cases to test various logical predicates involving two columns of same table. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Ron Hu <ron.hu@huawei.com> Author: U-CHINA\r00754707 <r00754707@R00754707-SC04.china.huawei.com> Closes #17415 from ron8hu/filterTwoColumns.
* [SPARK-20145] Fix range case insensitive bug in SQLsamelamin2017-04-031-3/+1
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Range in SQL should be case insensitive ## How was this patch tested? unit test Author: samelamin <hussam.elamin@gmail.com> Author: samelamin <sam_elamin@discovery.com> Closes #17487 from samelamin/SPARK-20145.
* [SPARK-20194] Add support for partition pruning to in-memory catalogAdrian Ionescu2017-04-033-4/+78
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch implements `listPartitionsByFilter()` for `InMemoryCatalog` and thus resolves an outstanding TODO causing the `PruneFileSourcePartitions` optimizer rule not to apply when "spark.sql.catalogImplementation" is set to "in-memory" (which is the default). The change is straightforward: it extracts the code for further filtering of the list of partitions returned by the metastore's `getPartitionsByFilter()` out from `HiveExternalCatalog` into `ExternalCatalogUtils` and calls this new function from `InMemoryCatalog` on the whole list of partitions. Now that this method is implemented we can always pass the `CatalogTable` to the `DataSource` in `FindDataSourceTable`, so that the latter is resolved to a relation with a `CatalogFileIndex`, which is what the `PruneFileSourcePartitions` rule matches for. ## How was this patch tested? Ran existing tests and added new test for `listPartitionsByFilter` in `ExternalCatalogSuite`, which is subclassed by both `InMemoryCatalogSuite` and `HiveExternalCatalogSuite`. Author: Adrian Ionescu <adrian@databricks.com> Closes #17510 from adrian-ionescu/InMemoryCatalog.
* [SPARK-20166][SQL] Use XXX for ISO 8601 timezone instead of ZZ ↵hyukjinkwon2017-04-031-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | (FastDateFormat specific) in CSV/JSON timeformat options ## What changes were proposed in this pull request? This PR proposes to use `XXX` format instead of `ZZ`. `ZZ` seems a `FastDateFormat` specific. `ZZ` supports "ISO 8601 extended format time zones" but it seems `FastDateFormat` specific option. I misunderstood this is compatible format with `SimpleDateFormat` when this change is introduced. Please see [SimpleDateFormat documentation]( https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html#iso8601timezone) and [FastDateFormat documentation](https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/time/FastDateFormat.html). It seems we better replace `ZZ` to `XXX` because they look using the same strategy - [FastDateParser.java#L930](https://github.com/apache/commons-lang/blob/8767cd4f1a6af07093c1e6c422dae8e574be7e5e/src/main/java/org/apache/commons/lang3/time/FastDateParser.java#L930), [FastDateParser.java#L932-L951 ](https://github.com/apache/commons-lang/blob/8767cd4f1a6af07093c1e6c422dae8e574be7e5e/src/main/java/org/apache/commons/lang3/time/FastDateParser.java#L932-L951) and [FastDateParser.java#L596-L601](https://github.com/apache/commons-lang/blob/8767cd4f1a6af07093c1e6c422dae8e574be7e5e/src/main/java/org/apache/commons/lang3/time/FastDateParser.java#L596-L601). I also checked the codes and manually debugged it for sure. It seems both cases use the same pattern `( Z|(?:[+-]\\d{2}(?::)\\d{2}))`. _Note that this should be rather a fix about documentation and not the behaviour change because `ZZ` seems invalid date format in `SimpleDateFormat` as documented in `DataFrameReader` and etc, and both `ZZ` and `XXX` look identically working with `FastDateFormat`_ Current documentation is as below: ``` * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> ``` ## How was this patch tested? Existing tests should cover this. Also, manually tested as below (BTW, I don't think these are worth being added as tests within Spark): **Parse** ```scala scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00") res4: java.util.Date = Tue Mar 21 20:00:00 KST 2017 scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000Z") res10: java.util.Date = Tue Mar 21 09:00:00 KST 2017 scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000-11:00") java.text.ParseException: Unparseable date: "2017-03-21T00:00:00.000-11:00" at java.text.DateFormat.parse(DateFormat.java:366) ... 48 elided scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000Z") java.text.ParseException: Unparseable date: "2017-03-21T00:00:00.000Z" at java.text.DateFormat.parse(DateFormat.java:366) ... 48 elided ``` ```scala scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00") res7: java.util.Date = Tue Mar 21 20:00:00 KST 2017 scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000Z") res1: java.util.Date = Tue Mar 21 09:00:00 KST 2017 scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000-11:00") res8: java.util.Date = Tue Mar 21 20:00:00 KST 2017 scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000Z") res2: java.util.Date = Tue Mar 21 09:00:00 KST 2017 ``` **Format** ```scala scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00")) res6: String = 2017-03-21T20:00:00.000+09:00 ``` ```scala scala> val fd = org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ") fd: org.apache.commons.lang3.time.FastDateFormat = FastDateFormat[yyyy-MM-dd'T'HH:mm:ss.SSSZZ,ko_KR,Asia/Seoul] scala> fd.format(fd.parse("2017-03-21T00:00:00.000-11:00")) res1: String = 2017-03-21T20:00:00.000+09:00 scala> val fd = org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX") fd: org.apache.commons.lang3.time.FastDateFormat = FastDateFormat[yyyy-MM-dd'T'HH:mm:ss.SSSXXX,ko_KR,Asia/Seoul] scala> fd.format(fd.parse("2017-03-21T00:00:00.000-11:00")) res2: String = 2017-03-21T20:00:00.000+09:00 ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #17489 from HyukjinKwon/SPARK-20166.
* [SPARK-20143][SQL] DataType.fromJson should throw an exception with better ↵hyukjinkwon2017-04-022-1/+39
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | message ## What changes were proposed in this pull request? Currently, `DataType.fromJson` throws `scala.MatchError` or `java.util.NoSuchElementException` in some cases when the JSON input is invalid as below: ```scala DataType.fromJson(""""abcd"""") ``` ``` java.util.NoSuchElementException: key not found: abcd at ... ``` ```scala DataType.fromJson("""{"abcd":"a"}""") ``` ``` scala.MatchError: JObject(List((abcd,JString(a)))) (of class org.json4s.JsonAST$JObject) at ... ``` ```scala DataType.fromJson("""{"fields": [{"a":123}], "type": "struct"}""") ``` ``` scala.MatchError: JObject(List((a,JInt(123)))) (of class org.json4s.JsonAST$JObject) at ... ``` After this PR, ```scala DataType.fromJson(""""abcd"""") ``` ``` java.lang.IllegalArgumentException: Failed to convert the JSON string 'abcd' to a data type. at ... ``` ```scala DataType.fromJson("""{"abcd":"a"}""") ``` ``` java.lang.IllegalArgumentException: Failed to convert the JSON string '{"abcd":"a"}' to a data type. at ... ``` ```scala DataType.fromJson("""{"fields": [{"a":123}], "type": "struct"}""") at ... ``` ``` java.lang.IllegalArgumentException: Failed to convert the JSON string '{"a":123}' to a field. ``` ## How was this patch tested? Unit test added in `DataTypeSuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17468 from HyukjinKwon/fromjson_exception.
* [SPARK-20186][SQL] BroadcastHint should use child's statswangzhenhua2017-04-012-2/+21
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? `BroadcastHint` should use child's statistics and set `isBroadcastable` to true. ## How was this patch tested? Added a new stats estimation test for `BroadcastHint`. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17504 from wzhfy/broadcastHintEstimation.
* [SPARK-20164][SQL] AnalysisException not tolerant of null query plan.Kunal Khamar2017-03-311-1/+1
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? The query plan in an `AnalysisException` may be `null` when an `AnalysisException` object is serialized and then deserialized, since `plan` is marked `transient`. Or when someone throws an `AnalysisException` with a null query plan (which should not happen). `def getMessage` is not tolerant of this and throws a `NullPointerException`, leading to loss of information about the original exception. The fix is to add a `null` check in `getMessage`. ## How was this patch tested? - Unit test Author: Kunal Khamar <kkhamar@outlook.com> Closes #17486 from kunalkhamar/spark-20164.
* [SPARK-20121][SQL] simplify NullPropagation with NullIntolerantWenchen Fan2017-03-306-73/+39
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Instead of iterating all expressions that can return null for null inputs, we can just check `NullIntolerant`. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17450 from cloud-fan/null.
* [DOCS] Docs-only improvementsJacek Laskowski2017-03-309-14/+16
| | | | | | | | | | | | | | | | …adoc ## What changes were proposed in this pull request? Use recommended values for row boundaries in Window's scaladoc, i.e. `Window.unboundedPreceding`, `Window.unboundedFollowing`, and `Window.currentRow` (that were introduced in 2.1.0). ## How was this patch tested? Local build Author: Jacek Laskowski <jacek@japila.pl> Closes #17417 from jaceklaskowski/window-expression-scaladoc.
* [SPARK-19088][SQL] Fix 2.10 build.Takuya UESHIN2017-03-291-1/+2
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Commit 6c70a38 broke the build for scala 2.10. The commit uses some reflections which are not available in Scala 2.10. This PR fixes them. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes #17473 from ueshin/issues/SPARK-19088.
* [SPARK-17075][SQL][FOLLOWUP] Add Estimation of Constant LiteralXiao Li2017-03-292-2/+124
| | | | | | | | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? `FalseLiteral` and `TrueLiteral` should have been eliminated by optimizer rule `BooleanSimplification`, but null literals might be added by optimizer rule `NullPropagation`. For safety, our filter estimation should handle all the eligible literal cases. Our optimizer rule BooleanSimplification is unable to remove the null literal in many cases. For example, `a < 0 or null`. Thus, we need to handle null literal in filter estimation. `Not` can be pushed down below `And` and `Or`. Then, we could see two consecutive `Not`, which need to be collapsed into one. Because of the limited expression support for filter estimation, we just need to handle the case `Not(null)` for avoiding incorrect error due to the boolean operation on null. For details, see below matrix. ``` not NULL = NULL NULL or false = NULL NULL or true = true NULL or NULL = NULL NULL and false = false NULL and true = NULL NULL and NULL = NULL ``` ### How was this patch tested? Added the test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #17446 from gatorsmile/constantFilterEstimation.
* [SPARK-20009][SQL] Support DDL strings for defining schema in ↵Takeshi Yamamuro2017-03-292-21/+70
| | | | | | | | | | | | | | functions.from_json ## What changes were proposed in this pull request? This pr added `StructType.fromDDL` to convert a DDL format string into `StructType` for defining schemas in `functions.from_json`. ## How was this patch tested? Added tests in `JsonFunctionsSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17406 from maropu/SPARK-20009.
* [SPARK-20125][SQL] Dataset of type option of map does not workWenchen Fan2017-03-281-0/+5
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? When we build the deserializer expression for map type, we will use `StaticInvoke` to call `ArrayBasedMapData.toScalaMap`, and declare the return type as `scala.collection.immutable.Map`. If the map is inside an Option, we will wrap this `StaticInvoke` with `WrapOption`, which requires the input to be `scala.collect.Map`. Ideally this should be fine, as `scala.collection.immutable.Map` extends `scala.collect.Map`, but our `ObjectType` is too strict about this, this PR fixes it. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17454 from cloud-fan/map.
* [SPARK-20124][SQL] Join reorder should keep the same order of final project ↵wangzhenhua2017-03-283-10/+31
| | | | | | | | | | | | | | | | | attributes ## What changes were proposed in this pull request? Join reorder algorithm should keep exactly the same order of output attributes in the top project. For example, if user want to select a, b, c, after reordering, we should output a, b, c in the same order as specified by user, instead of b, a, c or other orders. ## How was this patch tested? A new test case is added in `JoinReorderSuite`. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17453 from wzhfy/keepOrderInProject.
* [SPARK-20094][SQL] Preventing push down of IN subquery to Join operatorwangzhenhua2017-03-282-0/+26
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? TPCDS q45 fails becuase: `ReorderJoin` collects all predicates and try to put them into join condition when creating ordered join. If a predicate with an IN subquery (`ListQuery`) is in a join condition instead of a filter condition, `RewritePredicateSubquery.rewriteExistentialExpr` would fail to convert the subquery to an `ExistenceJoin`, and thus result in error. We should prevent push down of IN subquery to Join operator. ## How was this patch tested? Add a new test case in `FilterPushdownSuite`. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17428 from wzhfy/noSubqueryInJoinCond.
* [SPARK-19088][SQL] Optimize sequence type deserialization codegenMichal Senkyr2017-03-283-69/+54
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Optimization of arbitrary Scala sequence deserialization introduced by #16240. The previous implementation constructed an array which was then converted by `to`. This required two passes in most cases. This implementation attempts to remedy that by using `Builder`s provided by the `newBuilder` method on every Scala collection's companion object to build the resulting collection directly. Example codegen for simple `List` (obtained using `Seq(List(1)).toDS().map(identity).queryExecution.debug.codegen`): Before: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean deserializetoobject_resultIsNull; /* 010 */ private java.lang.Object[] deserializetoobject_argValue; /* 011 */ private boolean MapObjects_loopIsNull1; /* 012 */ private int MapObjects_loopValue0; /* 013 */ private boolean deserializetoobject_resultIsNull1; /* 014 */ private scala.collection.generic.CanBuildFrom deserializetoobject_argValue1; /* 015 */ private UnsafeRow deserializetoobject_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 018 */ private scala.collection.immutable.List mapelements_argValue; /* 019 */ private UnsafeRow mapelements_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 022 */ private scala.collection.immutable.List serializefromobject_argValue; /* 023 */ private UnsafeRow serializefromobject_result; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 025 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 027 */ /* 028 */ public GeneratedIterator(Object[] references) { /* 029 */ this.references = references; /* 030 */ } /* 031 */ /* 032 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 033 */ partitionIndex = index; /* 034 */ this.inputs = inputs; /* 035 */ inputadapter_input = inputs[0]; /* 036 */ /* 037 */ deserializetoobject_result = new UnsafeRow(1); /* 038 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 039 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 040 */ /* 041 */ mapelements_result = new UnsafeRow(1); /* 042 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 043 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 044 */ /* 045 */ serializefromobject_result = new UnsafeRow(1); /* 046 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 047 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 048 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 049 */ /* 050 */ } /* 051 */ /* 052 */ protected void processNext() throws java.io.IOException { /* 053 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 054 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 055 */ ArrayData inputadapter_value = inputadapter_row.getArray(0); /* 056 */ /* 057 */ deserializetoobject_resultIsNull = false; /* 058 */ /* 059 */ if (!deserializetoobject_resultIsNull) { /* 060 */ ArrayData deserializetoobject_value3 = null; /* 061 */ /* 062 */ if (!false) { /* 063 */ Integer[] deserializetoobject_convertedArray = null; /* 064 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 065 */ deserializetoobject_convertedArray = new Integer[deserializetoobject_dataLength]; /* 066 */ /* 067 */ int deserializetoobject_loopIndex = 0; /* 068 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 069 */ MapObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex)); /* 070 */ MapObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 071 */ /* 072 */ if (MapObjects_loopIsNull1) { /* 073 */ throw new RuntimeException(((java.lang.String) references[0])); /* 074 */ } /* 075 */ if (false) { /* 076 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 077 */ } else { /* 078 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue0; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_loopIndex += 1; /* 082 */ } /* 083 */ /* 084 */ deserializetoobject_value3 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /* 085 */ } /* 086 */ boolean deserializetoobject_isNull2 = true; /* 087 */ java.lang.Object[] deserializetoobject_value2 = null; /* 088 */ if (!false) { /* 089 */ deserializetoobject_isNull2 = false; /* 090 */ if (!deserializetoobject_isNull2) { /* 091 */ Object deserializetoobject_funcResult = null; /* 092 */ deserializetoobject_funcResult = deserializetoobject_value3.array(); /* 093 */ if (deserializetoobject_funcResult == null) { /* 094 */ deserializetoobject_isNull2 = true; /* 095 */ } else { /* 096 */ deserializetoobject_value2 = (java.lang.Object[]) deserializetoobject_funcResult; /* 097 */ } /* 098 */ /* 099 */ } /* 100 */ deserializetoobject_isNull2 = deserializetoobject_value2 == null; /* 101 */ } /* 102 */ deserializetoobject_resultIsNull = deserializetoobject_isNull2; /* 103 */ deserializetoobject_argValue = deserializetoobject_value2; /* 104 */ } /* 105 */ /* 106 */ boolean deserializetoobject_isNull1 = deserializetoobject_resultIsNull; /* 107 */ final scala.collection.Seq deserializetoobject_value1 = deserializetoobject_resultIsNull ? null : scala.collection.mutable.WrappedArray.make(deserializetoobject_argValue); /* 108 */ deserializetoobject_isNull1 = deserializetoobject_value1 == null; /* 109 */ boolean deserializetoobject_isNull = true; /* 110 */ scala.collection.immutable.List deserializetoobject_value = null; /* 111 */ if (!deserializetoobject_isNull1) { /* 112 */ deserializetoobject_resultIsNull1 = false; /* 113 */ /* 114 */ if (!deserializetoobject_resultIsNull1) { /* 115 */ boolean deserializetoobject_isNull6 = false; /* 116 */ final scala.collection.generic.CanBuildFrom deserializetoobject_value6 = false ? null : scala.collection.immutable.List.canBuildFrom(); /* 117 */ deserializetoobject_isNull6 = deserializetoobject_value6 == null; /* 118 */ deserializetoobject_resultIsNull1 = deserializetoobject_isNull6; /* 119 */ deserializetoobject_argValue1 = deserializetoobject_value6; /* 120 */ } /* 121 */ /* 122 */ deserializetoobject_isNull = deserializetoobject_resultIsNull1; /* 123 */ if (!deserializetoobject_isNull) { /* 124 */ Object deserializetoobject_funcResult1 = null; /* 125 */ deserializetoobject_funcResult1 = deserializetoobject_value1.to(deserializetoobject_argValue1); /* 126 */ if (deserializetoobject_funcResult1 == null) { /* 127 */ deserializetoobject_isNull = true; /* 128 */ } else { /* 129 */ deserializetoobject_value = (scala.collection.immutable.List) deserializetoobject_funcResult1; /* 130 */ } /* 131 */ /* 132 */ } /* 133 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 134 */ } /* 135 */ /* 136 */ boolean mapelements_isNull = true; /* 137 */ scala.collection.immutable.List mapelements_value = null; /* 138 */ if (!false) { /* 139 */ mapelements_argValue = deserializetoobject_value; /* 140 */ /* 141 */ mapelements_isNull = false; /* 142 */ if (!mapelements_isNull) { /* 143 */ Object mapelements_funcResult = null; /* 144 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 145 */ if (mapelements_funcResult == null) { /* 146 */ mapelements_isNull = true; /* 147 */ } else { /* 148 */ mapelements_value = (scala.collection.immutable.List) mapelements_funcResult; /* 149 */ } /* 150 */ /* 151 */ } /* 152 */ mapelements_isNull = mapelements_value == null; /* 153 */ } /* 154 */ /* 155 */ if (mapelements_isNull) { /* 156 */ throw new RuntimeException(((java.lang.String) references[2])); /* 157 */ } /* 158 */ serializefromobject_argValue = mapelements_value; /* 159 */ /* 160 */ final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 161 */ serializefromobject_holder.reset(); /* 162 */ /* 163 */ // Remember the current cursor so that we can calculate how many bytes are /* 164 */ // written later. /* 165 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 166 */ /* 167 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 168 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 169 */ // grow the global buffer before writing data. /* 170 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 171 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 172 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 173 */ /* 174 */ } else { /* 175 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 176 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 177 */ /* 178 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 179 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 180 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index); /* 181 */ } else { /* 182 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index); /* 183 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 184 */ } /* 185 */ } /* 186 */ } /* 187 */ /* 188 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 189 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 190 */ append(serializefromobject_result); /* 191 */ if (shouldStop()) return; /* 192 */ } /* 193 */ } /* 194 */ } ``` After: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean CollectObjects_loopIsNull1; /* 010 */ private int CollectObjects_loopValue0; /* 011 */ private UnsafeRow deserializetoobject_result; /* 012 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 014 */ private scala.collection.immutable.List mapelements_argValue; /* 015 */ private UnsafeRow mapelements_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 018 */ private scala.collection.immutable.List serializefromobject_argValue; /* 019 */ private UnsafeRow serializefromobject_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 023 */ /* 024 */ public GeneratedIterator(Object[] references) { /* 025 */ this.references = references; /* 026 */ } /* 027 */ /* 028 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 029 */ partitionIndex = index; /* 030 */ this.inputs = inputs; /* 031 */ inputadapter_input = inputs[0]; /* 032 */ /* 033 */ deserializetoobject_result = new UnsafeRow(1); /* 034 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 035 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 036 */ /* 037 */ mapelements_result = new UnsafeRow(1); /* 038 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 039 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 040 */ /* 041 */ serializefromobject_result = new UnsafeRow(1); /* 042 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 043 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 044 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 045 */ /* 046 */ } /* 047 */ /* 048 */ protected void processNext() throws java.io.IOException { /* 049 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 050 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 051 */ ArrayData inputadapter_value = inputadapter_row.getArray(0); /* 052 */ /* 053 */ scala.collection.immutable.List deserializetoobject_value = null; /* 054 */ /* 055 */ if (!false) { /* 056 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 057 */ scala.collection.mutable.Builder CollectObjects_builderValue2 = scala.collection.immutable.List$.MODULE$.newBuilder(); /* 058 */ CollectObjects_builderValue2.sizeHint(deserializetoobject_dataLength); /* 059 */ /* 060 */ int deserializetoobject_loopIndex = 0; /* 061 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 062 */ CollectObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex)); /* 063 */ CollectObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 064 */ /* 065 */ if (CollectObjects_loopIsNull1) { /* 066 */ throw new RuntimeException(((java.lang.String) references[0])); /* 067 */ } /* 068 */ if (false) { /* 069 */ CollectObjects_builderValue2.$plus$eq(null); /* 070 */ } else { /* 071 */ CollectObjects_builderValue2.$plus$eq(CollectObjects_loopValue0); /* 072 */ } /* 073 */ /* 074 */ deserializetoobject_loopIndex += 1; /* 075 */ } /* 076 */ /* 077 */ deserializetoobject_value = (scala.collection.immutable.List) CollectObjects_builderValue2.result(); /* 078 */ } /* 079 */ /* 080 */ boolean mapelements_isNull = true; /* 081 */ scala.collection.immutable.List mapelements_value = null; /* 082 */ if (!false) { /* 083 */ mapelements_argValue = deserializetoobject_value; /* 084 */ /* 085 */ mapelements_isNull = false; /* 086 */ if (!mapelements_isNull) { /* 087 */ Object mapelements_funcResult = null; /* 088 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 089 */ if (mapelements_funcResult == null) { /* 090 */ mapelements_isNull = true; /* 091 */ } else { /* 092 */ mapelements_value = (scala.collection.immutable.List) mapelements_funcResult; /* 093 */ } /* 094 */ /* 095 */ } /* 096 */ mapelements_isNull = mapelements_value == null; /* 097 */ } /* 098 */ /* 099 */ if (mapelements_isNull) { /* 100 */ throw new RuntimeException(((java.lang.String) references[2])); /* 101 */ } /* 102 */ serializefromobject_argValue = mapelements_value; /* 103 */ /* 104 */ final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 105 */ serializefromobject_holder.reset(); /* 106 */ /* 107 */ // Remember the current cursor so that we can calculate how many bytes are /* 108 */ // written later. /* 109 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 110 */ /* 111 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 112 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 113 */ // grow the global buffer before writing data. /* 114 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 115 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 116 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 117 */ /* 118 */ } else { /* 119 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 120 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 121 */ /* 122 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 123 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 124 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index); /* 125 */ } else { /* 126 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index); /* 127 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 128 */ } /* 129 */ } /* 130 */ } /* 131 */ /* 132 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 133 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 134 */ append(serializefromobject_result); /* 135 */ if (shouldStop()) return; /* 136 */ } /* 137 */ } /* 138 */ } ``` Benchmark results before: ``` OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH AMD A10-4600M APU with Radeon(tm) HD Graphics collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Seq 269 / 370 0.0 269125.8 1.0X List 154 / 176 0.0 154453.5 1.7X mutable.Queue 210 / 233 0.0 209691.6 1.3X ``` Benchmark results after: ``` OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH AMD A10-4600M APU with Radeon(tm) HD Graphics collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Seq 255 / 316 0.0 254697.3 1.0X List 152 / 177 0.0 152410.0 1.7X mutable.Queue 213 / 235 0.0 213470.0 1.2X ``` ## How was this patch tested? ```bash ./build/mvn -DskipTests clean package && ./dev/run-tests ``` Additionally in Spark Shell: ```scala case class QueueClass(q: scala.collection.immutable.Queue[Int]) spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect ``` Author: Michal Senkyr <mike.senkyr@gmail.com> Closes #16541 from michalsenkyr/dataset-seq-builder.
* [SPARK-20100][SQL] Refactor SessionState initializationHerman van Hovell2017-03-283-53/+31
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions. This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements: 1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive. 2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17433 from hvanhovell/SPARK-20100.
* [SPARK-20104][SQL] Don't estimate IsNull or IsNotNull predicates for ↵wangzhenhua2017-03-272-4/+33
| | | | | | | | | | | | | | | | non-leaf node ## What changes were proposed in this pull request? In current stage, we don't have advanced statistics such as sketches or histograms. As a result, some operator can't estimate `nullCount` accurately. E.g. left outer join estimation does not accurately update `nullCount` currently. So for `IsNull` and `IsNotNull` predicates, we only estimate them when the child is a leaf node, whose `nullCount` is accurate. ## How was this patch tested? A new test case is added in `FilterEstimationSuite`. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17438 from wzhfy/nullEstimation.
* [SPARK-20086][SQL] CollapseWindow should not collapse dependent adjacent windowsHerman van Hovell2017-03-262-3/+16
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The `CollapseWindow` is currently to aggressive when collapsing adjacent windows. It also collapses windows in the which the parent produces a column that is consumed by the child; this creates an invalid window which will fail at runtime. This PR fixes this by adding a check for dependent adjacent windows to the `CollapseWindow` rule. ## How was this patch tested? Added a new test case to `CollapseWindowSuite` Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17432 from hvanhovell/SPARK-20086.
* [SPARK-19949][SQL][FOLLOW-UP] move FailureSafeParser from catalyst to sql coreWenchen Fan2017-03-252-83/+33
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The `FailureSafeParser` is only used in sql core, it doesn't make sense to put it in catalyst module. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #17408 from cloud-fan/minor.
* [SPARK-19846][SQL] Add a flag to disable constraint propagationLiang-Chi Hsieh2017-03-2513-20/+158
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins. Compared with previous PRs #16998, #16785, this is a much simpler option: add a flag to disable constraint propagation. ### Benchmark Run the following codes locally. import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} import org.apache.spark.sql.internal.SQLConf spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false) val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0")) val indexers = df.columns.tail.map(c => new StringIndexer() .setInputCol(c) .setOutputCol(s"${c}_indexed") .setHandleInvalid("skip")) val encoders = indexers.map(indexer => new OneHotEncoder() .setInputCol(indexer.getOutputCol) .setOutputCol(s"${indexer.getOutputCol}_encoded") .setDropLast(true)) val stages: Array[PipelineStage] = indexers ++ encoders val pipeline = new Pipeline().setStages(stages) val startTime = System.nanoTime pipeline.fit(df).transform(df).show val runningTime = System.nanoTime - startTime Before this patch: 1786001 ms ~= 30 mins After this patch: 26392 ms = less than half of a minute Related PRs: #16998, #16785. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17186 from viirya/add-flag-disable-constraint-propagation.
* [SQL][MINOR] Fix for typo in AnalyzerJacek Laskowski2017-03-241-1/+1
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fix for typo in Analyzer ## How was this patch tested? local build Author: Jacek Laskowski <jacek@japila.pl> Closes #17409 from jaceklaskowski/analyzer-typo.
* [SPARK-19959][SQL] Fix to throw NullPointerException in ↵Kazuaki Ishizaki2017-03-241-1/+4
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | df[java.lang.Long].collect ## What changes were proposed in this pull request? This PR fixes `NullPointerException` in the generated code by Catalyst. When we run the following code, we get the following `NullPointerException`. This is because there is no null checks for `inputadapter_value` while `java.lang.Long inputadapter_value` at Line 30 may have `null`. This happen when a type of DataFrame is nullable primitive type such as `java.lang.Long` and the wholestage codegen is used. While the physical plan keeps `nullable=true` in `input[0, java.lang.Long, true].longValue`, `BoundReference.doGenCode` ignores `nullable=true`. Thus, nullcheck code will not be generated and `NullPointerException` will occur. This PR checks the nullability and correctly generates nullcheck if needed. ```java sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF.collect ``` ```java Caused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:37) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:393) ... ``` Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow serializefromobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 012 */ /* 013 */ public GeneratedIterator(Object[] references) { /* 014 */ this.references = references; /* 015 */ } /* 016 */ /* 017 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 018 */ partitionIndex = index; /* 019 */ this.inputs = inputs; /* 020 */ inputadapter_input = inputs[0]; /* 021 */ serializefromobject_result = new UnsafeRow(1); /* 022 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 023 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 024 */ /* 025 */ } /* 026 */ /* 027 */ protected void processNext() throws java.io.IOException { /* 028 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 029 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 030 */ java.lang.Long inputadapter_value = (java.lang.Long)inputadapter_row.get(0, null); /* 031 */ /* 032 */ boolean serializefromobject_isNull = true; /* 033 */ long serializefromobject_value = -1L; /* 034 */ if (!false) { /* 035 */ serializefromobject_isNull = false; /* 036 */ if (!serializefromobject_isNull) { /* 037 */ serializefromobject_value = inputadapter_value.longValue(); /* 038 */ } /* 039 */ /* 040 */ } /* 041 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 042 */ /* 043 */ if (serializefromobject_isNull) { /* 044 */ serializefromobject_rowWriter.setNullAt(0); /* 045 */ } else { /* 046 */ serializefromobject_rowWriter.write(0, serializefromobject_value); /* 047 */ } /* 048 */ append(serializefromobject_result); /* 049 */ if (shouldStop()) return; /* 050 */ } /* 051 */ } /* 052 */ } ``` Generated code with this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow serializefromobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 012 */ /* 013 */ public GeneratedIterator(Object[] references) { /* 014 */ this.references = references; /* 015 */ } /* 016 */ /* 017 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 018 */ partitionIndex = index; /* 019 */ this.inputs = inputs; /* 020 */ inputadapter_input = inputs[0]; /* 021 */ serializefromobject_result = new UnsafeRow(1); /* 022 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 023 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 024 */ /* 025 */ } /* 026 */ /* 027 */ protected void processNext() throws java.io.IOException { /* 028 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 029 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 030 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 031 */ java.lang.Long inputadapter_value = inputadapter_isNull ? null : ((java.lang.Long)inputadapter_row.get(0, null)); /* 032 */ /* 033 */ boolean serializefromobject_isNull = true; /* 034 */ long serializefromobject_value = -1L; /* 035 */ if (!inputadapter_isNull) { /* 036 */ serializefromobject_isNull = false; /* 037 */ if (!serializefromobject_isNull) { /* 038 */ serializefromobject_value = inputadapter_value.longValue(); /* 039 */ } /* 040 */ /* 041 */ } /* 042 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 043 */ /* 044 */ if (serializefromobject_isNull) { /* 045 */ serializefromobject_rowWriter.setNullAt(0); /* 046 */ } else { /* 047 */ serializefromobject_rowWriter.write(0, serializefromobject_value); /* 048 */ } /* 049 */ append(serializefromobject_result); /* 050 */ if (shouldStop()) return; /* 051 */ } /* 052 */ } /* 053 */ } ``` ## How was this patch tested? Added new test suites in `DataFrameSuites` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17302 from kiszk/SPARK-19959.
* [SPARK-20057][SS] Renamed KeyedState to GroupState in mapGroupsWithStateTathagata Das2017-03-223-20/+24
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Since the state is tied a "group" in the "mapGroupsWithState" operations, its better to call the state "GroupState" instead of a key. This would make it more general if you extends this operation to RelationGroupedDataset and python APIs. ## How was this patch tested? Existing unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17385 from tdas/SPARK-20057.
* [SPARK-20018][SQL] Pivot with timestamp and count should not print internal ↵hyukjinkwon2017-03-221-2/+4
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | representation ## What changes were proposed in this pull request? Currently, when we perform count with timestamp types, it prints the internal representation as the column name as below: ```scala Seq(new java.sql.Timestamp(1)).toDF("a").groupBy("a").pivot("a").count().show() ``` ``` +--------------------+----+ | a|1000| +--------------------+----+ |1969-12-31 16:00:...| 1| +--------------------+----+ ``` This PR proposes to use external Scala value instead of the internal representation in the column names as below: ``` +--------------------+-----------------------+ | a|1969-12-31 16:00:00.001| +--------------------+-----------------------+ |1969-12-31 16:00:...| 1| +--------------------+-----------------------+ ``` ## How was this patch tested? Unit test in `DataFramePivotSuite` and manual tests. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17348 from HyukjinKwon/SPARK-20018.
* [SPARK-19949][SQL][FOLLOW-UP] Clean up parse modes and update related commentshyukjinkwon2017-03-226-61/+71
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to make `mode` options in both CSV and JSON to use `cass object` and fix some related comments related previous fix. Also, this PR modifies some tests related parse modes. ## How was this patch tested? Modified unit tests in both `CSVSuite.scala` and `JsonSuite.scala`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17377 from HyukjinKwon/SPARK-19949.
* [SPARK-20030][SS] Event-time-based timeout for MapGroupsWithStateTathagata Das2017-03-214-44/+93
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Adding event time based timeout. The user sets the timeout timestamp directly using `KeyedState.setTimeoutTimestamp`. The keys times out when the watermark crosses the timeout timestamp. ## How was this patch tested? Unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17361 from tdas/SPARK-20030.
* [SPARK-20017][SQL] change the nullability of function 'StringToMap' from ↵zhaorongsheng2017-03-212-1/+10
| | | | | | | | | | | | 'false' to 'true' ## What changes were proposed in this pull request? Change the nullability of function `StringToMap` from `false` to `true`. Author: zhaorongsheng <334362872@qq.com> Closes #17350 from zhaorongsheng/bug-fix_strToMap_NPE.
* [SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tablesXin Wu2017-03-213-1/+87
| | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Support` ALTER TABLE ADD COLUMNS (...) `syntax for Hive serde and some datasource tables. In this PR, we consider a few aspects: 1. View is not supported for `ALTER ADD COLUMNS` 2. Since tables created in SparkSQL with Hive DDL syntax will populate table properties with schema information, we need make sure the consistency of the schema before and after ALTER operation in order for future use. 3. For embedded-schema type of format, such as `parquet`, we need to make sure that the predicate on the newly-added columns can be evaluated properly, or pushed down properly. In case of the data file does not have the columns for the newly-added columns, such predicates should return as if the column values are NULLs. 4. For datasource table, this feature does not support the following: 4.1 TEXT format, since there is only one default column `value` is inferred for text format data. 4.2 ORC format, since SparkSQL native ORC reader does not support the difference between user-specified-schema and inferred schema from ORC files. 4.3 Third party datasource types that implements RelationProvider, including the built-in JDBC format, since different implementations by the vendors may have different ways to dealing with schema. 4.4 Other datasource types, such as `parquet`, `json`, `csv`, `hive` are supported. 5. Column names being added can not be duplicate of any existing data column or partition column names. Case sensitivity is taken into consideration according to the sql configuration. 6. This feature also supports In-Memory catalog, while Hive support is turned off. ## How was this patch tested? Add new test cases Author: Xin Wu <xinwu@us.ibm.com> Closes #16626 from xwu0226/alter_add_columns.
* [SPARK-17080][SQL][FOLLOWUP] Improve documentation, change buildJoin method ↵wangzhenhua2017-03-212-42/+68
| | | | | | | | | | | | | | | | | | structure and add a debug log ## What changes were proposed in this pull request? 1. Improve documentation for class `Cost` and `JoinReorderDP` and method `buildJoin()`. 2. Change code structure of `buildJoin()` to make the logic clearer. 3. Add a debug-level log to record information for join reordering, including time cost, the number of items and the number of plans in memo. ## How was this patch tested? Not related. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17353 from wzhfy/reorderFollow.
* [SPARK-20024][SQL][TEST-MAVEN] SessionCatalog reset need to set the current ↵Xiao Li2017-03-202-2/+1
| | | | | | | | | | | | | | | | database of ExternalCatalog ### What changes were proposed in this pull request? SessionCatalog API setCurrentDatabase does not set the current database of the underlying ExternalCatalog. Thus, weird errors could come in the test suites after we call reset. We need to fix it. So far, have not found the direct impact in the other code paths because we expect all the SessionCatalog APIs should always use the current database value we managed, unless some of code paths skip it. Thus, we fix it in the test-only function reset(). ### How was this patch tested? Multiple test case failures are observed in mvn and add a test case in SessionCatalogSuite. Author: Xiao Li <gatorsmile@gmail.com> Closes #17354 from gatorsmile/useDB.
* [SPARK-19949][SQL] unify bad record handling in CSV and JSONWenchen Fan2017-03-204-117/+91
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently JSON and CSV have exactly the same logic about handling bad records, this PR tries to abstract it and put it in a upper level to reduce code duplication. The overall idea is, we make the JSON and CSV parser to throw a BadRecordException, then the upper level, FailureSafeParser, handles bad records according to the parse mode. Behavior changes: 1. with PERMISSIVE mode, if the number of tokens doesn't match the schema, previously CSV parser will treat it as a legal record and parse as many tokens as possible. After this PR, we treat it as an illegal record, and put the raw record string in a special column, but we still parse as many tokens as possible. 2. all logging is removed as they are not very useful in practice. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Author: hyukjinkwon <gurwls223@gmail.com> Author: Wenchen Fan <cloud0fan@gmail.com> Closes #17315 from cloud-fan/bad-record2.
* [SPARK-19980][SQL] Add NULL checks in Bean serializerTakeshi Yamamuro2017-03-211-2/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? A Bean serializer in `ExpressionEncoder` could change values when Beans having NULL. A concrete example is as follows; ``` scala> :paste class Outer extends Serializable { private var cls: Inner = _ def setCls(c: Inner): Unit = cls = c def getCls(): Inner = cls } class Inner extends Serializable { private var str: String = _ def setStr(s: String): Unit = str = str def getStr(): String = str } scala> Seq("""{"cls":null}""", """{"cls": {"str":null}}""").toDF().write.text("data") scala> val encoder = Encoders.bean(classOf[Outer]) scala> val schema = encoder.schema scala> val df = spark.read.schema(schema).json("data").as[Outer](encoder) scala> df.show +------+ | cls| +------+ |[null]| | null| +------+ scala> df.map(x => x)(encoder).show() +------+ | cls| +------+ |[null]| |[null]| // <-- Value changed +------+ ``` This is because the Bean serializer does not have the NULL-check expressions that the serializer of Scala's product types has. Actually, this value change does not happen in Scala's product types; ``` scala> :paste case class Outer(cls: Inner) case class Inner(str: String) scala> val encoder = Encoders.product[Outer] scala> val schema = encoder.schema scala> val df = spark.read.schema(schema).json("data").as[Outer](encoder) scala> df.show +------+ | cls| +------+ |[null]| | null| +------+ scala> df.map(x => x)(encoder).show() +------+ | cls| +------+ |[null]| | null| +------+ ``` This pr added the NULL-check expressions in Bean serializer along with the serializer of Scala's product types. ## How was this patch tested? Added tests in `JavaDatasetSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17347 from maropu/SPARK-19980.
* [SPARK-20010][SQL] Sort information is lost after sort merge joinwangzhenhua2017-03-215-9/+24
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? After sort merge join for inner join, now we only keep left key ordering. However, after inner join, right key has the same value and order as left key. So if we need another smj on right key, we will unnecessarily add a sort which causes additional cost. As a more complicated example, A join B on A.key = B.key join C on B.key = C.key join D on A.key = D.key. We will unnecessarily add a sort on B.key when join {A, B} and C, and add a sort on A.key when join {A, B, C} and D. To fix this, we need to propagate all sorted information (equivalent expressions) from bottom up through `outputOrdering` and `SortOrder`. ## How was this patch tested? Test cases are added. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17339 from wzhfy/sortEnhance.
* [SPARK-19573][SQL] Make NaN/null handling consistent in approxQuantileZheng RuiFeng2017-03-203-21/+40
| | | | | | | | | | | | ## What changes were proposed in this pull request? update `StatFunctions.multipleApproxQuantiles` to handle NaN/null ## How was this patch tested? existing tests and added tests Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #16971 from zhengruifeng/quantiles_nan.
* [SPARK-17791][SQL] Join reordering using star schema detectionIoana Delaney2017-03-2010-36/+978
| | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Star schema consists of one or more fact tables referencing a number of dimension tables. In general, queries against star schema are expected to run fast because of the established RI constraints among the tables. This design proposes a join reordering based on natural, generally accepted heuristics for star schema queries: - Finds the star join with the largest fact table and places it on the driving arm of the left-deep join. This plan avoids large tables on the inner, and thus favors hash joins. - Applies the most selective dimensions early in the plan to reduce the amount of data flow. The design document was included in SPARK-17791. Link to the google doc: [StarSchemaDetection](https://docs.google.com/document/d/1UAfwbm_A6wo7goHlVZfYK99pqDMEZUumi7pubJXETEA/edit?usp=sharing) ## How was this patch tested? A new test suite StarJoinSuite.scala was implemented. Author: Ioana Delaney <ioanamdelaney@gmail.com> Closes #15363 from ioana-delaney/starJoinReord2.
* [SPARK-19849][SQL] Support ArrayType in to_json to produce JSON arrayhyukjinkwon2017-03-194-61/+113
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to support an array of struct type in `to_json` as below: ```scala import org.apache.spark.sql.functions._ val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a") df.select(to_json($"a").as("json")).show() ``` ``` +----------+ | json| +----------+ |[{"_1":1}]| +----------+ ``` Currently, it throws an exception as below (a newline manually inserted for readability): ``` org.apache.spark.sql.AnalysisException: cannot resolve 'structtojson(`array`)' due to data type mismatch: structtojson requires that the expression is a struct expression.;; ``` This allows the roundtrip with `from_json` as below: ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val df = Seq("""[{"a":1}, {"a":2}]""").toDF("json").select(from_json($"json", schema).as("array")) df.show() // Read back. df.select(to_json($"array").as("json")).show() ``` ``` +----------+ | array| +----------+ |[[1], [2]]| +----------+ +-----------------+ | json| +-----------------+ |[{"a":1},{"a":2}]| +-----------------+ ``` Also, this PR proposes to rename from `StructToJson` to `StructsToJson ` and `JsonToStruct` to `JsonToStructs`. ## How was this patch tested? Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite` for Scala, doctest for Python and test in `test_sparkSQL.R` for R. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17192 from HyukjinKwon/SPARK-19849.
* [SPARK-19067][SS] Processing-time-based timeout in MapGroupsWithStateTathagata Das2017-03-195-47/+136
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? When a key does not get any new data in `mapGroupsWithState`, the mapping function is never called on it. So we need a timeout feature that calls the function again in such cases, so that the user can decide whether to continue waiting or clean up (remove state, save stuff externally, etc.). Timeouts can be either based on processing time or event time. This JIRA is for processing time, but defines the high level API design for both. The usage would look like this. ``` def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = { ... state.setTimeoutDuration(10000) ... } dataset // type is Dataset[T] .groupByKey[K](keyingFunc) // generates KeyValueGroupedDataset[K, T] .mapGroupsWithState[S, U]( func = stateFunction, timeout = KeyedStateTimeout.withProcessingTime) // returns Dataset[U] ``` Note the following design aspects. - The timeout type is provided as a param in mapGroupsWithState as a parameter global to all the keys. This is so that the planner knows this at planning time, and accordingly optimize the execution based on whether to saves extra info in state or not (e.g. timeout durations or timestamps). - The exact timeout duration is provided inside the function call so that it can be customized on a per key basis. - When the timeout occurs for a key, the function is called with no values, and KeyedState.isTimingOut() set to true. - The timeout is reset for key every time the function is called on the key, that is, when the key has new data, or the key has timed out. So the user has to set the timeout duration everytime the function is called, otherwise there will not be any timeout set. Guarantees provided on timeout of key, when timeout duration is D ms: - Timeout will never be called before real clock time has advanced by D ms - Timeout will be called eventually when there is a trigger with any data in it (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur. For example, if there is no data in the stream (for any key) for a while, then the timeout will not be hit. Implementation details: - Added new param to `mapGroupsWithState` for timeout - Added new method to `StateStore` to filter data based on timeout timestamp - Changed the internal map type of `HDFSBackedStateStore` from Java's `HashMap` to `ConcurrentHashMap` as the latter allows weakly-consistent fail-safe iterators on the map data. See comments in code for more details. - Refactored logic of `MapGroupsWithStateExec` to - Save timeout info to state store for each key that has data. - Then, filter states that should be timed out based on the current batch processing timestamp. - Moved KeyedState for `o.a.s.sql` to `o.a.s.sql.streaming`. I remember that this was a feedback in the MapGroupsWithState PR that I had forgotten to address. ## How was this patch tested? New unit tests in - MapGroupsWithStateSuite for timeouts. - StateStoreSuite for new APIs in StateStore. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17179 from tdas/mapgroupwithstate-timeout.