aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative ↵Davies Liu2015-10-291-6/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | memory management This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed. Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling). The PrepareRDD may be not needed anymore, could be removed in follow up PR. The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration). ```python sqlContext.setConf("spark.sql.shuffle.partitions", "1") df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s") df2 = df.select(df.id.alias('id2'), df.s.alias('s2')) j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2") j.explain() print j.count() ``` For thread-safety, here what I'm got: 1) Without calling spill(), the operators should only be used by single thread, no safety problems. 2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems. 3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it. 4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning. 5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter). Author: Davies Liu <davies@databricks.com> Closes #9241 from davies/force_spill.
* [SPARK-10641][SQL] Add Skewness and Kurtosis Supportsethah2015-10-296-0/+469
| | | | | | | | | Implementing skewness and kurtosis support based on following algorithm: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics Author: sethah <seth.hendrickson16@gmail.com> Closes #9003 from sethah/SPARK-10641.
* [SPARK-11379][SQL] ExpressionEncoder can't handle top level primitive type ↵Wenchen Fan2015-10-292-1/+2
| | | | | | | | | | | | correctly For inner primitive type(e.g. inside `Product`), we use `schemaFor` to get the catalyst type for it, https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L403. However, for top level primitive type, we use `dataTypeFor`, which is wrong. Author: Wenchen Fan <wenchen@databricks.com> Closes #9337 from cloud-fan/encoder.
* [SPARK-11351] [SQL] support hive interval literalWenchen Fan2015-10-282-20/+103
| | | | | | Author: Wenchen Fan <wenchen@databricks.com> Closes #9304 from cloud-fan/interval.
* [SPARK-11377] [SQL] withNewChildren should not convert StructType to SeqMichael Armbrust2015-10-281-1/+3
| | | | | | | | | | This is minor, but I ran into while writing Datasets and while it wasn't needed for the final solution, it was super confusing so we should fix it. Basically we recurse into `Seq` to see if they have children. This breaks because we don't preserve the original subclass of `Seq` (and `StructType <:< Seq[StructField]`). Since a struct can never contain children, lets just not recurse into it. Author: Michael Armbrust <michael@databricks.com> Closes #9334 from marmbrus/structMakeCopy.
* [SPARK-11313][SQL] implement cogroup on DataSets (support 2 datasets)Wenchen Fan2015-10-282-0/+40
| | | | | | | | A simpler version of https://github.com/apache/spark/pull/9279, only support 2 datasets. Author: Wenchen Fan <wenchen@databricks.com> Closes #9324 from cloud-fan/cogroup2.
* [SPARK-11347] [SQL] Support for joinWith in DatasetsMichael Armbrust2015-10-2711-516/+304
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | This PR adds a new operation `joinWith` to a `Dataset`, which returns a `Tuple` for each pair where a given `condition` evaluates to true. ```scala case class ClassData(a: String, b: Int) val ds1 = Seq(ClassData("a", 1), ClassData("b", 2)).toDS() val ds2 = Seq(("a", 1), ("b", 2)).toDS() > ds1.joinWith(ds2, $"_1" === $"a").collect() res0: Array((ClassData("a", 1), ("a", 1)), (ClassData("b", 2), ("b", 2))) ``` This operation is similar to the relation `join` function with one important difference in the result schema. Since `joinWith` preserves objects present on either side of the join, the result schema is similarly nested into a tuple under the column names `_1` and `_2`. This type of join can be useful both for preserving type-safety with the original object types as well as working with relational data where either side of the join has column names in common. ## Required Changes to Encoders In the process of working on this patch, several deficiencies to the way that we were handling encoders were discovered. Specifically, it turned out to be very difficult to `rebind` the non-expression based encoders to extract the nested objects from the results of joins (and also typed selects that return tuples). As a result the following changes were made. - `ClassEncoder` has been renamed to `ExpressionEncoder` and has been improved to also handle primitive types. Additionally, it is now possible to take arbitrary expression encoders and rewrite them into a single encoder that returns a tuple. - All internal operations on `Dataset`s now require an `ExpressionEncoder`. If the users tries to pass a non-`ExpressionEncoder` in, an error will be thrown. We can relax this requirement in the future by constructing a wrapper class that uses expressions to project the row to the expected schema, shielding the users code from the required remapping. This will give us a nice balance where we don't force user encoders to understand attribute references and binding, but still allow our native encoder to leverage runtime code generation to construct specific encoders for a given schema that avoid an extra remapping step. - Additionally, the semantics for different types of objects are now better defined. As stated in the `ExpressionEncoder` scaladoc: - Classes will have their sub fields extracted by name using `UnresolvedAttribute` expressions and `UnresolvedExtractValue` expressions. - Tuples will have their subfields extracted by position using `BoundReference` expressions. - Primitives will have their values extracted from the first ordinal with a schema that defaults to the name `value`. - Finally, the binding lifecycle for `Encoders` has now been unified across the codebase. Encoders are now `resolved` to the appropriate schema in the constructor of `Dataset`. This process replaces an unresolved expressions with concrete `AttributeReference` expressions. Binding then happens on demand, when an encoder is going to be used to construct an object. This closely mirrors the lifecycle for standard expressions when executing normal SQL or `DataFrame` queries. Author: Michael Armbrust <michael@databricks.com> Closes #9300 from marmbrus/datasets-tuples.
* [SPARK-11303][SQL] filter should not be pushed down into sampleYanbo Liang2015-10-271-4/+0
| | | | | | | | When sampling and then filtering DataFrame, the SQL Optimizer will push down filter into sample and produce wrong result. This is due to the sampler is calculated based on the original scope rather than the scope after filtering. Author: Yanbo Liang <ybliang8@gmail.com> Closes #9294 from yanboliang/spark-11303.
* [SPARK-11277][SQL] sort_array throws exception scala.MatchErrorJia Li2015-10-272-1/+12
| | | | | | | | | | | | I'm new to spark. I was trying out the sort_array function then hit this exception. I looked into the spark source code. I found the root cause is that sort_array does not check for an array of NULLs. It's not meaningful to sort an array of entirely NULLs anyway. I'm adding a check on the input array type to SortArray. If the array consists of NULLs entirely, there is no need to sort such array. I have also added a test case for this. Please help to review my fix. Thanks! Author: Jia Li <jiali@us.ibm.com> Closes #9247 from jliwork/SPARK-11277.
* [SPARK-10984] Simplify *MemoryManager class structureJosh Rosen2015-10-251-1/+0
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes: - MemoryManager - StaticMemoryManager - ExecutorMemoryManager - TaskMemoryManager - ShuffleMemoryManager This is fairly confusing. To simplify things, this patch consolidates several of these classes: - ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager. - TaskMemoryManager is moved into Spark Core. **Key changes and tasks**: - [x] Merge ExecutorMemoryManager into MemoryManager. - [x] Move pooling logic into Allocator. - [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`. - [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager. - [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager. - [x] Merge ShuffleMemoryManager into MemoryManager. - [x] Move code - [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.** - [x] Port ShuffleMemoryManagerSuite tests. - [x] Move classes from `unsafe` package to `memory` package. - [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction. - [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation - [x] AbstractBytesToBytesMapSuite - [x] UnsafeExternalSorterSuite - [x] UnsafeFixedWidthAggregationMapSuite - [x] UnsafeKVExternalSorterSuite **Compatiblity notes**: - This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task. Author: Josh Rosen <joshrosen@databricks.com> Closes #9127 from JoshRosen/SPARK-10984.
* [SPARK-6428][SQL] Removed unnecessary typecasts in MutableInt, MutableDouble ↵Alexander Slesarenko2015-10-251-9/+9
| | | | | | | | | | etc. marmbrus rxin I believe these typecasts are not required in the presence of explicit return types. Author: Alexander Slesarenko <avslesarenko@gmail.com> Closes #9262 from aslesarenko/remove-typecasts.
* [SPARK-11243][SQL] zero out padding bytes in UnsafeRowDavies Liu2015-10-232-5/+35
| | | | | | | | | | For nested StructType, the underline buffer could be used for others before, we should zero out the padding bytes for those primitive types that have less than 8 bytes. cc cloud-fan Author: Davies Liu <davies@databricks.com> Closes #9217 from davies/zero_out.
* [SPARK-11273][SQL] Move ArrayData/MapData/DataTypeParser to catalyst.util ↵Reynold Xin2015-10-2333-34/+49
| | | | | | | | package Author: Reynold Xin <rxin@databricks.com> Closes #9239 from rxin/types-private.
* [SPARK-11116][SQL] First Draft of Dataset APIMichael Armbrust2015-10-2213-22/+495
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | *This PR adds a new experimental API to Spark, tentitively named Datasets.* A `Dataset` is a strongly-typed collection of objects that can be transformed in parallel using functional or relational operations. Example usage is as follows: ### Functional ```scala > val ds: Dataset[Int] = Seq(1, 2, 3).toDS() > ds.filter(_ % 1 == 0).collect() res1: Array[Int] = Array(1, 2, 3) ``` ### Relational ```scala scala> ds.toDF().show() +-----+ |value| +-----+ | 1| | 2| | 3| +-----+ > ds.select(expr("value + 1").as[Int]).collect() res11: Array[Int] = Array(2, 3, 4) ``` ## Comparison to RDDs A `Dataset` differs from an `RDD` in the following ways: - The creation of a `Dataset` requires the presence of an explicit `Encoder` that can be used to serialize the object into a binary format. Encoders are also capable of mapping the schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime reflection based serialization. - Internally, a `Dataset` is represented by a Catalyst logical plan and the data is stored in the encoded form. This representation allows for additional logical operations and enables many operations (sorting, shuffling, etc.) to be performed without deserializing to an object. A `Dataset` can be converted to an `RDD` by calling the `.rdd` method. ## Comparison to DataFrames A `Dataset` can be thought of as a specialized DataFrame, where the elements map to a specific JVM object type, instead of to a generic `Row` container. A DataFrame can be transformed into specific Dataset by calling `df.as[ElementType]`. Similarly you can transform a strongly-typed `Dataset` to a generic DataFrame by calling `ds.toDF()`. ## Implementation Status and TODOs This is a rough cut at the least controversial parts of the API. The primary purpose here is to get something committed so that we can better parallelize further work and get early feedback on the API. The following is being deferred to future PRs: - Joins and Aggregations (prototype here https://github.com/apache/spark/commit/f11f91e6f08c8cf389b8388b626cd29eec32d937) - Support for Java Additionally, the responsibility for binding an encoder to a given schema is currently done in a fairly ad-hoc fashion. This is an internal detail, and what we are doing today works for the cases we care about. However, as we add more APIs we'll probably need to do this in a more principled way (i.e. separate resolution from binding as we do in DataFrames). ## COMPATIBILITY NOTE Long term we plan to make `DataFrame` extend `Dataset[Row]`. However, making this change to che class hierarchy would break the function signatures for the existing function operations (map, flatMap, etc). As such, this class should be considered a preview of the final API. Changes will be made to the interface after Spark 1.6. Author: Michael Armbrust <michael@databricks.com> Closes #9190 from marmbrus/dataset-infra.
* [SPARK-11216][SQL][FOLLOW-UP] add encoder/decoder for external rowWenchen Fan2015-10-224-16/+17
| | | | | | | | address comments in https://github.com/apache/spark/pull/9184 Author: Wenchen Fan <wenchen@databricks.com> Closes #9212 from cloud-fan/encoder.
* [SPARK-11243][SQL] output UnsafeRow from columnar cacheDavies Liu2015-10-214-108/+160
| | | | | | | | This PR change InMemoryTableScan to output UnsafeRow, and optimize the unrolling and scanning by coping the bytes for var-length types between UnsafeRow and ByteBuffer directly without creating the wrapper objects. When scanning the decimals in TPC-DS store_sales table, it's 80% faster (copy it as long without create Decimal objects). Author: Davies Liu <davies@databricks.com> Closes #9203 from davies/unsafe_cache.
* [SPARK-8654][SQL] Analysis exception when using NULL IN (...) : invalid castDilip Biswal2015-10-213-3/+32
| | | | | | | | | | | | | | | | | | In the analysis phase , while processing the rules for IN predicate, we compare the in-list types to the lhs expression type and generate cast operation if necessary. In the case of NULL [NOT] IN expr1 , we end up generating cast between in list types to NULL like cast (1 as NULL) which is not a valid cast. The fix is to find a common type between LHS and RHS expressions and cast all the expression to the common type. Author: Dilip Biswal <dbiswal@us.ibm.com> This patch had conflicts when merged, resolved by Committer: Michael Armbrust <michael@databricks.com> Closes #9036 from dilipbiswal/spark_8654_new.
* [SPARK-11233][SQL] register cosh in function registryShagun Sodhani2015-10-211-0/+1
| | | | | | Author: Shagun Sodhani <sshagunsodhani@gmail.com> Closes #9199 from shagunsodhani/proposed-fix-#11233.
* [SPARK-9740][SPARK-9592][SPARK-9210][SQL] Change the default behavior of ↵Yin Huai2015-10-215-41/+172
| | | | | | | | | | | | First/Last to RESPECT NULLS. I am changing the default behavior of `First`/`Last` to respect null values (the SQL standard default behavior). https://issues.apache.org/jira/browse/SPARK-9740 Author: Yin Huai <yhuai@databricks.com> Closes #8113 from yhuai/firstLast.
* [SPARK-11197][SQL] run SQL on files directlyDavies Liu2015-10-213-3/+12
| | | | | | | | | | | | This PR introduce a new feature to run SQL directly on files without create a table, for example: ``` select id from json.`path/to/json/files` as j ``` Author: Davies Liu <davies@databricks.com> Closes #9173 from davies/source.
* [SPARK-10743][SQL] keep the name of expression if possible when do castWenchen Fan2015-10-211-17/+16
| | | | | | Author: Wenchen Fan <cloud0fan@163.com> Closes #8859 from cloud-fan/cast.
* [SPARK-10534] [SQL] ORDER BY clause allows only columns that are present in ↵Dilip Biswal2015-10-212-1/+11
| | | | | | | | | | | | | | | | | | the select projection list Find out the missing attributes by recursively looking at the sort order expression and rest of the code takes care of projecting them out. Added description from cloud-fan I wanna explain a bit more about this bug. When we resolve sort ordering, we will use a special method, which only resolves UnresolvedAttributes and UnresolvedExtractValue. However, for something like Floor('a), even the 'a is resolved, the floor expression may still being unresolved as data type mismatch(for example, 'a is string type and Floor need double type), thus can't pass this filter, and we can't push down this missing attribute 'a Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #9123 from dilipbiswal/SPARK-10534.
* [SPARK-11216] [SQL] add encoder/decoder for external rowWenchen Fan2015-10-219-54/+459
| | | | | | | | | | | | | | Implement encode/decode for external row based on `ClassEncoder`. TODO: * code cleanup * ~~fix corner cases~~ * refactor the encoder interface * improve test for product codegen, to cover more corner cases. Author: Wenchen Fan <wenchen@databricks.com> Closes #9184 from cloud-fan/encoder.
* [SPARK-11179] [SQL] Push filters through aggregatenitin goyal2015-10-212-0/+69
| | | | | | | | | | | | | | | | | | | | | | Push conjunctive predicates though Aggregate operators when their references are a subset of the groupingExpressions. Query plan before optimisation :- Filter ((c#138L = 2) && (a#0 = 3)) Aggregate [a#0], [a#0,count(b#1) AS c#138L] Project [a#0,b#1] LocalRelation [a#0,b#1,c#2] Query plan after optimisation :- Filter (c#138L = 2) Aggregate [a#0], [a#0,count(b#1) AS c#138L] Filter (a#0 = 3) Project [a#0,b#1] LocalRelation [a#0,b#1,c#2] Author: nitin goyal <nitin.goyal@guavus.com> Author: nitin.goyal <nitin.goyal@guavus.com> Closes #9167 from nitin2goyal/master.
* [SPARK-11149] [SQL] Improve cache performance for primitive typesDavies Liu2015-10-203-44/+35
| | | | | | | | | | | | | | | | | | | | | | | | | | | This PR improve the performance by: 1) Generate an Iterator that take Iterator[CachedBatch] as input, and call accessors (unroll the loop for columns), avoid the expensive Iterator.flatMap. 2) Use Unsafe.getInt/getLong/getFloat/getDouble instead of ByteBuffer.getInt/getLong/getFloat/getDouble, the later one actually read byte by byte. 3) Remove the unnecessary copy() in Coalesce(), which is not related to memory cache, found during benchmark. The following benchmark showed that we can speedup the columnar cache of int by 2x. ``` path = '/opt/tpcds/store_sales/' int_cols = ['ss_sold_date_sk', 'ss_sold_time_sk', 'ss_item_sk','ss_customer_sk'] df = sqlContext.read.parquet(path).select(int_cols).cache() df.count() t = time.time() print df.select("*")._jdf.queryExecution().toRdd().count() print time.time() - t ``` Author: Davies Liu <davies@databricks.com> Closes #9145 from davies/byte_buffer.
* [SPARK-11111] [SQL] fast null-safe joinDavies Liu2015-10-203-15/+77
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Currently, we use CartesianProduct for join with null-safe-equal condition. ``` scala> sqlContext.sql("select * from t a join t b on (a.i <=> b.i)").explain == Physical Plan == TungstenProject [i#2,j#3,i#7,j#8] Filter (i#2 <=> i#7) CartesianProduct LocalTableScan [i#2,j#3], [[1,1]] LocalTableScan [i#7,j#8], [[1,1]] ``` Actually, we can have an equal-join condition as `coalesce(i, default) = coalesce(b.i, default)`, then an partitioned join algorithm could be used. After this PR, the plan will become: ``` >>> sqlContext.sql("select * from a join b ON a.id <=> b.id").explain() TungstenProject [id#0L,id#1L] Filter (id#0L <=> id#1L) SortMergeJoin [coalesce(id#0L,0)], [coalesce(id#1L,0)] TungstenSort [coalesce(id#0L,0) ASC], false, 0 TungstenExchange hashpartitioning(coalesce(id#0L,0),200) ConvertToUnsafe Scan PhysicalRDD[id#0L] TungstenSort [coalesce(id#1L,0) ASC], false, 0 TungstenExchange hashpartitioning(coalesce(id#1L,0),200) ConvertToUnsafe Scan PhysicalRDD[id#1L] ``` Author: Davies Liu <davies@databricks.com> Closes #9120 from davies/null_safe.
* [SPARK-6740] [SQL] correctly parse NOT operator with comparison operationsWenchen Fan2015-10-203-8/+21
| | | | | | | | | | We can't parse `NOT` operator with comparison operations like `SELECT NOT TRUE > TRUE`, this PR fixed it. Takes over https://github.com/apache/spark/pull/6326. Author: Wenchen Fan <cloud0fan@outlook.com> Closes #8617 from cloud-fan/not.
* [SPARK-10463] [SQL] remove PromotePrecision during optimizationDaoyuan Wang2015-10-201-3/+4
| | | | | | | | | | | PromotePrecision is not necessary after HiveTypeCoercion done. Jira: https://issues.apache.org/jira/browse/SPARK-10463 Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #8621 from adrian-wang/promoterm.
* [SPARK-11119] [SQL] cleanup for unsafe array and mapWenchen Fan2015-10-198-177/+157
| | | | | | | | | | | | | | The purpose of this PR is to keep the unsafe format detail only inside the unsafe class itself, so when we use them(like use unsafe array in unsafe map, use unsafe array and map in columnar cache), we don't need to understand the format before use them. change list: * unsafe array's 4-bytes numElements header is now required(was optional), and become a part of unsafe array format. * w.r.t the previous changing, the `sizeInBytes` of unsafe array now counts the 4-bytes header. * unsafe map's format was `[numElements] [key array numBytes] [key array content(without numElements header)] [value array content(without numElements header)]` before, which is a little hacky as it makes unsafe array's header optional. I think saving 4 bytes is not a big deal, so the format is now: `[key array numBytes] [unsafe key array] [unsafe value array]`. * w.r.t the previous changing, the `sizeInBytes` of unsafe map now counts both map's header and array's header. Author: Wenchen Fan <wenchen@databricks.com> Closes #9131 from cloud-fan/unsafe.
* [SPARK-11124] JsonParser/Generator should be closed for resource recyclenavis.ryu2015-10-161-30/+26
| | | | | | | | Some json parsers are not closed. parser in JacksonParser#parseJson, for example. Author: navis.ryu <navis@apache.org> Closes #9130 from navis/SPARK-11124.
* [SPARK-11076] [SQL] Add decimal support for floor and ceilCheng Hao2015-10-144-13/+91
| | | | | | | | Actually all of the `UnaryMathExpression` doens't support the Decimal, will create follow ups for supporing it. This is the first PR which will be good to review the approach I am taking. Author: Cheng Hao <hao.cheng@intel.com> Closes #9086 from chenghao-intel/ceiling.
* [SPARK-11017] [SQL] Support ImperativeAggregates in TungstenAggregateJosh Rosen2015-10-142-13/+37
| | | | | | | | | | This patch extends TungstenAggregate to support ImperativeAggregate functions. The existing TungstenAggregate operator only supported DeclarativeAggregate functions, which are defined in terms of Catalyst expressions and can be evaluated via generated projections. ImperativeAggregate functions, on the other hand, are evaluated by calling their `initialize`, `update`, `merge`, and `eval` methods. The basic strategy here is similar to how SortBasedAggregate evaluates both types of aggregate functions: use a generated projection to evaluate the expression-based declarative aggregates with dummy placeholder expressions inserted in place of the imperative aggregate function output, then invoke the imperative aggregate functions and target them against the aggregation buffer. The bulk of the diff here consists of code that was copied and adapted from SortBasedAggregate, with some key changes to handle TungstenAggregate's sort fallback path. Author: Josh Rosen <joshrosen@databricks.com> Closes #9038 from JoshRosen/support-interpreted-in-tungsten-agg-final.
* [SPARK-11113] [SQL] Remove DeveloperApi annotation from private classes.Reynold Xin2015-10-141-3/+0
| | | | | | | | o.a.s.sql.catalyst and o.a.s.sql.execution are supposed to be private. Author: Reynold Xin <rxin@databricks.com> Closes #9121 from rxin/SPARK-11113.
* [SPARK-10104] [SQL] Consolidate different forms of table identifiersWenchen Fan2015-10-149-141/+100
| | | | | | | | | Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string. Author: Wenchen Fan <wenchen@databricks.com> Author: Wenchen Fan <cloud0fan@163.com> Closes #8453 from cloud-fan/table-name.
* [SPARK-11032] [SQL] correctly handle havingWenchen Fan2015-10-131-1/+1
| | | | | | | | We should not stop resolving having when the having condtion is resolved, or something like `count(1)` will crash. Author: Wenchen Fan <cloud0fan@163.com> Closes #9105 from cloud-fan/having.
* [SPARK-11090] [SQL] Constructor for Product types from InternalRowMichael Armbrust2015-10-139-159/+723
| | | | | | | | | | | | | | This is a first draft of the ability to construct expressions that will take a catalyst internal row and construct a Product (case class or tuple) that has fields with the correct names. Support include: - Nested classes - Maps - Efficiently handling of arrays of primitive types Not yet supported: - Case classes that require custom collection types (i.e. List instead of Seq). Author: Michael Armbrust <michael@databricks.com> Closes #9100 from marmbrus/productContructor.
* [SPARK-11080] [SQL] Incorporate per-JVM id into ExprId to prevent unsafe ↵Josh Rosen2015-10-131-3/+12
| | | | | | | | | | | | cross-JVM comparisions In the current implementation of named expressions' `ExprIds`, we rely on a per-JVM AtomicLong to ensure that expression ids are unique within a JVM. However, these expression ids will not be _globally_ unique. This opens the potential for id collisions if new expression ids happen to be created inside of tasks rather than on the driver. There are currently a few cases where tasks allocate expression ids, which happen to be safe because those expressions are never compared to expressions created on the driver. In order to guard against the introduction of invalid comparisons between driver-created and executor-created expression ids, this patch extends `ExprId` to incorporate a UUID to identify the JVM that created the id, which prevents collisions. Author: Josh Rosen <joshrosen@databricks.com> Closes #9093 from JoshRosen/SPARK-11080.
* [SPARK-10990] [SPARK-11018] [SQL] improve unrolling of complex typesDavies Liu2015-10-125-9/+50
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | This PR improve the unrolling and read of complex types in columnar cache: 1) Using UnsafeProjection to do serialization of complex types, so they will not be serialized three times (two for actualSize) 2) Copy the bytes from UnsafeRow/UnsafeArrayData to ByteBuffer directly, avoiding the immediate byte[] 3) Using the underlying array in ByteBuffer to create UTF8String/UnsafeRow/UnsafeArrayData without copy. Combine these optimizations, we can reduce the unrolling time from 25s to 21s (20% less), reduce the scanning time from 3.5s to 2.5s (28% less). ``` df = sqlContext.read.parquet(path) t = time.time() df.cache() df.count() print 'unrolling', time.time() - t for i in range(10): t = time.time() print df.select("*")._jdf.queryExecution().toRdd().count() print time.time() - t ``` The schema is ``` root |-- a: struct (nullable = true) | |-- b: long (nullable = true) | |-- c: string (nullable = true) |-- d: array (nullable = true) | |-- element: long (containsNull = true) |-- e: map (nullable = true) | |-- key: long | |-- value: string (valueContainsNull = true) ``` Now the columnar cache depends on that UnsafeProjection support all the data types (including UDT), this PR also fix that. Author: Davies Liu <davies@databricks.com> Closes #9016 from davies/complex2.
* [SPARK-10960] [SQL] SQL with windowing function should be able to refer ↵Liang-Chi Hsieh2015-10-121-0/+4
| | | | | | | | | | | | | | | | column in inner select JIRA: https://issues.apache.org/jira/browse/SPARK-10960 When accessing a column in inner select from a select with window function, `AnalysisException` will be thrown. For example, an query like this: select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 from (select month, area, product, 1 as tmp1 from windowData) tmp Currently, the rule `ExtractWindowExpressions` in `Analyzer` only extracts regular expressions from `WindowFunction`, `WindowSpecDefinition` and `AggregateExpression`. We need to also extract other attributes as the one in `Alias` as shown in the above query. Author: Liang-Chi Hsieh <viirya@appier.com> Closes #9011 from viirya/fix-window-inner-column.
* [SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQLDavies Liu2015-10-081-7/+21
| | | | | | | | | | | | | | | | | | | | | This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session. A new session of SQLContext could be created by: 1) create an new SQLContext 2) call newSession() on existing SQLContext For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession). CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache. Added jars are still shared by all the sessions, because SparkContext does not support sessions. cc marmbrus yhuai rxin Author: Davies Liu <davies@databricks.com> Closes #8909 from davies/sessions.
* [SPARK-10914] UnsafeRow serialization breaks when two machines have ↵Reynold Xin2015-10-081-3/+44
| | | | | | | | | | | | | | | | | | different Oops size. UnsafeRow contains 3 pieces of information when pointing to some data in memory (an object, a base offset, and length). When the row is serialized with Java/Kryo serialization, the object layout in memory can change if two machines have different pointer width (Oops in JVM). To reproduce, launch Spark using MASTER=local-cluster[2,1,1024] bin/spark-shell --conf "spark.executor.extraJavaOptions=-XX:-UseCompressedOops" And then run the following scala> sql("select 1 xx").collect() Author: Reynold Xin <rxin@databricks.com> Closes #9030 from rxin/SPARK-10914.
* [SPARK-8848] [SQL] Refactors Parquet write path to follow parquet-formatCheng Lian2015-10-081-1/+3
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | This PR refactors Parquet write path to follow parquet-format spec. It's a successor of PR #7679, but with less non-essential changes. Major changes include: 1. Replaces `RowWriteSupport` and `MutableRowWriteSupport` with `CatalystWriteSupport` - Writes Parquet data using standard layout defined in parquet-format Specifically, we are now writing ... - ... arrays and maps in standard 3-level structure with proper annotations and field names - ... decimals as `INT32` and `INT64` whenever possible, and taking `FIXED_LEN_BYTE_ARRAY` as the final fallback - Supports legacy mode which is compatible with Spark 1.4 and prior versions The legacy mode is by default off, and can be turned on by flipping SQL option `spark.sql.parquet.writeLegacyFormat` to `true`. - Eliminates per value data type dispatching costs via prebuilt composed writer functions 1. Cleans up the last pieces of old Parquet support code As pointed out by rxin previously, we probably want to rename all those `Catalyst*` Parquet classes to `Parquet*` for clarity. But I'd like to do this in a follow-up PR to minimize code review noises in this one. Author: Cheng Lian <lian@databricks.com> Closes #8988 from liancheng/spark-8848/standard-parquet-write-path.
* [SPARK-10993] [SQL] Inital code generated encoder for product typesMichael Armbrust2015-10-088-2/+910
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | This PR is a first cut at code generating an encoder that takes a Scala `Product` type and converts it directly into the tungsten binary format. This is done through the addition of a new set of expression that can be used to invoke methods on raw JVM objects, extracting fields and converting the result into the required format. These can then be used directly in an `UnsafeProjection` allowing us to leverage the existing encoding logic. According to some simple benchmarks, this can significantly speed up conversion (~4x). However, replacing CatalystConverters is deferred to a later PR to keep this PR at a reasonable size. ```scala case class SomeInts(a: Int, b: Int, c: Int, d: Int, e: Int) val data = SomeInts(1, 2, 3, 4, 5) val encoder = ProductEncoder[SomeInts] val converter = CatalystTypeConverters.createToCatalystConverter(ScalaReflection.schemaFor[SomeInts].dataType) (1 to 5).foreach {iter => benchmark(s"converter $iter") { var i = 100000000 while (i > 0) { val res = converter(data).asInstanceOf[InternalRow] assert(res.getInt(0) == 1) assert(res.getInt(1) == 2) i -= 1 } } benchmark(s"encoder $iter") { var i = 100000000 while (i > 0) { val res = encoder.toRow(data) assert(res.getInt(0) == 1) assert(res.getInt(1) == 2) i -= 1 } } } ``` Results: ``` [info] converter 1: 7170ms [info] encoder 1: 1888ms [info] converter 2: 6763ms [info] encoder 2: 1824ms [info] converter 3: 6912ms [info] encoder 3: 1802ms [info] converter 4: 7131ms [info] encoder 4: 1798ms [info] converter 5: 7350ms [info] encoder 5: 1912ms ``` Author: Michael Armbrust <michael@databricks.com> Closes #9019 from marmbrus/productEncoder.
* Revert [SPARK-8654] [SQL] Fix Analysis exception when using NULL INMichael Armbrust2015-10-082-29/+2
| | | | | | | | This reverts commit dcbd58a929be0058b1cfa59b14898c4c428a7680 from #8983 Author: Michael Armbrust <michael@databricks.com> Closes #9034 from marmbrus/revert8654.
* [SPARK-10887] [SQL] Build HashedRelation outside of HashJoinNode.Yin Huai2015-10-082-2/+4
| | | | | | | | | | This PR refactors `HashJoinNode` to take a existing `HashedRelation`. So, we can reuse this node for both `ShuffledHashJoin` and `BroadcastHashJoin`. https://issues.apache.org/jira/browse/SPARK-10887 Author: Yin Huai <yhuai@databricks.com> Closes #8953 from yhuai/SPARK-10887.
* [SPARK-8654] [SQL] Fix Analysis exception when using NULL IN (...)Dilip Biswal2015-10-082-2/+29
| | | | | | | | | | | | | | | In the analysis phase , while processing the rules for IN predicate, we compare the in-list types to the lhs expression type and generate cast operation if necessary. In the case of NULL [NOT] IN expr1 , we end up generating cast between in list types to NULL like cast (1 as NULL) which is not a valid cast. The fix is to not generate such a cast if the lhs type is a NullType instead we translate the expression to Literal(Null). Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #8983 from dilipbiswal/spark_8654.
* [SPARK-10998] [SQL] Show non-children in default Expression.toStringMichael Armbrust2015-10-081-1/+7
| | | | | | | | | | | Its pretty hard to debug problems with expressions when you can't see all the arguments. Before: `invoke()` After: `invoke(inputObject#1, intField, IntegerType)` Author: Michael Armbrust <michael@databricks.com> Closes #9022 from marmbrus/expressionToString.
* [SPARK-10917] [SQL] improve performance of complex type in columnar cacheDavies Liu2015-10-072-1/+5
| | | | | | | | | | | | | | | | | | | | | | | | | This PR improve the performance of complex types in columnar cache by using UnsafeProjection instead of KryoSerializer. A simple benchmark show that this PR could improve the performance of scanning a cached table with complex columns by 15x (comparing to Spark 1.5). Here is the code used to benchmark: ``` df = sc.range(1<<23).map(lambda i: Row(a=Row(b=i, c=str(i)), d=range(10), e=dict(zip(range(10), [str(i) for i in range(10)])))).toDF() df.write.parquet("table") ``` ``` df = sqlContext.read.parquet("table") df.cache() df.count() t = time.time() print df.select("*")._jdf.queryExecution().toRdd().count() print time.time() - t ``` Author: Davies Liu <davies@databricks.com> Closes #8971 from davies/complex.
* [SPARK-9702] [SQL] Use Exchange to implement logical Repartition operatorJosh Rosen2015-10-071-0/+16
| | | | | | | | | This patch allows `Repartition` to support UnsafeRows. This is accomplished by implementing the logical `Repartition` operator in terms of `Exchange` and a new `RoundRobinPartitioning`. Author: Josh Rosen <joshrosen@databricks.com> Author: Liang-Chi Hsieh <viirya@appier.com> Closes #8083 from JoshRosen/SPARK-9702.
* [SPARK-10980] [SQL] fix bug in create DecimalDavies Liu2015-10-072-1/+2
| | | | | | | | | | The created decimal is wrong if using `Decimal(unscaled, precision, scale)` with unscaled > 1e18 and and precision > 18 and scale > 0. This bug exists since the beginning. Author: Davies Liu <davies@databricks.com> Closes #9014 from davies/fix_decimal.