aboutsummaryrefslogtreecommitdiff
path: root/sql/core
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-20439][SQL] Fix Catalog API listTables and getTable when failed to ↵Xiao Li2017-04-241-5/+23
| | | | | | | | | | | | | | | fetch table metadata ### What changes were proposed in this pull request? `spark.catalog.listTables` and `spark.catalog.getTable` does not work if we are unable to retrieve table metadata due to any reason (e.g., table serde class is not accessible or the table type is not accepted by Spark SQL). After this PR, the APIs still return the corresponding Table without the description and tableType) ### How was this patch tested? Added a test case Author: Xiao Li <gatorsmile@gmail.com> Closes #17730 from gatorsmile/listTables.
* [SPARK-20430][SQL] Initialise RangeExec parameters in a driver sideTakeshi Yamamuro2017-04-222-5/+11
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This pr initialised `RangeExec` parameters in a driver side. In the current master, a query below throws `NullPointerException`; ``` sql("SET spark.sql.codegen.wholeStage=false") sql("SELECT * FROM range(1)").show 17/04/20 17:11:05 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.spark.sql.execution.SparkPlan.sparkContext(SparkPlan.scala:54) at org.apache.spark.sql.execution.RangeExec.numSlices(basicPhysicalOperators.scala:343) at org.apache.spark.sql.execution.RangeExec$$anonfun$20.apply(basicPhysicalOperators.scala:506) at org.apache.spark.sql.execution.RangeExec$$anonfun$20.apply(basicPhysicalOperators.scala:505) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ``` ## How was this patch tested? Added a test in `DataFrameRangeSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17717 from maropu/SPARK-20430.
* [SPARK-20412] Throw ParseException from visitNonOptionalPartitionSpec ↵Juliusz Sompolski2017-04-211-4/+12
| | | | | | | | | | | | | | | | | | instead of returning null values. ## What changes were proposed in this pull request? If a partitionSpec is supposed to not contain optional values, a ParseException should be thrown, and not nulls returned. The nulls can later cause NullPointerExceptions in places not expecting them. ## How was this patch tested? A query like "SHOW PARTITIONS tbl PARTITION(col1='val1', col2)" used to throw a NullPointerException. Now it throws a ParseException. Author: Juliusz Sompolski <julek@databricks.com> Closes #17707 from juliuszsompolski/SPARK-20412.
* [SPARK-20420][SQL] Add events to the external catalogHerman van Hovell2017-04-211-0/+7
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? It is often useful to be able to track changes to the `ExternalCatalog`. This PR makes the `ExternalCatalog` emit events when a catalog object is changed. Events are fired before and after the change. The following events are fired per object: - Database - CreateDatabasePreEvent: event fired before the database is created. - CreateDatabaseEvent: event fired after the database has been created. - DropDatabasePreEvent: event fired before the database is dropped. - DropDatabaseEvent: event fired after the database has been dropped. - Table - CreateTablePreEvent: event fired before the table is created. - CreateTableEvent: event fired after the table has been created. - RenameTablePreEvent: event fired before the table is renamed. - RenameTableEvent: event fired after the table has been renamed. - DropTablePreEvent: event fired before the table is dropped. - DropTableEvent: event fired after the table has been dropped. - Function - CreateFunctionPreEvent: event fired before the function is created. - CreateFunctionEvent: event fired after the function has been created. - RenameFunctionPreEvent: event fired before the function is renamed. - RenameFunctionEvent: event fired after the function has been renamed. - DropFunctionPreEvent: event fired before the function is dropped. - DropFunctionPreEvent: event fired after the function has been dropped. The current events currently only contain the names of the object modified. We add more events, and more details at a later point. A user can monitor changes to the external catalog by adding a listener to the Spark listener bus checking for `ExternalCatalogEvent`s using the `SparkListener.onOtherEvent` hook. A more direct approach is add listener directly to the `ExternalCatalog`. ## How was this patch tested? Added the `ExternalCatalogEventSuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17710 from hvanhovell/SPARK-20420.
* [SPARK-20281][SQL] Print the identical Range parameters of SparkContext APIs ↵Takeshi Yamamuro2017-04-203-3/+4
| | | | | | | | | | | | | | | | | | | | | | | | and SQL in explain ## What changes were proposed in this pull request? This pr modified code to print the identical `Range` parameters of SparkContext APIs and SQL in `explain` output. In the current master, they internally use `defaultParallelism` for `splits` by default though, they print different strings in explain output; ``` scala> spark.range(4).explain == Physical Plan == *Range (0, 4, step=1, splits=Some(8)) scala> sql("select * from range(4)").explain == Physical Plan == *Range (0, 4, step=1, splits=None) ``` ## How was this patch tested? Added tests in `SQLQuerySuite` and modified some results in the existing tests. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17670 from maropu/SPARK-20281.
* [SPARK-20329][SQL] Make timezone aware expression without timezone unresolvedHerman van Hovell2017-04-218-21/+42
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? A cast expression with a resolved time zone is not equal to a cast expression without a resolved time zone. The `ResolveAggregateFunction` assumed that these expression were the same, and would fail to resolve `HAVING` clauses which contain a `Cast` expression. This is in essence caused by the fact that a `TimeZoneAwareExpression` can be resolved without a set time zone. This PR fixes this, and makes a `TimeZoneAwareExpression` unresolved as long as it has no TimeZone set. ## How was this patch tested? Added a regression test to the `SQLQueryTestSuite.having` file. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17641 from hvanhovell/SPARK-20329.
* [SPARK-20367] Properly unescape column names of partitioning columns parsed ↵Juliusz Sompolski2017-04-212-1/+13
| | | | | | | | | | | | | | | | from paths. ## What changes were proposed in this pull request? When infering partitioning schema from paths, the column in parsePartitionColumn should be unescaped with unescapePathName, just like it is being done in e.g. parsePathFragmentAsSeq. ## How was this patch tested? Added a test to FileIndexSuite. Author: Juliusz Sompolski <julek@databricks.com> Closes #17703 from juliuszsompolski/SPARK-20367.
* [SPARK-20410][SQL] Make sparkConf a def in SharedSQLContextHerman van Hovell2017-04-207-43/+32
| | | | | | | | | | | | ## What changes were proposed in this pull request? It is kind of annoying that `SharedSQLContext.sparkConf` is a val when overriding test cases, because you cannot call `super` on it. This PR makes it a function. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17705 from hvanhovell/SPARK-20410.
* [SPARK-20334][SQL] Return a better error message when correlated predicates ↵Dilip Biswal2017-04-203-50/+143
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | contain aggregate expression that has mixture of outer and local references. ## What changes were proposed in this pull request? Address a follow up in [comment](https://github.com/apache/spark/pull/16954#discussion_r105718880) Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following : ```SQL SELECT t1a FROM t1 GROUP BY 1 HAVING EXISTS (SELECT 1 FROM t2 WHERE t2a < min(t1a + t2a)); ``` Exception snippet. ``` Cannot evaluate expression: min((input[0, int, false] + input[4, int, false])) at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:226) at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:106) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:103) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:103) ``` After this PR, a better error message is issued. ``` org.apache.spark.sql.AnalysisException Error in query: Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)), Outer references: t1.`t1a`, Local references: t2.`t2a`.; ``` ## How was this patch tested? Added tests in SQLQueryTestSuite. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #17636 from dilipbiswal/subquery_followup1.
* [SPARK-20407][TESTS] ParquetQuerySuite 'Enabling/disabling ↵Bogdan Raducanu2017-04-203-7/+60
| | | | | | | | | | | | | | | | ignoreCorruptFiles' flaky test ## What changes were proposed in this pull request? SharedSQLContext.afterEach now calls DebugFilesystem.assertNoOpenStreams inside eventually. SQLTestUtils withTempDir calls waitForTasksToFinish before deleting the directory. ## How was this patch tested? Added new test in ParquetQuerySuite based on the flaky test Author: Bogdan Raducanu <bogdan@databricks.com> Closes #17701 from bogdanrdc/SPARK-20407.
* [SPARK-20409][SQL] fail early if aggregate function in GROUP BYWenchen Fan2017-04-202-2/+9
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? It's illegal to have aggregate function in GROUP BY, and we should fail at analysis phase, if this happens. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17704 from cloud-fan/minor.
* [SPARK-20405][SQL] Dataset.withNewExecutionId should be privateReynold Xin2017-04-201-1/+1
| | | | | | | | | | | | ## What changes were proposed in this pull request? Dataset.withNewExecutionId is only used in Dataset itself and should be private. ## How was this patch tested? N/A - this is a simple visibility change. Author: Reynold Xin <rxin@databricks.com> Closes #17699 from rxin/SPARK-20405.
* [SPARK-20156][SQL][FOLLOW-UP] Java String toLowerCase "Turkish locale bug" ↵Xiao Li2017-04-203-2/+49
| | | | | | | | | | | | | | | | in Database and Table DDLs ### What changes were proposed in this pull request? Database and Table names conform the Hive standard ("[a-zA-z_0-9]+"), i.e. if this name only contains characters, numbers, and _. When calling `toLowerCase` on the names, we should add `Locale.ROOT` to the `toLowerCase`for avoiding inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem"). ### How was this patch tested? Added a test case Author: Xiao Li <gatorsmile@gmail.com> Closes #17655 from gatorsmile/locale.
* [SPARK-20398][SQL] range() operator should include cancellation reason when ↵Eric Liang2017-04-191-3/+1
| | | | | | | | | | | | | | | | | killed ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-19820 adds a reason field for why tasks were killed. However, for backwards compatibility it left the old TaskKilledException constructor which defaults to "unknown reason". The range() operator should use the constructor that fills in the reason rather than dropping it on task kill. ## How was this patch tested? Existing tests, and I tested this manually. Author: Eric Liang <ekl@databricks.com> Closes #17692 from ericl/fix-kill-reason-in-range.
* [SPARK-20356][SQL] Pruned InMemoryTableScanExec should have correct output ↵Liang-Chi Hsieh2017-04-192-1/+18
| | | | | | | | | | | | | | | | | | partitioning and ordering ## What changes were proposed in this pull request? The output of `InMemoryTableScanExec` can be pruned and mismatch with `InMemoryRelation` and its child plan's output. This causes wrong output partitioning and ordering. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17679 from viirya/SPARK-20356.
* [SPARK-20359][SQL] Avoid unnecessary execution in EliminateOuterJoin ↵Koert Kuipers2017-04-191-0/+10
| | | | | | | | | | | | | | | | | | | optimization that can lead to NPE Avoid necessary execution that can lead to NPE in EliminateOuterJoin and add test in DataFrameSuite to confirm NPE is no longer thrown ## What changes were proposed in this pull request? Change leftHasNonNullPredicate and rightHasNonNullPredicate to lazy so they are only executed when needed. ## How was this patch tested? Added test in DataFrameSuite that failed before this fix and now succeeds. Note that a test in catalyst project would be better but i am unsure how to do this. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Koert Kuipers <koert@tresata.com> Closes #17660 from koertkuipers/feat-catch-npe-in-eliminate-outer-join.
* [SPARK-20349][SQL] ListFunctions returns duplicate functions after using ↵Xiao Li2017-04-171-3/+1
| | | | | | | | | | | | | | | | persistent functions ### What changes were proposed in this pull request? The session catalog caches some persistent functions in the `FunctionRegistry`, so there can be duplicates. Our Catalog API `listFunctions` does not handle it. It would be better if `SessionCatalog` API can de-duplciate the records, instead of doing it by each API caller. In `FunctionRegistry`, our functions are identified by the unquoted string. Thus, this PR is try to parse it using our parser interface and then de-duplicate the names. ### How was this patch tested? Added test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #17646 from gatorsmile/showFunctions.
* [SPARK-20318][SQL] Use Catalyst type for min/max in ColumnStat for ease of ↵wangzhenhua2017-04-142-12/+15
| | | | | | | | | | | | | | | | | | | | | estimation ## What changes were proposed in this pull request? Currently when estimating predicates like col > literal or col = literal, we will update min or max in column stats based on literal value. However, literal value is of Catalyst type (internal type), while min/max is of external type. Then for the next predicate, we again need to do type conversion to compare and update column stats. This is awkward and causes many unnecessary conversions in estimation. To solve this, we use Catalyst type for min/max in `ColumnStat`. Note that the persistent format in metastore is still of external type, so there's no inconsistency for statistics in metastore. This pr also fixes a bug for boolean type in `IN` condition. ## How was this patch tested? The changes for ColumnStat are covered by existing tests. For bug fix, a new test for boolean type in IN condition is added Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17630 from wzhfy/refactorColumnStat.
* [SPARK-20038][SQL] FileFormatWriter.ExecuteWriteTask.releaseResources() ↵Steve Loughran2017-04-131-4/+10
| | | | | | | | | | | | | | | | | | implementations to be re-entrant ## What changes were proposed in this pull request? have the`FileFormatWriter.ExecuteWriteTask.releaseResources()` implementations set `currentWriter=null` in a finally clause. This guarantees that if the first call to `currentWriter()` throws an exception, the second releaseResources() call made during the task cancel process will not trigger a second attempt to close the stream. ## How was this patch tested? Tricky. I've been fixing the underlying cause when I saw the problem [HADOOP-14204](https://issues.apache.org/jira/browse/HADOOP-14204), but SPARK-10109 shows I'm not the first to have seen this. I can't replicate it locally any more, my code no longer being broken. code review, however, should be straightforward Author: Steve Loughran <stevel@hortonworks.com> Closes #17364 from steveloughran/stevel/SPARK-20038-close.
* [SPARK-20301][FLAKY-TEST] Fix Hadoop Shell.runCommand flakiness in ↵Burak Yavuz2017-04-122-30/+32
| | | | | | | | | | | | | | | | | | | | | | Structured Streaming tests ## What changes were proposed in this pull request? Some Structured Streaming tests show flakiness such as: ``` [info] - prune results by current_date, complete mode - 696 *** FAILED *** (10 seconds, 937 milliseconds) [info] Timed out while stopping and waiting for microbatchthread to terminate.: The code passed to failAfter did not complete within 10 seconds. ``` This happens when we wait for the stream to stop, but it doesn't. The reason it doesn't stop is that we interrupt the microBatchThread, but Hadoop's `Shell.runCommand` swallows the interrupt exception, and the exception is not propagated upstream to the microBatchThread. Then this thread continues to run, only to start blocking on the `streamManualClock`. ## How was this patch tested? Thousand retries locally and [Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75720/testReport) of the flaky tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #17613 from brkyvz/flaky-stream-agg.
* [SPARK-20303][SQL] Rename createTempFunction to registerFunctionXiao Li2017-04-122-8/+6
| | | | | | | | | | | | | | ### What changes were proposed in this pull request? Session catalog API `createTempFunction` is being used by Hive build-in functions, persistent functions, and temporary functions. Thus, the name is confusing. This PR is to rename it by `registerFunction`. Also we can move construction of `FunctionBuilder` and `ExpressionInfo` into the new `registerFunction`, instead of duplicating the logics everywhere. In the next PRs, the remaining Function-related APIs also need cleanups. ### How was this patch tested? Existing test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #17615 from gatorsmile/cleanupCreateTempFunction.
* [SPARK-18692][BUILD][DOCS] Test Java 8 unidoc build on Jenkinshyukjinkwon2017-04-127-19/+27
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to run Spark unidoc to test Javadoc 8 build as Javadoc 8 is easily re-breakable. There are several problems with it: - It introduces little extra bit of time to run the tests. In my case, it took 1.5 mins more (`Elapsed :[94.8746569157]`). How it was tested is described in "How was this patch tested?". - > One problem that I noticed was that Unidoc appeared to be processing test sources: if we can find a way to exclude those from being processed in the first place then that might significantly speed things up. (see joshrosen's [comment](https://issues.apache.org/jira/browse/SPARK-18692?focusedCommentId=15947627&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15947627)) To complete this automated build, It also suggests to fix existing Javadoc breaks / ones introduced by test codes as described above. There fixes are similar instances that previously fixed. Please refer https://github.com/apache/spark/pull/15999 and https://github.com/apache/spark/pull/16013 Note that this only fixes **errors** not **warnings**. Please see my observation https://github.com/apache/spark/pull/17389#issuecomment-288438704 for spurious errors by warnings. ## How was this patch tested? Manually via `jekyll build` for building tests. Also, tested via running `./dev/run-tests`. This was tested via manually adding `time.time()` as below: ```diff profiles_and_goals = build_profiles + sbt_goals print("[info] Building Spark unidoc (w/Hive 1.2.1) using SBT with these arguments: ", " ".join(profiles_and_goals)) + import time + st = time.time() exec_sbt(profiles_and_goals) + print("Elapsed :[%s]" % str(time.time() - st)) ``` produces ``` ... ======================================================================== Building Unidoc API Documentation ======================================================================== ... [info] Main Java API documentation successful. ... Elapsed :[94.8746569157] ... Author: hyukjinkwon <gurwls223@gmail.com> Closes #17477 from HyukjinKwon/SPARK-18692.
* [MINOR][DOCS] JSON APIs related documentation fixeshyukjinkwon2017-04-121-2/+2
| | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes corrections related to JSON APIs as below: - Rendering links in Python documentation - Replacing `RDD` to `Dataset` in programing guide - Adding missing description about JSON Lines consistently in `DataFrameReader.json` in Python API - De-duplicating little bit of `DataFrameReader.json` in Scala/Java API ## How was this patch tested? Manually build the documentation via `jekyll build`. Corresponding snapstops will be left on the codes. Note that currently there are Javadoc8 breaks in several places. These are proposed to be handled in https://github.com/apache/spark/pull/17477. So, this PR does not fix those. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17602 from HyukjinKwon/minor-json-documentation.
* [SPARK-19993][SQL] Caching logical plans containing subquery expressions ↵Dilip Biswal2017-04-122-4/+146
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | does not work. ## What changes were proposed in this pull request? The sameResult() method does not work when the logical plan contains subquery expressions. **Before the fix** ```SQL scala> val ds = spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)") ds: org.apache.spark.sql.DataFrame = [c1: int] scala> ds.cache res13: ds.type = [c1: int] scala> spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)").explain(true) == Analyzed Logical Plan == c1: int Project [c1#86] +- Filter c1#86 IN (list#78 [c1#86]) : +- Project [c1#87] : +- Filter (outer(c1#86) = c1#87) : +- SubqueryAlias s2 : +- Relation[c1#87] parquet +- SubqueryAlias s1 +- Relation[c1#86] parquet == Optimized Logical Plan == Join LeftSemi, ((c1#86 = c1#87) && (c1#86 = c1#87)) :- Relation[c1#86] parquet +- Relation[c1#87] parquet ``` **Plan after fix** ```SQL == Analyzed Logical Plan == c1: int Project [c1#22] +- Filter c1#22 IN (list#14 [c1#22]) : +- Project [c1#23] : +- Filter (outer(c1#22) = c1#23) : +- SubqueryAlias s2 : +- Relation[c1#23] parquet +- SubqueryAlias s1 +- Relation[c1#22] parquet == Optimized Logical Plan == InMemoryRelation [c1#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *BroadcastHashJoin [c1#1, c1#1], [c1#2, c1#2], LeftSemi, BuildRight :- *FileScan parquet default.s1[c1#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int> +- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, true] as bigint), 32) | (cast(input[0, int, true] as bigint) & 4294967295)))) +- *FileScan parquet default.s2[c1#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int> ``` ## How was this patch tested? New tests are added to CachedTableSuite. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #17330 from dilipbiswal/subquery_cache_final.
* [SPARK-20291][SQL] NaNvl(FloatType, NullType) should not be cast to ↵DB Tsai2017-04-121-2/+1
| | | | | | | | | | | | | | | | | | | | | | NaNvl(DoubleType, DoubleType) ## What changes were proposed in this pull request? `NaNvl(float value, null)` will be converted into `NaNvl(float value, Cast(null, DoubleType))` and finally `NaNvl(Cast(float value, DoubleType), Cast(null, DoubleType))`. This will cause mismatching in the output type when the input type is float. By adding extra rule in TypeCoercion can resolve this issue. ## How was this patch tested? unite tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: DB Tsai <dbt@netflix.com> Closes #17606 from dbtsai/fixNaNvl.
* [SPARK-20175][SQL] Exists should not be evaluated in Join operatorLiang-Chi Hsieh2017-04-111-0/+10
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Similar to `ListQuery`, `Exists` should not be evaluated in `Join` operator too. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17491 from viirya/dont-push-exists-to-join.
* [SPARK-20283][SQL] Add preOptimizationBatchesReynold Xin2017-04-101-1/+7
| | | | | | | | | | | | ## What changes were proposed in this pull request? We currently have postHocOptimizationBatches, but not preOptimizationBatches. This patch adds preOptimizationBatches so the optimizer debugging extensions are symmetric. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #17595 from rxin/SPARK-20283.
* [SPARK-20282][SS][TESTS] Write the commit log first to fix a race contion in ↵Shixiong Zhu2017-04-101-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | tests ## What changes were proposed in this pull request? This PR fixes the following failure: ``` sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: Assert on query failed: == Progress == AssertOnQuery(<condition>, ) StopStream AddData to MemoryStream[value#30891]: 1,2 StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock35cdc93a,Map()) CheckAnswer: [6],[3] StopStream => AssertOnQuery(<condition>, ) AssertOnQuery(<condition>, ) StartStream(OneTimeTrigger,org.apache.spark.util.SystemClockcdb247d,Map()) CheckAnswer: [6],[3] StopStream AddData to MemoryStream[value#30891]: 3 StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock55394e4d,Map()) CheckLastBatch: [2] StopStream AddData to MemoryStream[value#30891]: 0 StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock749aa997,Map()) ExpectFailure[org.apache.spark.SparkException, isFatalError: false] AssertOnQuery(<condition>, ) AssertOnQuery(<condition>, incorrect start offset or end offset on exception) == Stream == Output Mode: Append Stream state: not started Thread state: dead == Sink == 0: [6] [3] == Plan == at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:495) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555) at org.scalatest.Assertions$class.fail(Assertions.scala:1328) at org.scalatest.FunSuite.fail(FunSuite.scala:1555) at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:347) at org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:318) at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:483) at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:357) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:357) at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:356) at org.apache.spark.sql.streaming.StreamingQuerySuite.testStream(StreamingQuerySuite.scala:41) at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply$mcV$sp(StreamingQuerySuite.scala:166) at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161) at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161) at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply$mcV$sp(SQLTestUtils.scala:268) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfterEach$$super$runTest(StreamingQuerySuite.scala:41) at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255) at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$runTest(StreamingQuerySuite.scala:41) at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200) at org.apache.spark.sql.streaming.StreamingQuerySuite.runTest(StreamingQuerySuite.scala:41) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:381) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$run(StreamingQuerySuite.scala:41) at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241) at org.apache.spark.sql.streaming.StreamingQuerySuite.run(StreamingQuerySuite.scala:41) at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:357) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:502) at sbt.ForkMain$Run$2.call(ForkMain.java:296) at sbt.ForkMain$Run$2.call(ForkMain.java:286) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` The failure is because `CheckAnswer` will run once `committedOffsets` is updated. Then writing the commit log may be interrupted by the following `StopStream`. This PR just change the order to write the commit log first. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #17594 from zsxwing/SPARK-20282.
* [SPARK-20280][CORE] FileStatusCache Weigher integer overflowBogdan Raducanu2017-04-102-13/+50
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor. ## How was this patch tested? New test in FileIndexSuite Author: Bogdan Raducanu <bogdan@databricks.com> Closes #17591 from bogdanrdc/SPARK-20280.
* [SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish ↵Sean Owen2017-04-1035-86/+134
| | | | | | | | | | | | | | | | | | locale bug" causes Spark problems ## What changes were proposed in this pull request? Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem"). The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods. ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #17527 from srowen/SPARK-20156.
* [SPARK-20229][SQL] add semanticHash to QueryPlanWenchen Fan2017-04-109-78/+39
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Like `Expression`, `QueryPlan` should also have a `semanticHash` method, then we can put plans to a hash map and look it up fast. This PR refactors `QueryPlan` to follow `Expression` and put all the normalization logic in `QueryPlan.canonicalized`, so that it's very natural to implement `semanticHash`. follow-up: improve `CacheManager` to leverage this `semanticHash` and speed up plan lookup, instead of iterating all cached plans. ## How was this patch tested? existing tests. Note that we don't need to test the `semanticHash` method, once the existing tests prove `sameResult` is correct, we are good. Author: Wenchen Fan <wenchen@databricks.com> Closes #17541 from cloud-fan/plan-semantic.
* [SPARK-20270][SQL] na.fill should not change the values in long or integer ↵DB Tsai2017-04-102-2/+17
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | when the default value is in double ## What changes were proposed in this pull request? This bug was partially addressed in SPARK-18555 https://github.com/apache/spark/pull/15994, but the root cause isn't completely solved. This bug is pretty critical since it changes the member id in Long in our application if the member id can not be represented by Double losslessly when the member id is very big. Here is an example how this happens, with ``` Seq[(java.lang.Long, java.lang.Double)]((null, 3.14), (9123146099426677101L, null), (9123146560113991650L, 1.6), (null, null)).toDF("a", "b").na.fill(0.2), ``` the logical plan will be ``` == Analyzed Logical Plan == a: bigint, b: double Project [cast(coalesce(cast(a#232L as double), cast(0.2 as double)) as bigint) AS a#240L, cast(coalesce(nanvl(b#233, cast(null as double)), 0.2) as double) AS b#241] +- Project [_1#229L AS a#232L, _2#230 AS b#233] +- LocalRelation [_1#229L, _2#230] ``` Note that even the value is not null, Spark will cast the Long into Double first. Then if it's not null, Spark will cast it back to Long which results in losing precision. The behavior should be that the original value should not be changed if it's not null, but Spark will change the value which is wrong. With the PR, the logical plan will be ``` == Analyzed Logical Plan == a: bigint, b: double Project [coalesce(a#232L, cast(0.2 as bigint)) AS a#240L, coalesce(nanvl(b#233, cast(null as double)), cast(0.2 as double)) AS b#241] +- Project [_1#229L AS a#232L, _2#230 AS b#233] +- LocalRelation [_1#229L, _2#230] ``` which behaves correctly without changing the original Long values and also avoids extra cost of unnecessary casting. ## How was this patch tested? unit test added. +cc srowen rxin cloud-fan gatorsmile Thanks. Author: DB Tsai <dbt@netflix.com> Closes #17577 from dbtsai/fixnafill.
* [SPARK-20264][SQL] asm should be non-test dependency in sql/coreReynold Xin2017-04-091-5/+4
| | | | | | | | | | | | ## What changes were proposed in this pull request? sq/core module currently declares asm as a test scope dependency. Transitively it should actually be a normal dependency since the actual core module defines it. This occasionally confuses IntelliJ. ## How was this patch tested? N/A - This is a build change. Author: Reynold Xin <rxin@databricks.com> Closes #17574 from rxin/SPARK-20264.
* [SPARK-20253][SQL] Remove unnecessary nullchecks of a return value from ↵Kazuaki Ishizaki2017-04-101-0/+10
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Spark runtime routines in generated Java code ## What changes were proposed in this pull request? This PR elminates unnecessary nullchecks of a return value from known Spark runtime routines. We know whether a given Spark runtime routine returns ``null`` or not (e.g. ``ArrayData.toDoubleArray()`` never returns ``null``). Thus, we can eliminate a null check for the return value from the Spark runtime routine. When we run the following example program, now we get the Java code "Without this PR". In this code, since we know ``ArrayData.toDoubleArray()`` never returns ``null```, we can eliminate null checks at lines 90-92, and 97. ```java val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache ds.count ds.map(e => e).show ``` Without this PR ```java /* 050 */ protected void processNext() throws java.io.IOException { /* 051 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 052 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 053 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 054 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0)); /* 055 */ /* 056 */ ArrayData deserializetoobject_value1 = null; /* 057 */ /* 058 */ if (!inputadapter_isNull) { /* 059 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 060 */ /* 061 */ Double[] deserializetoobject_convertedArray = null; /* 062 */ deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength]; /* 063 */ /* 064 */ int deserializetoobject_loopIndex = 0; /* 065 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 066 */ MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex)); /* 067 */ MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 068 */ /* 069 */ if (MapObjects_loopIsNull2) { /* 070 */ throw new RuntimeException(((java.lang.String) references[0])); /* 071 */ } /* 072 */ if (false) { /* 073 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 074 */ } else { /* 075 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2; /* 076 */ } /* 077 */ /* 078 */ deserializetoobject_loopIndex += 1; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/ /* 082 */ } /* 083 */ boolean deserializetoobject_isNull = true; /* 084 */ double[] deserializetoobject_value = null; /* 085 */ if (!inputadapter_isNull) { /* 086 */ deserializetoobject_isNull = false; /* 087 */ if (!deserializetoobject_isNull) { /* 088 */ Object deserializetoobject_funcResult = null; /* 089 */ deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray(); /* 090 */ if (deserializetoobject_funcResult == null) { /* 091 */ deserializetoobject_isNull = true; /* 092 */ } else { /* 093 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult; /* 094 */ } /* 095 */ /* 096 */ } /* 097 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 098 */ } /* 099 */ /* 100 */ boolean mapelements_isNull = true; /* 101 */ double[] mapelements_value = null; /* 102 */ if (!false) { /* 103 */ mapelements_resultIsNull = false; /* 104 */ /* 105 */ if (!mapelements_resultIsNull) { /* 106 */ mapelements_resultIsNull = deserializetoobject_isNull; /* 107 */ mapelements_argValue = deserializetoobject_value; /* 108 */ } /* 109 */ /* 110 */ mapelements_isNull = mapelements_resultIsNull; /* 111 */ if (!mapelements_isNull) { /* 112 */ Object mapelements_funcResult = null; /* 113 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 114 */ if (mapelements_funcResult == null) { /* 115 */ mapelements_isNull = true; /* 116 */ } else { /* 117 */ mapelements_value = (double[]) mapelements_funcResult; /* 118 */ } /* 119 */ /* 120 */ } /* 121 */ mapelements_isNull = mapelements_value == null; /* 122 */ } /* 123 */ /* 124 */ serializefromobject_resultIsNull = false; /* 125 */ /* 126 */ if (!serializefromobject_resultIsNull) { /* 127 */ serializefromobject_resultIsNull = mapelements_isNull; /* 128 */ serializefromobject_argValue = mapelements_value; /* 129 */ } /* 130 */ /* 131 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull; /* 132 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue); /* 133 */ serializefromobject_isNull = serializefromobject_value == null; /* 134 */ serializefromobject_holder.reset(); /* 135 */ /* 136 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 137 */ /* 138 */ if (serializefromobject_isNull) { /* 139 */ serializefromobject_rowWriter.setNullAt(0); /* 140 */ } else { /* 141 */ // Remember the current cursor so that we can calculate how many bytes are /* 142 */ // written later. /* 143 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 144 */ /* 145 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 146 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 147 */ // grow the global buffer before writing data. /* 148 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 149 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 150 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 151 */ /* 152 */ } else { /* 153 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 154 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8); /* 155 */ /* 156 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 157 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 158 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index); /* 159 */ } else { /* 160 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index); /* 161 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 162 */ } /* 163 */ } /* 164 */ } /* 165 */ /* 166 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 167 */ } /* 168 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 169 */ append(serializefromobject_result); /* 170 */ if (shouldStop()) return; /* 171 */ } /* 172 */ } ``` With this PR (removed most of lines 90-97 in the above code) ```java /* 050 */ protected void processNext() throws java.io.IOException { /* 051 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 052 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 053 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 054 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0)); /* 055 */ /* 056 */ ArrayData deserializetoobject_value1 = null; /* 057 */ /* 058 */ if (!inputadapter_isNull) { /* 059 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 060 */ /* 061 */ Double[] deserializetoobject_convertedArray = null; /* 062 */ deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength]; /* 063 */ /* 064 */ int deserializetoobject_loopIndex = 0; /* 065 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 066 */ MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex)); /* 067 */ MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 068 */ /* 069 */ if (MapObjects_loopIsNull2) { /* 070 */ throw new RuntimeException(((java.lang.String) references[0])); /* 071 */ } /* 072 */ if (false) { /* 073 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 074 */ } else { /* 075 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2; /* 076 */ } /* 077 */ /* 078 */ deserializetoobject_loopIndex += 1; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/ /* 082 */ } /* 083 */ boolean deserializetoobject_isNull = true; /* 084 */ double[] deserializetoobject_value = null; /* 085 */ if (!inputadapter_isNull) { /* 086 */ deserializetoobject_isNull = false; /* 087 */ if (!deserializetoobject_isNull) { /* 088 */ Object deserializetoobject_funcResult = null; /* 089 */ deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray(); /* 090 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult; /* 091 */ /* 092 */ } /* 093 */ /* 094 */ } /* 095 */ /* 096 */ boolean mapelements_isNull = true; /* 097 */ double[] mapelements_value = null; /* 098 */ if (!false) { /* 099 */ mapelements_resultIsNull = false; /* 100 */ /* 101 */ if (!mapelements_resultIsNull) { /* 102 */ mapelements_resultIsNull = deserializetoobject_isNull; /* 103 */ mapelements_argValue = deserializetoobject_value; /* 104 */ } /* 105 */ /* 106 */ mapelements_isNull = mapelements_resultIsNull; /* 107 */ if (!mapelements_isNull) { /* 108 */ Object mapelements_funcResult = null; /* 109 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 110 */ if (mapelements_funcResult == null) { /* 111 */ mapelements_isNull = true; /* 112 */ } else { /* 113 */ mapelements_value = (double[]) mapelements_funcResult; /* 114 */ } /* 115 */ /* 116 */ } /* 117 */ mapelements_isNull = mapelements_value == null; /* 118 */ } /* 119 */ /* 120 */ serializefromobject_resultIsNull = false; /* 121 */ /* 122 */ if (!serializefromobject_resultIsNull) { /* 123 */ serializefromobject_resultIsNull = mapelements_isNull; /* 124 */ serializefromobject_argValue = mapelements_value; /* 125 */ } /* 126 */ /* 127 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull; /* 128 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue); /* 129 */ serializefromobject_isNull = serializefromobject_value == null; /* 130 */ serializefromobject_holder.reset(); /* 131 */ /* 132 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 133 */ /* 134 */ if (serializefromobject_isNull) { /* 135 */ serializefromobject_rowWriter.setNullAt(0); /* 136 */ } else { /* 137 */ // Remember the current cursor so that we can calculate how many bytes are /* 138 */ // written later. /* 139 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 140 */ /* 141 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 142 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 143 */ // grow the global buffer before writing data. /* 144 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 145 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 146 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 147 */ /* 148 */ } else { /* 149 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 150 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8); /* 151 */ /* 152 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 153 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 154 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index); /* 155 */ } else { /* 156 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index); /* 157 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 158 */ } /* 159 */ } /* 160 */ } /* 161 */ /* 162 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 163 */ } /* 164 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 165 */ append(serializefromobject_result); /* 166 */ if (shouldStop()) return; /* 167 */ } /* 168 */ } ``` ## How was this patch tested? Add test suites to ``DatasetPrimitiveSuite`` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17569 from kiszk/SPARK-20253.
* [SPARK-20255] Move listLeafFiles() to InMemoryFileIndexAdrian Ionescu2017-04-073-231/+236
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request Trying to get a grip on the `FileIndex` hierarchy, I was confused by the following inconsistency: On the one hand, `PartitioningAwareFileIndex` defines `leafFiles` and `leafDirToChildrenFiles` as abstract, but on the other it fully implements `listLeafFiles` which does all the listing of files. However, the latter is only used by `InMemoryFileIndex`. I'm hereby proposing to move this method (and all its dependencies) to the implementation class that actually uses it, and thus unclutter the `PartitioningAwareFileIndex` interface. ## How was this patch tested? `./build/sbt sql/test` Author: Adrian Ionescu <adrian@databricks.com> Closes #17570 from adrian-ionescu/list-leaf-files.
* [SPARK-20245][SQL][MINOR] pass output to LogicalRelation directlyWenchen Fan2017-04-074-45/+32
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently `LogicalRelation` has a `expectedOutputAttributes` parameter, which makes it hard to reason about what the actual output is. Like other leaf nodes, `LogicalRelation` should also take `output` as a parameter, to simplify the logic ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17552 from cloud-fan/minor.
* [SPARK-20196][PYTHON][SQL] update doc for catalog functions for all ↵Felix Cheung2017-04-062-16/+23
| | | | | | | | | | | | | | | | languages, add pyspark refreshByPath API ## What changes were proposed in this pull request? Update doc to remove external for createTable, add refreshByPath in python ## How was this patch tested? manual Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #17512 from felixcheung/catalogdoc.
* [SPARK-20224][SS] Updated docs for streaming dropDuplicates and ↵Tathagata Das2017-04-052-3/+14
| | | | | | | | | | | | | | | | | | mapGroupsWithState ## What changes were proposed in this pull request? - Fixed bug in Java API not passing timeout conf to scala API - Updated markdown docs - Updated scala docs - Added scala and Java example ## How was this patch tested? Manually ran examples. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17539 from tdas/SPARK-20224.
* [SPARK-20223][SQL] Fix typo in tpcds q77.sqlwangzhenhua2017-04-051-1/+1
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fix typo in tpcds q77.sql ## How was this patch tested? N/A Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17538 from wzhfy/typoQ77.
* [SPARK-20209][SS] Execute next trigger immediately if previous batch took ↵Tathagata Das2017-04-0410-72/+193
| | | | | | | | | | | | | | | | | longer than trigger interval ## What changes were proposed in this pull request? For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval. In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock. ## How was this patch tested? Added new unit tests to comprehensively test this behavior. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17525 from tdas/SPARK-20209.
* Small doc fix for ReuseSubquery.Reynold Xin2017-04-041-2/+2
|
* [SPARK-19716][SQL] support by-name resolution for struct type elements in arrayWenchen Fan2017-04-042-1/+10
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Previously when we construct deserializer expression for array type, we will first cast the corresponding field to expected array type and then apply `MapObjects`. However, by doing that, we lose the opportunity to do by-name resolution for struct type inside array type. In this PR, I introduce a `UnresolvedMapObjects` to hold the lambda function and the input array expression. Then during analysis, after the input array expression is resolved, we get the actual array element type and apply by-name resolution. Then we don't need to add `Cast` for array type when constructing the deserializer expression, as the element type is determined later at analyzer. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17398 from cloud-fan/dataset.
* [SPARK-20204][SQL] remove SimpleCatalystConf and CatalystConf type aliasWenchen Fan2017-04-046-16/+16
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/17285 . ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17521 from cloud-fan/conf.
* [SPARK-20198][SQL] Remove the inconsistency in table/function name ↵Xiao Li2017-04-044-55/+155
| | | | | | | | | | | | | | | | | | | | | conventions in SparkSession.Catalog APIs ### What changes were proposed in this pull request? Observed by felixcheung , in `SparkSession`.`Catalog` APIs, we have different conventions/rules for table/function identifiers/names. Most APIs accept the qualified name (i.e., `databaseName`.`tableName` or `databaseName`.`functionName`). However, the following five APIs do not accept it. - def listColumns(tableName: String): Dataset[Column] - def getTable(tableName: String): Table - def getFunction(functionName: String): Function - def tableExists(tableName: String): Boolean - def functionExists(functionName: String): Boolean To make them consistent with the other Catalog APIs, this PR does the changes, updates the function/API comments and adds the `params` to clarify the inputs we allow. ### How was this patch tested? Added the test cases . Author: Xiao Li <gatorsmile@gmail.com> Closes #17518 from gatorsmile/tableIdentifier.
* [SPARK-20067][SQL] Unify and Clean Up Desc Commands Using Catalog InterfaceXiao Li2017-04-0312-375/+523
| | | | | | | | | | | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? This PR is to unify and clean up the outputs of `DESC EXTENDED/FORMATTED` and `SHOW TABLE EXTENDED` by moving the logics into the Catalog interface. The output formats are improved. We also add the missing attributes. It impacts the DDL commands like `SHOW TABLE EXTENDED`, `DESC EXTENDED` and `DESC FORMATTED`. In addition, by following what we did in Dataset API `printSchema`, we can use `treeString` to show the schema in the more readable way. Below is the current way: ``` Schema: STRUCT<`a`: STRING (nullable = true), `b`: INT (nullable = true), `c`: STRING (nullable = true), `d`: STRING (nullable = true)> ``` After the change, it should look like ``` Schema: root |-- a: string (nullable = true) |-- b: integer (nullable = true) |-- c: string (nullable = true) |-- d: string (nullable = true) ``` ### How was this patch tested? `describe.sql` and `show-tables.sql` Author: Xiao Li <gatorsmile@gmail.com> Closes #17394 from gatorsmile/descFollowUp.
* [SPARK-10364][SQL] Support Parquet logical type TIMESTAMP_MILLISDilip Biswal2017-04-0410-20/+193
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? **Description** from JIRA The TimestampType in Spark SQL is of microsecond precision. Ideally, we should convert Spark SQL timestamp values into Parquet TIMESTAMP_MICROS. But unfortunately parquet-mr hasn't supported it yet. For the read path, we should be able to read TIMESTAMP_MILLIS Parquet values and pad a 0 microsecond part to read values. For the write path, currently we are writing timestamps as INT96, similar to Impala and Hive. One alternative is that, we can have a separate SQL option to let users be able to write Spark SQL timestamp values as TIMESTAMP_MILLIS. Of course, in this way the microsecond part will be truncated. ## How was this patch tested? Added new tests in ParquetQuerySuite and ParquetIOSuite Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #15332 from dilipbiswal/parquet-time-millis.
* [SPARK-20145] Fix range case insensitive bug in SQLsamelamin2017-04-032-1/+25
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Range in SQL should be case insensitive ## How was this patch tested? unit test Author: samelamin <hussam.elamin@gmail.com> Author: samelamin <sam_elamin@discovery.com> Closes #17487 from samelamin/SPARK-20145.
* [SPARK-20194] Add support for partition pruning to in-memory catalogAdrian Ionescu2017-04-031-4/+1
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch implements `listPartitionsByFilter()` for `InMemoryCatalog` and thus resolves an outstanding TODO causing the `PruneFileSourcePartitions` optimizer rule not to apply when "spark.sql.catalogImplementation" is set to "in-memory" (which is the default). The change is straightforward: it extracts the code for further filtering of the list of partitions returned by the metastore's `getPartitionsByFilter()` out from `HiveExternalCatalog` into `ExternalCatalogUtils` and calls this new function from `InMemoryCatalog` on the whole list of partitions. Now that this method is implemented we can always pass the `CatalogTable` to the `DataSource` in `FindDataSourceTable`, so that the latter is resolved to a relation with a `CatalogFileIndex`, which is what the `PruneFileSourcePartitions` rule matches for. ## How was this patch tested? Ran existing tests and added new test for `listPartitionsByFilter` in `ExternalCatalogSuite`, which is subclassed by both `InMemoryCatalogSuite` and `HiveExternalCatalogSuite`. Author: Adrian Ionescu <adrian@databricks.com> Closes #17510 from adrian-ionescu/InMemoryCatalog.
* [SPARK-19641][SQL] JSON schema inference in DROPMALFORMED mode produces ↵hyukjinkwon2017-04-032-33/+78
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | incorrect schema for non-array/object JSONs ## What changes were proposed in this pull request? Currently, when we infer the types for vaild JSON strings but object or array, we are producing empty schemas regardless of parse modes as below: ```scala scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root ``` ```scala scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root ``` This PR proposes to handle parse modes in type inference. After this PR, ```scala scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root |-- a: long (nullable = true) ``` ``` scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() java.lang.RuntimeException: Failed to infer a common schema. Struct types are expected but string was found. ``` This PR is based on https://github.com/NathanHowell/spark/commit/e233fd03346a73b3b447fa4c24f3b12c8b2e53ae and I and NathanHowell talked about this in https://issues.apache.org/jira/browse/SPARK-19641 ## How was this patch tested? Unit tests in `JsonSuite` for both `DROPMALFORMED` and `FAILFAST` modes. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17492 from HyukjinKwon/SPARK-19641.
* [SPARK-20166][SQL] Use XXX for ISO 8601 timezone instead of ZZ ↵hyukjinkwon2017-04-035-8/+8
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | (FastDateFormat specific) in CSV/JSON timeformat options ## What changes were proposed in this pull request? This PR proposes to use `XXX` format instead of `ZZ`. `ZZ` seems a `FastDateFormat` specific. `ZZ` supports "ISO 8601 extended format time zones" but it seems `FastDateFormat` specific option. I misunderstood this is compatible format with `SimpleDateFormat` when this change is introduced. Please see [SimpleDateFormat documentation]( https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html#iso8601timezone) and [FastDateFormat documentation](https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/time/FastDateFormat.html). It seems we better replace `ZZ` to `XXX` because they look using the same strategy - [FastDateParser.java#L930](https://github.com/apache/commons-lang/blob/8767cd4f1a6af07093c1e6c422dae8e574be7e5e/src/main/java/org/apache/commons/lang3/time/FastDateParser.java#L930), [FastDateParser.java#L932-L951 ](https://github.com/apache/commons-lang/blob/8767cd4f1a6af07093c1e6c422dae8e574be7e5e/src/main/java/org/apache/commons/lang3/time/FastDateParser.java#L932-L951) and [FastDateParser.java#L596-L601](https://github.com/apache/commons-lang/blob/8767cd4f1a6af07093c1e6c422dae8e574be7e5e/src/main/java/org/apache/commons/lang3/time/FastDateParser.java#L596-L601). I also checked the codes and manually debugged it for sure. It seems both cases use the same pattern `( Z|(?:[+-]\\d{2}(?::)\\d{2}))`. _Note that this should be rather a fix about documentation and not the behaviour change because `ZZ` seems invalid date format in `SimpleDateFormat` as documented in `DataFrameReader` and etc, and both `ZZ` and `XXX` look identically working with `FastDateFormat`_ Current documentation is as below: ``` * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> ``` ## How was this patch tested? Existing tests should cover this. Also, manually tested as below (BTW, I don't think these are worth being added as tests within Spark): **Parse** ```scala scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00") res4: java.util.Date = Tue Mar 21 20:00:00 KST 2017 scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000Z") res10: java.util.Date = Tue Mar 21 09:00:00 KST 2017 scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000-11:00") java.text.ParseException: Unparseable date: "2017-03-21T00:00:00.000-11:00" at java.text.DateFormat.parse(DateFormat.java:366) ... 48 elided scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000Z") java.text.ParseException: Unparseable date: "2017-03-21T00:00:00.000Z" at java.text.DateFormat.parse(DateFormat.java:366) ... 48 elided ``` ```scala scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00") res7: java.util.Date = Tue Mar 21 20:00:00 KST 2017 scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000Z") res1: java.util.Date = Tue Mar 21 09:00:00 KST 2017 scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000-11:00") res8: java.util.Date = Tue Mar 21 20:00:00 KST 2017 scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000Z") res2: java.util.Date = Tue Mar 21 09:00:00 KST 2017 ``` **Format** ```scala scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00")) res6: String = 2017-03-21T20:00:00.000+09:00 ``` ```scala scala> val fd = org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ") fd: org.apache.commons.lang3.time.FastDateFormat = FastDateFormat[yyyy-MM-dd'T'HH:mm:ss.SSSZZ,ko_KR,Asia/Seoul] scala> fd.format(fd.parse("2017-03-21T00:00:00.000-11:00")) res1: String = 2017-03-21T20:00:00.000+09:00 scala> val fd = org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX") fd: org.apache.commons.lang3.time.FastDateFormat = FastDateFormat[yyyy-MM-dd'T'HH:mm:ss.SSSXXX,ko_KR,Asia/Seoul] scala> fd.format(fd.parse("2017-03-21T00:00:00.000-11:00")) res2: String = 2017-03-21T20:00:00.000+09:00 ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #17489 from HyukjinKwon/SPARK-20166.