aboutsummaryrefslogtreecommitdiff
path: root/sql
Commit message (Collapse)AuthorAgeFilesLines
* [MINOR] Typo fixesJacek Laskowski2016-04-024-10/+10
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Typo fixes. No functional changes. ## How was this patch tested? Built the sources and ran with samples. Author: Jacek Laskowski <jacek@japila.pl> Closes #11802 from jaceklaskowski/typo-fixes.
* [HOTFIX] Fix compilation break.Reynold Xin2016-04-022-5/+4
|
* [MINOR][SQL] Fix comments styl and correct several styles and nits in CSV ↵hyukjinkwon2016-04-014-49/+48
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | data source ## What changes were proposed in this pull request? While trying to create a PR (which was not an issue at the end), I just corrected some style nits. So, I removed the changes except for some coding style corrections. - According to the [scala-style-guide#documentation-style](https://github.com/databricks/scala-style-guide#documentation-style), Scala style comments are discouraged. >```scala >/** This is a correct one-liner, short description. */ > >/** > * This is correct multi-line JavaDoc comment. And > * this is my second line, and if I keep typing, this would be > * my third line. > */ > >/** In Spark, we don't use the ScalaDoc style so this > * is not correct. > */ >``` - Double newlines between consecutive methods was removed. According to [scala-style-guide#blank-lines-vertical-whitespace](https://github.com/databricks/scala-style-guide#blank-lines-vertical-whitespace), single newline appears when >Between consecutive members (or initializers) of a class: fields, constructors, methods, nested classes, static initializers, instance initializers. - Remove uesless parentheses in tests - Use `mapPartitions` instead of `mapPartitionsWithIndex()`. ## How was this patch tested? Unit tests were used and `dev/run_tests` for style tests. Author: hyukjinkwon <gurwls223@gmail.com> Closes #12109 from HyukjinKwon/SPARK-14271.
* [SPARK-14285][SQL] Implement common type-safe aggregate functionsReynold Xin2016-04-019-111/+342
| | | | | | | | | | | | ## What changes were proposed in this pull request? In the Dataset API, it is fairly difficult for users to perform simple aggregations in a type-safe way at the moment because there are no aggregators that have been implemented. This pull request adds a few common aggregate functions in expressions.scala.typed package, and also creates the expressions.java.typed package without implementation. The java implementation should probably come as a separate pull request. One challenge there is to resolve the type difference between Scala primitive types and Java boxed types. ## How was this patch tested? Added unit tests for them. Author: Reynold Xin <rxin@databricks.com> Closes #12077 from rxin/SPARK-14285.
* [SPARK-14251][SQL] Add SQL command for printing out generated code for debuggingDongjoon Hyun2016-04-017-31/+67
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR implements `EXPLAIN CODEGEN` SQL command which returns generated codes like `debugCodegen`. In `spark-shell`, we don't need to `import debug` module. In `spark-sql`, we can use this SQL command now. **Before** ``` scala> import org.apache.spark.sql.execution.debug._ scala> sql("select 'a' as a group by 1").debugCodegen() Found 2 WholeStageCodegen subtrees. == Subtree 1 / 2 == ... Generated code: ... == Subtree 2 / 2 == ... Generated code: ... ``` **After** ``` scala> sql("explain extended codegen select 'a' as a group by 1").collect().foreach(println) [Found 2 WholeStageCodegen subtrees.] [== Subtree 1 / 2 ==] ... [] [Generated code:] ... [] [== Subtree 2 / 2 ==] ... [] [Generated code:] ... ``` ## How was this patch tested? Pass the Jenkins tests (including new testcases) Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12099 from dongjoon-hyun/SPARK-14251.
* [SPARK-14138] [SQL] [MASTER] Fix generated SpecificColumnarIterator code can ↵Kazuaki Ishizaki2016-04-012-5/+51
| | | | | | | | | | | | | | | | exceed JVM size limit for cached DataFrames ## What changes were proposed in this pull request? This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using a approach to make a group for lot of ```ColumnAccessor``` instantiations or method calls (more than 200) into a method ## How was this patch tested? Added a new unit test, which includes large instantiations and method calls, to ```InMemoryColumnarQuerySuite``` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #12108 from kiszk/SPARK-14138-master.
* [SPARK-14244][SQL] Don't use SizeBasedWindowFunction.n created on executor ↵Cheng Lian2016-04-014-9/+67
| | | | | | | | | | | | | | | | side when evaluating window functions ## What changes were proposed in this pull request? `SizeBasedWindowFunction.n` is a global singleton attribute created for evaluating size based aggregate window functions like `CUME_DIST`. However, this attribute gets different expression IDs when created on both driver side and executor side. This PR adds `withPartitionSize` method to `SizeBasedWindowFunction` so that we can easily rewrite `SizeBasedWindowFunction.n` on executor side. ## How was this patch tested? A test case is added in `HiveSparkSubmitSuite`, which supports launching multi-process clusters. Author: Cheng Lian <lian@databricks.com> Closes #12040 from liancheng/spark-14244-fix-sized-window-function.
* [SPARK-14255][SQL] Streaming AggregationMichael Armbrust2016-04-0133-305/+827
| | | | | | | | | | | | | | | | | | | | | | This PR adds the ability to perform aggregations inside of a `ContinuousQuery`. In order to implement this feature, the planning of aggregation has augmented with a new `StatefulAggregationStrategy`. Unlike batch aggregation, stateful-aggregation uses the `StateStore` (introduced in #11645) to persist the results of partial aggregation across different invocations. The resulting physical plan performs the aggregation using the following progression: - Partial Aggregation - Shuffle - Partial Merge (now there is at most 1 tuple per group) - StateStoreRestore (now there is 1 tuple from this batch + optionally one from the previous) - Partial Merge (now there is at most 1 tuple per group) - StateStoreSave (saves the tuple for the next batch) - Complete (output the current result of the aggregation) The following refactoring was also performed to allow us to plug into existing code: - The get/put implementation is taken from #12013 - The logic for breaking down and de-duping the physical execution of aggregation has been move into a new pattern `PhysicalAggregation` - The `AttributeReference` used to identify the result of an `AggregateFunction` as been moved into the `AggregateExpression` container. This change moves the reference into the same object as the other intermediate references used in aggregation and eliminates the need to pass around a `Map[(AggregateFunction, Boolean), Attribute]`. Further clean up (using a different aggregation container for logical/physical plans) is deferred to a followup. - Some planning logic is moved from the `SessionState` into the `QueryExecution` to make it easier to override in the streaming case. - The ability to write a `StreamTest` that checks only the output of the last batch has been added to simulate the future addition of output modes. Author: Michael Armbrust <michael@databricks.com> Closes #12048 from marmbrus/statefulAgg.
* [SPARK-14316][SQL] StateStoreCoordinator should extend ThreadSafeRpcEndpointShixiong Zhu2016-04-012-7/+5
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? RpcEndpoint is not thread safe and allows multiple messages to be processed at the same time. StateStoreCoordinator should use ThreadSafeRpcEndpoint. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12100 from zsxwing/fix-StateStoreCoordinator.
* [SPARK-13674] [SQL] Add wholestage codegen support to SampleLiang-Chi Hsieh2016-04-014-14/+99
| | | | | | | | | | | | | | | | | JIRA: https://issues.apache.org/jira/browse/SPARK-13674 ## What changes were proposed in this pull request? Sample operator doesn't support wholestage codegen now. This pr is to add support to it. ## How was this patch tested? A test is added into `BenchmarkWholeStageCodegen`. Besides, all tests should be passed. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #11517 from viirya/add-wholestage-sample.
* [SPARK-14160] Time Windowing functions for DatasetsBurak Yavuz2016-04-017-0/+735
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR adds the function `window` as a column expression. `window` can be used to bucket rows into time windows given a time column. With this expression, performing time series analysis on batch data, as well as streaming data should become much more simpler. ### Usage Assume the following schema: `sensor_id, measurement, timestamp` To average 5 minute data every 1 minute (window length of 5 minutes, slide duration of 1 minute), we will use: ```scala df.groupBy(window("timestamp", “5 minutes”, “1 minute”), "sensor_id") .agg(mean("measurement").as("avg_meas")) ``` This will generate windows such as: ``` 09:00:00-09:05:00 09:01:00-09:06:00 09:02:00-09:07:00 ... ``` Intervals will start at every `slideDuration` starting at the unix epoch (1970-01-01 00:00:00 UTC). To start intervals at a different point of time, e.g. 30 seconds after a minute, the `startTime` parameter can be used. ```scala df.groupBy(window("timestamp", “5 minutes”, “1 minute”, "30 second"), "sensor_id") .agg(mean("measurement").as("avg_meas")) ``` This will generate windows such as: ``` 09:00:30-09:05:30 09:01:30-09:06:30 09:02:30-09:07:30 ... ``` Support for Python will be made in a follow up PR after this. ## How was this patch tested? This patch has some basic unit tests for the `TimeWindow` expression testing that the parameters pass validation, and it also has some unit/integration tests testing the correctness of the windowing and usability in complex operations (multi-column grouping, multi-column projections, joins). Author: Burak Yavuz <brkyvz@gmail.com> Author: Michael Armbrust <michael@databricks.com> Closes #12008 from brkyvz/df-time-window.
* [SPARK-14070][SQL] Use ORC data source for SQL queries on ORC tablesTejas Patil2016-04-016-75/+220
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch enables use of OrcRelation for SQL queries which read data from Hive tables. Changes in this patch: - Added a new rule `OrcConversions` which would alter the plan to use `OrcRelation`. In this diff, the conversion is done only for reads. - Added a new config `spark.sql.hive.convertMetastoreOrc` to control the conversion BEFORE ``` scala> hqlContext.sql("SELECT * FROM orc_table").explain(true) == Parsed Logical Plan == 'Project [unresolvedalias(*, None)] +- 'UnresolvedRelation `orc_table`, None == Analyzed Logical Plan == key: string, value: string Project [key#171,value#172] +- MetastoreRelation default, orc_table, None == Optimized Logical Plan == MetastoreRelation default, orc_table, None == Physical Plan == HiveTableScan [key#171,value#172], MetastoreRelation default, orc_table, None ``` AFTER ``` scala> hqlContext.sql("SELECT * FROM orc_table").explain(true) == Parsed Logical Plan == 'Project [unresolvedalias(*, None)] +- 'UnresolvedRelation `orc_table`, None == Analyzed Logical Plan == key: string, value: string Project [key#76,value#77] +- SubqueryAlias orc_table +- Relation[key#76,value#77] ORC part: struct<>, data: struct<key:string,value:string> == Optimized Logical Plan == Relation[key#76,value#77] ORC part: struct<>, data: struct<key:string,value:string> == Physical Plan == WholeStageCodegen : +- Scan ORC part: struct<>, data: struct<key:string,value:string>[key#76,value#77] InputPaths: file:/user/hive/warehouse/orc_table ``` ## How was this patch tested? - Added a new unit test. Ran existing unit tests - Ran with production like data ## Performance gains Ran on a production table in Facebook (note that the data was in DWRF file format which is similar to ORC) Best case : when there was no matching rows for the predicate in the query (everything is filtered out) ``` CPU time Wall time Total wall time across all tasks ================================================================ Without the change 541_515 sec 25.0 mins 165.8 hours With change 407 sec 1.5 mins 15 mins ``` Average case: A subset of rows in the data match the query predicate ``` CPU time Wall time Total wall time across all tasks ================================================================ Without the change 624_630 sec 31.0 mins 199.0 h With change 14_769 sec 5.3 mins 7.7 h ``` Author: Tejas Patil <tejasp@fb.com> Closes #11891 from tejasapatil/orc_ppd.
* [SPARK-14191][SQL] Remove invalid Expand operator constraintsLiang-Chi Hsieh2016-04-012-1/+31
| | | | | | | | | | | | | | | | | | | | | | | | | `Expand` operator now uses its child plan's constraints as its valid constraints (i.e., the base of constraints). This is not correct because `Expand` will set its group by attributes to null values. So the nullability of these attributes should be true. E.g., for an `Expand` operator like: val input = LocalRelation('a.int, 'b.int, 'c.int).where('c.attr > 10 && 'a.attr < 5 && 'b.attr > 2) Expand( Seq( Seq('c, Literal.create(null, StringType), 1), Seq('c, 'a, 2)), Seq('c, 'a, 'gid.int), Project(Seq('a, 'c), input)) The `Project` operator has the constraints `IsNotNull('a)`, `IsNotNull('b)` and `IsNotNull('c)`. But the `Expand` should not have `IsNotNull('a)` in its constraints. This PR is the first step for this issue and remove invalid constraints of `Expand` operator. A test is added to `ConstraintPropagationSuite`. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Author: Michael Armbrust <michael@databricks.com> Closes #11995 from viirya/fix-expand-constraints.
* [SPARK-13995][SQL] Extract correct IsNotNull constraints for ExpressionLiang-Chi Hsieh2016-04-017-37/+134
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? JIRA: https://issues.apache.org/jira/browse/SPARK-13995 We infer relative `IsNotNull` constraints from logical plan's expressions in `constructIsNotNullConstraints` now. However, we don't consider the case of (nested) `Cast`. For example: val tr = LocalRelation('a.int, 'b.long) val plan = tr.where('a.attr === 'b.attr).analyze Then, the plan's constraints will have `IsNotNull(Cast(resolveColumn(tr, "a"), LongType))`, instead of `IsNotNull(resolveColumn(tr, "a"))`. This PR fixes it. Besides, as `IsNotNull` constraints are most useful for `Attribute`, we should do recursing through any `Expression` that is null intolerant and construct `IsNotNull` constraints for all `Attribute`s under these Expressions. For example, consider the following constraints: val df = Seq((1,2,3)).toDF("a", "b", "c") df.where("a + b = c").queryExecution.analyzed.constraints The inferred isnotnull constraints should be isnotnull(a), isnotnull(b), isnotnull(c), instead of isnotnull(a + c) and isnotnull(c). ## How was this patch tested? Test is added into `ConstraintPropagationSuite`. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #11809 from viirya/constraint-cast.
* [MINOR] [SQL] Update usage of `debug` by removing `typeCheck` and adding ↵Dongjoon Hyun2016-04-011-2/+2
| | | | | | | | | | | | | | | | | | `debugCodegen` ## What changes were proposed in this pull request? This PR updates the usage comments of `debug` according to the following commits. - [SPARK-9754](https://issues.apache.org/jira/browse/SPARK-9754) removed `typeCheck`. - [SPARK-14227](https://issues.apache.org/jira/browse/SPARK-14227) added `debugCodegen`. ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12094 from dongjoon-hyun/minor_fix_debug_usage.
* [SPARK-14133][SQL] Throws exception for unsupported create/drop/alter index ↵sureshthalamati2016-04-013-6/+31
| | | | | | | | | | | | | | | | | | | | | | , and lock/unlock operations. ## What changes were proposed in this pull request? This PR throws Unsupported Operation exception for create index, drop index, alter index , lock table , lock database, unlock table, and unlock database operations that are not supported in Spark SQL. Currently these operations are executed executed by Hive. Error: spark-sql> drop index my_index on my_table; Error in query: Unsupported operation: drop index(line 1, pos 0) ## How was this patch tested? Added test cases to HiveQuerySuite yhuai hvanhovell andrewor14 Author: sureshthalamati <suresh.thalamati@gmail.com> Closes #12069 from sureshthalamati/unsupported_ddl_spark-14133.
* [SPARK-14184][SQL] Support native execution of SHOW DATABASE command and fix ↵Dilip Biswal2016-04-018-18/+163
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | SHOW TABLE to use table identifier pattern ## What changes were proposed in this pull request? This PR addresses the following 1. Supports native execution of SHOW DATABASES command 2. Fixes SHOW TABLES to apply the identifier_with_wildcards pattern if supplied. SHOW TABLE syntax ``` SHOW TABLES [IN database_name] ['identifier_with_wildcards']; ``` SHOW DATABASES syntax ``` SHOW (DATABASES|SCHEMAS) [LIKE 'identifier_with_wildcards']; ``` ## How was this patch tested? Tests added in SQLQuerySuite (both hive and sql contexts) and DDLCommandSuite Note: Since the table name pattern was not working , tests are added in both SQLQuerySuite to verify the application of the table pattern. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #11991 from dilipbiswal/dkb_show_database.
* [SPARK-14295][SPARK-14274][SQL] Implements buildReader() for LibSVMCheng Lian2016-03-313-1/+14
| | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR implements `FileFormat.buildReader()` for the LibSVM data source. Besides that, a new interface method `prepareRead()` is added to `FileFormat`: ```scala def prepareRead( sqlContext: SQLContext, options: Map[String, String], files: Seq[FileStatus]): Map[String, String] = options ``` After migrating from `buildInternalScan()` to `buildReader()`, we lost the opportunity to collect necessary global information, since `buildReader()` works in a per-partition manner. For example, LibSVM needs to infer the total number of features if the `numFeatures` data source option is not set. Any necessary collected global information should be returned using the data source options map. By default, this method just returns the original options untouched. An alternative approach is to absorb `inferSchema()` into `prepareRead()`, since schema inference is also some kind of global information gathering. However, this approach wasn't chosen because schema inference is optional, while `prepareRead()` must be called whenever a `HadoopFsRelation` based data source relation is instantiated. One unaddressed problem is that, when `numFeatures` is absent, now the input data will be scanned twice. The `buildInternalScan()` code path doesn't need to do this because it caches the raw parsed RDD in memory before computing the total number of features. However, with `FileScanRDD`, the raw parsed RDD is created in a different way (e.g. partitioning) from the final RDD. ## How was this patch tested? Tested using existing test suites. Author: Cheng Lian <lian@databricks.com> Closes #12088 from liancheng/spark-14295-libsvm-build-reader.
* [SPARK-14267] [SQL] [PYSPARK] execute multiple Python UDFs within single batchDavies Liu2016-03-314-67/+120
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR support multiple Python UDFs within single batch, also improve the performance. ```python >>> from pyspark.sql.types import IntegerType >>> sqlContext.registerFunction("double", lambda x: x * 2, IntegerType()) >>> sqlContext.registerFunction("add", lambda x, y: x + y, IntegerType()) >>> sqlContext.sql("SELECT double(add(1, 2)), add(double(2), 1)").explain(True) == Parsed Logical Plan == 'Project [unresolvedalias('double('add(1, 2)), None),unresolvedalias('add('double(2), 1), None)] +- OneRowRelation$ == Analyzed Logical Plan == double(add(1, 2)): int, add(double(2), 1): int Project [double(add(1, 2))#14,add(double(2), 1)#15] +- Project [double(add(1, 2))#14,add(double(2), 1)#15] +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15] +- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18] +- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17] +- OneRowRelation$ == Optimized Logical Plan == Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15] +- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18] +- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17] +- OneRowRelation$ == Physical Plan == WholeStageCodegen : +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15] : +- INPUT +- !BatchPythonEvaluation [add(pythonUDF1#17, 1)], [pythonUDF0#16,pythonUDF1#17,pythonUDF0#18] +- !BatchPythonEvaluation [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17] +- Scan OneRowRelation[] ``` ## How was this patch tested? Added new tests. Using the following script to benchmark 1, 2 and 3 udfs, ``` df = sqlContext.range(1, 1 << 23, 1, 4) double = F.udf(lambda x: x * 2, LongType()) print df.select(double(df.id)).count() print df.select(double(df.id), double(df.id + 1)).count() print df.select(double(df.id), double(df.id + 1), double(df.id + 2)).count() ``` Here is the results: N | Before | After | speed up ---- |------------ | -------------|------ 1 | 22 s | 7 s | 3.1X 2 | 38 s | 13 s | 2.9X 3 | 58 s | 16 s | 3.6X This benchmark ran locally with 4 CPUs. For 3 UDFs, it launched 12 Python before before this patch, 4 process after this patch. After this patch, it will use less memory for multiple UDFs than before (less buffering). Author: Davies Liu <davies@databricks.com> Closes #12057 from davies/multi_udfs.
* [SPARK-14304][SQL][TESTS] Fix tests that don't create temp files in the ↵Shixiong Zhu2016-03-316-22/+24
| | | | | | | | | | | | | | | | `java.io.tmpdir` folder ## What changes were proposed in this pull request? If I press `CTRL-C` when running these tests, the temp files will be left in `sql/core` folder and I need to delete them manually. It's annoying. This PR just moves the temp files to the `java.io.tmpdir` folder and add a name prefix for them. ## How was this patch tested? Existing Jenkins tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #12093 from zsxwing/temp-file.
* [SPARK-14182][SQL] Parse DDL Command: Alter Viewgatorsmile2016-03-314-55/+177
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | This PR is to provide native parsing support for DDL commands: `Alter View`. Since its AST trees are highly similar to `Alter Table`. Thus, both implementation are integrated into the same one. Based on the Hive DDL document: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL and https://cwiki.apache.org/confluence/display/Hive/PartitionedViews **Syntax:** ```SQL ALTER VIEW view_name RENAME TO new_view_name ``` - to change the name of a view to a different name **Syntax:** ```SQL ALTER VIEW view_name SET TBLPROPERTIES ('comment' = new_comment); ``` - to add metadata to a view **Syntax:** ```SQL ALTER VIEW view_name UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key') ``` - to remove metadata from a view **Syntax:** ```SQL ALTER VIEW view_name ADD [IF NOT EXISTS] PARTITION spec1[, PARTITION spec2, ...] ``` - to add the partitioning metadata for a view. - the syntax of partition spec in `ALTER VIEW` is identical to `ALTER TABLE`, **EXCEPT** that it is **ILLEGAL** to specify a `LOCATION` clause. **Syntax:** ```SQL ALTER VIEW view_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] ``` - to drop the related partition metadata for a view. Added the related test cases to `DDLCommandSuite` Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #11987 from gatorsmile/parseAlterView.
* [SPARK-14278][SQL] Initialize columnar batch with proper memory modeSameer Agarwal2016-03-311-1/+1
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fixes a minor bug in the record reader constructor that was possibly introduced during refactoring. ## How was this patch tested? N/A Author: Sameer Agarwal <sameer@databricks.com> Closes #12070 from sameeragarwal/vectorized-rr.
* [SPARK-14263][SQL] Benchmark Vectorized HashMap for GroupBy AggregatesSameer Agarwal2016-03-312-10/+142
| | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes a new data-structure based on a vectorized hashmap that can be potentially _codegened_ in `TungstenAggregate` to speed up aggregates with group by. Micro-benchmarks show a 10x improvement over the current `BytesToBytes` aggregation map. ## How was this patch tested? Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- hash 108 / 119 96.9 10.3 1.0X fast hash 63 / 70 166.2 6.0 1.7X arrayEqual 70 / 73 150.8 6.6 1.6X Java HashMap (Long) 141 / 200 74.3 13.5 0.8X Java HashMap (two ints) 145 / 185 72.3 13.8 0.7X Java HashMap (UnsafeRow) 499 / 524 21.0 47.6 0.2X BytesToBytesMap (off Heap) 483 / 548 21.7 46.0 0.2X BytesToBytesMap (on Heap) 485 / 562 21.6 46.2 0.2X Vectorized Hashmap 54 / 60 193.7 5.2 2.0X Author: Sameer Agarwal <sameer@databricks.com> Closes #12055 from sameeragarwal/vectorized-hashmap.
* [SPARK-14211][SQL] Remove ANTLR3 based parserHerman van Hovell2016-03-3138-8216/+393
| | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? This PR removes the ANTLR3 based parser, and moves the new ANTLR4 based parser into the `org.apache.spark.sql.catalyst.parser package`. ### How was this patch tested? Existing unit tests. cc rxin andrewor14 yhuai Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #12071 from hvanhovell/SPARK-14211.
* [SPARK-14206][SQL] buildReader() implementation for CSVCheng Lian2016-03-309-63/+119
| | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Major changes: 1. Implement `FileFormat.buildReader()` for the CSV data source. 1. Add an extra argument to `FileFormat.buildReader()`, `physicalSchema`, which is basically the result of `FileFormat.inferSchema` or user specified schema. This argument is necessary because the CSV data source needs to know all the columns of the underlying files to read the file. ## How was this patch tested? Existing tests should do the work. Author: Cheng Lian <lian@databricks.com> Closes #12002 from liancheng/spark-14206-csv-build-reader.
* [SPARK-14081][SQL] - Preserve DataFrame column types when filling nulls.Travis Crawford2016-03-302-28/+40
| | | | | | | | | | | | ## What changes were proposed in this pull request? This change resolves an issue where `DataFrameNaFunctions.fill` changes a `FloatType` column to a `DoubleType`. We also clarify the contract that replacement values will be cast to the column data type, which may change the replacement value when casting to a lower precision type. ## How was this patch tested? This patch has associated unit tests. Author: Travis Crawford <travis@medium.com> Closes #11967 from traviscrawford/SPARK-14081-dataframena.
* [SPARK-14282][SQL] CodeFormatter should handle oneline comment with /* */ ↵Dongjoon Hyun2016-03-303-2/+17
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | properly ## What changes were proposed in this pull request? This PR improves `CodeFormatter` to fix the following malformed indentations. ```java /* 019 */ public java.lang.Object apply(java.lang.Object _i) { /* 020 */ InternalRow i = (InternalRow) _i; /* 021 */ /* createexternalrow(if (isnull(input[0, double])) null else input[0, double], if (isnull(input[1, int])) null else input[1, int], ... */ /* 022 */ boolean isNull = false; /* 023 */ final Object[] values = new Object[2]; /* 024 */ /* if (isnull(input[0, double])) null else input[0, double] */ /* 025 */ /* isnull(input[0, double]) */ ... /* 053 */ if (!false && false) { /* 054 */ /* null */ /* 055 */ final int value9 = -1; /* 056 */ isNull6 = true; /* 057 */ value6 = value9; /* 058 */ } else { ... /* 077 */ return mutableRow; /* 078 */ } /* 079 */ } /* 080 */ ``` After this PR, the code will be formatted like the following. ```java /* 019 */ public java.lang.Object apply(java.lang.Object _i) { /* 020 */ InternalRow i = (InternalRow) _i; /* 021 */ /* createexternalrow(if (isnull(input[0, double])) null else input[0, double], if (isnull(input[1, int])) null else input[1, int], ... */ /* 022 */ boolean isNull = false; /* 023 */ final Object[] values = new Object[2]; /* 024 */ /* if (isnull(input[0, double])) null else input[0, double] */ /* 025 */ /* isnull(input[0, double]) */ ... /* 053 */ if (!false && false) { /* 054 */ /* null */ /* 055 */ final int value9 = -1; /* 056 */ isNull6 = true; /* 057 */ value6 = value9; /* 058 */ } else { ... /* 077 */ return mutableRow; /* 078 */ } /* 079 */ } /* 080 */ ``` Also, this issue fixes the following too. (Similar with [SPARK-14185](https://issues.apache.org/jira/browse/SPARK-14185)) ```java 16/03/30 12:39:24 DEBUG WholeStageCodegen: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } ``` ```java 16/03/30 12:46:32 DEBUG WholeStageCodegen: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } ``` ## How was this patch tested? Pass the Jenkins tests (including new CodeFormatterSuite testcases.) Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12072 from dongjoon-hyun/SPARK-14282.
* [SPARK-14259][SQL] Add a FileSourceStrategy option for limiting #files in a ↵Takeshi YAMAMURO2016-03-303-2/+59
| | | | | | | | | | | | | | partition ## What changes were proposed in this pull request? This pr is to add a config to control the maximum number of files as even small files have a non-trivial fixed cost. The current packing can put a lot of small files together which cases straggler tasks. ## How was this patch tested? I added tests to check if many files get split into partitions in FileSourceStrategySuite. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #12068 from maropu/SPARK-14259.
* [SPARK-14268][SQL] rename toRowExpressions and fromRowExpression to ↵Wenchen Fan2016-03-309-113/+110
| | | | | | | | | | | | | | | | serializer and deserializer in ExpressionEncoder ## What changes were proposed in this pull request? In `ExpressionEncoder`, we use `constructorFor` to build `fromRowExpression` as the `deserializer` in `ObjectOperator`. It's kind of confusing, we should make the name consistent. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12058 from cloud-fan/rename.
* [SPARK-14114][SQL] implement buildReader for text data sourceWenchen Fan2016-03-302-2/+29
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR implements buildReader for text data source and enable it in the new data source code path. ## How was this patch tested? Existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #11934 from cloud-fan/text.
* [SPARK-14124][SQL] Implement Database-related DDL Commandsgatorsmile2016-03-299-61/+302
| | | | | | | | | | | | | | | | | | | | | | #### What changes were proposed in this pull request? This PR is to implement the following four Database-related DDL commands: - `CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name` - `DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]` - `DESCRIBE DATABASE [EXTENDED] db_name` - `ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)` Another PR will be submitted to handle the unsupported commands. In the Database-related DDL commands, we will issue an error exception for `ALTER (DATABASE|SCHEMA) database_name SET OWNER [USER|ROLE] user_or_role`. cc yhuai andrewor14 rxin Could you review the changes? Is it in the right direction? Thanks! #### How was this patch tested? Added a few test cases in `command/DDLSuite.scala` for testing DDL command execution in `SQLContext`. Since `HiveContext` also shares the same implementation, the existing test cases in `\hive` also verifies the correctness of these commands. Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #12009 from gatorsmile/dbDDL.
* [SPARK-14225][SQL] Cap the length of toCommentSafeString at 128 charsSameer Agarwal2016-03-293-8/+79
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Builds on https://github.com/apache/spark/pull/12022 and (a) appends "..." to truncated comment strings and (b) fixes indentation in lines after the commented strings if they happen to have a `(`, `{`, `)` or `}` ## How was this patch tested? Manually examined the generated code. Author: Sameer Agarwal <sameer@databricks.com> Closes #12044 from sameeragarwal/comment.
* [SPARK-14215] [SQL] [PYSPARK] Support chained Python UDFsDavies Liu2016-03-292-10/+45
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR brings the support for chained Python UDFs, for example ```sql select udf1(udf2(a)) select udf1(udf2(a) + 3) select udf1(udf2(a) + udf3(b)) ``` Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches. For example, ```python >>> sqlContext.sql("select double(double(1))").explain() == Physical Plan == WholeStageCodegen : +- Project [pythonUDF#10 AS double(double(1))#9] : +- INPUT +- !BatchPythonEvaluation double(double(1)), [pythonUDF#10] +- Scan OneRowRelation[] >>> sqlContext.sql("select double(double(1) + double(2))").explain() == Physical Plan == WholeStageCodegen : +- Project [pythonUDF#19 AS double((double(1) + double(2)))#16] : +- INPUT +- !BatchPythonEvaluation double((pythonUDF#17 + pythonUDF#18)), [pythonUDF#17,pythonUDF#18,pythonUDF#19] +- !BatchPythonEvaluation double(2), [pythonUDF#17,pythonUDF#18] +- !BatchPythonEvaluation double(1), [pythonUDF#17] +- Scan OneRowRelation[] ``` TODO: will support multiple unrelated Python UDFs in one batch (another PR). ## How was this patch tested? Added new unit tests for chained UDFs. Author: Davies Liu <davies@databricks.com> Closes #12014 from davies/py_udfs.
* [SPARK-14227][SQL] Add method for printing out generated code for debuggingEric Liang2016-03-293-6/+60
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This adds `debugCodegen` to the debug package for query execution. ## How was this patch tested? Unit and manual testing. Output example: ``` scala> import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.execution.debug._ scala> sqlContext.range(100).groupBy("id").count().orderBy("id").debugCodegen() Found 3 WholeStageCodegen subtrees. == Subtree 1 / 3 == WholeStageCodegen : +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) : +- Range 0, 1, 1, 100, [id#0L] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ /** Codegened pipeline for: /* 006 */ * TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) /* 007 */ +- Range 0, 1, 1, 100, [id#0L] /* 008 */ */ /* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 010 */ private Object[] references; /* 011 */ private boolean agg_initAgg; /* 012 */ private org.apache.spark.sql.execution.aggregate.TungstenAggregate agg_plan; /* 013 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 014 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 015 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 016 */ private org.apache.spark.sql.execution.metric.LongSQLMetric range_numOutputRows; /* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue range_metricValue; /* 018 */ private boolean range_initRange; /* 019 */ private long range_partitionEnd; /* 020 */ private long range_number; /* 021 */ private boolean range_overflow; /* 022 */ private scala.collection.Iterator range_input; /* 023 */ private UnsafeRow range_result; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder; /* 025 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter; /* 026 */ private UnsafeRow agg_result; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 028 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 029 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner agg_unsafeRowJoiner; /* 030 */ private org.apache.spark.sql.execution.metric.LongSQLMetric wholestagecodegen_numOutputRows; /* 031 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue wholestagecodegen_metricValue; /* 032 */ /* 033 */ public GeneratedIterator(Object[] references) { /* 034 */ this.references = references; /* 035 */ } /* 036 */ /* 037 */ public void init(scala.collection.Iterator inputs[]) { /* 038 */ agg_initAgg = false; /* 039 */ this.agg_plan = (org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0]; /* 040 */ agg_hashMap = agg_plan.createHashMap(); /* 041 */ /* 042 */ this.range_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1]; /* 043 */ range_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) range_numOutputRows.localValue(); /* 044 */ range_initRange = false; /* 045 */ range_partitionEnd = 0L; /* 046 */ range_number = 0L; /* 047 */ range_overflow = false; /* 048 */ range_input = inputs[0]; /* 049 */ range_result = new UnsafeRow(1); /* 050 */ this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0); /* 051 */ this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1); /* 052 */ agg_result = new UnsafeRow(1); /* 053 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 054 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 055 */ agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner(); /* 056 */ this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2]; /* 057 */ wholestagecodegen_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) wholestagecodegen_numOutputRows.localValue(); /* 058 */ } /* 059 */ /* 060 */ private void agg_doAggregateWithKeys() throws java.io.IOException { /* 061 */ /*** PRODUCE: Range 0, 1, 1, 100, [id#0L] */ /* 062 */ /* 063 */ // initialize Range /* 064 */ if (!range_initRange) { /* 065 */ range_initRange = true; /* 066 */ if (range_input.hasNext()) { /* 067 */ initRange(((InternalRow) range_input.next()).getInt(0)); /* 068 */ } else { /* 069 */ return; /* 070 */ } /* 071 */ } /* 072 */ /* 073 */ while (!range_overflow && range_number < range_partitionEnd) { /* 074 */ long range_value = range_number; /* 075 */ range_number += 1L; /* 076 */ if (range_number < range_value ^ 1L < 0) { /* 077 */ range_overflow = true; /* 078 */ } /* 079 */ /* 080 */ /*** CONSUME: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) */ /* 081 */ /* 082 */ // generate grouping key /* 083 */ agg_rowWriter.write(0, range_value); /* 084 */ /* hash(input[0, bigint], 42) */ /* 085 */ int agg_value1 = 42; /* 086 */ /* 087 */ agg_value1 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(range_value, agg_value1); /* 088 */ UnsafeRow agg_aggBuffer = null; /* 089 */ if (true) { /* 090 */ // try to get the buffer from hash map /* 091 */ agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1); /* 092 */ } /* 093 */ if (agg_aggBuffer == null) { /* 094 */ if (agg_sorter == null) { /* 095 */ agg_sorter = agg_hashMap.destructAndCreateExternalSorter(); /* 096 */ } else { /* 097 */ agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter()); /* 098 */ } /* 099 */ /* 100 */ // the hash map had be spilled, it should have enough memory now, /* 101 */ // try to allocate buffer again. /* 102 */ agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1); /* 103 */ if (agg_aggBuffer == null) { /* 104 */ // failed to allocate the first page /* 105 */ throw new OutOfMemoryError("No enough memory for aggregation"); /* 106 */ } /* 107 */ } /* 108 */ /* 109 */ // evaluate aggregate function /* 110 */ /* (input[0, bigint] + 1) */ /* 111 */ /* input[0, bigint] */ /* 112 */ long agg_value4 = agg_aggBuffer.getLong(0); /* 113 */ /* 114 */ long agg_value3 = -1L; /* 115 */ agg_value3 = agg_value4 + 1L; /* 116 */ // update aggregate buffer /* 117 */ agg_aggBuffer.setLong(0, agg_value3); /* 118 */ /* 119 */ if (shouldStop()) return; /* 120 */ } /* 121 */ /* 122 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter); /* 123 */ } /* 124 */ /* 125 */ private void initRange(int idx) { /* 126 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 127 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L); /* 128 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(100L); /* 129 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 130 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 131 */ /* 132 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 133 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 134 */ range_number = Long.MAX_VALUE; /* 135 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 136 */ range_number = Long.MIN_VALUE; /* 137 */ } else { /* 138 */ range_number = st.longValue(); /* 139 */ } /* 140 */ /* 141 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 142 */ .multiply(step).add(start); /* 143 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 144 */ range_partitionEnd = Long.MAX_VALUE; /* 145 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 146 */ range_partitionEnd = Long.MIN_VALUE; /* 147 */ } else { /* 148 */ range_partitionEnd = end.longValue(); /* 149 */ } /* 150 */ /* 151 */ range_metricValue.add((range_partitionEnd - range_number) / 1L); /* 152 */ } /* 153 */ /* 154 */ protected void processNext() throws java.io.IOException { /* 155 */ /*** PRODUCE: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) */ /* 156 */ /* 157 */ if (!agg_initAgg) { /* 158 */ agg_initAgg = true; /* 159 */ agg_doAggregateWithKeys(); /* 160 */ } /* 161 */ /* 162 */ // output the result /* 163 */ while (agg_mapIter.next()) { /* 164 */ wholestagecodegen_metricValue.add(1); /* 165 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 166 */ UnsafeRow agg_aggBuffer1 = (UnsafeRow) agg_mapIter.getValue(); /* 167 */ /* 168 */ UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, agg_aggBuffer1); /* 169 */ /* 170 */ /*** CONSUME: WholeStageCodegen */ /* 171 */ /* 172 */ append(agg_resultRow); /* 173 */ /* 174 */ if (shouldStop()) return; /* 175 */ } /* 176 */ /* 177 */ agg_mapIter.close(); /* 178 */ if (agg_sorter == null) { /* 179 */ agg_hashMap.free(); /* 180 */ } /* 181 */ } /* 182 */ } == Subtree 2 / 3 == WholeStageCodegen : +- Sort [id#0L ASC], true, 0 : +- INPUT +- Exchange rangepartitioning(id#0L ASC, 200), None +- WholeStageCodegen : +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) : +- INPUT +- Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) : +- Range 0, 1, 1, 100, [id#0L] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ /** Codegened pipeline for: /* 006 */ * Sort [id#0L ASC], true, 0 /* 007 */ +- INPUT /* 008 */ */ /* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 010 */ private Object[] references; /* 011 */ private boolean sort_needToSort; /* 012 */ private org.apache.spark.sql.execution.Sort sort_plan; /* 013 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter; /* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics; /* 015 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter; /* 016 */ private scala.collection.Iterator inputadapter_input; /* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize; /* 018 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue; /* 019 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize; /* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1; /* 021 */ /* 022 */ public GeneratedIterator(Object[] references) { /* 023 */ this.references = references; /* 024 */ } /* 025 */ /* 026 */ public void init(scala.collection.Iterator inputs[]) { /* 027 */ sort_needToSort = true; /* 028 */ this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0]; /* 029 */ sort_sorter = sort_plan.createSorter(); /* 030 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics(); /* 031 */ /* 032 */ inputadapter_input = inputs[0]; /* 033 */ this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1]; /* 034 */ sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue(); /* 035 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2]; /* 036 */ sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue(); /* 037 */ } /* 038 */ /* 039 */ private void sort_addToSorter() throws java.io.IOException { /* 040 */ /*** PRODUCE: INPUT */ /* 041 */ /* 042 */ while (inputadapter_input.hasNext()) { /* 043 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 044 */ /*** CONSUME: Sort [id#0L ASC], true, 0 */ /* 045 */ /* 046 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row); /* 047 */ if (shouldStop()) return; /* 048 */ } /* 049 */ /* 050 */ } /* 051 */ /* 052 */ protected void processNext() throws java.io.IOException { /* 053 */ /*** PRODUCE: Sort [id#0L ASC], true, 0 */ /* 054 */ if (sort_needToSort) { /* 055 */ sort_addToSorter(); /* 056 */ Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled(); /* 057 */ sort_sortedIter = sort_sorter.sort(); /* 058 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage()); /* 059 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore); /* 060 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage()); /* 061 */ sort_needToSort = false; /* 062 */ } /* 063 */ /* 064 */ while (sort_sortedIter.hasNext()) { /* 065 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next(); /* 066 */ /* 067 */ /*** CONSUME: WholeStageCodegen */ /* 068 */ /* 069 */ append(sort_outputRow); /* 070 */ /* 071 */ if (shouldStop()) return; /* 072 */ } /* 073 */ } /* 074 */ } == Subtree 3 / 3 == WholeStageCodegen : +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) : +- INPUT +- Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) : +- Range 0, 1, 1, 100, [id#0L] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ /** Codegened pipeline for: /* 006 */ * TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) /* 007 */ +- INPUT /* 008 */ */ /* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 010 */ private Object[] references; /* 011 */ private boolean agg_initAgg; /* 012 */ private org.apache.spark.sql.execution.aggregate.TungstenAggregate agg_plan; /* 013 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 014 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 015 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 016 */ private scala.collection.Iterator inputadapter_input; /* 017 */ private UnsafeRow agg_result; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 020 */ private UnsafeRow agg_result1; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter1; /* 023 */ private org.apache.spark.sql.execution.metric.LongSQLMetric wholestagecodegen_numOutputRows; /* 024 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue wholestagecodegen_metricValue; /* 025 */ /* 026 */ public GeneratedIterator(Object[] references) { /* 027 */ this.references = references; /* 028 */ } /* 029 */ /* 030 */ public void init(scala.collection.Iterator inputs[]) { /* 031 */ agg_initAgg = false; /* 032 */ this.agg_plan = (org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0]; /* 033 */ agg_hashMap = agg_plan.createHashMap(); /* 034 */ /* 035 */ inputadapter_input = inputs[0]; /* 036 */ agg_result = new UnsafeRow(1); /* 037 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 038 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 039 */ agg_result1 = new UnsafeRow(2); /* 040 */ this.agg_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 0); /* 041 */ this.agg_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1, 2); /* 042 */ this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1]; /* 043 */ wholestagecodegen_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) wholestagecodegen_numOutputRows.localValue(); /* 044 */ } /* 045 */ /* 046 */ private void agg_doAggregateWithKeys() throws java.io.IOException { /* 047 */ /*** PRODUCE: INPUT */ /* 048 */ /* 049 */ while (inputadapter_input.hasNext()) { /* 050 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 051 */ /*** CONSUME: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) */ /* 052 */ /* input[0, bigint] */ /* 053 */ long inputadapter_value = inputadapter_row.getLong(0); /* 054 */ /* input[1, bigint] */ /* 055 */ long inputadapter_value1 = inputadapter_row.getLong(1); /* 056 */ /* 057 */ // generate grouping key /* 058 */ agg_rowWriter.write(0, inputadapter_value); /* 059 */ /* hash(input[0, bigint], 42) */ /* 060 */ int agg_value1 = 42; /* 061 */ /* 062 */ agg_value1 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(inputadapter_value, agg_value1); /* 063 */ UnsafeRow agg_aggBuffer = null; /* 064 */ if (true) { /* 065 */ // try to get the buffer from hash map /* 066 */ agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1); /* 067 */ } /* 068 */ if (agg_aggBuffer == null) { /* 069 */ if (agg_sorter == null) { /* 070 */ agg_sorter = agg_hashMap.destructAndCreateExternalSorter(); /* 071 */ } else { /* 072 */ agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter()); /* 073 */ } /* 074 */ /* 075 */ // the hash map had be spilled, it should have enough memory now, /* 076 */ // try to allocate buffer again. /* 077 */ agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1); /* 078 */ if (agg_aggBuffer == null) { /* 079 */ // failed to allocate the first page /* 080 */ throw new OutOfMemoryError("No enough memory for aggregation"); /* 081 */ } /* 082 */ } /* 083 */ /* 084 */ // evaluate aggregate function /* 085 */ /* (input[0, bigint] + input[2, bigint]) */ /* 086 */ /* input[0, bigint] */ /* 087 */ long agg_value4 = agg_aggBuffer.getLong(0); /* 088 */ /* 089 */ long agg_value3 = -1L; /* 090 */ agg_value3 = agg_value4 + inputadapter_value1; /* 091 */ // update aggregate buffer /* 092 */ agg_aggBuffer.setLong(0, agg_value3); /* 093 */ if (shouldStop()) return; /* 094 */ } /* 095 */ /* 096 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter); /* 097 */ } /* 098 */ /* 099 */ protected void processNext() throws java.io.IOException { /* 100 */ /*** PRODUCE: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) */ /* 101 */ /* 102 */ if (!agg_initAgg) { /* 103 */ agg_initAgg = true; /* 104 */ agg_doAggregateWithKeys(); /* 105 */ } /* 106 */ /* 107 */ // output the result /* 108 */ while (agg_mapIter.next()) { /* 109 */ wholestagecodegen_metricValue.add(1); /* 110 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 111 */ UnsafeRow agg_aggBuffer1 = (UnsafeRow) agg_mapIter.getValue(); /* 112 */ /* 113 */ /* input[0, bigint] */ /* 114 */ long agg_value6 = agg_aggKey.getLong(0); /* 115 */ /* input[0, bigint] */ /* 116 */ long agg_value7 = agg_aggBuffer1.getLong(0); /* 117 */ /* 118 */ /*** CONSUME: WholeStageCodegen */ /* 119 */ /* 120 */ agg_rowWriter1.write(0, agg_value6); /* 121 */ /* 122 */ agg_rowWriter1.write(1, agg_value7); /* 123 */ append(agg_result1); /* 124 */ /* 125 */ if (shouldStop()) return; /* 126 */ } /* 127 */ /* 128 */ agg_mapIter.close(); /* 129 */ if (agg_sorter == null) { /* 130 */ agg_hashMap.free(); /* 131 */ } /* 132 */ } /* 133 */ } ``` rxin Author: Eric Liang <ekl@databricks.com> Closes #12025 from ericl/spark-14227.
* [MINOR][SQL] Fix exception message to print string-array correctly.Dongjoon Hyun2016-03-291-1/+3
| | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR is a simple fix for an exception message to print `string[]` content correctly. ```java String[] colPath = requestedSchema.getPaths().get(i); ... - throw new IOException("Required column is missing in data file. Col: " + colPath); + throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(colPath)); ``` ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12041 from dongjoon-hyun/fix_exception_message_with_string_array.
* [MINOR][SQL] Fix typos by replacing 'much' with 'match'.Dongjoon Hyun2016-03-292-2/+2
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR fixes two trivial typos: 'does not **much**' --> 'does not **match**'. ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12042 from dongjoon-hyun/fix_typo_by_replacing_much_with_match.
* [SPARK-14208][SQL] Renames spark.sql.parquet.fileScanCheng Lian2016-03-292-5/+5
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Renames SQL option `spark.sql.parquet.fileScan` since now all `HadoopFsRelation` based data sources are being migrated to `FileScanRDD` code path. ## How was this patch tested? None. Author: Cheng Lian <lian@databricks.com> Closes #12003 from liancheng/spark-14208-option-renaming.
* [SPARK-14158][SQL] implement buildReader for json data sourceWenchen Fan2016-03-294-4/+90
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR implements buildReader for json data source and enable it in the new data source code path. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #11960 from cloud-fan/json.
* [SPARK-14210] [SQL] Add a metric for time spent in scans.Nong Li2016-03-281-63/+94
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This adds a metric to parquet scans that measures the time in just the scan phase. This is only possible when the scan returns ColumnarBatches, otherwise the overhead is too high. This combined with the pipeline metric lets us easily see what percent of the time was in the scan. Author: Nong Li <nong@databricks.com> Closes #12007 from nongli/spark-14210.
* [SPARK-13981][SQL] Defer evaluating variables within Filter operator.Nong Li2016-03-281-16/+61
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This improves the Filter codegen for NULLs by deferring loading the values for IsNotNull. Instead of generating code like: boolean isNull = ... int value = ... if (isNull) continue; we will generate: boolean isNull = ... if (isNull) continue; int value = ... This is useful since retrieving the values can be non-trivial (they can be dictionary encoded among other things). This currently only works when the attribute comes from the column batch but could be extended to other cases in the future. ## How was this patch tested? On tpcds q55, this fixes the regression from introducing the IsNotNull predicates. ``` TPCDS Snappy: Best/Avg Time(ms) Rate(M/s) Per Row(ns) -------------------------------------------------------------------------------- q55 4564 / 5036 25.2 39.6 q55 4064 / 4340 28.3 35.3 ``` Author: Nong Li <nong@databricks.com> Closes #11792 from nongli/spark-13981.
* [SPARK-14213][SQL] Migrate HiveQl parsing to ANTLR4 parserHerman van Hovell2016-03-284-4/+488
| | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? This PR migrates all HiveQl parsing to the new ANTLR4 parser. This PR is build on top of https://github.com/apache/spark/pull/12011, and we should wait with merging until that one is in (hence the WIP tag). As soon as this PR is merged we can start removing much of the old parser infrastructure. ### How was this patch tested? Exisiting Hive unit tests. cc rxin andrewor14 yhuai Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #12015 from hvanhovell/SPARK-14213.
* [SPARK-14205][SQL] remove trait QueryableWenchen Fan2016-03-286-161/+95
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? After DataFrame and Dataset are merged, the trait `Queryable` becomes unnecessary as it has only one implementation. We should remove it. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12001 from cloud-fan/df-ds.
* [SPARK-14119][SPARK-14120][SPARK-14122][SQL] Throw exception on unsupported ↵Andrew Or2016-03-283-32/+69
| | | | | | | | | | | | | | | | | | | | | | | | | | | DDL commands ## What changes were proposed in this pull request? Before: We just pass all role commands to Hive even though it doesn't work. After: We throw an `AnalysisException` that looks like this: ``` scala> sql("CREATE ROLE x") org.apache.spark.sql.AnalysisException: Unsupported Hive operation: CREATE ROLE; at org.apache.spark.sql.hive.HiveQl$$anonfun$parsePlan$1.apply(HiveQl.scala:213) at org.apache.spark.sql.hive.HiveQl$$anonfun$parsePlan$1.apply(HiveQl.scala:208) at org.apache.spark.sql.catalyst.parser.CatalystQl.safeParse(CatalystQl.scala:49) at org.apache.spark.sql.hive.HiveQl.parsePlan(HiveQl.scala:208) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:198) ``` ## How was this patch tested? `HiveQuerySuite` Author: Andrew Or <andrew@databricks.com> Closes #11948 from andrewor14/ddl-role-management.
* [SPARK-14013][SQL] Proper temp function support in catalogAndrew Or2016-03-2812-83/+136
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Session catalog was added in #11750. However, it doesn't really support temporary functions properly; right now we only store the metadata in the form of `CatalogFunction`, but this doesn't make sense for temporary functions because there is no class name. This patch moves the `FunctionRegistry` into the `SessionCatalog`. With this, the user can call `catalog.createTempFunction` and `catalog.lookupFunction` to use the function they registered previously. This is currently still dead code, however. ## How was this patch tested? `SessionCatalogSuite`. Author: Andrew Or <andrew@databricks.com> Closes #11972 from andrewor14/temp-functions.
* [SPARK-14169][CORE] Add UninterruptibleThreadShixiong Zhu2016-03-281-66/+8
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Extract the workaround for HADOOP-10622 introduced by #11940 into UninterruptibleThread so that we can test and reuse it. ## How was this patch tested? Unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11971 from zsxwing/uninterrupt.
* [SPARK-14155][SQL] Hide UserDefinedType interface in Spark 2.0Reynold Xin2016-03-281-2/+4
| | | | | | | | | | | | ## What changes were proposed in this pull request? UserDefinedType is a developer API in Spark 1.x. With very high probability we will create a new API for user-defined type that also works well with column batches as well as encoders (datasets). In Spark 2.0, let's make `UserDefinedType` `private[spark]` first. ## How was this patch tested? Existing unit tests. Author: Reynold Xin <rxin@databricks.com> Closes #11955 from rxin/SPARK-14155.
* [SPARK-13923][SPARK-14014][SQL] Session catalog follow-upsAndrew Or2016-03-2824-125/+131
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch addresses the remaining comments left in #11750 and #11918 after they are merged. For a full list of changes in this patch, just trace the commits. ## How was this patch tested? `SessionCatalogSuite` and `CatalogTestCases` Author: Andrew Or <andrew@databricks.com> Closes #12006 from andrewor14/session-catalog-followup.
* [SPARK-14086][SQL] Add DDL commands to ANTLR4 parserHerman van Hovell2016-03-282-6/+619
| | | | | | | | | | | | | | | | | | | | | | | | #### What changes were proposed in this pull request? This PR adds all the current Spark SQL DDL commands to the new ANTLR 4 based SQL parser. I have found a few inconsistencies in the current commands: - Function has an alias field. This is actually the class name of the function. - Partition specifications should contain nulls in some commands, and contain `None`s in others. - `AlterTableSkewedLocation`: Should defines which columns have skewed values, and should allow us to define storage for each skewed combination of values. We currently only allow one value per field. - `AlterTableSetFileFormat`: Should only have one file format, it currently supports both. I have implemented all these comments like they were, and I propose to improve them in follow-up PRs. #### How was this patch tested? The existing DDLCommandSuite. cc rxin andrewor14 yhuai Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #12011 from hvanhovell/SPARK-14086.
* [SPARK-13713][SQL][TEST-MAVEN] Add Antlr4 maven plugin.Yin Huai2016-03-281-0/+15
| | | | | | | | Seems https://github.com/apache/spark/commit/600c0b69cab4767e8e5a6f4284777d8b9d4bd40e is missing the antlr4 maven plugin. This pr adds it. Author: Yin Huai <yhuai@databricks.com> Closes #12010 from yhuai/mavenAntlr4.
* [SPARK-14052] [SQL] build a BytesToBytesMap directly in HashedRelationDavies Liu2016-03-285-167/+175
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently, for the key that can not fit within a long, we build a hash map for UnsafeHashedRelation, it's converted to BytesToBytesMap after serialization and deserialization. We should build a BytesToBytesMap directly to have better memory efficiency. In order to do that, BytesToBytesMap should support multiple (K,V) pair with the same K, Location.putNewKey() is renamed to Location.append(), which could append multiple values for the same key (same Location). `Location.newValue()` is added to find the next value for the same key. ## How was this patch tested? Existing tests. Added benchmark for broadcast hash join with duplicated keys. Author: Davies Liu <davies@databricks.com> Closes #11870 from davies/map2.