aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
Commit message (Collapse)AuthorAgeFilesLines
...
* [SPARK-19611][SQL] Introduce configurable table schema inferenceBudde2017-03-092-65/+22
| | | | | | | | | | | | | | | | | | | | | | | | | | | ## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde <budde@amazon.com> Closes #16944 from budde/SPARK-19611.
* [SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc ↵Jeff Zhang2017-03-091-3/+3
| | | | | | | | | | | | file in DataFrameReader.orc Beside the issue in spark api, also fix 2 minor issues in pyspark - support read from multiple input paths for orc - support read from multiple input paths for text Author: Jeff Zhang <zjffdu@apache.org> Closes #10307 from zjffdu/SPARK-12334.
* [SPARK-19861][SS] watermark should not be a negative time.uncleGen2017-03-091-1/+3
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `watermark` should not be negative. This behavior is invalid, check it before real run. ## How was this patch tested? add new unit test. Author: uncleGen <hustyugm@gmail.com> Author: dylon <hustyugm@gmail.com> Closes #17202 from uncleGen/SPARK-19861.
* [SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSourceLiwei Lin2017-03-092-17/+42
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`). This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log). ## Usage ```scala spark .readStream .option("fileNameOnly", true) .text("s3n://bucket/dir1/dir2") .writeStream ... ``` ## How was this patch tested? Added a test case Author: Liwei Lin <lwlin7@gmail.com> Closes #17120 from lw-lin/filename-only.
* [SPARK-19561][SQL] add int case handling for TimestampTypeJason White2017-03-091-0/+2
| | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Add handling of input of type `Int` for dataType `TimestampType` to `EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger than MAX_INT to Long, which are handled correctly already, but values between MIN_INT and MAX_INT are serialized to Int. These range limits correspond to roughly half an hour on either side of the epoch. As a result, PySpark doesn't allow TimestampType values to be created in this range. Alternatives attempted: patching the `TimestampType.toInternal` function to cast return values to `long`, so Py4J would always serialize them to Scala Long. Python3 does not have a `long` type, so this approach failed on Python3. ## How was this patch tested? Added a new PySpark-side test that fails without the change. The contribution is my original work and I license the work to the project under the project’s open source license. Resubmission of https://github.com/apache/spark/pull/16896. The original PR didn't go through Jenkins and broke the build. davies dongjoon-hyun cloud-fan Could you kick off a Jenkins run for me? It passed everything for me locally, but it's possible something has changed in the last few weeks. Author: Jason White <jason.white@shopify.com> Closes #17200 from JasonMWhite/SPARK-19561.
* [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.uncleGen2017-03-081-8/+11
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? A follow up to SPARK-19859: - extract the calculation of `delayMs` and reuse it. - update EventTimeWatermarkExec - use the correct `delayMs` in EventTimeWatermark ## How was this patch tested? Jenkins. Author: uncleGen <hustyugm@gmail.com> Closes #17221 from uncleGen/SPARK-19859.
* [MINOR][SQL] The analyzer rules are fired twice for cases when ↵Dilip Biswal2017-03-081-2/+7
| | | | | | | | | | | | | | | | | AnalysisException is raised from analyzer. ## What changes were proposed in this pull request? In general we have a checkAnalysis phase which validates the logical plan and throws AnalysisException on semantic errors. However we also can throw AnalysisException from a few analyzer rules like ResolveSubquery. I found that we fire up the analyzer rules twice for the queries that throw AnalysisException from one of the analyzer rules. This is a very minor fix. We don't have to strictly fix it. I just got confused seeing the rule getting fired two times when i was not expecting it. ## How was this patch tested? Tested manually. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #17214 from dilipbiswal/analyis_twice.
* [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in ↵Burak Yavuz2017-03-082-5/+14
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | combination with maxFileAge in FileStreamSource ## What changes were proposed in this pull request? **The Problem** There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days. This causes a problem when both latestFirst = true maxFilesPerTrigger > total files to be processed. Here is what happens in all combinations 1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed. 2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind. 3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing. The bug is with case 3. **The Solution** Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set. ## How was this patch tested? Regression test in `FileStreamSourceSuite` Author: Burak Yavuz <brkyvz@gmail.com> Closes #17153 from brkyvz/maxFileAge.
* [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSVhyukjinkwon2017-03-084-30/+94
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to add an API that loads `DataFrame` from `Dataset[String]` storing csv. It allows pre-processing before loading into CSV, which means allowing a lot of workarounds for many narrow cases, for example, as below: - Case 1 - pre-processing ```scala val df = spark.read.text("...") // Pre-processing with this. spark.read.csv(df.as[String]) ``` - Case 2 - use other input formats ```scala val rdd = spark.sparkContext.newAPIHadoopFile("/file.csv.lzo", classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]) val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength)) spark.read.csv(stringRdd.toDS) ``` ## How was this patch tested? Added tests in `CSVSuite` and build with Scala 2.10. ``` ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16854 from HyukjinKwon/SPARK-15463.
* [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session ↵Kunal Khamar2017-03-085-99/+212
| | | | | | | | | | | | | | | | has an identical copy of the SessionState Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState. Subsequent changes to base session are not propagated to cloned session, clone is independent after creation. If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables. Unit tests Author: Kunal Khamar <kkhamar@outlook.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #16826 from kunalkhamar/fork-sparksession.
* [SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow ↵Shixiong Zhu2017-03-085-30/+106
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | invalid cases ## What changes were proposed in this pull request? Add a output mode parameter to `flatMapGroupsWithState` and just define `mapGroupsWithState` as `flatMapGroupsWithState(Update)`. `UnsupportedOperationChecker` is modified to disallow unsupported cases. - Batch mapGroupsWithState or flatMapGroupsWithState is always allowed. - For streaming (map/flatMap)GroupsWithState, see the following table: | Operators | Supported Query Output Mode | | ------------- | ------------- | | flatMapGroupsWithState(Update) without aggregation | Update | | flatMapGroupsWithState(Update) with aggregation | None | | flatMapGroupsWithState(Append) without aggregation | Append | | flatMapGroupsWithState(Append) before aggregation | Append, Update, Complete | | flatMapGroupsWithState(Append) after aggregation | None | | Multiple flatMapGroupsWithState(Append)s | Append | | Multiple mapGroupsWithStates | None | | Mxing mapGroupsWithStates and flatMapGroupsWithStates | None | | Other cases of multiple flatMapGroupsWithState | None | ## How was this patch tested? The added unit tests. Here are the tests related to (map/flatMap)GroupsWithState: ``` [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in update mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in append mode: not supported (7 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Append mode: not supported (11 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Update mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in update mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Update mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Complete mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Update mode: not supported (4 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation in complete mode: not supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Append output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Update output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Append output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Update output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are in append mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some are not in append mode: not supported (7 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in append mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in complete mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Update mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Complete mode: not supported (4 milliseconds) [info] - streaming plan - mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are in append mode: not supported (4 milliseconds) [info] - streaming plan - mapGroupsWithState - mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation: not supported (4 milliseconds) ``` Author: Shixiong Zhu <shixiong@databricks.com> Closes #17197 from zsxwing/mapgroups-check.
* [SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled ↵Xiao Li2017-03-081-5/+5
| | | | | | | | | | | | | | | | | | | | | | | | | | | Repartition ### What changes were proposed in this pull request? Observed by felixcheung in https://github.com/apache/spark/pull/16739, when users use the shuffle-enabled `repartition` API, they expect the partition they got should be the exact number they provided, even if they call shuffle-disabled `coalesce` later. Currently, `CollapseRepartition` rule does not consider whether shuffle is enabled or not. Thus, we got the following unexpected result. ```Scala val df = spark.range(0, 10000, 1, 5) val df2 = df.repartition(10) assert(df2.coalesce(13).rdd.getNumPartitions == 5) assert(df2.coalesce(7).rdd.getNumPartitions == 5) assert(df2.coalesce(3).rdd.getNumPartitions == 3) ``` This PR is to fix the issue. We preserve shuffle-enabled Repartition. ### How was this patch tested? Added a test case Author: Xiao Li <gatorsmile@gmail.com> Closes #16933 from gatorsmile/CollapseRepartition.
* [SPARK-19865][SQL] remove the view identifier in SubqueryAliasjiangxingbo2017-03-081-1/+1
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Since we have a `View` node now, we can remove the view identifier in `SubqueryAlias`, which was used to indicate a view node before. ## How was this patch tested? Update the related test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17210 from jiangxb1987/SubqueryAlias.
* [SPARK-17080][SQL] join reorderwangzhenhua2017-03-081-0/+16
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Reorder the joins using a dynamic programming algorithm (Selinger paper): First we put all items (basic joined nodes) into level 1, then we build all two-way joins at level 2 from plans at level 1 (single items), then build all 3-way joins from plans at previous levels (two-way joins and single items), then 4-way joins ... etc, until we build all n-way joins and pick the best plan among them. When building m-way joins, we only keep the best plan (with the lowest cost) for the same set of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. Thus, the plans maintained for each level when reordering four items A, B, C, D are as follows: ``` level 1: p({A}), p({B}), p({C}), p({D}) level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D}) level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D}) level 4: p({A, B, C, D}) ``` where p({A, B, C, D}) is the final output plan. For cost evaluation, since physical costs for operators are not available currently, we use cardinalities and sizes to compute costs. ## How was this patch tested? add test cases Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #17138 from wzhfy/joinReorder.
* [SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically ↵Yuming Wang2017-03-082-0/+21
| | | | | | | | | | | | | | | converted to spark.sql.shuffle.partitions ## What changes were proposed in this pull request? Make the `SET mapreduce.job.reduces` automatically converted to `spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`. ## How was this patch tested? unit tests Author: Yuming Wang <wgyumg@gmail.com> Closes #17020 from wangyum/SPARK-19693.
* [SPARK-19841][SS] watermarkPredicate should filter based on keysShixiong Zhu2017-03-071-8/+20
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `StreamingDeduplicateExec.watermarkPredicate` should filter based on keys. Otherwise, it may generate a wrong answer if the watermark column in `keyExpression` has a different position in the row. `StateStoreSaveExec` has the same codes but its parent can makes sure the watermark column positions in `keyExpression` and `row` are the same. ## How was this patch tested? The added test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #17183 from zsxwing/SPARK-19841.
* [SPARK-18389][SQL] Disallow cyclic view referencejiangxingbo2017-03-071-2/+59
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Disallow cyclic view references, a cyclic view reference may be created by the following queries: ``` CREATE VIEW testView AS SELECT id FROM tbl CREATE VIEW testView2 AS SELECT id FROM testView ALTER VIEW testView AS SELECT * FROM testView2 ``` In the above example, a reference cycle (testView -> testView2 -> testView) exsits. We disallow cyclic view references by checking that in ALTER VIEW command, when the `analyzedPlan` contains the same `View` node with the altered view, we should prevent the behavior and throw an AnalysisException. ## How was this patch tested? Test by `SQLViewSuite.test("correctly handle a cyclic view reference")`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17152 from jiangxb1987/cyclic-view.
* [SPARK-19765][SPARK-18549][SQL] UNCACHE TABLE should un-cache all cached ↵Wenchen Fan2017-03-075-76/+79
| | | | | | | | | | | | | | | | | | | | | plans that refer to this table ## What changes were proposed in this pull request? When un-cache a table, we should not only remove the cache entry for this table, but also un-cache any other cached plans that refer to this table. This PR also includes some refactors: 1. use `java.util.LinkedList` to store the cache entries, so that it's safer to remove elements while iterating 2. rename `invalidateCache` to `recacheByPlan`, which is more obvious about what it does. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17097 from cloud-fan/cache.
* [SPARK-19832][SQL] DynamicPartitionWriteTask get partitionPath should escape ↵windpiger2017-03-061-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | the partition name ## What changes were proposed in this pull request? Currently in DynamicPartitionWriteTask, when we get the paritionPath of a parition, we just escape the partition value, not escape the partition name. this will cause some problems for some special partition name situation, for example : 1) if the partition name contains '%' etc, there will be two partition path created in the filesytem, one is for escaped path like '/path/a%25b=1', another is for unescaped path like '/path/a%b=1'. and the data inserted stored in unescaped path, while the show partitions table will return 'a%25b=1' which the partition name is escaped. So here it is not consist. And I think the data should be stored in the escaped path in filesystem, which Hive2.0.0 also have the same action. 2) if the partition name contains ':', there will throw exception that new Path("/path","a:b"), this is illegal which has a colon in the relative path. ``` java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: a:b at org.apache.hadoop.fs.Path.initialize(Path.java:205) at org.apache.hadoop.fs.Path.<init>(Path.java:171) at org.apache.hadoop.fs.Path.<init>(Path.java:88) ... 48 elided Caused by: java.net.URISyntaxException: Relative path in absolute URI: a:b at java.net.URI.checkPath(URI.java:1823) at java.net.URI.<init>(URI.java:745) at org.apache.hadoop.fs.Path.initialize(Path.java:202) ... 50 more ``` ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17173 from windpiger/fixDatasourceSpecialCharPartitionName.
* [SPARK-19709][SQL] Read empty file with CSV data sourceWojtek Szymanski2017-03-061-32/+36
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Bugfix for reading empty file with CSV data source. Instead of throwing `NoSuchElementException`, an empty data frame is returned. ## How was this patch tested? Added new unit test in `org.apache.spark.sql.execution.datasources.csv.CSVSuite` Author: Wojtek Szymanski <wk.szymanski@gmail.com> Closes #17068 from wojtek-szymanski/SPARK-19709.
* [SPARK-19211][SQL] Explicitly prevent Insert into View or Create View As Insertjiangxingbo2017-03-061-0/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently we don't explicitly forbid the following behaviors: 1. The statement CREATE VIEW AS INSERT INTO throws the following exception: ``` scala> spark.sql("CREATE VIEW testView AS INSERT INTO tab VALUES (1, \"a\")") org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: at least one column must be specified for the table; scala> spark.sql("CREATE VIEW testView(a, b) AS INSERT INTO tab VALUES (1, \"a\")") org.apache.spark.sql.AnalysisException: The number of columns produced by the SELECT clause (num: `0`) does not match the number of column names specified by CREATE VIEW (num: `2`).; ``` 2. The statement INSERT INTO view VALUES throws the following exception from checkAnalysis: ``` scala> spark.sql("INSERT INTO testView VALUES (1, \"a\")") org.apache.spark.sql.AnalysisException: Inserting into an RDD-based table is not allowed.;; 'InsertIntoTable View (`default`.`testView`, [a#16,b#17]), false, false +- LocalRelation [col1#14, col2#15] ``` After this PR, the behavior changes to: ``` scala> spark.sql("CREATE VIEW testView AS INSERT INTO tab VALUES (1, \"a\")") org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: CREATE VIEW ... AS INSERT INTO; scala> spark.sql("CREATE VIEW testView(a, b) AS INSERT INTO tab VALUES (1, \"a\")") org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: CREATE VIEW ... AS INSERT INTO; scala> spark.sql("INSERT INTO testView VALUES (1, \"a\")") org.apache.spark.sql.AnalysisException: `default`.`testView` is a view, inserting into a view is not allowed; ``` ## How was this patch tested? Add a new test case in `SparkSqlParserSuite`; Update the corresponding test case in `SQLViewSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17125 from jiangxb1987/insert-with-view.
* [SPARK-19257][SQL] location for table/partition/database should be java.net.URIwindpiger2017-03-069-23/+45
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently we treat the location of table/partition/database as URI string. It will be safer if we can make the type of location as java.net.URI. In this PR, there are following classes changes: **1. CatalogDatabase** ``` case class CatalogDatabase( name: String, description: String, locationUri: String, properties: Map[String, String]) ---> case class CatalogDatabase( name: String, description: String, locationUri: URI, properties: Map[String, String]) ``` **2. CatalogStorageFormat** ``` case class CatalogStorageFormat( locationUri: Option[String], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) ----> case class CatalogStorageFormat( locationUri: Option[URI], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) ``` Before and After this PR, it is transparent for user, there is no change that the user should concern. The `String` to `URI` just happened in SparkSQL internally. Here list some operation related location: **1. whitespace in the location** e.g. `/a/b c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b c/d'` , then `DESC EXTENDED t ` show the location is `/a/b c/d`, and the real path in the FileSystem also show `/a/b c/d` **2. colon(:) in the location** e.g. `/a/b:c/d` For both table location and partition location, when `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b:c/d'` , **In linux file system** `DESC EXTENDED t ` show the location is `/a/b:c/d`, and the real path in the FileSystem also show `/a/b:c/d` **in HDFS** throw exception: `java.lang.IllegalArgumentException: Pathname /a/b:c/d from hdfs://iZbp1151s8hbnnwriekxdeZ:9000/a/b:c/d is not a valid DFS filename.` **while** After `INSERT INTO TABLE t PARTITION(a="a:b") SELECT 1` then `DESC EXTENDED t ` show the location is `/xxx/a=a%3Ab`, and the real path in the FileSystem also show `/xxx/a=a%3Ab` **3. percent sign(%) in the location** e.g. `/a/b%c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%c/d'` , then `DESC EXTENDED t ` show the location is `/a/b%c/d`, and the real path in the FileSystem also show `/a/b%c/d` **4. encoded(%25) in the location** e.g. `/a/b%25c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%25c/d'` , then `DESC EXTENDED t ` show the location is `/a/b%25c/d`, and the real path in the FileSystem also show `/a/b%25c/d` **while** After `INSERT INTO TABLE t PARTITION(a="%25") SELECT 1` then `DESC EXTENDED t ` show the location is `/xxx/a=%2525`, and the real path in the FileSystem also show `/xxx/a=%2525` **Additionally**, except the location, there are two other factors will affect the location of the table/partition. one is the table name which does not allowed to have special characters, and the other is `partition name` which have the same actions with `partition value`, and `partition name` with special character situation has add some testcase and resolve a bug in [PR](https://github.com/apache/spark/pull/17173) ### Summary: After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION path`, the path which we get from `DESC TABLE` and `real path in FileSystem` are all the same with the `CREATE TABLE` command(different filesystem has different action that allow what kind of special character to create the path, e.g. HDFS does not allow colon, but linux filesystem allow it ). `DataBase` also have the same logic with `CREATE TABLE` while if the `partition value` has some special character like `%` `:` `#` etc, then we will get the path with encoded `partition value` like `/xxx/a=A%25B` from `DESC TABLE` and `real path in FileSystem` In this PR, the core change code is using `new Path(str).toUri` and `new Path(uri).toString` which transfrom `str to uri `or `uri to str`. for example: ``` val str = '/a/b c/d' val uri = new Path(str).toUri --> '/a/b%20c/d' val strFromUri = new Path(uri).toString -> '/a/b c/d' ``` when we restore table/partition from metastore, or get the location from `CREATE TABLE` command, we can use it as above to change string to uri `new Path(str).toUri ` ## How was this patch tested? unit test added. The `current master branch` also `passed all the test cases` added in this PR by a litter change. https://github.com/apache/spark/pull/17149/files#diff-b7094baa12601424a5d19cb930e3402fR1764 here `toURI` -> `toString` when test in master branch. This can show that this PR is transparent for user. Author: windpiger <songjun@outlook.com> Closes #17149 from windpiger/changeStringToURI.
* [SPARK-19595][SQL] Support json array in from_jsonhyukjinkwon2017-03-051-5/+47
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to both, **Do not allow json arrays with multiple elements and return null in `from_json` with `StructType` as the schema.** Currently, it only reads the single row when the input is a json array. So, the codes below: ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = StructType(StructField("a", IntegerType) :: Nil) Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("struct").select(from_json(col("struct"), schema)).show() ``` prints ``` +--------------------+ |jsontostruct(struct)| +--------------------+ | [1]| +--------------------+ ``` This PR simply suggests to print this as `null` if the schema is `StructType` and input is json array.with multiple elements ``` +--------------------+ |jsontostruct(struct)| +--------------------+ | null| +--------------------+ ``` **Support json arrays in `from_json` with `ArrayType` as the schema.** ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("array").select(from_json(col("array"), schema)).show() ``` prints ``` +-------------------+ |jsontostruct(array)| +-------------------+ | [[1], [2]]| +-------------------+ ``` ## How was this patch tested? Unit test in `JsonExpressionsSuite`, `JsonFunctionsSuite`, Python doctests and manual test. Author: hyukjinkwon <gurwls223@gmail.com> Closes #16929 from HyukjinKwon/disallow-array.
* [SPARK-19254][SQL] Support Seq, Map, and Struct in functions.litTakeshi Yamamuro2017-03-051-8/+17
| | | | | | | | | | | | | ## What changes were proposed in this pull request? This pr is to support Seq, Map, and Struct in functions.lit; it adds a new IF named `lit2` with `TypeTag` for avoiding type erasure. ## How was this patch tested? Added tests in `LiteralExpressionSuite` Author: Takeshi Yamamuro <yamamuro@apache.org> Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #16610 from maropu/SPARK-19254.
* [SPARK-19718][SS] Handle more interrupt cases properly for HadoopShixiong Zhu2017-03-031-5/+15
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? [SPARK-19617](https://issues.apache.org/jira/browse/SPARK-19617) changed `HDFSMetadataLog` to enable interrupts when using the local file system. However, now we hit [HADOOP-12074](https://issues.apache.org/jira/browse/HADOOP-12074): `Shell.runCommand` converts `InterruptedException` to `new IOException(ie.toString())` before Hadoop 2.8. This is the Hadoop patch to fix HADOOP-1207: https://github.com/apache/hadoop/commit/95c73d49b1bb459b626a9ac52acadb8f5fa724de This PR adds new logic to handle the following cases related to `InterruptedException`. - Check if the message of IOException starts with `java.lang.InterruptedException`. If so, treat it as `InterruptedException`. This is for pre-Hadoop 2.8. - Treat `InterruptedIOException` as `InterruptedException`. This is for Hadoop 2.8+ and other places that may throw `InterruptedIOException` when the thread is interrupted. ## How was this patch tested? The new unit test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #17044 from zsxwing/SPARK-19718.
* [SPARK-18939][SQL] Timezone support in partition values.Takuya UESHIN2017-03-035-27/+62
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This is a follow-up pr of #16308 and #16750. This pr enables timezone support in partition values. We should use `timeZone` option introduced at #16750 to parse/format partition values of the `TimestampType`. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT` which will be used for partition values, the values written by the default timezone option, which is `"GMT"` because the session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq((1, new java.sql.Timestamp(1451606400000L))).toDF("i", "ts") df: org.apache.spark.sql.DataFrame = [i: int, ts: timestamp] scala> df.show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ scala> df.write.partitionBy("ts").save("/path/to/gmtpartition") ``` ```sh $ ls /path/to/gmtpartition/ _SUCCESS ts=2016-01-01 00%3A00%3A00 ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").partitionBy("ts").save("/path/to/pstpartition") ``` ```sh $ ls /path/to/pstpartition/ _SUCCESS ts=2015-12-31 16%3A00%3A00 ``` We can properly read the partition values if the session local timezone and the timezone of the partition values are the same: ```scala scala> spark.read.load("/path/to/gmtpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ ``` And even if the timezones are different, we can properly read the values with setting corrent timezone option: ```scala // wrong result scala> spark.read.load("/path/to/pstpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2015-12-31 16:00:00| +---+-------------------+ // correct result scala> spark.read.option("timeZone", "PST").load("/path/to/pstpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ ``` ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #17053 from ueshin/issues/SPARK-18939.
* [SPARK-19774] StreamExecution should call stop() on sources when a stream failsBurak Yavuz2017-03-031-1/+13
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? We call stop() on a Structured Streaming Source only when the stream is shutdown when a user calls streamingQuery.stop(). We should actually stop all sources when the stream fails as well, otherwise we may leak resources, e.g. connections to Kafka. ## How was this patch tested? Unit tests in `StreamingQuerySuite`. Author: Burak Yavuz <brkyvz@gmail.com> Closes #17107 from brkyvz/close-source.
* [SPARK-18699][SQL][FOLLOWUP] Add explanation in CSV parser and minor cleanuphyukjinkwon2017-03-031-29/+68
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR suggests adding some comments in `UnivocityParser` logics to explain what happens. Also, it proposes, IMHO, a little bit cleaner (at least easy for me to explain). ## How was this patch tested? Unit tests in `CSVSuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17142 from HyukjinKwon/SPARK-18699.
* [SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to ↵windpiger2017-03-021-4/+9
| | | | | | | | | | | | | | | | | | | | | | listFiles twice ## What changes were proposed in this pull request? Currently when we resolveRelation for a `FileFormat DataSource` without providing user schema, it will execute `listFiles` twice in `InMemoryFileIndex` during `resolveRelation`. This PR add a `FileStatusCache` for DataSource, this can avoid listFiles twice. But there is a bug in `InMemoryFileIndex` see: [SPARK-19748](https://github.com/apache/spark/pull/17079) [SPARK-19761](https://github.com/apache/spark/pull/17093), so this pr should be after SPARK-19748/ SPARK-19761. ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17081 from windpiger/resolveDataSourceScanFilesTwice.
* [SPARK-19779][SS] Delete needless tmp file after restart structured ↵guifeng2017-03-021-1/+3
| | | | | | | | | | | | | | | | | streaming job ## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (https://github.com/apache/spark/pull/17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifeng <guifengleaf@gmail.com> Closes #17124 from gf53520/SPARK-19779.
* [SPARK-18352][DOCS] wholeFile JSON update doc and programming guideFelix Cheung2017-03-022-4/+4
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Update doc for R, programming guide. Clarify default behavior for all languages. ## How was this patch tested? manually Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #17128 from felixcheung/jsonwholefiledoc.
* [SPARK-19583][SQL] CTAS for data source table with a created location should ↵windpiger2017-03-011-2/+2
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | succeed ## What changes were proposed in this pull request? ``` spark.sql( s""" |CREATE TABLE t |USING parquet |PARTITIONED BY(a, b) |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) ``` Failed with the error message: ``` path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4c0000gn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.; org.apache.spark.sql.AnalysisException: path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4c0000gn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:102) ``` while hive table is ok ,so we should fix it for datasource table. The reason is that the SaveMode check is put in `InsertIntoHadoopFsRelationCommand` , and the SaveMode check actually use `path`, this is fine when we use `DataFrameWriter.save()`, because this situation of SaveMode act on `path`. While when we use `CreateDataSourceAsSelectCommand`, the situation of SaveMode act on table, and we have already do SaveMode check in `CreateDataSourceAsSelectCommand` for table , so we should not do SaveMode check in the following logic in `InsertIntoHadoopFsRelationCommand` for path, this is redundant and wrong logic for `CreateDataSourceAsSelectCommand` After this PR, the following DDL will succeed, when the location has been created we will append it or overwrite it. ``` CREATE TABLE ... (PARTITIONED BY ...) LOCATION path AS SELECT ... ``` ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #16938 from windpiger/CTASDataSourceWitLocation.
* [SPARK-19761][SQL] create InMemoryFileIndex with an empty rootPaths when set ↵windpiger2017-03-012-3/+5
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed ## What changes were proposed in this pull request? If we create a InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero, it will throw an exception: ``` Positive number of slices required java.lang.IllegalArgumentException: Positive number of slices required at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:357) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:256) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:50) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9$$anonfun$apply$mcV$sp$2.apply$mcV$sp(FileIndexSuite.scala:186) at org.apache.spark.sql.test.SQLTestUtils$class.withSQLConf(SQLTestUtils.scala:105) at org.apache.spark.sql.execution.datasources.FileIndexSuite.withSQLConf(FileIndexSuite.scala:33) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply$mcV$sp(FileIndexSuite.scala:185) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) ``` ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17093 from windpiger/fixEmptiPathInBulkListFiles.
* [SPARK-19736][SQL] refreshByPath should clear all cached plans with the ↵Liang-Chi Hsieh2017-03-011-9/+10
| | | | | | | | | | | | | | | | | | | | specified path ## What changes were proposed in this pull request? `Catalog.refreshByPath` can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path. However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17064 from viirya/fix-refreshByPath.
* [SPARK-19633][SS] FileSource read from FileSinkLiwei Lin2017-02-283-29/+87
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Right now file source always uses `InMemoryFileIndex` to scan files from a given path. But when reading the outputs from another streaming query, the file source should use `MetadataFileIndex` to list files from the sink log. This patch adds this support. ## `MetadataFileIndex` or `InMemoryFileIndex` ```scala spark .readStream .format(...) .load("/some/path") // for a non-glob path: // - use `MetadataFileIndex` when `/some/path/_spark_meta` exists // - fall back to `InMemoryFileIndex` otherwise ``` ```scala spark .readStream .format(...) .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex` ``` ## How was this patch tested? two newly added tests Author: Liwei Lin <lwlin7@gmail.com> Closes #16987 from lw-lin/source-read-from-sink.
* [SPARK-19572][SPARKR] Allow to disable hive in sparkR shellJeff Zhang2017-02-281-2/+4
| | | | | | | | | | | | | ## What changes were proposed in this pull request? SPARK-15236 do this for scala shell, this ticket is for sparkR shell. This is not only for sparkR itself, but can also benefit downstream project like livy which use shell.R for its interactive session. For now, livy has no control of whether enable hive or not. ## How was this patch tested? Tested it manually, run `bin/sparkR --master local --conf spark.sql.catalogImplementation=in-memory` and verify hive is not enabled. Author: Jeff Zhang <zjffdu@apache.org> Closes #16907 from zjffdu/SPARK-19572.
* [SPARK-19610][SQL] Support parsing multiline CSV fileshyukjinkwon2017-02-289-132/+371
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file). So, this PR introduces `wholeFile` option which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory. ## How was this patch tested? Unit tests in `CSVSuite` and `tests.py` Manual tests with a single 9GB CSV file in local file system, for example, ```scala spark.read.option("wholeFile", true).option("inferSchema", true).csv("tmp.csv").count() ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16976 from HyukjinKwon/SPARK-19610.
* [SPARK-19463][SQL] refresh cache after the InsertIntoHadoopFsRelationCommandwindpiger2017-02-281-0/+3
| | | | | | | | | | | | | ## What changes were proposed in this pull request? If we first cache a DataSource table, then we insert some data into the table, we should refresh the data in the cache after the insert command. ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #16809 from windpiger/refreshCacheAfterInsert.
* [SPARK-19677][SS] Committing a delta file atop an existing one should not ↵Roberto Agostino Vitillo2017-02-281-1/+10
| | | | | | | | | | | | | | | | | | | | | | | fail on HDFS ## What changes were proposed in this pull request? HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html) of `rename()`, the behavior of the local filesystem and HDFS varies: > Destination exists and is a file > Renaming a file atop an existing file is specified as failing, raising an exception. > - Local FileSystem : the rename succeeds; the destination file is replaced by the source file. > - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false. This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output. ## How was this patch tested? This patch was tested by running `StateStoreSuite`. Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com> Closes #17012 from vitillo/fix_rename.
* [SPARK-19678][SQL] remove MetastoreRelationWenchen Fan2017-02-286-103/+71
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? `MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation` ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17015 from cloud-fan/table-relation.
* [SPARK-19660][CORE][SQL] Replace the configuration property names that are ↵Yuming Wang2017-02-282-7/+7
| | | | | | | | | | | | | | | | | | | | | | deprecated in the version of Hadoop 2.6 ## What changes were proposed in this pull request? Replace all the Hadoop deprecated configuration property names according to [DeprecatedProperties](https://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-common/DeprecatedProperties.html). except: https://github.com/apache/spark/blob/v2.1.0/python/pyspark/sql/tests.py#L1533 https://github.com/apache/spark/blob/v2.1.0/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala#L987 https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala#L45 https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L614 ## How was this patch tested? Existing tests Author: Yuming Wang <wgyumg@gmail.com> Closes #16990 from wangyum/HadoopDeprecatedProperties.
* [SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate ↵windpiger2017-02-281-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | and regenerate the inmemory var for InMemoryFileIndex with FileStatusCache ## What changes were proposed in this pull request? If we refresh a InMemoryFileIndex with a FileStatusCache, it will first use the FileStatusCache to re-generate the cachedLeafFiles etc, then call FileStatusCache.invalidateAll. While the order to do these two actions is wrong, this lead to the refresh action does not take effect. ``` override def refresh(): Unit = { refresh0() fileStatusCache.invalidateAll() } private def refresh0(): Unit = { val files = listLeafFiles(rootPaths) cachedLeafFiles = new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) cachedPartitionSpec = null } ``` ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17079 from windpiger/fixInMemoryFileIndexRefresh.
* [SPARK-19749][SS] Name socket source with a meaningful nameuncleGen2017-02-271-2/+4
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Name socket source with a meaningful name ## How was this patch tested? Jenkins Author: uncleGen <hustyugm@gmail.com> Closes #17082 from uncleGen/SPARK-19749.
* [SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle ↵Eyal Zituny2017-02-261-1/+13
| | | | | | | | | | | | | | | | | | QueryTerminatedEvent if more then one listeners exists ## What changes were proposed in this pull request? currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event. this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set. in this PR, the query id will be removed from the set only after all the listeners handles the event ## How was this patch tested? a test with multiple listeners has been added to StreamingQueryListenerSuite Author: Eyal Zituny <eyal.zituny@equalum.io> Closes #16991 from eyalzit/master.
* [SPARK-19650] Commands should not trigger a Spark jobHerman van Hovell2017-02-243-17/+8
| | | | | | | | | | | | Spark executes SQL commands eagerly. It does this by creating an RDD which contains the command's results. The downside to this is that any action on this RDD triggers a Spark job which is expensive and is unnecessary. This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; it just materializes the result and puts them in a `LocalRelation`. Added a regression test to `SQLQuerySuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17027 from hvanhovell/no-job-command.
* [SPARK-19735][SQL] Remove HOLD_DDLTIME from Catalog APIsXiao Li2017-02-241-2/+0
| | | | | | | | | | | | ### What changes were proposed in this pull request? As explained in Hive JIRA https://issues.apache.org/jira/browse/HIVE-12224, HOLD_DDLTIME was broken as soon as it landed. Hive 2.0 removes HOLD_DDLTIME from the API. In Spark SQL, we always set it to FALSE. Like Hive, we should also remove it from our Catalog APIs. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #17063 from gatorsmile/removalHoldDDLTime.
* [SPARK-17078][SQL] Show stats when explainwangzhenhua2017-02-244-6/+28
| | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently we can only check the estimated stats in logical plans by debugging. We need to provide an easier and more efficient way for developers/users. In this pr, we add EXPLAIN COST command to show stats in the optimized logical plan. E.g. ``` spark-sql> EXPLAIN COST select count(1) from store_returns; ... == Optimized Logical Plan == Aggregate [count(1) AS count(1)#24L], Statistics(sizeInBytes=16.0 B, rowCount=1, isBroadcastable=false) +- Project, Statistics(sizeInBytes=4.3 GB, rowCount=5.76E+8, isBroadcastable=false) +- Relation[sr_returned_date_sk#3,sr_return_time_sk#4,sr_item_sk#5,sr_customer_sk#6,sr_cdemo_sk#7,sr_hdemo_sk#8,sr_addr_sk#9,sr_store_sk#10,sr_reason_sk#11,sr_ticket_number#12,sr_return_quantity#13,sr_return_amt#14,sr_return_tax#15,sr_return_amt_inc_tax#16,sr_fee#17,sr_return_ship_cost#18,sr_refunded_cash#19,sr_reversed_charge#20,sr_store_credit#21,sr_net_loss#22] parquet, Statistics(sizeInBytes=28.6 GB, rowCount=5.76E+8, isBroadcastable=false) ... ``` ## How was this patch tested? Add test cases. Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #16594 from wzhfy/showStats.
* [SPARK-19664][SQL] put hive.metastore.warehouse.dir in hadoopconf to ↵windpiger2017-02-231-2/+6
| | | | | | | | | | | | | | | | | overwrite its original value ## What changes were proposed in this pull request? In [SPARK-15959](https://issues.apache.org/jira/browse/SPARK-15959), we bring back the `hive.metastore.warehouse.dir` , while in the logic, when use the value of `spark.sql.warehouse.dir` to overwrite `hive.metastore.warehouse.dir` , it set it to `sparkContext.conf` which does not overwrite the value is hadoopConf, I think it should put in `sparkContext.hadoopConfiguration` and overwrite the original value of hadoopConf https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala#L64 ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #16996 from windpiger/hivemetawarehouseConf.
* [SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the ↵Carson Wang2017-02-231-2/+5
| | | | | | | | | | | | | | execution when merging all accumulator updates ## What changes were proposed in this pull request? In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException. ## How was this patch tested? Updated unit test. Author: Carson Wang <carson.wang@intel.com> Closes #17009 from carsonwang/FixSQLMetrics.
* [SPARK-18699][SQL] Put malformed tokens into a new field when parsing CSV dataTakeshi Yamamuro2017-02-235-33/+114
| | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This pr added a logic to put malformed tokens into a new field when parsing CSV data in case of permissive modes. In the current master, if the CSV parser hits these malformed ones, it throws an exception below (and then a job fails); ``` Caused by: java.lang.IllegalArgumentException at java.sql.Date.valueOf(Date.java:143) at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:272) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272) at scala.util.Try.getOrElse(Try.scala:79) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:269) at ``` In case that users load large CSV-formatted data, the job failure makes users get some confused. So, this fix set NULL for original columns and put malformed tokens in a new field. ## How was this patch tested? Added tests in `CSVSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #16928 from maropu/SPARK-18699-2.