aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
Commit message (Collapse)AuthorAgeFilesLines
...
* [SPARK-17354] [SQL] Partitioning by dates/timestamps should work with ↵hyukjinkwon2016-09-091-1/+48
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Parquet vectorized reader ## What changes were proposed in this pull request? This PR fixes `ColumnVectorUtils.populate` so that Parquet vectorized reader can read partitioned table with dates/timestamps. This works fine with Parquet normal reader. This is being only called within [VectorizedParquetRecordReader.java#L185](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L185). When partition column types are explicitly given to `DateType` or `TimestampType` (rather than inferring the type of partition column), this fails with the exception below: ``` 16/09/01 10:30:07 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 6) java.lang.ClassCastException: java.lang.Integer cannot be cast to java.sql.Date at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:89) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:185) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:204) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:362) ... ``` ## How was this patch tested? Unit tests in `SQLQuerySuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14919 from HyukjinKwon/SPARK-17354.
* [SPARK-17432][SQL] PreprocessDDL should respect case sensitivity when ↵Wenchen Fan2016-09-081-0/+7
| | | | | | | | | | | | | | | | | | checking duplicated columns ## What changes were proposed in this pull request? In `PreprocessDDL` we will check if table columns are duplicated. However, this checking ignores case sensitivity config(it's always case-sensitive) and lead to different result between `HiveExternalCatalog` and `InMemoryCatalog`. `HiveExternalCatalog` will throw exception because hive metastore is always case-nonsensitive, and `InMemoryCatalog` is fine. This PR fixes it. ## How was this patch tested? a new test in DDLSuite Author: Wenchen Fan <wenchen@databricks.com> Closes #14994 from cloud-fan/check-dup.
* [SPARK-17427][SQL] function SIZE should return -1 when parameter is nullDaoyuan Wang2016-09-071-6/+8
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? `select size(null)` returns -1 in Hive. In order to be compatible, we should return `-1`. ## How was this patch tested? unit test in `CollectionFunctionsSuite` and `DataFrameFunctionsSuite`. Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #14991 from adrian-wang/size.
* [SPARK-17372][SQL][STREAMING] Avoid serialization issues by using Arrays to ↵Tathagata Das2016-09-062-10/+50
| | | | | | | | | | | | | | | | | | | | | | | | | | save file names in FileStreamSource ## What changes were proposed in this pull request? When we create a filestream on a directory that has partitioned subdirs (i.e. dir/x=y/), then ListingFileCatalog.allFiles returns the files in the dir as Seq[String] which internally is a Stream[String]. This is because of this [line](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93), where a LinkedHashSet.values.toSeq returns Stream. Then when the [FileStreamSource](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79) filters this Stream[String] to remove the seen files, it creates a new Stream[String], which has a filter function that has a $outer reference to the FileStreamSource (in Scala 2.10). Trying to serialize this Stream[String] causes NotSerializableException. This will happened even if there is just one file in the dir. Its important to note that this behavior is different in Scala 2.11. There is no $outer reference to FileStreamSource, so it does not throw NotSerializableException. However, with a large sequence of files (tested with 10000 files), it throws StackOverflowError. This is because how Stream class is implemented. Its basically like a linked list, and attempting to serialize a long Stream requires *recursively* going through linked list, thus resulting in StackOverflowError. In short, across both Scala 2.10 and 2.11, serialization fails when both the following conditions are true. - file stream defined on a partitioned directory - directory has 10k+ files The right solution is to convert the seq to an array before writing to the log. This PR implements this fix in two ways. - Changing all uses for HDFSMetadataLog to ensure Array is used instead of Seq - Added a `require` in HDFSMetadataLog such that it is never used with type Seq ## How was this patch tested? Added unit test that test that ensures the file stream source can handle with 10000 files. This tests fails in both Scala 2.10 and 2.11 with different failures as indicated above. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #14987 from tdas/SPARK-17372.
* [SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in ↵Davies Liu2016-09-061-0/+56
| | | | | | | | | | | | | | | | | | LongToUnsafeRowMap ## What changes were proposed in this pull request? In LongToUnsafeRowMap, we use offset of a value as pointer, stored in a array also in the page for chained values. The offset is not portable, because Platform.LONG_ARRAY_OFFSET will be different with different JVM Heap size, then the deserialized LongToUnsafeRowMap will be corrupt. This PR will change to use portable address (without Platform.LONG_ARRAY_OFFSET). ## How was this patch tested? Added a test case with random generated keys, to improve the coverage. But this test is not a regression test, that could require a Spark cluster that have at least 32G heap in driver or executor. Author: Davies Liu <davies@databricks.com> Closes #14927 from davies/longmap.
* [SPARK-17374][SQL] Better error messages when parsing JSON using DataFrameReaderSean Zhong2016-09-061-1/+28
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR adds better error messages for malformed record when reading a JSON file using DataFrameReader. For example, for query: ``` import org.apache.spark.sql.types._ val corruptRecords = spark.sparkContext.parallelize("""{"a":{, b:3}""" :: Nil) val schema = StructType(StructField("a", StringType, true) :: Nil) val jsonDF = spark.read.schema(schema).json(corruptRecords) ``` **Before change:** We silently replace corrupted line with null ``` scala> jsonDF.show +----+ | a| +----+ |null| +----+ ``` **After change:** Add an explicit warning message: ``` scala> jsonDF.show 16/09/02 14:43:16 WARN JacksonParser: Found at least one malformed records (sample: {"a":{, b:3}). The JSON reader will replace all malformed records with placeholder null in current PERMISSIVE parser mode. To find out which corrupted records have been replaced with null, please use the default inferred schema instead of providing a custom schema. Code example to print all malformed records (scala): =================================================== // The corrupted record exists in column _corrupt_record. val parsedJson = spark.read.json("/path/to/json/file/test.json") +----+ | a| +----+ |null| +----+ ``` ### ## How was this patch tested? Unit test. Author: Sean Zhong <seanzhong@databricks.com> Closes #14929 from clockfly/logwarning_if_schema_not_contain_corrupted_record.
* [SPARK-17356][SQL] Fix out of memory issue when generating JSON for TreeNodeSean Zhong2016-09-061-1/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? class `org.apache.spark.sql.types.Metadata` is widely used in mllib to store some ml attributes. `Metadata` is commonly stored in `Alias` expression. ``` case class Alias(child: Expression, name: String)( val exprId: ExprId = NamedExpression.newExprId, val qualifier: Option[String] = None, val explicitMetadata: Option[Metadata] = None, override val isGenerated: java.lang.Boolean = false) ``` The `Metadata` can take a big memory footprint since the number of attributes is big ( in scale of million). When `toJSON` is called on `Alias` expression, the `Metadata` will also be converted to a big JSON string. If a plan contains many such kind of `Alias` expressions, it may trigger out of memory error when `toJSON` is called, since converting all `Metadata` references to JSON will take huge memory. With this PR, we will skip scanning Metadata when doing JSON conversion. For a reproducer of the OOM, and analysis, please look at jira https://issues.apache.org/jira/browse/SPARK-17356. ## How was this patch tested? Existing tests. Author: Sean Zhong <seanzhong@databricks.com> Closes #14915 from clockfly/json_oom.
* [SPARK-17361][SQL] file-based external table without path should not be createdWenchen Fan2016-09-061-0/+17
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Using the public `Catalog` API, users can create a file-based data source table, without giving the path options. For this case, currently we can create the table successfully, but fail when we read it. Ideally we should fail during creation. This is because when we create data source table, we resolve the data source relation without validating path: `resolveRelation(checkPathExist = false)`. Looking back to why we add this trick(`checkPathExist`), it's because when we call `resolveRelation` for managed table, we add the path to data source options but the path is not created yet. So why we add this not-yet-created path to data source options? This PR fix the problem by adding path to options after we call `resolveRelation`. Then we can remove the `checkPathExist` parameter in `DataSource.resolveRelation` and do some related cleanups. ## How was this patch tested? existing tests and new test in `CatalogSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #14921 from cloud-fan/check-path.
* [SPARK-17358][SQL] Cached table(parquet/orc) should be shard between beelinesYadong Qi2016-09-061-1/+2
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Cached table(parquet/orc) couldn't be shard between beelines, because the `sameResult` method used by `CacheManager` always return false(`sparkSession` are different) when compare two `HadoopFsRelation` in different beelines. So we make `sparkSession` a curry parameter. ## How was this patch tested? Beeline1 ``` 1: jdbc:hive2://localhost:10000> CACHE TABLE src_pqt; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (5.143 seconds) 1: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt; +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | plan | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | == Physical Plan == InMemoryTableScan [key#49, value#50] +- InMemoryRelation [key#49, value#50], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt` +- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:string> | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ ``` Beeline2 ``` 0: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt; +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | plan | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | == Physical Plan == InMemoryTableScan [key#68, value#69] +- InMemoryRelation [key#68, value#69], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt` +- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:string> | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ ``` Author: Yadong Qi <qiyadong2010@gmail.com> Closes #14913 from watermen/SPARK-17358.
* [SPARK-17072][SQL] support table-level statistics generation and storing ↵wangzhenhua2016-09-051-0/+26
| | | | | | | | | | | | | | | | | | | | | | | into/loading from metastore ## What changes were proposed in this pull request? 1. Support generation table-level statistics for - hive tables in HiveExternalCatalog - data source tables in HiveExternalCatalog - data source tables in InMemoryCatalog. 2. Add a property "catalogStats" in CatalogTable to hold statistics in Spark side. 3. Put logics of statistics transformation between Spark and Hive in HiveClientImpl. 4. Extend Statistics class by adding rowCount (will add estimatedSize when we have column stats). ## How was this patch tested? add unit tests Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wangzhenhua@huawei.com> Closes #14712 from wzhfy/tableStats.
* [SPARK-17394][SQL] should not allow specify database in table/view name ↵Wenchen Fan2016-09-052-34/+10
| | | | | | | | | | | | | | | | | | | after RENAME TO ## What changes were proposed in this pull request? It's really weird that we allow users to specify database in both from table name and to table name in `ALTER TABLE RENAME TO`, while logically we can't support rename a table to a different database. Both postgres and MySQL disallow this syntax, it's reasonable to follow them and simply our code. ## How was this patch tested? new test in `DDLCommandSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #14955 from cloud-fan/rename.
* [SPARK-17298][SQL] Require explicit CROSS join for cartesian productsSrinath Shankar2016-09-036-16/+63
| | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Require the use of CROSS join syntax in SQL (and a new crossJoin DataFrame API) to specify explicit cartesian products between relations. By cartesian product we mean a join between relations R and S where there is no join condition involving columns from both R and S. If a cartesian product is detected in the absence of an explicit CROSS join, an error must be thrown. Turning on the "spark.sql.crossJoin.enabled" configuration flag will disable this check and allow cartesian products without an explicit CROSS join. The new crossJoin DataFrame API must be used to specify explicit cross joins. The existing join(DataFrame) method will produce a INNER join that will require a subsequent join condition. That is df1.join(df2) is equivalent to select * from df1, df2. ## How was this patch tested? Added cross-join.sql to the SQLQueryTestSuite to test the check for cartesian products. Added a couple of tests to the DataFrameJoinSuite to test the crossJoin API. Modified various other test suites to explicitly specify a cross join where an INNER join or a comma-separated list was previously used. Author: Srinath Shankar <srinath@databricks.com> Closes #14866 from srinathshankar/crossjoin.
* [SPARK-17230] [SQL] Should not pass optimized query into QueryExecution in ↵Davies Liu2016-09-021-0/+8
| | | | | | | | | | | | | | | | | | | | | | | | DataFrameWriter ## What changes were proposed in this pull request? Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs. For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result. Ideally, we should make all the analyzer rules all idempotent, that may require lots of effort to double checking them one by one (may be not easy). An easier approach could be never feed a optimized plan into Analyzer, this PR fix the case for RunnableComand, they will be optimized, during execution, the passed `query` will also be passed into QueryExecution again. This PR make these `query` not part of the children, so they will not be optimized and analyzed again. Right now, we did not know a logical plan is optimized or not, we could introduce a flag for that, and make sure a optimized logical plan will not be analyzed again. ## How was this patch tested? Added regression tests. Author: Davies Liu <davies@databricks.com> Closes #14797 from davies/fix_writer.
* [SPARK-16525] [SQL] Enable Row Based HashMap in HashAggregateExecQifan Pu2016-09-013-10/+102
| | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR is the second step for the following feature: For hash aggregation in Spark SQL, we use a fast aggregation hashmap to act as a "cache" in order to boost aggregation performance. Previously, the hashmap is backed by a `ColumnarBatch`. This has performance issues when we have wide schema for the aggregation table (large number of key fields or value fields). In this JIRA, we support another implementation of fast hashmap, which is backed by a `RowBatch`. We then automatically pick between the two implementations based on certain knobs. In this second-step PR, we enable `RowBasedHashMapGenerator` in `HashAggregateExec`. ## How was this patch tested? Added tests: `RowBasedAggregateHashMapSuite` and ` VectorizedAggregateHashMapSuite` Additional micro-benchmarks tests and TPCDS results will be added in a separate PR in the series. Author: Qifan Pu <qifan.pu@gmail.com> Author: ooq <qifan.pu@gmail.com> Closes #14176 from ooq/rowbasedfastaggmap-pr2.
* [SPARK-16461][SQL] Support partition batch pruning with `<=>` predicate in ↵hyukjinkwon2016-09-011-0/+2
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | InMemoryTableScanExec ## What changes were proposed in this pull request? It seems `EqualNullSafe` filter was missed for batch pruneing partitions in cached tables. It seems supporting this improves the performance roughly 5 times faster. Running the codes below: ```scala test("Null-safe equal comparison") { val N = 20000000 val df = spark.range(N).repartition(20) val benchmark = new Benchmark("Null-safe equal comparison", N) df.createOrReplaceTempView("t") spark.catalog.cacheTable("t") sql("select id from t where id <=> 1").collect() benchmark.addCase("Null-safe equal comparison", 10) { _ => sql("select id from t where id <=> 1").collect() } benchmark.run() } ``` produces the results below: **Before:** ``` Running benchmark: Null-safe equal comparison Running case: Null-safe equal comparison Stopped after 10 iterations, 2098 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_45-b14 on Mac OS X 10.11.5 Intel(R) Core(TM) i7-4850HQ CPU 2.30GHz Null-safe equal comparison: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Null-safe equal comparison 204 / 210 98.1 10.2 1.0X ``` **After:** ``` Running benchmark: Null-safe equal comparison Running case: Null-safe equal comparison Stopped after 10 iterations, 478 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_45-b14 on Mac OS X 10.11.5 Intel(R) Core(TM) i7-4850HQ CPU 2.30GHz Null-safe equal comparison: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Null-safe equal comparison 42 / 48 474.1 2.1 1.0X ``` ## How was this patch tested? Unit tests in `PartitionBatchPruningSuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14117 from HyukjinKwon/SPARK-16461.
* [SPARK-16283][SQL] Implements percentile_approx aggregation function which ↵Sean Zhong2016-09-011-0/+226
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | supports partial aggregation. ## What changes were proposed in this pull request? This PR implements aggregation function `percentile_approx`. Function `percentile_approx` returns the approximate percentile(s) of a column at the given percentage(s). A percentile is a watermark value below which a given percentage of the column values fall. For example, the percentile of column `col` at percentage 50% is the median value of column `col`. ### Syntax: ``` # Returns percentile at a given percentage value. The approximation error can be reduced by increasing parameter accuracy, at the cost of memory. percentile_approx(col, percentage [, accuracy]) # Returns percentile value array at given percentage value array percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy]) ``` ### Features: 1. This function supports partial aggregation. 2. The memory consumption is bounded. The larger `accuracy` parameter we choose, we smaller error we get. The default accuracy value is 10000, to match with Hive default setting. Choose a smaller value for smaller memory footprint. 3. This function supports window function aggregation. ### Example usages: ``` ## Returns the 25th percentile value, with default accuracy SELECT percentile_approx(col, 0.25) FROM table ## Returns an array of percentile value (25th, 50th, 75th), with default accuracy SELECT percentile_approx(col, array(0.25, 0.5, 0.75)) FROM table ## Returns 25th percentile value, with custom accuracy value 100, larger accuracy parameter yields smaller approximation error SELECT percentile_approx(col, 0.25, 100) FROM table ## Returns the 25th, and 50th percentile values, with custom accuracy value 100 SELECT percentile_approx(col, array(0.25, 0.5), 100) FROM table ``` ### NOTE: 1. The `percentile_approx` implementation is different from Hive, so the result returned on same query maybe slightly different with Hive. This implementation uses `QuantileSummaries` as the underlying probabilistic data structure, and mainly follows paper `Space-efficient Online Computation of Quantile Summaries` by Greenwald, Michael and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)` 2. The current implementation of `QuantileSummaries` doesn't support automatic compression. This PR has a rule to do compression automatically at the caller side, but it may not be optimal. ## How was this patch tested? Unit test, and Sql query test. ## Acknowledgement 1. This PR's work in based on lw-lin's PR https://github.com/apache/spark/pull/14298, with improvements like supporting partial aggregation, fixing out of memory issue. Author: Sean Zhong <seanzhong@databricks.com> Closes #14868 from clockfly/appro_percentile_try_2.
* revert PR#10896 and PR#14865Wenchen Fan2016-09-012-71/+21
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? according to the discussion in the original PR #10896 and the new approach PR #14876 , we decided to revert these 2 PRs and go with the new approach. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #14909 from cloud-fan/revert.
* [SPARK-17289][SQL] Fix a bug to satisfy sort requirements in partial ↵Takeshi YAMAMURO2016-08-301-1/+21
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | aggregations ## What changes were proposed in this pull request? Partial aggregations are generated in `EnsureRequirements`, but the planner fails to check if partial aggregation satisfies sort requirements. For the following query: ``` val df2 = (0 to 1000).map(x => (x % 2, x.toString)).toDF("a", "b").createOrReplaceTempView("t2") spark.sql("select max(b) from t2 group by a").explain(true) ``` Now, the SortAggregator won't insert Sort operator before partial aggregation, this will break sort-based partial aggregation. ``` == Physical Plan == SortAggregate(key=[a#5], functions=[max(b#6)], output=[max(b)#17]) +- *Sort [a#5 ASC], false, 0 +- Exchange hashpartitioning(a#5, 200) +- SortAggregate(key=[a#5], functions=[partial_max(b#6)], output=[a#5, max#19]) +- LocalTableScan [a#5, b#6] ``` Actually, a correct plan is: ``` == Physical Plan == SortAggregate(key=[a#5], functions=[max(b#6)], output=[max(b)#17]) +- *Sort [a#5 ASC], false, 0 +- Exchange hashpartitioning(a#5, 200) +- SortAggregate(key=[a#5], functions=[partial_max(b#6)], output=[a#5, max#19]) +- *Sort [a#5 ASC], false, 0 +- LocalTableScan [a#5, b#6] ``` ## How was this patch tested? Added tests in `PlannerSuite`. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #14865 from maropu/SPARK-17289.
* [SPARK-17063] [SQL] Improve performance of MSCK REPAIR TABLE with Hive metastoreDavies Liu2016-08-291-2/+11
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR split the the single `createPartitions()` call into smaller batches, which could prevent Hive metastore from OOM (caused by millions of partitions). It will also try to gather all the fast stats (number of files and total size of all files) in parallel to avoid the bottle neck of listing the files in metastore sequential, which is controlled by spark.sql.gatherFastStats (enabled by default). ## How was this patch tested? Tested locally with 10000 partitions and 100 files with embedded metastore, without gathering fast stats in parallel, adding partitions took 153 seconds, after enable that, gathering the fast stats took about 34 seconds, adding these partitions took 25 seconds (most of the time spent in object store), 59 seconds in total, 2.5X faster (with larger cluster, gathering will much faster). Author: Davies Liu <davies@databricks.com> Closes #14607 from davies/repair_batch.
* [SPARK-17271][SQL] Planner adds un-necessary Sort even if child ordering is ↵Tejas Patil2016-08-281-1/+39
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | semantically same as required ordering ## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-17271 Planner is adding un-needed SORT operation due to bug in the way comparison for `SortOrder` is done at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L253 `SortOrder` needs to be compared semantically because `Expression` within two `SortOrder` can be "semantically equal" but not literally equal objects. eg. In case of `sql("SELECT * FROM table1 a JOIN table2 b ON a.col1=b.col1")` Expression in required SortOrder: ``` AttributeReference( name = "col1", dataType = LongType, nullable = false ) (exprId = exprId, qualifier = Some("a") ) ``` Expression in child SortOrder: ``` AttributeReference( name = "col1", dataType = LongType, nullable = false ) (exprId = exprId) ``` Notice that the output column has a qualifier but the child attribute does not but the inherent expression is the same and hence in this case we can say that the child satisfies the required sort order. This PR includes following changes: - Added a `semanticEquals` method to `SortOrder` so that it can compare underlying child expressions semantically (and not using default Object.equals) - Fixed `EnsureRequirements` to use semantic comparison of SortOrder ## How was this patch tested? - Added a test case to `PlannerSuite`. Ran rest tests in `PlannerSuite` Author: Tejas Patil <tejasp@fb.com> Closes #14841 from tejasapatil/SPARK-17271_sort_order_equals_bug.
* [SPARK-15382][SQL] Fix a bug in sampling with replacementTakeshi YAMAMURO2016-08-271-0/+7
| | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This pr to fix a bug below in sampling with replacement ``` val df = Seq((1, 0), (2, 0), (3, 0)).toDF("a", "b") df.sample(true, 2.0).withColumn("c", monotonically_increasing_id).select($"c").show +---+ | c| +---+ | 0| | 1| | 1| | 1| | 2| +---+ ``` ## How was this patch tested? Added a test in `DataFrameSuite`. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #14800 from maropu/FixSampleBug.
* [SPARK-17235][SQL] Support purging of old logs in MetadataLogpetermaxlee2016-08-261-4/+23
| | | | | | | | | | | | ## What changes were proposed in this pull request? This patch adds a purge interface to MetadataLog, and an implementation in HDFSMetadataLog. The purge function is currently unused, but I will use it to purge old execution and file source logs in follow-up patches. These changes are required in a production structured streaming job that runs for a long period of time. ## How was this patch tested? Added a unit test case in HDFSMetadataLogSuite. Author: petermaxlee <petermaxlee@gmail.com> Closes #14802 from petermaxlee/SPARK-17235.
* [SPARK-17165][SQL] FileStreamSource should not track the list of seen files ↵petermaxlee2016-08-262-3/+113
| | | | | | | | | | | | | | | | indefinitely ## What changes were proposed in this pull request? Before this change, FileStreamSource uses an in-memory hash set to track the list of files processed by the engine. The list can grow indefinitely, leading to OOM or overflow of the hash set. This patch introduces a new user-defined option called "maxFileAge", default to 24 hours. If a file is older than this age, FileStreamSource will purge it from the in-memory map that was used to track the list of files that have been processed. ## How was this patch tested? Added unit tests for the underlying utility, and also added an end-to-end test to validate the purge in FileStreamSourceSuite. Also verified the new test cases would fail when the timeout was set to a very large number. Author: petermaxlee <petermaxlee@gmail.com> Closes #14728 from petermaxlee/SPARK-17165.
* [SPARK-17192][SQL] Issue Exception when Users Specify the Partitioning ↵gatorsmile2016-08-261-5/+12
| | | | | | | | | | | | | | | | | | Columns without a Given Schema ### What changes were proposed in this pull request? Address the comments by yhuai in the original PR: https://github.com/apache/spark/pull/14207 First, issue an exception instead of logging a warning when users specify the partitioning columns without a given schema. Second, refactor the codes a little. ### How was this patch tested? Fixed the test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #14572 from gatorsmile/followup16552.
* [SPARK-16216][SQL][FOLLOWUP] Enable timestamp type tests for JSON and verify ↵hyukjinkwon2016-08-261-2/+14
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | all unsupported types in CSV ## What changes were proposed in this pull request? This PR enables the tests for `TimestampType` for JSON and unifies the logics for verifying schema when writing in CSV. In more details, this PR, - Enables the tests for `TimestampType` for JSON and This was disabled due to an issue in `DatatypeConverter.parseDateTime` which parses dates incorrectly, for example as below: ```scala val d = javax.xml.bind.DatatypeConverter.parseDateTime("0900-01-01T00:00:00.000").getTime println(d.toString) ``` ``` Fri Dec 28 00:00:00 KST 899 ``` However, since we use `FastDateFormat`, it seems we are safe now. ```scala val d = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS").parse("0900-01-01T00:00:00.000") println(d) ``` ``` Tue Jan 01 00:00:00 PST 900 ``` - Verifies all unsupported types in CSV There is a separate logics to verify the schemas in `CSVFileFormat`. This is actually not quite correct enough because we don't support `NullType` and `CalanderIntervalType` as well `StructType`, `ArrayType`, `MapType`. So, this PR adds both types. ## How was this patch tested? Tests in `JsonHadoopFsRelation` and `CSVSuite` Author: hyukjinkwon <gurwls223@gmail.com> Closes #14829 from HyukjinKwon/SPARK-16216-followup.
* [SPARK-17187][SQL] Supports using arbitrary Java object as internal ↵Sean Zhong2016-08-251-0/+300
| | | | | | | | | | | | | | | | | | | | | | | | | aggregation buffer object ## What changes were proposed in this pull request? This PR introduces an abstract class `TypedImperativeAggregate` so that an aggregation function of TypedImperativeAggregate can use **arbitrary** user-defined Java object as intermediate aggregation buffer object. **This has advantages like:** 1. It now can support larger category of aggregation functions. For example, it will be much easier to implement aggregation function `percentile_approx`, which has a complex aggregation buffer definition. 2. It can be used to avoid doing serialization/de-serialization for every call of `update` or `merge` when converting domain specific aggregation object to internal Spark-Sql storage format. 3. It is easier to integrate with other existing monoid libraries like algebird, and supports more aggregation functions with high performance. Please see `org.apache.spark.sql.TypedImperativeAggregateSuite.TypedMaxAggregate` to find an example of how to defined a `TypedImperativeAggregate` aggregation function. Please see Java doc of `TypedImperativeAggregate` and Jira ticket SPARK-17187 for more information. ## How was this patch tested? Unit tests. Author: Sean Zhong <seanzhong@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #14753 from clockfly/object_aggregation_buffer_try_2.
* [SPARK-16991][SPARK-17099][SPARK-17120][SQL] Fix Outer Join Elimination when ↵gatorsmile2016-08-251-0/+8
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Filter's isNotNull Constraints Unable to Filter Out All Null-supplying Rows ### What changes were proposed in this pull request? This PR is to fix an incorrect outer join elimination when filter's `isNotNull` constraints is unable to filter out all null-supplying rows. For example, `isnotnull(coalesce(b#227, c#238))`. Users can hit this error when they try to use `using/natural outer join`, which is converted to a normal outer join with a `coalesce` expression on the `using columns`. For example, ```Scala val a = Seq((1, 2), (2, 3)).toDF("a", "b") val b = Seq((2, 5), (3, 4)).toDF("a", "c") val c = Seq((3, 1)).toDF("a", "d") val ab = a.join(b, Seq("a"), "fullouter") ab.join(c, "a").explain(true) ``` The dataframe `ab` is doing `using full-outer join`, which is converted to a normal outer join with a `coalesce` expression. Constraints inference generates a `Filter` with constraints `isnotnull(coalesce(b#227, c#238))`. Then, it triggers a wrong outer join elimination and generates a wrong result. ``` Project [a#251, b#227, c#237, d#247] +- Join Inner, (a#251 = a#246) :- Project [coalesce(a#226, a#236) AS a#251, b#227, c#237] : +- Join FullOuter, (a#226 = a#236) : :- Project [_1#223 AS a#226, _2#224 AS b#227] : : +- LocalRelation [_1#223, _2#224] : +- Project [_1#233 AS a#236, _2#234 AS c#237] : +- LocalRelation [_1#233, _2#234] +- Project [_1#243 AS a#246, _2#244 AS d#247] +- LocalRelation [_1#243, _2#244] == Optimized Logical Plan == Project [a#251, b#227, c#237, d#247] +- Join Inner, (a#251 = a#246) :- Project [coalesce(a#226, a#236) AS a#251, b#227, c#237] : +- Filter isnotnull(coalesce(a#226, a#236)) : +- Join FullOuter, (a#226 = a#236) : :- LocalRelation [a#226, b#227] : +- LocalRelation [a#236, c#237] +- LocalRelation [a#246, d#247] ``` **A note to the `Committer`**, please also give the credit to dongjoon-hyun who submitted another PR for fixing this issue. https://github.com/apache/spark/pull/14580 ### How was this patch tested? Added test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #14661 from gatorsmile/fixOuterJoinElimination.
* [SPARK-12978][SQL] Skip unnecessary final group-by when input data already ↵Takeshi YAMAMURO2016-08-252-22/+52
| | | | | | | | | | | | | | | | | | | | | | | | | | | clustered with group-by keys This ticket targets the optimization to skip an unnecessary group-by operation below; Without opt.: ``` == Physical Plan == TungstenAggregate(key=[col0#159], functions=[(sum(col1#160),mode=Final,isDistinct=false),(avg(col2#161),mode=Final,isDistinct=false)], output=[col0#159,sum(col1)#177,avg(col2)#178]) +- TungstenAggregate(key=[col0#159], functions=[(sum(col1#160),mode=Partial,isDistinct=false),(avg(col2#161),mode=Partial,isDistinct=false)], output=[col0#159,sum#200,sum#201,count#202L]) +- TungstenExchange hashpartitioning(col0#159,200), None +- InMemoryColumnarTableScan [col0#159,col1#160,col2#161], InMemoryRelation [col0#159,col1#160,col2#161], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None ``` With opt.: ``` == Physical Plan == TungstenAggregate(key=[col0#159], functions=[(sum(col1#160),mode=Complete,isDistinct=false),(avg(col2#161),mode=Final,isDistinct=false)], output=[col0#159,sum(col1)#177,avg(col2)#178]) +- TungstenExchange hashpartitioning(col0#159,200), None +- InMemoryColumnarTableScan [col0#159,col1#160,col2#161], InMemoryRelation [col0#159,col1#160,col2#161], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None ``` Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #10896 from maropu/SkipGroupbySpike.
* [SPARK-16216][SQL] Read/write timestamps and dates in ISO 8601 and ↵hyukjinkwon2016-08-245-15/+236
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | dateFormat/timestampFormat option for CSV and JSON ## What changes were proposed in this pull request? ### Default - ISO 8601 Currently, CSV datasource is writing `Timestamp` and `Date` as numeric form and JSON datasource is writing both as below: - CSV ``` // TimestampType 1414459800000000 // DateType 16673 ``` - Json ``` // TimestampType 1970-01-01 11:46:40.0 // DateType 1970-01-01 ``` So, for CSV we can't read back what we write and for JSON it becomes ambiguous because the timezone is being missed. So, this PR make both **write** `Timestamp` and `Date` in ISO 8601 formatted string (please refer the [ISO 8601 specification](https://www.w3.org/TR/NOTE-datetime)). - For `Timestamp` it becomes as below: (`yyyy-MM-dd'T'HH:mm:ss.SSSZZ`) ``` 1970-01-01T02:00:01.000-01:00 ``` - For `Date` it becomes as below (`yyyy-MM-dd`) ``` 1970-01-01 ``` ### Custom date format option - `dateFormat` This PR also adds the support to write and read dates and timestamps in a formatted string as below: - **DateType** - With `dateFormat` option (e.g. `yyyy/MM/dd`) ``` +----------+ | date| +----------+ |2015/08/26| |2014/10/27| |2016/01/28| +----------+ ``` ### Custom date format option - `timestampFormat` - **TimestampType** - With `dateFormat` option (e.g. `dd/MM/yyyy HH:mm`) ``` +----------------+ | date| +----------------+ |2015/08/26 18:00| |2014/10/27 18:30| |2016/01/28 20:00| +----------------+ ``` ## How was this patch tested? Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14279 from HyukjinKwon/SPARK-16216-json-csv.
* [SPARK-17188][SQL] Moves class QuantileSummaries to project catalyst for ↵Sean Zhong2016-08-231-129/+0
| | | | | | | | | | | | | | | | implementing percentile_approx ## What changes were proposed in this pull request? This is a sub-task of [SPARK-16283](https://issues.apache.org/jira/browse/SPARK-16283) (Implement percentile_approx SQL function), which moves class QuantileSummaries to project catalyst so that it can be reused when implementing aggregation function `percentile_approx`. ## How was this patch tested? This PR only does class relocation, class implementation is not changed. Author: Sean Zhong <seanzhong@databricks.com> Closes #14754 from clockfly/move_QuantileSummaries_to_catalyst.
* [MINOR][SQL] Fix some typos in comments and test hintsSean Zhong2016-08-221-3/+3
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fix some typos in comments and test hints ## How was this patch tested? N/A. Author: Sean Zhong <seanzhong@databricks.com> Closes #14755 from clockfly/fix_minor_typo.
* [SPARK-17115][SQL] decrease the threshold when split expressionsDavies Liu2016-08-221-0/+53
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? In 2.0, we change the threshold of splitting expressions from 16K to 64K, which cause very bad performance on wide table, because the generated method can't be JIT compiled by default (above the limit of 8K bytecode). This PR will decrease it to 1K, based on the benchmark results for a wide table with 400 columns of LongType. It also fix a bug around splitting expression in whole-stage codegen (it should not split them). ## How was this patch tested? Added benchmark suite. Author: Davies Liu <davies@databricks.com> Closes #14692 from davies/split_exprs.
* [SPARK-16498][SQL] move hive hack for data source table into HiveExternalCatalogWenchen Fan2016-08-213-97/+24
| | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Spark SQL doesn't have its own meta store yet, and use hive's currently. However, hive's meta store has some limitations(e.g. columns can't be too many, not case-preserving, bad decimal type support, etc.), so we have some hacks to successfully store data source table metadata into hive meta store, i.e. put all the information in table properties. This PR moves these hacks to `HiveExternalCatalog`, tries to isolate hive specific logic in one place. changes overview: 1. **before this PR**: we need to put metadata(schema, partition columns, etc.) of data source tables to table properties before saving it to external catalog, even the external catalog doesn't use hive metastore(e.g. `InMemoryCatalog`) **after this PR**: the table properties tricks are only in `HiveExternalCatalog`, the caller side doesn't need to take care of it anymore. 2. **before this PR**: because the table properties tricks are done outside of external catalog, so we also need to revert these tricks when we read the table metadata from external catalog and use it. e.g. in `DescribeTableCommand` we will read schema and partition columns from table properties. **after this PR**: The table metadata read from external catalog is exactly the same with what we saved to it. bonus: now we can create data source table using `SessionCatalog`, if schema is specified. breaks: `schemaStringLengthThreshold` is not configurable anymore. `hive.default.rcfile.serde` is not configurable anymore. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14155 from cloud-fan/catalog-table.
* [SPARK-17124][SQL] RelationalGroupedDataset.agg should preserve order and ↵petermaxlee2016-08-211-0/+10
| | | | | | | | | | | | | | | | | | | allow multiple aggregates per column ## What changes were proposed in this pull request? This patch fixes a longstanding issue with one of the RelationalGroupedDataset.agg function. Even though the signature accepts vararg of pairs, the underlying implementation turns the seq into a map, and thus not order preserving nor allowing multiple aggregates per column. This change also allows users to use this function to run multiple different aggregations for a single column, e.g. ``` agg("age" -> "max", "age" -> "count") ``` ## How was this patch tested? Added a test case in DataFrameAggregateSuite. Author: petermaxlee <petermaxlee@gmail.com> Closes #14697 from petermaxlee/SPARK-17124.
* [SPARK-17149][SQL] array.sql for testing array related functionspetermaxlee2016-08-192-16/+10
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch creates array.sql in SQLQueryTestSuite for testing array related functions, including: - indexing - array creation - size - array_contains - sort_array ## How was this patch tested? The patch itself is about adding tests. Author: petermaxlee <petermaxlee@gmail.com> Closes #14708 from petermaxlee/SPARK-17149.
* [SPARK-16391][SQL] Support partial aggregation for reduceGroupsReynold Xin2016-08-181-0/+73
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch introduces a new private ReduceAggregator interface that is a subclass of Aggregator. ReduceAggregator only requires a single associative and commutative reduce function. ReduceAggregator is also used to implement KeyValueGroupedDataset.reduceGroups in order to support partial aggregation. Note that the pull request was initially done by viirya. ## How was this patch tested? Covered by original tests for reduceGroups, as well as a new test suite for ReduceAggregator. Author: Reynold Xin <rxin@databricks.com> Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #14576 from rxin/reduceAggregator.
* [SPARK-16995][SQL] TreeNodeException when flat mapping ↵Liang-Chi Hsieh2016-08-181-0/+13
| | | | | | | | | | | | | | | | | | | | | | | | | | | RelationalGroupedDataset created from DataFrame containing a column created with lit/expr ## What changes were proposed in this pull request? A TreeNodeException is thrown when executing the following minimal example in Spark 2.0. import spark.implicits._ case class test (x: Int, q: Int) val d = Seq(1).toDF("x") d.withColumn("q", lit(0)).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show d.withColumn("q", expr("0")).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show The problem is at `FoldablePropagation`. The rule will do `transformExpressions` on `LogicalPlan`. The query above contains a `MapGroups` which has a parameter `dataAttributes:Seq[Attribute]`. One attributes in `dataAttributes` will be transformed to an `Alias(literal(0), _)` in `FoldablePropagation`. `Alias` is not an `Attribute` and causes the error. We can't easily detect such type inconsistency during transforming expressions. A direct approach to this problem is to skip doing `FoldablePropagation` on object operators as they should not contain such expressions. ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #14648 from viirya/flat-mapping.
* [SPARK-17096][SQL][STREAMING] Improve exception string reported through the ↵Tathagata Das2016-08-171-7/+6
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | StreamingQueryListener ## What changes were proposed in this pull request? Currently, the stackTrace (as `Array[StackTraceElements]`) reported through StreamingQueryListener.onQueryTerminated is useless as it has the stack trace of where StreamingQueryException is defined, not the stack trace of underlying exception. For example, if a streaming query fails because of a / by zero exception in a task, the `QueryTerminated.stackTrace` will have ``` org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:211) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124) ``` This is basically useless, as it is location where the StreamingQueryException was defined. What we want is Here is the right way to reason about what should be posted as through StreamingQueryListener.onQueryTerminated - The actual exception could either be a SparkException, or an arbitrary exception. - SparkException reports the relevant executor stack trace of a failed task as a string in the the exception message. The `Array[StackTraceElements]` returned by `SparkException.stackTrace()` is mostly irrelevant. - For any arbitrary exception, the `Array[StackTraceElements]` returned by `exception.stackTrace()` may be relevant. - When there is an error in a streaming query, it's hard to reason whether the `Array[StackTraceElements]` is useful or not. In fact, it is not clear whether it is even useful to report the stack trace as this array of Java objects. It may be sufficient to report the strack trace as a string, along with the message. This is how Spark reported executor stra - Hence, this PR simplifies the API by removing the array `stackTrace` from `QueryTerminated`. Instead the `exception` returns a string containing the message and the stack trace of the actual underlying exception that failed the streaming query (i.e. not that of the StreamingQueryException). If anyone is interested in the actual stack trace as an array, can always access them through `streamingQuery.exception` which returns the exception object. With this change, if a streaming query fails because of a / by zero exception in a task, the `QueryTerminated.exception` will be ``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.ArithmeticException: / by zero at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply$mcII$sp(StreamingQueryListenerSuite.scala:153) at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(StreamingQueryListenerSuite.scala:153) at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(StreamingQueryListenerSuite.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:226) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1429) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1417) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1416) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1416) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) ... ``` It contains the relevant executor stack trace. In a case non-SparkException, if the streaming source MemoryStream throws an exception, exception message will have the relevant stack trace. ``` java.lang.RuntimeException: this is the exception message at org.apache.spark.sql.execution.streaming.MemoryStream.getBatch(memory.scala:103) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:316) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:313) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:313) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:197) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:187) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124) ``` Note that this change in the public `QueryTerminated` class is okay as the APIs are still experimental. ## How was this patch tested? Unit tests that test whether the right information is present in the exception message reported through QueryTerminated object. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #14675 from tdas/SPARK-17096.
* [SPARK-17102][SQL] bypass UserDefinedGenerator for json format checkWenchen Fan2016-08-171-1/+2
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? We use reflection to convert `TreeNode` to json string, and currently don't support arbitrary object. `UserDefinedGenerator` takes a function object, so we should skip json format test for it, or the tests can be flacky, e.g. `DataFrameSuite.simple explode`, this test always fail with scala 2.10(branch 1.6 builds with scala 2.10 by default), but pass with scala 2.11(master branch builds with scala 2.11 by default). ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #14679 from cloud-fan/json.
* [SPARK-17106] [SQL] Simplify the SubqueryExpression interfaceHerman van Hovell2016-08-172-3/+2
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The current subquery expression interface contains a little bit of technical debt in the form of a few different access paths to get and set the query contained by the expression. This is confusing to anyone who goes over this code. This PR unifies these access paths. ## How was this patch tested? (Existing tests) Author: Herman van Hovell <hvanhovell@databricks.com> Closes #14685 from hvanhovell/SPARK-17106.
* [SPARK-15285][SQL] Generated SpecificSafeProjection.apply method grows ↵Kazuaki Ishizaki2016-08-171-0/+40
| | | | | | | | | | | | | | | | | | beyond 64 KB ## What changes were proposed in this pull request? This PR splits the generated code for ```SafeProjection.apply``` by using ```ctx.splitExpressions()```. This is because the large code body for ```NewInstance``` may grow beyond 64KB bytecode size for ```apply()``` method. Here is [the original PR](https://github.com/apache/spark/pull/13243) for SPARK-15285. However, it breaks a build with Scala 2.10 since Scala 2.10 does not a case class with large number of members. Thus, it was reverted by [this commit](https://github.com/apache/spark/commit/fa244e5a90690d6a31be50f2aa203ae1a2e9a1cf). ## How was this patch tested? Added new tests by using `DefinedByConstructorParams` instead of case class for scala-2.10 Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #14670 from kiszk/SPARK-15285-2.
* [SPARK-16916][SQL] serde/storage properties should not have limitationsWenchen Fan2016-08-151-7/+0
| | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `CatalogStorageFormat.properties` can be used in 2 ways: 1. for hive tables, it stores the serde properties. 2. for data source tables, it stores the data source options, e.g. `path`, `skipHiveMetadata`, etc. however, both of them have nothing to do with data source properties, e.g. `spark.sql.sources.provider`, so they should not have limitations about data source properties. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #14506 from cloud-fan/table-prop.
* [SPARK-16671][CORE][SQL] Consolidate code to do variable substitution.Marcelo Vanzin2016-08-151-18/+0
| | | | | | | | | | | | | | | | Both core and sql have slightly different code that does variable substitution of config values. This change refactors that code and encapsulates the logic of reading config values and expading variables in a new helper class, which can be configured so that both core and sql can use it without losing existing functionality, and allows for easier testing and makes it easier to add more features in the future. Tested with existing and new unit tests, and by running spark-shell with some configs referencing variables and making sure it behaved as expected. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #14468 from vanzin/SPARK-16671.
* [SPARK-16966][SQL][CORE] App Name is a randomUUID even when "spark.app.name" ↵Sean Owen2016-08-131-0/+1
| | | | | | | | | | | | | | | | | | | exists ## What changes were proposed in this pull request? Don't override app name specified in `SparkConf` with a random app name. Only set it if the conf has no app name even after options have been applied. See also https://github.com/apache/spark/pull/14602 This is similar to Sherry302 's original proposal in https://github.com/apache/spark/pull/14556 ## How was this patch tested? Jenkins test, with new case reproducing the bug Author: Sean Owen <sowen@cloudera.com> Closes #14630 from srowen/SPARK-16966.2.
* [SPARK-16968] Add additional options in jdbc when creating a new tableGraceH2016-08-131-0/+12
| | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? In the PR, we just allow the user to add additional options when create a new table in JDBC writer. The options can be table_options or partition_options. E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8" Here is the usage example: ``` df.write.option("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8").jdbc(...) ``` ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) will apply test result soon. Author: GraceH <93113783@qq.com> Closes #14559 from GraceH/jdbc_options.
* [SPARK-16975][SQL] Column-partition path starting '_' should be handled ↵Dongjoon Hyun2016-08-121-0/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | correctly ## What changes were proposed in this pull request? Currently, Spark ignores path names starting with underscore `_` and `.`. This causes read-failures for the column-partitioned file data sources whose partition column names starts from '_', e.g. `_col`. **Before** ```scala scala> spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet") scala> spark.read.parquet("/tmp/parquet") org.apache.spark.sql.AnalysisException: Unable to infer schema for ParquetFormat at /tmp/parquet20. It must be specified manually; ``` **After** ```scala scala> spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet") scala> spark.read.parquet("/tmp/parquet") res2: org.apache.spark.sql.DataFrame = [id: bigint, _locality_code: int] ``` ## How was this patch tested? Pass the Jenkins with a new test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #14585 from dongjoon-hyun/SPARK-16975-PARQUET.
* [SPARK-16434][SQL] Avoid per-record type dispatch in JSON when readinghyukjinkwon2016-08-121-3/+8
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently, `JacksonParser.parse` is doing type-based dispatch for each row to convert the tokens to appropriate values for Spark. It might not have to be done like this because the schema is already kept. So, appropriate converters can be created first according to the schema once, and then apply them to each row. This PR corrects `JacksonParser` so that it creates all converters for the schema once and then applies them to each row rather than type dispatching for every row. Benchmark was proceeded with the codes below: #### Parser tests **Before** ```scala test("Benchmark for JSON converter") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" val data = List.fill(N)(row) val dummyOption = new JSONOptions(Map.empty[String, String]) val schema = InferSchema.infer(spark.sparkContext.parallelize(Seq(row)), "", dummyOption) val factory = new JsonFactory() val benchmark = new Benchmark("JSON converter", N) benchmark.addCase("convert JSON file", 10) { _ => data.foreach { input => val parser = factory.createParser(input) parser.nextToken() JacksonParser.convertRootField(factory, parser, schema) } } benchmark.run() } ``` ``` JSON converter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ convert JSON file 1697 / 1807 0.1 13256.9 1.0X ``` **After** ```scala test("Benchmark for JSON converter") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" val data = List.fill(N)(row) val dummyOption = new JSONOptions(Map.empty[String, String], new SQLConf()) val schema = InferSchema.infer(spark.sparkContext.parallelize(Seq(row)), dummyOption) val benchmark = new Benchmark("JSON converter", N) benchmark.addCase("convert JSON file", 10) { _ => val parser = new JacksonParser(schema, dummyOption) data.foreach { input => parser.parse(input) } } benchmark.run() } ``` ``` JSON converter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ convert JSON file 1401 / 1461 0.1 10947.4 1.0X ``` It seems parsing time is improved by roughly ~20% #### End-to-End test ```scala test("Benchmark for JSON reader") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" val df = spark.sqlContext.read.json(spark.sparkContext.parallelize(List.fill(N)(row))) withTempPath { path => df.write.format("json").save(path.getCanonicalPath) val benchmark = new Benchmark("JSON reader", N) benchmark.addCase("reading JSON file", 10) { _ => spark.read.format("json").load(path.getCanonicalPath).collect() } benchmark.run() } } ``` **Before** ``` JSON reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ reading JSON file 6485 / 6924 0.0 50665.0 1.0X ``` **After** ``` JSON reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ reading JSON file 6350 / 6529 0.0 49609.3 1.0X ``` ## How was this patch tested? Existing test cases should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14102 from HyukjinKwon/SPARK-16434.
* [SPARK-17018][SQL] literals.sql for testing literal parsingpetermaxlee2016-08-111-4/+10
| | | | | | | | | | | | ## What changes were proposed in this pull request? This patch adds literals.sql for testing literal parsing end-to-end in SQL. ## How was this patch tested? The patch itself is only about adding test cases. Author: petermaxlee <petermaxlee@gmail.com> Closes #14598 from petermaxlee/SPARK-17018-2.
* [SPARK-17015][SQL] group-by/order-by ordinal and arithmetic testspetermaxlee2016-08-111-220/+0
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch adds three test files: 1. arithmetic.sql.out 2. order-by-ordinal.sql 3. group-by-ordinal.sql This includes https://github.com/apache/spark/pull/14594. ## How was this patch tested? This is a test case change. Author: petermaxlee <petermaxlee@gmail.com> Closes #14595 from petermaxlee/SPARK-17015.
* [SPARK-17011][SQL] Support testing exceptions in SQLQueryTestSuitepetermaxlee2016-08-102-56/+35
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch adds exception testing to SQLQueryTestSuite. When there is an exception in query execution, the query result contains the the exception class along with the exception message. As part of this, I moved some additional test cases for limit from SQLQuerySuite over to SQLQueryTestSuite. ## How was this patch tested? This is a test harness change. Author: petermaxlee <petermaxlee@gmail.com> Closes #14592 from petermaxlee/SPARK-17011.