aboutsummaryrefslogtreecommitdiff
path: root/sql
Commit message (Collapse)AuthorAgeFilesLines
...
* [SPARK-18936][SQL] Infrastructure for session local timezone support.Takuya UESHIN2017-01-2632-674/+1182
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? As of Spark 2.1, Spark SQL assumes the machine timezone for datetime manipulation, which is bad if users are not in the same timezones as the machines, or if different users have different timezones. We should introduce a session local timezone setting that is used for execution. An explicit non-goal is locale handling. ### Semantics Setting the session local timezone means that the timezone-aware expressions listed below should use the timezone to evaluate values, and also it should be used to convert (cast) between string and timestamp or between timestamp and date. - `CurrentDate` - `CurrentBatchTimestamp` - `Hour` - `Minute` - `Second` - `DateFormatClass` - `ToUnixTimestamp` - `UnixTimestamp` - `FromUnixTime` and below are implicitly timezone-aware through cast from timestamp to date: - `DayOfYear` - `Year` - `Quarter` - `Month` - `DayOfMonth` - `WeekOfYear` - `LastDay` - `NextDay` - `TruncDate` For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values evaluated by some of timezone-aware expressions are: ```scala scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts") df: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false) +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ |ts |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)| +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ |2016-01-01 00:00:00|2016 |1 |1 |0 |0 |0 | +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ ``` whereas setting the session local timezone to `"PST"`, they are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "PST") scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false) +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ |ts |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)| +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ |2015-12-31 16:00:00|2015 |12 |31 |16 |0 |0 | +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ ``` Notice that even if you set the session local timezone, it affects only in `DataFrame` operations, neither in `Dataset` operations, `RDD` operations nor in `ScalaUDF`s. You need to properly handle timezone by yourself. ### Design of the fix I introduced an analyzer to pass session local timezone to timezone-aware expressions and modified DateTimeUtils to take the timezone argument. ## How was this patch tested? Existing tests and added tests for timezone aware expressions. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #16308 from ueshin/issues/SPARK-18350.
* [TESTS][SQL] Setup testdata at the beginning for tests to run independentlyDilip Biswal2017-01-254-13/+10
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? In CachedTableSuite, we are not setting up the test data at the beginning. Some tests fail while trying to run individually. When running the entire suite they run fine. Here are some of the tests that fail - - test("SELECT star from cached table") - test("Self-join cached") As part of this simplified a couple of tests by calling a support method to count the number of InMemoryRelations. ## How was this patch tested? Ran the failing tests individually. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #16688 from dilipbiswal/cachetablesuite_simple.
* [SPARK-19311][SQL] fix UDT hierarchy issuegmoehler2017-01-252-3/+110
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? acceptType() in UDT will no only accept the same type but also all base types ## How was this patch tested? Manual test using a set of generated UDTs fixing acceptType() in my user defined types Please review http://spark.apache.org/contributing.html before opening a pull request. Author: gmoehler <moehler@de.ibm.com> Closes #16660 from gmoehler/master.
* [SPARK-18863][SQL] Output non-aggregate expressions without GROUP BY in a ↵Nattavut Sutyanyong2017-01-254-51/+168
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | subquery does not yield an error ## What changes were proposed in this pull request? This PR will report proper error messages when a subquery expression contain an invalid plan. This problem is fixed by calling CheckAnalysis for the plan inside a subquery. ## How was this patch tested? Existing tests and two new test cases on 2 forms of subquery, namely, scalar subquery and in/exists subquery. ```` -- TC 01.01 -- The column t2b in the SELECT of the subquery is invalid -- because it is neither an aggregate function nor a GROUP BY column. select t1a, t2b from t1, t2 where t1b = t2c and t2b = (select max(avg) from (select t2b, avg(t2b) avg from t2 where t2a = t1.t1b ) ) ; -- TC 01.02 -- Invalid due to the column t2b not part of the output from table t2. select * from t1 where t1a in (select min(t2a) from t2 group by t2c having t2c in (select max(t3c) from t3 group by t3b having t3b > t2b )) ; ```` Author: Nattavut Sutyanyong <nsy.can@gmail.com> Closes #16572 from nsyca/18863.
* [SPARK-19334][SQL] Fix the code injection vulnerability related to Generator ↵Kousuke Saruta2017-01-242-2/+25
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | functions. ## What changes were proposed in this pull request? Similar to SPARK-15165, codegen is in danger of arbitrary code injection. The root cause is how variable names are created by codegen. In GenerateExec#codeGenAccessor, a variable name is created like as follows. ``` val value = ctx.freshName(name) ``` The variable `value` is named based on the value of the variable `name` and the value of `name` is from schema given by users so an attacker can attack with queries like as follows. ``` SELECT inline(array(cast(struct(1) AS struct<`=new Object() { {f();} public void f() {throw new RuntimeException("This exception is injected.");} public int x;}.x`:int>))) ``` In the example above, a RuntimeException is thrown but an attacker can replace it with arbitrary code. ## How was this patch tested? Added a new test case. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #16681 from sarutak/SPARK-19334.
* [SPARK-19017][SQL] NOT IN subquery with more than one column may return ↵Nattavut Sutyanyong2017-01-245-6/+131
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | incorrect results ## What changes were proposed in this pull request? This PR fixes the code in Optimizer phase where the NULL-aware expression of a NOT IN query is expanded in Rule `RewritePredicateSubquery`. Example: The query select a1,b1 from t1 where (a1,b1) not in (select a2,b2 from t2); has the (a1, b1) = (a2, b2) rewritten from (before this fix): Join LeftAnti, ((isnull((_1#2 = a2#16)) || isnull((_2#3 = b2#17))) || ((_1#2 = a2#16) && (_2#3 = b2#17))) to (after this fix): Join LeftAnti, (((_1#2 = a2#16) || isnull((_1#2 = a2#16))) && ((_2#3 = b2#17) || isnull((_2#3 = b2#17)))) ## How was this patch tested? sql/test, catalyst/test and new test cases in SQLQueryTestSuite. Author: Nattavut Sutyanyong <nsy.can@gmail.com> Closes #16467 from nsyca/19017.
* [SPARK-17913][SQL] compare atomic and string type column may return ↵Wenchen Fan2017-01-247-7/+38
| | | | | | | | | | | | | | | | | | | | confusing result ## What changes were proposed in this pull request? Spark SQL follows MySQL to do the implicit type conversion for binary comparison: http://dev.mysql.com/doc/refman/5.7/en/type-conversion.html However, this may return confusing result, e.g. `1 = 'true'` will return true, `19157170390056973L = '19157170390056971'` will return true. I think it's more reasonable to follow postgres in this case, i.e. cast string to the type of the other side, but return null if the string is not castable to keep hive compatibility. ## How was this patch tested? newly added tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #15880 from cloud-fan/compare.
* [SPARK-19246][SQL] CataLogTable's partitionSchema order and exist checkwindpiger2017-01-241-4/+9
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? CataLogTable's partitionSchema should check if each column name in partitionColumnNames must match one and only one field in schema, if not we should throw an exception and CataLogTable's partitionSchema should keep order with partitionColumnNames ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #16606 from windpiger/checkPartionColNameWithSchema.
* [SPARK-19152][SQL] DataFrameWriter.saveAsTable support hive appendwindpiger2017-01-249-38/+83
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19107), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support. This PR implement: DataFrameWriter.saveAsTable work with hive format with append mode ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #16552 from windpiger/saveAsTableWithHiveAppend.
* [SPARK-16101][HOTFIX] Fix the build with Scala 2.10 by explicit typed argumenthyukjinkwon2017-01-231-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? I goofed in https://github.com/apache/spark/pull/16669 which introduces the break in scala 2.10. This fixes ```bash [error] /home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala:65: polymorphic expression cannot be instantiated to expected type; [error] found : [B >: org.apache.spark.sql.types.StructField](B, Int) => Int [error] required: org.apache.spark.sql.types.StructField => ? [error] fields.map(schema.indexOf).toArray [error] ^ [error] one error found [error] (sql/compile:compileIncremental) Compilation failed ``` ## How was this patch tested? Manually via ```bash ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package ``` ``` [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ........................... SUCCESS [ 2.719 s] [INFO] Spark Project Tags ................................. SUCCESS [ 3.441 s] [INFO] Spark Project Sketch ............................... SUCCESS [ 3.411 s] [INFO] Spark Project Networking ........................... SUCCESS [ 5.088 s] [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 5.131 s] [INFO] Spark Project Unsafe ............................... SUCCESS [ 5.813 s] [INFO] Spark Project Launcher ............................. SUCCESS [ 6.567 s] [INFO] Spark Project Core ................................. SUCCESS [01:39 min] [INFO] Spark Project ML Local Library ..................... SUCCESS [ 6.644 s] [INFO] Spark Project GraphX ............................... SUCCESS [ 11.304 s] [INFO] Spark Project Streaming ............................ SUCCESS [ 26.275 s] [INFO] Spark Project Catalyst ............................. SUCCESS [01:04 min] [INFO] Spark Project SQL .................................. SUCCESS [02:07 min] [INFO] Spark Project ML Library ........................... SUCCESS [01:20 min] [INFO] Spark Project Tools ................................ SUCCESS [ 8.755 s] [INFO] Spark Project Hive ................................. SUCCESS [ 51.141 s] [INFO] Spark Project REPL ................................. SUCCESS [ 13.688 s] [INFO] Spark Project YARN Shuffle Service ................. SUCCESS [ 7.211 s] [INFO] Spark Project YARN ................................. SUCCESS [ 10.908 s] [INFO] Spark Project Assembly ............................. SUCCESS [ 2.940 s] [INFO] Spark Project External Flume Sink .................. SUCCESS [ 4.386 s] [INFO] Spark Project External Flume ....................... SUCCESS [ 8.589 s] [INFO] Spark Project External Flume Assembly .............. SUCCESS [ 1.891 s] [INFO] Spark Integration for Kafka 0.8 .................... SUCCESS [ 8.458 s] [INFO] Spark Project Examples ............................. SUCCESS [ 17.706 s] [INFO] Spark Project External Kafka Assembly .............. SUCCESS [ 3.070 s] [INFO] Spark Integration for Kafka 0.10 ................... SUCCESS [ 11.227 s] [INFO] Spark Integration for Kafka 0.10 Assembly .......... SUCCESS [ 2.982 s] [INFO] Kafka 0.10 Source for Structured Streaming ......... SUCCESS [ 7.494 s] [INFO] Spark Project Java 8 Tests ......................... SUCCESS [ 3.748 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ ``` and manual test `CSVSuite` with Scala 2.11 with my IDE. Author: hyukjinkwon <gurwls223@gmail.com> Closes #16684 from HyukjinKwon/hot-fix-type-ensurance.
* [SPARK-19268][SS] Disallow adaptive query execution for streaming queriesShixiong Zhu2017-01-232-1/+17
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? As adaptive query execution may change the number of partitions in different batches, it may break streaming queries. Hence, we should disallow this feature in Structured Streaming. ## How was this patch tested? `test("SPARK-19268: Adaptive query execution should be disallowed")`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16683 from zsxwing/SPARK-19268.
* [SPARK-9435][SQL] Reuse function in Java UDF to correctly support ↵hyukjinkwon2017-01-232-23/+68
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | expressions that require equality comparison between ScalaUDF ## What changes were proposed in this pull request? Currently, running the codes in Java ```java spark.udf().register("inc", new UDF1<Long, Long>() { Override public Long call(Long i) { return i + 1; } }, DataTypes.LongType); spark.range(10).toDF("x").createOrReplaceTempView("tmp"); Row result = spark.sql("SELECT inc(x) FROM tmp GROUP BY inc(x)").head(); Assert.assertEquals(7, result.getLong(0)); ``` fails as below: ``` org.apache.spark.sql.AnalysisException: expression 'tmp.`x`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;; Aggregate [UDF(x#19L)], [UDF(x#19L) AS UDF(x)#23L] +- SubqueryAlias tmp, `tmp` +- Project [id#16L AS x#19L] +- Range (0, 10, step=1, splits=Some(8)) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) ``` The root cause is because we were creating the function every time when it needs to build as below: ```scala scala> def inc(i: Int) = i + 1 inc: (i: Int)Int scala> (inc(_: Int)).hashCode res15: Int = 1231799381 scala> (inc(_: Int)).hashCode res16: Int = 2109839984 scala> (inc(_: Int)) == (inc(_: Int)) res17: Boolean = false ``` This seems leading to the comparison failure between `ScalaUDF`s created from Java UDF API, for example, in `Expression.semanticEquals`. In case of Scala one, it seems already fine. Both can be tested easily as below if any reviewer is more comfortable with Scala: ```scala val df = Seq((1, 10), (2, 11), (3, 12)).toDF("x", "y") val javaUDF = new UDF1[Int, Int] { override def call(i: Int): Int = i + 1 } // spark.udf.register("inc", javaUDF, IntegerType) // Uncomment this for Java API // spark.udf.register("inc", (i: Int) => i + 1) // Uncomment this for Scala API df.createOrReplaceTempView("tmp") spark.sql("SELECT inc(y) FROM tmp GROUP BY inc(y)").show() ``` ## How was this patch tested? Unit test in `JavaUDFSuite.java` and `./dev/lint-java`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #16553 from HyukjinKwon/SPARK-9435.
* [SPARK-19272][SQL] Remove the param `viewOriginalText` from `CatalogTable`jiangxingbo2017-01-249-45/+39
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Hive will expand the view text, so it needs 2 fields: originalText and viewText. Since we don't expand the view text, but only add table properties, perhaps only a single field `viewText` is enough in CatalogTable. This PR brought in the following changes: 1. Remove the param `viewOriginalText` from `CatalogTable`; 2. Update the output of command `DescribeTableCommand`. ## How was this patch tested? Tested by exsiting test cases, also updated the failed test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16679 from jiangxb1987/catalogTable.
* [SPARK-19290][SQL] add a new extending interface in Analyzer for post-hoc ↵Wenchen Fan2017-01-236-55/+30
| | | | | | | | | | | | | | | | | | | | | resolution ## What changes were proposed in this pull request? To implement DDL commands, we added several analyzer rules in sql/hive module to analyze DDL related plans. However, our `Analyzer` currently only have one extending interface: `extendedResolutionRules`, which defines extra rules that will be run together with other rules in the resolution batch, and doesn't fit DDL rules well, because: 1. DDL rules may do some checking and normalization, but we may do it many times as the resolution batch will run rules again and again, until fixed point, and it's hard to tell if a DDL rule has already done its checking and normalization. It's fine because DDL rules are idempotent, but it's bad for analysis performance 2. some DDL rules may depend on others, and it's pretty hard to write `if` conditions to guarantee the dependencies. It will be good if we have a batch which run rules in one pass, so that we can guarantee the dependencies by rules order. This PR adds a new extending interface in `Analyzer`: `postHocResolutionRules`, which defines rules that will be run only once in a batch runs right after the resolution batch. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #16645 from cloud-fan/analyzer.
* [SPARK-19284][SQL] append to partitioned datasource table should without ↵windpiger2017-01-233-5/+36
| | | | | | | | | | | | | | | custom partition location ## What changes were proposed in this pull request? when we append data to a existed partitioned datasource table, the InsertIntoHadoopFsRelationCommand.getCustomPartitionLocations currently return the same location with Hive default, it should return None. ## How was this patch tested? Author: windpiger <songjun@outlook.com> Closes #16642 from windpiger/appendSchema.
* [SPARK-19218][SQL] Fix SET command to show a result correctly and in a ↵Dongjoon Hyun2017-01-233-3/+36
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | sorted order ## What changes were proposed in this pull request? This PR aims to fix the following two things. 1. `sql("SET -v").collect()` or `sql("SET -v").show()` raises the following exceptions for String configuration with default value, `null`. For the test, please see [Jenkins result](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71539/testReport/) and https://github.com/apache/spark/commit/60953bf1f1ba144e709fdae3903a390ff9479fd0 in #16624 . ``` sbt.ForkMain$ForkError: java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException createexternalrow(input[0, string, false].toString, input[1, string, false].toString, input[2, string, false].toString, StructField(key,StringType,false), StructField(value,StringType,false), StructField(meaning,StringType,false)) :- input[0, string, false].toString : +- input[0, string, false] :- input[1, string, false].toString : +- input[1, string, false] +- input[2, string, false].toString +- input[2, string, false] ``` 2. Currently, `SET` and `SET -v` commands show unsorted result. We had better show a sorted result for UX. Also, this is compatible with Hive. **BEFORE** ``` scala> sql("set").show(false) ... |spark.driver.host |10.22.16.140 | |spark.driver.port |63893 | |spark.repl.class.uri |spark://10.22.16.140:63893/classes | ... |spark.app.name |Spark shell | |spark.driver.memory |4G | |spark.executor.id |driver | |spark.submit.deployMode |client | |spark.master |local[*] | |spark.home |/Users/dhyun/spark | |spark.sql.catalogImplementation|hive | |spark.app.id |local-1484333618945 | ``` **AFTER** ``` scala> sql("set").show(false) ... |spark.app.id |local-1484333925649 | |spark.app.name |Spark shell | |spark.driver.host |10.22.16.140 | |spark.driver.memory |4G | |spark.driver.port |64994 | |spark.executor.id |driver | |spark.jars | | |spark.master |local[*] | |spark.repl.class.uri |spark://10.22.16.140:64994/classes | |spark.sql.catalogImplementation|hive | |spark.submit.deployMode |client | ``` ## How was this patch tested? Jenkins with a new test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #16579 from dongjoon-hyun/SPARK-19218.
* [SPARK-19309][SQL] disable common subexpression elimination for conditional ↵Wenchen Fan2017-01-237-171/+84
| | | | | | | | | | | | | | | | | | | | expressions ## What changes were proposed in this pull request? As I pointed out in https://github.com/apache/spark/pull/15807#issuecomment-259143655 , the current subexpression elimination framework has a problem, it always evaluates all common subexpressions at the beginning, even they are inside conditional expressions and may not be accessed. Ideally we should implement it like scala lazy val, so we only evaluate it when it gets accessed at lease once. https://github.com/apache/spark/issues/15837 tries this approach, but it seems too complicated and may introduce performance regression. This PR simply stops common subexpression elimination for conditional expressions, with some cleanup. ## How was this patch tested? regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #16659 from cloud-fan/codegen.
* [SPARK-19229][SQL] Disallow Creating Hive Source Tables when Hive Support is ↵gatorsmile2017-01-2217-89/+68
| | | | | | | | | | | | | | Not Enabled ### What changes were proposed in this pull request? It is weird to create Hive source tables when using InMemoryCatalog. We are unable to operate it. This PR is to block users to create Hive source tables. ### How was this patch tested? Fixed the test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #16587 from gatorsmile/blockHiveTable.
* [SPARK-16101][SQL] Refactoring CSV read path to be consistent with JSON data ↵hyukjinkwon2017-01-237-316/+293
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | source ## What changes were proposed in this pull request? This PR refactors CSV read path to be consistent with JSON data source. It makes the methods in classes have consistent arguments with JSON ones. `UnivocityParser` and `JacksonParser` ``` scala private[csv] class UnivocityParser( schema: StructType, requiredSchema: StructType, options: CSVOptions) extends Logging { ... def parse(input: String): Seq[InternalRow] = { ... ``` ``` scala class JacksonParser( schema: StructType, columnNameOfCorruptRecord: String, options: JSONOptions) extends Logging { ... def parse(input: String): Option[InternalRow] = { ... ``` These allow parsing an iterator (`String` to `InternalRow`) as below for both JSON and CSV: ```scala iter.flatMap(parser.parse) ``` ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #16669 from HyukjinKwon/SPARK-16101-read.
* [SPARK-19153][SQL] DataFrameWriter.saveAsTable work with create partitioned ↵windpiger2017-01-223-19/+26
| | | | | | | | | | | | | | | | | table ## What changes were proposed in this pull request? After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19153), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support. this PR provide DataFrameWriter.saveAsTable work with hive format to create partitioned table. ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #16593 from windpiger/saveAsTableWithPartitionedTable.
* [SPARK-19117][SPARK-18922][TESTS] Fix the rest of flaky, newly introduced ↵hyukjinkwon2017-01-214-8/+12
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | and missed test failures on Windows ## What changes were proposed in this pull request? **Failed tests** ``` org.apache.spark.sql.hive.execution.HiveQuerySuite: - transform with SerDe3 *** FAILED *** - transform with SerDe4 *** FAILED *** ``` ``` org.apache.spark.sql.hive.execution.HiveDDLSuite: - create hive serde table with new syntax *** FAILED *** - add/drop partition with location - managed table *** FAILED *** ``` ``` org.apache.spark.sql.hive.ParquetMetastoreSuite: - Explicitly added partitions should be readable after load *** FAILED *** - Non-partitioned table readable after load *** FAILED *** ``` **Aborted tests** ``` Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.execution.HiveSerDeSuite *** ABORTED *** (157 milliseconds) org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: C:projectssparksqlhive argetscala-2.11 est-classesdatafilessales.txt; ``` **Flaky tests(failed 9ish out of 10)** ``` org.apache.spark.scheduler.SparkListenerSuite: - local metrics *** FAILED *** ``` ## How was this patch tested? Manually tested via AppVeyor. **Failed tests** ``` org.apache.spark.sql.hive.execution.HiveQuerySuite: - transform with SerDe3 !!! CANCELED !!! (0 milliseconds) - transform with SerDe4 !!! CANCELED !!! (0 milliseconds) ``` ``` org.apache.spark.sql.hive.execution.HiveDDLSuite: - create hive serde table with new syntax (1 second, 672 milliseconds) - add/drop partition with location - managed table (2 seconds, 391 milliseconds) ``` ``` org.apache.spark.sql.hive.ParquetMetastoreSuite: - Explicitly added partitions should be readable after load (609 milliseconds) - Non-partitioned table readable after load (344 milliseconds) ``` **Aborted tests** ``` spark.sql.hive.execution.HiveSerDeSuite: - Read with RegexSerDe (2 seconds, 142 milliseconds) - Read and write with LazySimpleSerDe (tab separated) (2 seconds) - Read with AvroSerDe (1 second, 47 milliseconds) - Read Partitioned with AvroSerDe (1 second, 422 milliseconds) ``` **Flaky tests (failed 9ish out of 10)** ``` org.apache.spark.scheduler.SparkListenerSuite: - local metrics (4 seconds, 562 milliseconds) ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16586 from HyukjinKwon/set-path-appveyor.
* [SPARK-19305][SQL] partitioned table should always put partition columns at ↵Wenchen Fan2017-01-212-18/+72
| | | | | | | | | | | | | | | | | | | | the end of table schema ## What changes were proposed in this pull request? For data source tables, we will always reorder the specified table schema, or the query in CTAS, to put partition columns at the end. e.g. `CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)` will create a table with schema `<a, c, d, b>` Hive serde tables don't have this problem before, because its CREATE TABLE syntax specifies data schema and partition schema individually. However, after we unifed the CREATE TABLE syntax, Hive serde table also need to do the reorder. This PR puts the reorder logic in a analyzer rule, which works with both data source tables and Hive serde tables. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #16655 from cloud-fan/schema.
* [SPARK-14536][SQL] fix to handle null value in array type column for postgres.sureshthalamati2017-01-201-3/+3
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? JDBC read is failing with NPE due to missing null value check for array data type if the source table has null values in the array type column. For null values Resultset.getArray() returns null. This PR adds null safe check to the Resultset.getArray() value before invoking method on the Array object. ## How was this patch tested? Updated the PostgresIntegration test suite to test null values. Ran docker integration tests on my laptop. Author: sureshthalamati <suresh.thalamati@gmail.com> Closes #15192 from sureshthalamati/jdbc_array_null_fix-SPARK-14536.
* [SPARK-16101][SQL] Refactoring CSV write path to be consistent with JSON ↵hyukjinkwon2017-01-215-115/+135
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | data source ## What changes were proposed in this pull request? This PR refactors CSV write path to be consistent with JSON data source. This PR makes the methods in classes have consistent arguments with JSON ones. - `UnivocityGenerator` and `JacksonGenerator` ``` scala private[csv] class UnivocityGenerator( schema: StructType, writer: Writer, options: CSVOptions = new CSVOptions(Map.empty[String, String])) { ... def write ... def close ... def flush ... ``` ``` scala private[sql] class JacksonGenerator( schema: StructType, writer: Writer, options: JSONOptions = new JSONOptions(Map.empty[String, String])) { ... def write ... def close ... def flush ... ``` - This PR also makes the classes put in together in a consistent manner with JSON. - `CsvFileFormat` ``` scala CsvFileFormat CsvOutputWriter ``` - `JsonFileFormat` ``` scala JsonFileFormat JsonOutputWriter ``` ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #16496 from HyukjinKwon/SPARK-16101-write.
* [SPARK-19267][SS] Fix a race condition when stopping StateStoreShixiong Zhu2017-01-201-27/+61
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? There is a race condition when stopping StateStore which makes `StateStoreSuite.maintenance` flaky. `StateStore.stop` doesn't wait for the running task to finish, and an out-of-date task may fail `doMaintenance` and cancel the new task. Here is a reproducer: https://github.com/zsxwing/spark/commit/dde1b5b106ba034861cf19e16883cfe181faa6f3 This PR adds MaintenanceTask to eliminate the race condition. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16627 from zsxwing/SPARK-19267.
* [SPARK-18589][SQL] Fix Python UDF accessing attributes from both side of joinDavies Liu2017-01-204-13/+21
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? PythonUDF is unevaluable, which can not be used inside a join condition, currently the optimizer will push a PythonUDF which accessing both side of join into the join condition, then the query will fail to plan. This PR fix this issue by checking the expression is evaluable or not before pushing it into Join. ## How was this patch tested? Add a regression test. Author: Davies Liu <davies@databricks.com> Closes #16581 from davies/pyudf_join.
* [SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in ↵Tathagata Das2017-01-202-3/+8
| | | | | | | | | | | | | | | Structured Streaming plan ## What changes were proposed in this pull request? Sort in a streaming plan should be allowed only after a aggregation in complete mode. Currently it is incorrectly allowed when present anywhere in the plan. It gives unpredictable potentially incorrect results. ## How was this patch tested? New test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16662 from tdas/SPARK-19314.
* [SPARK-19271][SQL] Change non-cbo estimation of aggregatewangzhenhua2017-01-197-11/+38
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Change non-cbo estimation behavior of aggregate: - If groupExpression is empty, we can know row count (=1) and the corresponding size; - otherwise, estimation falls back to UnaryNode's computeStats method, which should not propagate rowCount and attributeStats in Statistics because they are not estimated in that method. ## How was this patch tested? Added test case Author: wangzhenhua <wangzhenhua@huawei.com> Closes #16631 from wzhfy/aggNoCbo.
* [SPARK-19292][SQL] filter with partition columns should be case-insensitive ↵Wenchen Fan2017-01-193-2/+25
| | | | | | | | | | | | | | | | | | on Hive tables ## What changes were proposed in this pull request? When we query a table with a filter on partitioned columns, we will push the partition filter to the metastore to get matched partitions directly. In `HiveExternalCatalog.listPartitionsByFilter`, we assume the column names in partition filter are already normalized and we don't need to consider case sensitivity. However, `HiveTableScanExec` doesn't follow this assumption. This PR fixes it. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #16647 from cloud-fan/bug.
* [SPARK-17912] [SQL] Refactor code generation to get data for ↵Kazuaki Ishizaki2017-01-192-84/+135
| | | | | | | | | | | | | | | | | ColumnVector/ColumnarBatch ## What changes were proposed in this pull request? This PR refactors the code generation part to get data from `ColumnarVector` and `ColumnarBatch` by using a trait `ColumnarBatchScan` for ease of reuse. This is because this part will be reused by several components (e.g. parquet reader, Dataset.cache, and others) since `ColumnarBatch` will be first citizen. This PR is a part of https://github.com/apache/spark/pull/15219. In advance, this PR makes the code generation for `ColumnarVector` and `ColumnarBatch` reuseable as a trait. In general, this is very useful for other components from the reuseability view, too. ## How was this patch tested? tested existing test suites Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #15467 from kiszk/columnarrefactor.
* [SPARK-19295][SQL] IsolatedClientLoader's downloadVersion should log the ↵Yin Huai2017-01-191-0/+1
| | | | | | | | | | | | | | location of downloaded metastore client jars ## What changes were proposed in this pull request? This will help the users to know the location of those downloaded jars when `spark.sql.hive.metastore.jars` is set to `maven`. ## How was this patch tested? jenkins Author: Yin Huai <yhuai@databricks.com> Closes #16649 from yhuai/SPARK-19295.
* [SPARK-19059][SQL] Unable to retrieve data from parquet table whose name ↵jayadevanmurali2017-01-192-46/+53
| | | | | | | | | | | | | | | | | | | | startswith underscore ## What changes were proposed in this pull request? The initial shouldFilterOut() method invocation filter the root path name(table name in the intial call) and remove if it contains _. I moved the check one level below, so it first list files/directories in the given root path and then apply filter. (Please fill in changes proposed in this fix) ## How was this patch tested? Added new test case for this scenario (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: jayadevanmurali <jayadevan.m@tcs.com> Author: jayadevan <jayadevan.m@tcs.com> Closes #16635 from jayadevanmurali/branch-0.1-SPARK-19059.
* [SPARK-19265][SQL] make table relation cache general and does not depend on hiveWenchen Fan2017-01-1920-198/+144
| | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? We have a table relation plan cache in `HiveMetastoreCatalog`, which caches a lot of things: file status, resolved data source, inferred schema, etc. However, it doesn't make sense to limit this cache with hive support, we should move it to SQL core module so that users can use this cache without hive support. It can also reduce the size of `HiveMetastoreCatalog`, so that it's easier to remove it eventually. main changes: 1. move the table relation cache to `SessionCatalog` 2. `SessionCatalog.lookupRelation` will return `SimpleCatalogRelation` and the analyzer will convert it to `LogicalRelation` or `MetastoreRelation` later, then `HiveSessionCatalog` doesn't need to override `lookupRelation` anymore 3. `FindDataSourceTable` will read/write the table relation cache. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #16621 from cloud-fan/plan-cache.
* [SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon errorLiwei Lin2017-01-183-2/+10
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? We should call `StateStore.abort()` when there should be any error before the store is committed. ## How was this patch tested? Manually. Author: Liwei Lin <lwlin7@gmail.com> Closes #16547 from lw-lin/append-filter.
* [SPARK-19113][SS][TESTS] Ignore StreamingQueryException thrown from ↵Shixiong Zhu2017-01-181-1/+6
| | | | | | | | | | | | | | | | awaitInitialization to avoid breaking tests ## What changes were proposed in this pull request? #16492 missed one race condition: `StreamExecution.awaitInitialization` may throw fatal errors and fail the test. This PR just ignores `StreamingQueryException` thrown from `awaitInitialization` so that we can verify the exception in the `ExpectFailure` action later. It's fine since `StopStream` or `ExpectFailure` will catch `StreamingQueryException` as well. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16567 from zsxwing/SPARK-19113-2.
* [SPARK-19024][SQL] Implement new approach to write a permanent viewjiangxingbo2017-01-185-92/+146
| | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? On CREATE/ALTER a view, it's no longer needed to generate a SQL text string from the LogicalPlan, instead we store the SQL query text、the output column names of the query plan, and current database to CatalogTable. Permanent views created by this approach can be resolved by current view resolution approach. The main advantage includes: 1. If you update an underlying view, the current view also gets updated; 2. That gives us a change to get ride of SQL generation for operators. Major changes of this PR: 1. Generate the view-specific properties(e.g. view default database, view query output column names) during permanent view creation and store them as properties in the CatalogTable; 2. Update the commands `CreateViewCommand` and `AlterViewAsCommand`, get rid of SQL generation from them. ## How was this patch tested? Existing tests. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16613 from jiangxb1987/view-write-path.
* [SPARK-19227][SPARK-19251] remove unused imports and outdated commentsuncleGen2017-01-1810-14/+2
| | | | | | | | | | | | ## What changes were proposed in this pull request? remove ununsed imports and outdated comments, and fix some minor code style issue. ## How was this patch tested? existing ut Author: uncleGen <hustyugm@gmail.com> Closes #16591 from uncleGen/SPARK-19227.
* [SPARK-18243][SQL] Port Hive writing to use FileFormat interfaceWenchen Fan2017-01-1714-532/+317
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Inserting data into Hive tables has its own implementation that is distinct from data sources: `InsertIntoHiveTable`, `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`. Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep `InsertIntoHiveTable` to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to call `loadTable`/`loadPartition` at the end and remove `InsertIntoHiveTable`. This PR removes `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`, and create a `HiveFileFormat` to implement the write logic. In the future, we should also implement the read logic in `HiveFileFormat`. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #16517 from cloud-fan/insert-hive.
* [SPARK-13721][SQL] Support outer generators in DataFrame APIBogdan Raducanu2017-01-179-16/+150
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Added outer_explode, outer_posexplode, outer_inline functions and expressions. Some bug fixing in GenerateExec.scala for CollectionGenerator. Previously it was not correctly handling the case of outer with empty collections, only with nulls. ## How was this patch tested? New tests added to GeneratorFunctionSuite Author: Bogdan Raducanu <bogdan.rdc@gmail.com> Closes #16608 from bogdanrdc/SPARK-13721.
* [SPARK-18917][SQL] Remove schema check in appending dataReynold Xin2017-01-171-33/+3
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? In append mode, we check whether the schema of the write is compatible with the schema of the existing data. It can be a significant performance issue in cloud environment to find the existing schema for files. This patch removes the check. Note that for catalog tables, we always do the check, as discussed in https://github.com/apache/spark/pull/16339#discussion_r96208357 ## How was this patch tested? N/A Closes #16339. Author: Reynold Xin <rxin@databricks.com> Closes #16622 from rxin/SPARK-18917.
* [MINOR][SQL] Remove duplicate call of reset() function in ↵jiangxingbo2017-01-171-1/+0
| | | | | | | | | | | | | | | | CurrentOrigin.withOrigin() ## What changes were proposed in this pull request? Remove duplicate call of reset() function in CurrentOrigin.withOrigin(). ## How was this patch tested? Existing test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16615 from jiangxb1987/dummy-code.
* [SPARK-19129][SQL] SessionCatalog: Disallow empty part col values in ↵gatorsmile2017-01-185-8/+106
| | | | | | | | | | | | | | | | | | | | | | | | | partition spec ### What changes were proposed in this pull request? Empty partition column values are not valid for partition specification. Before this PR, we accept users to do it; however, Hive metastore does not detect and disallow it too. Thus, users hit the following strange error. ```Scala val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name") df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable") spark.sql("alter table partitionedTable drop partition(partCol1='')") spark.table("partitionedTable").show() ``` In the above example, the WHOLE table is DROPPED when users specify a partition spec containing only one partition column with empty values. When the partition columns contains more than one, Hive metastore APIs simply ignore the columns with empty values and treat it as partial spec. This is also not expected. This does not follow the actual Hive behaviors. This PR is to disallow users to specify such an invalid partition spec in the `SessionCatalog` APIs. ### How was this patch tested? Added test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #16583 from gatorsmile/disallowEmptyPartColValue.
* [SPARK-19065][SQL] Don't inherit expression id in dropDuplicatesShixiong Zhu2017-01-183-11/+27
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? `dropDuplicates` will create an Alias using the same exprId, so `StreamExecution` should also replace Alias if necessary. ## How was this patch tested? test("SPARK-19065: dropDuplicates should not create expressions using the same id") Author: Shixiong Zhu <shixiong@databricks.com> Closes #16564 from zsxwing/SPARK-19065.
* [SPARK-3249][DOC] Fix links in ScalaDoc that cause warning messages in ↵hyukjinkwon2017-01-172-5/+5
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | `sbt/sbt unidoc` ## What changes were proposed in this pull request? This PR proposes to fix ambiguous link warnings by simply making them as code blocks for both javadoc and scaladoc. ``` [warn] .../spark/core/src/main/scala/org/apache/spark/Accumulator.scala:20: The link target "SparkContext#accumulator" is ambiguous. Several members fit the target: [warn] .../spark/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala:281: The link target "runMiniBatchSGD" is ambiguous. Several members fit the target: [warn] .../spark/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala:83: The link target "run" is ambiguous. Several members fit the target: ... ``` This PR also fixes javadoc8 break as below: ``` [error] .../spark/sql/core/target/java/org/apache/spark/sql/LowPrioritySQLImplicits.java:7: error: reference not found [error] * newProductEncoder - to disambiguate for {link List}s which are both {link Seq} and {link Product} [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/LowPrioritySQLImplicits.java:7: error: reference not found [error] * newProductEncoder - to disambiguate for {link List}s which are both {link Seq} and {link Product} [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/LowPrioritySQLImplicits.java:7: error: reference not found [error] * newProductEncoder - to disambiguate for {link List}s which are both {link Seq} and {link Product} [error] ^ [info] 3 errors ``` ## How was this patch tested? Manually via `sbt unidoc > output.txt` and the checked it via `cat output.txt | grep ambiguous` and `sbt unidoc | grep error`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #16604 from HyukjinKwon/SPARK-3249.
* [SPARK-19219][SQL] Fix Parquet log output defaultsNick Lavers2017-01-172-4/+4
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Changing the default parquet logging levels to reflect the changes made in PR [#15538](https://github.com/apache/spark/pull/15538), in order to prevent the flood of log messages by default. ## How was this patch tested? Default log output when reading from parquet 1.6 files was compared with and without this change. The change eliminates the extraneous logging and makes the output readable. Author: Nick Lavers <nick.lavers@videoamp.com> Closes #16580 from nicklavers/spark-19219-set_default_parquet_log_level.
* [SPARK-19240][SQL][TEST] add test for setting location for managed tableWenchen Fan2017-01-171-0/+28
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? SET LOCATION can also work on managed table(or table created without custom path), the behavior is a little weird, but as we have already supported it, we should add a test to explicitly show the behavior. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #16597 from cloud-fan/set-location.
* [SPARK-19148][SQL] do not expose the external table concept in CatalogWenchen Fan2017-01-174-99/+183
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/16296 , we reached a consensus that we should hide the external/managed table concept to users and only expose custom table path. This PR renames `Catalog.createExternalTable` to `createTable`(still keep the old versions for backward compatibility), and only set the table type to EXTERNAL if `path` is specified in options. ## How was this patch tested? new tests in `CatalogSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #16528 from cloud-fan/create-table.
* [SPARK-18801][SQL][FOLLOWUP] Alias the view with its childjiangxingbo2017-01-165-57/+214
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR is a follow-up to address the comments https://github.com/apache/spark/pull/16233/files#r95669988 and https://github.com/apache/spark/pull/16233/files#r95662299. We try to wrap the child by: 1. Generate the `queryOutput` by: 1.1. If the query column names are defined, map the column names to attributes in the child output by name; 1.2. Else set the child output attributes to `queryOutput`. 2. Map the `queryQutput` to view output by index, if the corresponding attributes don't match, try to up cast and alias the attribute in `queryOutput` to the attribute in the view output. 3. Add a Project over the child, with the new output generated by the previous steps. If the view output doesn't have the same number of columns neither with the child output, nor with the query column names, throw an AnalysisException. ## How was this patch tested? Add new test cases in `SQLViewSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16561 from jiangxb1987/alias-view.
* [SPARK-19082][SQL] Make ignoreCorruptFiles work for ParquetLiang-Chi Hsieh2017-01-164-7/+139
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet: 1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html 2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too. This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc. Two main changes in this patch: 1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`. 2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator. One thing to notice is: We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catches `RuntimeException`. One concern is that it might also shadow other runtime exceptions other than reading corrupt files. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #16474 from viirya/fix-ignorecorrupted-parquet-files.
* [SPARK-19120] Refresh Metadata Cache After Loading Hive Tablesgatorsmile2017-01-153-14/+75
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? ```Scala sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") // This table fetch is to fill the cache with zero leaf files spark.table("tab").show() sql( s""" |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE |INTO TABLE tab """.stripMargin) spark.table("tab").show() ``` In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on. This PR is to refresh the metadata cache after processing the `LOAD DATA` command. In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not. ### How was this patch tested? Added test cases in parquetSuites.scala Author: gatorsmile <gatorsmile@gmail.com> Closes #16500 from gatorsmile/refreshInsertIntoHiveTable.