aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/test/java/org/apache
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish ↵Sean Owen2017-04-101-2/+4
| | | | | | | | | | | | | | | | | | locale bug" causes Spark problems ## What changes were proposed in this pull request? Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem"). The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods. ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #17527 from srowen/SPARK-20156.
* [SPARK-20057][SS] Renamed KeyedState to GroupState in mapGroupsWithStateTathagata Das2017-03-221-2/+6
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Since the state is tied a "group" in the "mapGroupsWithState" operations, its better to call the state "GroupState" instead of a key. This would make it more general if you extends this operation to RelationGroupedDataset and python APIs. ## How was this patch tested? Existing unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17385 from tdas/SPARK-20057.
* [SPARK-19067][SS] Processing-time-based timeout in MapGroupsWithStateTathagata Das2017-03-191-0/+29
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? When a key does not get any new data in `mapGroupsWithState`, the mapping function is never called on it. So we need a timeout feature that calls the function again in such cases, so that the user can decide whether to continue waiting or clean up (remove state, save stuff externally, etc.). Timeouts can be either based on processing time or event time. This JIRA is for processing time, but defines the high level API design for both. The usage would look like this. ``` def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = { ... state.setTimeoutDuration(10000) ... } dataset // type is Dataset[T] .groupByKey[K](keyingFunc) // generates KeyValueGroupedDataset[K, T] .mapGroupsWithState[S, U]( func = stateFunction, timeout = KeyedStateTimeout.withProcessingTime) // returns Dataset[U] ``` Note the following design aspects. - The timeout type is provided as a param in mapGroupsWithState as a parameter global to all the keys. This is so that the planner knows this at planning time, and accordingly optimize the execution based on whether to saves extra info in state or not (e.g. timeout durations or timestamps). - The exact timeout duration is provided inside the function call so that it can be customized on a per key basis. - When the timeout occurs for a key, the function is called with no values, and KeyedState.isTimingOut() set to true. - The timeout is reset for key every time the function is called on the key, that is, when the key has new data, or the key has timed out. So the user has to set the timeout duration everytime the function is called, otherwise there will not be any timeout set. Guarantees provided on timeout of key, when timeout duration is D ms: - Timeout will never be called before real clock time has advanced by D ms - Timeout will be called eventually when there is a trigger with any data in it (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur. For example, if there is no data in the stream (for any key) for a while, then the timeout will not be hit. Implementation details: - Added new param to `mapGroupsWithState` for timeout - Added new method to `StateStore` to filter data based on timeout timestamp - Changed the internal map type of `HDFSBackedStateStore` from Java's `HashMap` to `ConcurrentHashMap` as the latter allows weakly-consistent fail-safe iterators on the map data. See comments in code for more details. - Refactored logic of `MapGroupsWithStateExec` to - Save timeout info to state store for each key that has data. - Then, filter states that should be timed out based on the current batch processing timestamp. - Moved KeyedState for `o.a.s.sql` to `o.a.s.sql.streaming`. I remember that this was a feedback in the MapGroupsWithState PR that I had forgotten to address. ## How was this patch tested? New unit tests in - MapGroupsWithStateSuite for timeouts. - StateStoreSuite for new APIs in StateStore. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17179 from tdas/mapgroupwithstate-timeout.
* [SPARK-18420][BUILD] Fix the errors caused by lint check in JavaXianyang Liu2016-11-161-1/+0
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Small fix, fix the errors caused by lint check in Java - Clear unused objects and `UnusedImports`. - Add comments around the method `finalize` of `NioBufferedFileInputStream`to turn off checkstyle. - Cut the line which is longer than 100 characters into two lines. ## How was this patch tested? Travis CI. ``` $ build/mvn -T 4 -q -DskipTests -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install $ dev/lint-java ``` Before: ``` Checkstyle checks failed at following occurrences: [ERROR] src/main/java/org/apache/spark/network/util/TransportConf.java:[21,8] (imports) UnusedImports: Unused import - org.apache.commons.crypto.cipher.CryptoCipherFactory. [ERROR] src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java:[516,5] (modifier) RedundantModifier: Redundant 'public' modifier. [ERROR] src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java:[133] (coding) NoFinalizer: Avoid using finalizer method. [ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java:[71] (sizes) LineLength: Line is longer than 100 characters (found 113). [ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java:[112] (sizes) LineLength: Line is longer than 100 characters (found 110). [ERROR] src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java:[31,17] (modifier) ModifierOrder: 'static' modifier out of order with the JLS suggestions. [ERROR]src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionWithElasticNetExample.java:[64] (sizes) LineLength: Line is longer than 100 characters (found 103). [ERROR] src/main/java/org/apache/spark/examples/ml/JavaInteractionExample.java:[22,8] (imports) UnusedImports: Unused import - org.apache.spark.ml.linalg.Vectors. [ERROR] src/main/java/org/apache/spark/examples/ml/JavaInteractionExample.java:[51] (regexp) RegexpSingleline: No trailing whitespace allowed. ``` After: ``` $ build/mvn -T 4 -q -DskipTests -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install $ dev/lint-java Using `mvn` from path: /home/travis/build/ConeyLiu/spark/build/apache-maven-3.3.9/bin/mvn Checkstyle checks passed. ``` Author: Xianyang Liu <xyliu0530@icloud.com> Closes #15865 from ConeyLiu/master.
* [SPARK-17495][SQL] Add Hash capability semantically equivalent to Hive'sTejas Patil2016-10-041-0/+128
| | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-17495 Spark internally uses Murmur3Hash for partitioning. This is different from the one used by Hive. For queries which use bucketing this leads to different results if one tries the same query on both engines. For us, we want users to have backward compatibility to that one can switch parts of applications across the engines without observing regressions. This PR includes `HiveHash`, `HiveHashFunction`, `HiveHasher` which mimics Hive's hashing at https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java#L638 I am intentionally not introducing any usages of this hash function in rest of the code to keep this PR small. My eventual goal is to have Hive bucketing support in Spark. Once this PR gets in, I will make hash function pluggable in relevant areas (eg. `HashPartitioning`'s `partitionIdExpression` has Murmur3 hardcoded : https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala#L265) ## How was this patch tested? Added `HiveHashSuite` Author: Tejas Patil <tejasp@fb.com> Closes #15047 from tejasapatil/SPARK-17495_hive_hash.
* [SPARK-17524][TESTS] Use specified spark.buffer.pageSizeAdam Roberts2016-09-151-2/+4
| | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR has the appendRowUntilExceedingPageSize test in RowBasedKeyValueBatchSuite use whatever spark.buffer.pageSize value a user has specified to prevent a test failure for anyone testing Apache Spark on a box with a reduced page size. The test is currently hardcoded to use the default page size which is 64 MB so this minor PR is a test improvement ## How was this patch tested? Existing unit tests with 1 MB page size and with 64 MB (the default) page size Author: Adam Roberts <aroberts@uk.ibm.com> Closes #15079 from a-roberts/patch-5.
* [SPARK-16524][SQL] Add RowBatch and RowBasedHashMapGeneratorQifan Pu2016-07-261-0/+425
| | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR is the first step for the following feature: For hash aggregation in Spark SQL, we use a fast aggregation hashmap to act as a "cache" in order to boost aggregation performance. Previously, the hashmap is backed by a `ColumnarBatch`. This has performance issues when we have wide schema for the aggregation table (large number of key fields or value fields). In this JIRA, we support another implementation of fast hashmap, which is backed by a `RowBasedKeyValueBatch`. We then automatically pick between the two implementations based on certain knobs. In this first-step PR, implementations for `RowBasedKeyValueBatch` and `RowBasedHashMapGenerator` are added. ## How was this patch tested? Unit tests: `RowBasedKeyValueBatchSuite` Author: Qifan Pu <qifan.pu@gmail.com> Closes #14349 from ooq/SPARK-16524.
* [SPARK-15686][SQL] Move user-facing streaming classes into sql.streamingReynold Xin2016-06-011-1/+1
| | | | | | | | | | | | ## What changes were proposed in this pull request? This patch moves all user-facing structured streaming classes into sql.streaming. As part of this, I also added some since version annotation to methods and classes that don't have them. ## How was this patch tested? Updated tests to reflect the moves. Author: Reynold Xin <rxin@databricks.com> Closes #13429 from rxin/SPARK-15686.
* [SPARK-15517][SQL][STREAMING] Add support for complete output mode in ↵Tathagata Das2016-05-311-0/+31
| | | | | | | | | | | | | | | | | | | | | | | | | | | Structure Streaming ## What changes were proposed in this pull request? Currently structured streaming only supports append output mode. This PR adds the following. - Added support for Complete output mode in the internal state store, analyzer and planner. - Added public API in Scala and Python for users to specify output mode - Added checks for unsupported combinations of output mode and DF operations - Plans with no aggregation should support only Append mode - Plans with aggregation should support only Update and Complete modes - Default output mode is Append mode (**Question: should we change this to automatically set to Complete mode when there is aggregation?**) - Added support for Complete output mode in Memory Sink. So Memory Sink internally supports append and complete, update. But from public API only Complete and Append output modes are supported. ## How was this patch tested? Unit tests in various test suites - StreamingAggregationSuite: tests for complete mode - MemorySinkSuite: tests for checking behavior in Append and Complete modes. - UnsupportedOperationSuite: tests for checking unsupported combinations of DF ops and output modes - DataFrameReaderWriterSuite: tests for checking that output mode cannot be called on static DFs - Python doc test and existing unit tests modified to call write.outputMode. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13286 from tdas/complete-mode.
* [SPARK-13325][SQL] Create a 64-bit hashcode expressionHerman van Hovell2016-03-231-0/+166
This PR introduces a 64-bit hashcode expression. Such an expression is especially usefull for HyperLogLog++ and other probabilistic datastructures. I have implemented xxHash64 which is a 64-bit hashing algorithm created by Yann Colet and Mathias Westerdahl. This is a high speed (C implementation runs at memory bandwidth) and high quality hashcode. It exploits both Instruction Level Parralellism (for speed) and the multiplication and rotation techniques (for quality) like MurMurHash does. The initial results are promising. I have added a CG'ed test to the `HashBenchmark`, and this results in the following results (running from SBT): Running benchmark: Hash For simple Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For simple: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 1011 / 1016 132.8 7.5 1.0X codegen version 1864 / 1869 72.0 13.9 0.5X codegen version 64-bit 1614 / 1644 83.2 12.0 0.6X Running benchmark: Hash For normal Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 2467 / 2475 0.9 1176.1 1.0X codegen version 2008 / 2115 1.0 957.5 1.2X codegen version 64-bit 728 / 758 2.9 347.0 3.4X Running benchmark: Hash For array Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 1544 / 1707 0.1 11779.6 1.0X codegen version 2728 / 2745 0.0 20815.5 0.6X codegen version 64-bit 2508 / 2549 0.1 19132.8 0.6X Running benchmark: Hash For map Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 1819 / 1826 0.0 444014.3 1.0X codegen version 183 / 194 0.0 44642.9 9.9X codegen version 64-bit 173 / 174 0.0 42120.9 10.5X This shows that algorithm is consistently faster than MurMurHash32 in all cases and up to 3x (!) in the normal case. I have also added this to HyperLogLog++ and it cuts the processing time of the following code in half: val df = sqlContext.range(1<<25).agg(approxCountDistinct("id")) df.explain() val t = System.nanoTime() df.show() val ns = System.nanoTime() - t // Before ns: Long = 5821524302 // After ns: Long = 2836418963 cc cloud-fan (you have been working on hashcodes) / rxin Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #11209 from hvanhovell/xxHash.