aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-19993][SQL] Caching logical plans containing subquery expressions ↵Dilip Biswal2017-04-121-3/+4
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | does not work. ## What changes were proposed in this pull request? The sameResult() method does not work when the logical plan contains subquery expressions. **Before the fix** ```SQL scala> val ds = spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)") ds: org.apache.spark.sql.DataFrame = [c1: int] scala> ds.cache res13: ds.type = [c1: int] scala> spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)").explain(true) == Analyzed Logical Plan == c1: int Project [c1#86] +- Filter c1#86 IN (list#78 [c1#86]) : +- Project [c1#87] : +- Filter (outer(c1#86) = c1#87) : +- SubqueryAlias s2 : +- Relation[c1#87] parquet +- SubqueryAlias s1 +- Relation[c1#86] parquet == Optimized Logical Plan == Join LeftSemi, ((c1#86 = c1#87) && (c1#86 = c1#87)) :- Relation[c1#86] parquet +- Relation[c1#87] parquet ``` **Plan after fix** ```SQL == Analyzed Logical Plan == c1: int Project [c1#22] +- Filter c1#22 IN (list#14 [c1#22]) : +- Project [c1#23] : +- Filter (outer(c1#22) = c1#23) : +- SubqueryAlias s2 : +- Relation[c1#23] parquet +- SubqueryAlias s1 +- Relation[c1#22] parquet == Optimized Logical Plan == InMemoryRelation [c1#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *BroadcastHashJoin [c1#1, c1#1], [c1#2, c1#2], LeftSemi, BuildRight :- *FileScan parquet default.s1[c1#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int> +- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, true] as bigint), 32) | (cast(input[0, int, true] as bigint) & 4294967295)))) +- *FileScan parquet default.s2[c1#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int> ``` ## How was this patch tested? New tests are added to CachedTableSuite. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #17330 from dilipbiswal/subquery_cache_final.
* [SPARK-20291][SQL] NaNvl(FloatType, NullType) should not be cast to ↵DB Tsai2017-04-121-2/+1
| | | | | | | | | | | | | | | | | | | | | | NaNvl(DoubleType, DoubleType) ## What changes were proposed in this pull request? `NaNvl(float value, null)` will be converted into `NaNvl(float value, Cast(null, DoubleType))` and finally `NaNvl(Cast(float value, DoubleType), Cast(null, DoubleType))`. This will cause mismatching in the output type when the input type is float. By adding extra rule in TypeCoercion can resolve this issue. ## How was this patch tested? unite tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: DB Tsai <dbt@netflix.com> Closes #17606 from dbtsai/fixNaNvl.
* [SPARK-20283][SQL] Add preOptimizationBatchesReynold Xin2017-04-101-1/+7
| | | | | | | | | | | | ## What changes were proposed in this pull request? We currently have postHocOptimizationBatches, but not preOptimizationBatches. This patch adds preOptimizationBatches so the optimizer debugging extensions are symmetric. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #17595 from rxin/SPARK-20283.
* [SPARK-20282][SS][TESTS] Write the commit log first to fix a race contion in ↵Shixiong Zhu2017-04-101-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | tests ## What changes were proposed in this pull request? This PR fixes the following failure: ``` sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: Assert on query failed: == Progress == AssertOnQuery(<condition>, ) StopStream AddData to MemoryStream[value#30891]: 1,2 StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock35cdc93a,Map()) CheckAnswer: [6],[3] StopStream => AssertOnQuery(<condition>, ) AssertOnQuery(<condition>, ) StartStream(OneTimeTrigger,org.apache.spark.util.SystemClockcdb247d,Map()) CheckAnswer: [6],[3] StopStream AddData to MemoryStream[value#30891]: 3 StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock55394e4d,Map()) CheckLastBatch: [2] StopStream AddData to MemoryStream[value#30891]: 0 StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock749aa997,Map()) ExpectFailure[org.apache.spark.SparkException, isFatalError: false] AssertOnQuery(<condition>, ) AssertOnQuery(<condition>, incorrect start offset or end offset on exception) == Stream == Output Mode: Append Stream state: not started Thread state: dead == Sink == 0: [6] [3] == Plan == at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:495) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555) at org.scalatest.Assertions$class.fail(Assertions.scala:1328) at org.scalatest.FunSuite.fail(FunSuite.scala:1555) at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:347) at org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:318) at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:483) at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:357) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:357) at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:356) at org.apache.spark.sql.streaming.StreamingQuerySuite.testStream(StreamingQuerySuite.scala:41) at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply$mcV$sp(StreamingQuerySuite.scala:166) at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161) at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161) at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply$mcV$sp(SQLTestUtils.scala:268) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfterEach$$super$runTest(StreamingQuerySuite.scala:41) at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255) at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$runTest(StreamingQuerySuite.scala:41) at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200) at org.apache.spark.sql.streaming.StreamingQuerySuite.runTest(StreamingQuerySuite.scala:41) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:381) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$run(StreamingQuerySuite.scala:41) at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241) at org.apache.spark.sql.streaming.StreamingQuerySuite.run(StreamingQuerySuite.scala:41) at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:357) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:502) at sbt.ForkMain$Run$2.call(ForkMain.java:296) at sbt.ForkMain$Run$2.call(ForkMain.java:286) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` The failure is because `CheckAnswer` will run once `committedOffsets` is updated. Then writing the commit log may be interrupted by the following `StopStream`. This PR just change the order to write the commit log first. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #17594 from zsxwing/SPARK-20282.
* [SPARK-20280][CORE] FileStatusCache Weigher integer overflowBogdan Raducanu2017-04-101-13/+34
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor. ## How was this patch tested? New test in FileIndexSuite Author: Bogdan Raducanu <bogdan@databricks.com> Closes #17591 from bogdanrdc/SPARK-20280.
* [SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish ↵Sean Owen2017-04-1023-58/+86
| | | | | | | | | | | | | | | | | | locale bug" causes Spark problems ## What changes were proposed in this pull request? Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem"). The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods. ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #17527 from srowen/SPARK-20156.
* [SPARK-20229][SQL] add semanticHash to QueryPlanWenchen Fan2017-04-108-69/+30
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Like `Expression`, `QueryPlan` should also have a `semanticHash` method, then we can put plans to a hash map and look it up fast. This PR refactors `QueryPlan` to follow `Expression` and put all the normalization logic in `QueryPlan.canonicalized`, so that it's very natural to implement `semanticHash`. follow-up: improve `CacheManager` to leverage this `semanticHash` and speed up plan lookup, instead of iterating all cached plans. ## How was this patch tested? existing tests. Note that we don't need to test the `semanticHash` method, once the existing tests prove `sameResult` is correct, we are good. Author: Wenchen Fan <wenchen@databricks.com> Closes #17541 from cloud-fan/plan-semantic.
* [SPARK-20270][SQL] na.fill should not change the values in long or integer ↵DB Tsai2017-04-101-2/+3
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | when the default value is in double ## What changes were proposed in this pull request? This bug was partially addressed in SPARK-18555 https://github.com/apache/spark/pull/15994, but the root cause isn't completely solved. This bug is pretty critical since it changes the member id in Long in our application if the member id can not be represented by Double losslessly when the member id is very big. Here is an example how this happens, with ``` Seq[(java.lang.Long, java.lang.Double)]((null, 3.14), (9123146099426677101L, null), (9123146560113991650L, 1.6), (null, null)).toDF("a", "b").na.fill(0.2), ``` the logical plan will be ``` == Analyzed Logical Plan == a: bigint, b: double Project [cast(coalesce(cast(a#232L as double), cast(0.2 as double)) as bigint) AS a#240L, cast(coalesce(nanvl(b#233, cast(null as double)), 0.2) as double) AS b#241] +- Project [_1#229L AS a#232L, _2#230 AS b#233] +- LocalRelation [_1#229L, _2#230] ``` Note that even the value is not null, Spark will cast the Long into Double first. Then if it's not null, Spark will cast it back to Long which results in losing precision. The behavior should be that the original value should not be changed if it's not null, but Spark will change the value which is wrong. With the PR, the logical plan will be ``` == Analyzed Logical Plan == a: bigint, b: double Project [coalesce(a#232L, cast(0.2 as bigint)) AS a#240L, coalesce(nanvl(b#233, cast(null as double)), cast(0.2 as double)) AS b#241] +- Project [_1#229L AS a#232L, _2#230 AS b#233] +- LocalRelation [_1#229L, _2#230] ``` which behaves correctly without changing the original Long values and also avoids extra cost of unnecessary casting. ## How was this patch tested? unit test added. +cc srowen rxin cloud-fan gatorsmile Thanks. Author: DB Tsai <dbt@netflix.com> Closes #17577 from dbtsai/fixnafill.
* [SPARK-20255] Move listLeafFiles() to InMemoryFileIndexAdrian Ionescu2017-04-072-222/+227
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request Trying to get a grip on the `FileIndex` hierarchy, I was confused by the following inconsistency: On the one hand, `PartitioningAwareFileIndex` defines `leafFiles` and `leafDirToChildrenFiles` as abstract, but on the other it fully implements `listLeafFiles` which does all the listing of files. However, the latter is only used by `InMemoryFileIndex`. I'm hereby proposing to move this method (and all its dependencies) to the implementation class that actually uses it, and thus unclutter the `PartitioningAwareFileIndex` interface. ## How was this patch tested? `./build/sbt sql/test` Author: Adrian Ionescu <adrian@databricks.com> Closes #17570 from adrian-ionescu/list-leaf-files.
* [SPARK-20245][SQL][MINOR] pass output to LogicalRelation directlyWenchen Fan2017-04-073-36/+22
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently `LogicalRelation` has a `expectedOutputAttributes` parameter, which makes it hard to reason about what the actual output is. Like other leaf nodes, `LogicalRelation` should also take `output` as a parameter, to simplify the logic ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17552 from cloud-fan/minor.
* [SPARK-20196][PYTHON][SQL] update doc for catalog functions for all ↵Felix Cheung2017-04-062-16/+23
| | | | | | | | | | | | | | | | languages, add pyspark refreshByPath API ## What changes were proposed in this pull request? Update doc to remove external for createTable, add refreshByPath in python ## How was this patch tested? manual Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #17512 from felixcheung/catalogdoc.
* [SPARK-20224][SS] Updated docs for streaming dropDuplicates and ↵Tathagata Das2017-04-052-3/+14
| | | | | | | | | | | | | | | | | | mapGroupsWithState ## What changes were proposed in this pull request? - Fixed bug in Java API not passing timeout conf to scala API - Updated markdown docs - Updated scala docs - Added scala and Java example ## How was this patch tested? Manually ran examples. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17539 from tdas/SPARK-20224.
* [SPARK-20209][SS] Execute next trigger immediately if previous batch took ↵Tathagata Das2017-04-041-8/+9
| | | | | | | | | | | | | | | | | longer than trigger interval ## What changes were proposed in this pull request? For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval. In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock. ## How was this patch tested? Added new unit tests to comprehensively test this behavior. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17525 from tdas/SPARK-20209.
* Small doc fix for ReuseSubquery.Reynold Xin2017-04-041-2/+2
|
* [SPARK-19716][SQL] support by-name resolution for struct type elements in arrayWenchen Fan2017-04-041-1/+1
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Previously when we construct deserializer expression for array type, we will first cast the corresponding field to expected array type and then apply `MapObjects`. However, by doing that, we lose the opportunity to do by-name resolution for struct type inside array type. In this PR, I introduce a `UnresolvedMapObjects` to hold the lambda function and the input array expression. Then during analysis, after the input array expression is resolved, we get the actual array element type and apply by-name resolution. Then we don't need to add `Cast` for array type when constructing the deserializer expression, as the element type is determined later at analyzer. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17398 from cloud-fan/dataset.
* [SPARK-20204][SQL] remove SimpleCatalystConf and CatalystConf type aliasWenchen Fan2017-04-045-14/+14
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/17285 . ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17521 from cloud-fan/conf.
* [SPARK-20198][SQL] Remove the inconsistency in table/function name ↵Xiao Li2017-04-043-55/+134
| | | | | | | | | | | | | | | | | | | | | conventions in SparkSession.Catalog APIs ### What changes were proposed in this pull request? Observed by felixcheung , in `SparkSession`.`Catalog` APIs, we have different conventions/rules for table/function identifiers/names. Most APIs accept the qualified name (i.e., `databaseName`.`tableName` or `databaseName`.`functionName`). However, the following five APIs do not accept it. - def listColumns(tableName: String): Dataset[Column] - def getTable(tableName: String): Table - def getFunction(functionName: String): Function - def tableExists(tableName: String): Boolean - def functionExists(functionName: String): Boolean To make them consistent with the other Catalog APIs, this PR does the changes, updates the function/API comments and adds the `params` to clarify the inputs we allow. ### How was this patch tested? Added the test cases . Author: Xiao Li <gatorsmile@gmail.com> Closes #17518 from gatorsmile/tableIdentifier.
* [SPARK-20067][SQL] Unify and Clean Up Desc Commands Using Catalog InterfaceXiao Li2017-04-032-99/+28
| | | | | | | | | | | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? This PR is to unify and clean up the outputs of `DESC EXTENDED/FORMATTED` and `SHOW TABLE EXTENDED` by moving the logics into the Catalog interface. The output formats are improved. We also add the missing attributes. It impacts the DDL commands like `SHOW TABLE EXTENDED`, `DESC EXTENDED` and `DESC FORMATTED`. In addition, by following what we did in Dataset API `printSchema`, we can use `treeString` to show the schema in the more readable way. Below is the current way: ``` Schema: STRUCT<`a`: STRING (nullable = true), `b`: INT (nullable = true), `c`: STRING (nullable = true), `d`: STRING (nullable = true)> ``` After the change, it should look like ``` Schema: root |-- a: string (nullable = true) |-- b: integer (nullable = true) |-- c: string (nullable = true) |-- d: string (nullable = true) ``` ### How was this patch tested? `describe.sql` and `show-tables.sql` Author: Xiao Li <gatorsmile@gmail.com> Closes #17394 from gatorsmile/descFollowUp.
* [SPARK-10364][SQL] Support Parquet logical type TIMESTAMP_MILLISDilip Biswal2017-04-046-12/+79
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? **Description** from JIRA The TimestampType in Spark SQL is of microsecond precision. Ideally, we should convert Spark SQL timestamp values into Parquet TIMESTAMP_MICROS. But unfortunately parquet-mr hasn't supported it yet. For the read path, we should be able to read TIMESTAMP_MILLIS Parquet values and pad a 0 microsecond part to read values. For the write path, currently we are writing timestamps as INT96, similar to Impala and Hive. One alternative is that, we can have a separate SQL option to let users be able to write Spark SQL timestamp values as TIMESTAMP_MILLIS. Of course, in this way the microsecond part will be truncated. ## How was this patch tested? Added new tests in ParquetQuerySuite and ParquetIOSuite Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #15332 from dilipbiswal/parquet-time-millis.
* [SPARK-20194] Add support for partition pruning to in-memory catalogAdrian Ionescu2017-04-031-4/+1
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch implements `listPartitionsByFilter()` for `InMemoryCatalog` and thus resolves an outstanding TODO causing the `PruneFileSourcePartitions` optimizer rule not to apply when "spark.sql.catalogImplementation" is set to "in-memory" (which is the default). The change is straightforward: it extracts the code for further filtering of the list of partitions returned by the metastore's `getPartitionsByFilter()` out from `HiveExternalCatalog` into `ExternalCatalogUtils` and calls this new function from `InMemoryCatalog` on the whole list of partitions. Now that this method is implemented we can always pass the `CatalogTable` to the `DataSource` in `FindDataSourceTable`, so that the latter is resolved to a relation with a `CatalogFileIndex`, which is what the `PruneFileSourcePartitions` rule matches for. ## How was this patch tested? Ran existing tests and added new test for `listPartitionsByFilter` in `ExternalCatalogSuite`, which is subclassed by both `InMemoryCatalogSuite` and `HiveExternalCatalogSuite`. Author: Adrian Ionescu <adrian@databricks.com> Closes #17510 from adrian-ionescu/InMemoryCatalog.
* [SPARK-19641][SQL] JSON schema inference in DROPMALFORMED mode produces ↵hyukjinkwon2017-04-031-30/+47
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | incorrect schema for non-array/object JSONs ## What changes were proposed in this pull request? Currently, when we infer the types for vaild JSON strings but object or array, we are producing empty schemas regardless of parse modes as below: ```scala scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root ``` ```scala scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root ``` This PR proposes to handle parse modes in type inference. After this PR, ```scala scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root |-- a: long (nullable = true) ``` ``` scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() java.lang.RuntimeException: Failed to infer a common schema. Struct types are expected but string was found. ``` This PR is based on https://github.com/NathanHowell/spark/commit/e233fd03346a73b3b447fa4c24f3b12c8b2e53ae and I and NathanHowell talked about this in https://issues.apache.org/jira/browse/SPARK-19641 ## How was this patch tested? Unit tests in `JsonSuite` for both `DROPMALFORMED` and `FAILFAST` modes. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17492 from HyukjinKwon/SPARK-19641.
* [SPARK-20166][SQL] Use XXX for ISO 8601 timezone instead of ZZ ↵hyukjinkwon2017-04-034-7/+7
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | (FastDateFormat specific) in CSV/JSON timeformat options ## What changes were proposed in this pull request? This PR proposes to use `XXX` format instead of `ZZ`. `ZZ` seems a `FastDateFormat` specific. `ZZ` supports "ISO 8601 extended format time zones" but it seems `FastDateFormat` specific option. I misunderstood this is compatible format with `SimpleDateFormat` when this change is introduced. Please see [SimpleDateFormat documentation]( https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html#iso8601timezone) and [FastDateFormat documentation](https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/time/FastDateFormat.html). It seems we better replace `ZZ` to `XXX` because they look using the same strategy - [FastDateParser.java#L930](https://github.com/apache/commons-lang/blob/8767cd4f1a6af07093c1e6c422dae8e574be7e5e/src/main/java/org/apache/commons/lang3/time/FastDateParser.java#L930), [FastDateParser.java#L932-L951 ](https://github.com/apache/commons-lang/blob/8767cd4f1a6af07093c1e6c422dae8e574be7e5e/src/main/java/org/apache/commons/lang3/time/FastDateParser.java#L932-L951) and [FastDateParser.java#L596-L601](https://github.com/apache/commons-lang/blob/8767cd4f1a6af07093c1e6c422dae8e574be7e5e/src/main/java/org/apache/commons/lang3/time/FastDateParser.java#L596-L601). I also checked the codes and manually debugged it for sure. It seems both cases use the same pattern `( Z|(?:[+-]\\d{2}(?::)\\d{2}))`. _Note that this should be rather a fix about documentation and not the behaviour change because `ZZ` seems invalid date format in `SimpleDateFormat` as documented in `DataFrameReader` and etc, and both `ZZ` and `XXX` look identically working with `FastDateFormat`_ Current documentation is as below: ``` * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> ``` ## How was this patch tested? Existing tests should cover this. Also, manually tested as below (BTW, I don't think these are worth being added as tests within Spark): **Parse** ```scala scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00") res4: java.util.Date = Tue Mar 21 20:00:00 KST 2017 scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000Z") res10: java.util.Date = Tue Mar 21 09:00:00 KST 2017 scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000-11:00") java.text.ParseException: Unparseable date: "2017-03-21T00:00:00.000-11:00" at java.text.DateFormat.parse(DateFormat.java:366) ... 48 elided scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000Z") java.text.ParseException: Unparseable date: "2017-03-21T00:00:00.000Z" at java.text.DateFormat.parse(DateFormat.java:366) ... 48 elided ``` ```scala scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00") res7: java.util.Date = Tue Mar 21 20:00:00 KST 2017 scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000Z") res1: java.util.Date = Tue Mar 21 09:00:00 KST 2017 scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000-11:00") res8: java.util.Date = Tue Mar 21 20:00:00 KST 2017 scala> org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ").parse("2017-03-21T00:00:00.000Z") res2: java.util.Date = Tue Mar 21 09:00:00 KST 2017 ``` **Format** ```scala scala> new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").format(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse("2017-03-21T00:00:00.000-11:00")) res6: String = 2017-03-21T20:00:00.000+09:00 ``` ```scala scala> val fd = org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ") fd: org.apache.commons.lang3.time.FastDateFormat = FastDateFormat[yyyy-MM-dd'T'HH:mm:ss.SSSZZ,ko_KR,Asia/Seoul] scala> fd.format(fd.parse("2017-03-21T00:00:00.000-11:00")) res1: String = 2017-03-21T20:00:00.000+09:00 scala> val fd = org.apache.commons.lang3.time.FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX") fd: org.apache.commons.lang3.time.FastDateFormat = FastDateFormat[yyyy-MM-dd'T'HH:mm:ss.SSSXXX,ko_KR,Asia/Seoul] scala> fd.format(fd.parse("2017-03-21T00:00:00.000-11:00")) res2: String = 2017-03-21T20:00:00.000+09:00 ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #17489 from HyukjinKwon/SPARK-20166.
* [SPARK-19148][SQL][FOLLOW-UP] do not expose the external table concept in ↵Xiao Li2017-04-011-10/+15
| | | | | | | | | | | | | | Catalog ### What changes were proposed in this pull request? After we renames `Catalog`.`createExternalTable` to `createTable` in the PR: https://github.com/apache/spark/pull/16528, we also need to deprecate the corresponding functions in `SQLContext`. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #17502 from gatorsmile/deprecateCreateExternalTable.
* [SPARK-20165][SS] Resolve state encoder's deserializer in driver in ↵Tathagata Das2017-03-311-12/+16
| | | | | | | | | | | | | | | | | | | FlatMapGroupsWithStateExec ## What changes were proposed in this pull request? - Encoder's deserializer must be resolved at the driver where the class is defined. Otherwise there are corner cases using nested classes where resolving at the executor can fail. - Fixed flaky test related to processing time timeout. The flakiness is caused because the test thread (that adds data to memory source) has a race condition with the streaming query thread. When testing the manual clock, the goal is to add data and increment clock together atomically, such that a trigger sees new data AND updated clock simultaneously (both or none). This fix adds additional synchronization in when adding data; it makes sure that the streaming query thread is waiting on the manual clock to be incremented (so no batch is currently running) before adding data. - Added`testQuietly` on some tests that generate a lot of error logs. ## How was this patch tested? Multiple runs on existing unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17488 from tdas/SPARK-20165.
* [SPARK-20151][SQL] Account for partition pruning in scan metadataTime metricsReynold Xin2017-03-303-4/+18
| | | | | | | | | | | | ## What changes were proposed in this pull request? After SPARK-20136, we report metadata timing metrics in scan operator. However, that timing metric doesn't include one of the most important part of metadata, which is partition pruning. This patch adds that time measurement to the scan metrics. ## How was this patch tested? N/A - I tried adding a test in SQLMetricsSuite but it was extremely convoluted to the point that I'm not sure if this is worth it. Author: Reynold Xin <rxin@databricks.com> Closes #17476 from rxin/SPARK-20151.
* [DOCS] Docs-only improvementsJacek Laskowski2017-03-308-36/+36
| | | | | | | | | | | | | | | | …adoc ## What changes were proposed in this pull request? Use recommended values for row boundaries in Window's scaladoc, i.e. `Window.unboundedPreceding`, `Window.unboundedFollowing`, and `Window.currentRow` (that were introduced in 2.1.0). ## How was this patch tested? Local build Author: Jacek Laskowski <jacek@japila.pl> Closes #17417 from jaceklaskowski/window-expression-scaladoc.
* [SPARK-20148][SQL] Extend the file commit API to allow subscribing to task ↵Eric Liang2017-03-291-6/+16
| | | | | | | | | | | | | | | | | | | | commit messages ## What changes were proposed in this pull request? The internal FileCommitProtocol interface returns all task commit messages in bulk to the implementation when a job finishes. However, it is sometimes useful to access those messages before the job completes, so that the driver gets incremental progress updates before the job finishes. This adds an `onTaskCommit` listener to the internal api. ## How was this patch tested? Unit tests. cc rxin Author: Eric Liang <ekl@databricks.com> Closes #17475 from ericl/file-commit-api-ext.
* [SPARK-20136][SQL] Add num files and metadata operation timing to scan ↵Reynold Xin2017-03-291-2/+16
| | | | | | | | | | | | | | | | | | operator metrics ## What changes were proposed in this pull request? This patch adds explicit metadata operation timing and number of files in data source metrics. Those would be useful to include for performance profiling. Screenshot of a UI with this change (num files and metadata time are new metrics): <img width="321" alt="screen shot 2017-03-29 at 12 29 28 am" src="https://cloud.githubusercontent.com/assets/323388/24443272/d4ea58c0-1416-11e7-8940-ecb69375554a.png"> ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #17465 from rxin/SPARK-20136.
* [SPARK-20009][SQL] Support DDL strings for defining schema in ↵Takeshi Yamamuro2017-03-291-3/+12
| | | | | | | | | | | | | | functions.from_json ## What changes were proposed in this pull request? This pr added `StructType.fromDDL` to convert a DDL format string into `StructType` for defining schemas in `functions.from_json`. ## How was this patch tested? Added tests in `JsonFunctionsSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17406 from maropu/SPARK-20009.
* [SPARK-20048][SQL] Cloning SessionState does not clone query execution listenersKunal Khamar2017-03-294-40/+54
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Bugfix from [SPARK-19540.](https://github.com/apache/spark/pull/16826) Cloning SessionState does not clone query execution listeners, so cloned session is unable to listen to events on queries. ## How was this patch tested? - Unit test Author: Kunal Khamar <kkhamar@outlook.com> Closes #17379 from kunalkhamar/clone-bugfix.
* [SPARK-20134][SQL] SQLMetrics.postDriverMetricUpdates to simplify driver ↵Reynold Xin2017-03-294-14/+29
| | | | | | | | | | | | | | side metric updates ## What changes were proposed in this pull request? It is not super intuitive how to update SQLMetric on the driver side. This patch introduces a new SQLMetrics.postDriverMetricUpdates function to do that, and adds documentation to make it more obvious. ## How was this patch tested? Updated a test case to use this method. Author: Reynold Xin <rxin@databricks.com> Closes #17464 from rxin/SPARK-20134.
* [SPARK-20126][SQL] Remove HiveSessionStateHerman van Hovell2017-03-283-25/+32
| | | | | | | | | | | | ## What changes were proposed in this pull request? Commit https://github.com/apache/spark/commit/ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17457 from hvanhovell/SPARK-20126.
* [SPARK-20100][SQL] Refactor SessionState initializationHerman van Hovell2017-03-285-153/+352
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions. This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements: 1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive. 2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17433 from hvanhovell/SPARK-20100.
* [SPARK-19876][SS] Follow up: Refactored BatchCommitLog to simplify logicTathagata Das2017-03-273-12/+19
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Existing logic seemingly writes null to the BatchCommitLog, even though it does additional checks to write '{}' (valid json) to the log. This PR simplifies the logic by disallowing use of `log.add(batchId, metadata)` and instead using `log.add(batchId)`. No question of specifying metadata, so no confusion related to null. ## How was this patch tested? Existing tests pass. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17444 from tdas/SPARK-19876-1.
* [SPARK-20046][SQL] Facilitate loop optimizations in a JIT compiler regarding ↵Kazuaki Ishizaki2017-03-261-4/+14
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | sqlContext.read.parquet() ## What changes were proposed in this pull request? This PR improves performance of operations with `sqlContext.read.parquet()` by changing Java code generated by Catalyst. This PR is inspired by [the blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html) and [this stackoverflow entry](http://stackoverflow.com/questions/40629435/fast-parquet-row-count-in-spark). This PR changes generated code in the following two points. 1. Replace a while-loop with long instance variables a for-loop with int local variables 2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated). These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance of `sqlContext.read.parquet().count` is improved by 1.09x. Benchmark program: ```java val dir = "/dev/shm/parquet" val N = 1000 * 1000 * 40 val iters = 20 val benchmark = new Benchmark("Parquet", N * iters, minNumIters = 5, warmupTime = 30.seconds) sparkSession.range(n).write.mode("overwrite").parquet(dir) benchmark.addCase("count") { i: Int => var n = 0 var len = 0L while (n < iters) { len += sparkSession.read.parquet(dir).count n += 1 } } benchmark.run ``` Performance result without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz Parquet: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ w/o this PR 1152 / 1211 694.7 1.4 1.0X ``` Performance result with this PR ``` OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz Parquet: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ with this PR 1053 / 1121 760.0 1.3 1.0X ``` Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed. Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private scala.collection.Iterator scan_input; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows; /* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime; /* 014 */ private long scan_scanTime1; /* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch; /* 016 */ private int scan_batchIdx; /* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 019 */ private UnsafeRow agg_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 022 */ /* 023 */ public GeneratedIterator(Object[] references) { /* 024 */ this.references = references; /* 025 */ } /* 026 */ /* 027 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 028 */ partitionIndex = index; /* 029 */ this.inputs = inputs; /* 030 */ agg_initAgg = false; /* 031 */ /* 032 */ scan_input = inputs[0]; /* 033 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 034 */ this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 035 */ scan_scanTime1 = 0; /* 036 */ scan_batch = null; /* 037 */ scan_batchIdx = 0; /* 038 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 039 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 040 */ agg_result = new UnsafeRow(1); /* 041 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 042 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 043 */ /* 044 */ } /* 045 */ /* 046 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 047 */ // initialize aggregation buffer /* 048 */ agg_bufIsNull = false; /* 049 */ agg_bufValue = 0L; /* 050 */ /* 051 */ if (scan_batch == null) { /* 052 */ scan_nextBatch(); /* 053 */ } /* 054 */ while (scan_batch != null) { /* 055 */ int numRows = scan_batch.numRows(); /* 056 */ while (scan_batchIdx < numRows) { /* 057 */ int scan_rowIdx = scan_batchIdx++; /* 058 */ // do aggregate /* 059 */ // common sub-expressions /* 060 */ /* 061 */ // evaluate aggregate function /* 062 */ boolean agg_isNull1 = false; /* 063 */ /* 064 */ long agg_value1 = -1L; /* 065 */ agg_value1 = agg_bufValue + 1L; /* 066 */ // update aggregation buffer /* 067 */ agg_bufIsNull = false; /* 068 */ agg_bufValue = agg_value1; /* 069 */ if (shouldStop()) return; /* 070 */ } /* 071 */ scan_batch = null; /* 072 */ scan_nextBatch(); /* 073 */ } /* 074 */ scan_scanTime.add(scan_scanTime1 / (1000 * 1000)); /* 075 */ scan_scanTime1 = 0; /* 076 */ /* 077 */ } /* 078 */ /* 079 */ private void scan_nextBatch() throws java.io.IOException { /* 080 */ long getBatchStart = System.nanoTime(); /* 081 */ if (scan_input.hasNext()) { /* 082 */ scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next(); /* 083 */ scan_numOutputRows.add(scan_batch.numRows()); /* 084 */ scan_batchIdx = 0; /* 085 */ /* 086 */ } /* 087 */ scan_scanTime1 += System.nanoTime() - getBatchStart; /* 088 */ } /* 089 */ /* 090 */ protected void processNext() throws java.io.IOException { /* 091 */ while (!agg_initAgg) { /* 092 */ agg_initAgg = true; /* 093 */ long agg_beforeAgg = System.nanoTime(); /* 094 */ agg_doAggregateWithoutKey(); /* 095 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 096 */ /* 097 */ // output the result /* 098 */ /* 099 */ agg_numOutputRows.add(1); /* 100 */ agg_rowWriter.zeroOutNullBytes(); /* 101 */ /* 102 */ if (agg_bufIsNull) { /* 103 */ agg_rowWriter.setNullAt(0); /* 104 */ } else { /* 105 */ agg_rowWriter.write(0, agg_bufValue); /* 106 */ } /* 107 */ append(agg_result); /* 108 */ } /* 109 */ } /* 110 */ } ``` Generated code with this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private scala.collection.Iterator scan_input; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows; /* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime; /* 014 */ private long scan_scanTime1; /* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch; /* 016 */ private int scan_batchIdx; /* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 019 */ private UnsafeRow agg_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 022 */ /* 023 */ public GeneratedIterator(Object[] references) { /* 024 */ this.references = references; /* 025 */ } /* 026 */ /* 027 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 028 */ partitionIndex = index; /* 029 */ this.inputs = inputs; /* 030 */ agg_initAgg = false; /* 031 */ /* 032 */ scan_input = inputs[0]; /* 033 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 034 */ this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 035 */ scan_scanTime1 = 0; /* 036 */ scan_batch = null; /* 037 */ scan_batchIdx = 0; /* 038 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 039 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 040 */ agg_result = new UnsafeRow(1); /* 041 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 042 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 043 */ /* 044 */ } /* 045 */ /* 046 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 047 */ // initialize aggregation buffer /* 048 */ agg_bufIsNull = false; /* 049 */ agg_bufValue = 0L; /* 050 */ /* 051 */ if (scan_batch == null) { /* 052 */ scan_nextBatch(); /* 053 */ } /* 054 */ while (scan_batch != null) { /* 055 */ int numRows = scan_batch.numRows(); /* 056 */ int scan_localEnd = numRows - scan_batchIdx; /* 057 */ for (int scan_localIdx = 0; scan_localIdx < scan_localEnd; scan_localIdx++) { /* 058 */ int scan_rowIdx = scan_batchIdx + scan_localIdx; /* 059 */ // do aggregate /* 060 */ // common sub-expressions /* 061 */ /* 062 */ // evaluate aggregate function /* 063 */ boolean agg_isNull1 = false; /* 064 */ /* 065 */ long agg_value1 = -1L; /* 066 */ agg_value1 = agg_bufValue + 1L; /* 067 */ // update aggregation buffer /* 068 */ agg_bufIsNull = false; /* 069 */ agg_bufValue = agg_value1; /* 070 */ // shouldStop check is eliminated /* 071 */ } /* 072 */ scan_batchIdx = numRows; /* 073 */ scan_batch = null; /* 074 */ scan_nextBatch(); /* 075 */ } /* 079 */ } /* 080 */ /* 081 */ private void scan_nextBatch() throws java.io.IOException { /* 082 */ long getBatchStart = System.nanoTime(); /* 083 */ if (scan_input.hasNext()) { /* 084 */ scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next(); /* 085 */ scan_numOutputRows.add(scan_batch.numRows()); /* 086 */ scan_batchIdx = 0; /* 087 */ /* 088 */ } /* 089 */ scan_scanTime1 += System.nanoTime() - getBatchStart; /* 090 */ } /* 091 */ /* 092 */ protected void processNext() throws java.io.IOException { /* 093 */ while (!agg_initAgg) { /* 094 */ agg_initAgg = true; /* 095 */ long agg_beforeAgg = System.nanoTime(); /* 096 */ agg_doAggregateWithoutKey(); /* 097 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 098 */ /* 099 */ // output the result /* 100 */ /* 101 */ agg_numOutputRows.add(1); /* 102 */ agg_rowWriter.zeroOutNullBytes(); /* 103 */ /* 104 */ if (agg_bufIsNull) { /* 105 */ agg_rowWriter.setNullAt(0); /* 106 */ } else { /* 107 */ agg_rowWriter.write(0, agg_bufValue); /* 108 */ } /* 109 */ append(agg_result); /* 110 */ } /* 111 */ } /* 112 */ } ``` ## How was this patch tested? Tested existing test suites Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17378 from kiszk/SPARK-20046.
* [SPARK-19949][SQL][FOLLOW-UP] move FailureSafeParser from catalyst to sql coreWenchen Fan2017-03-254-5/+76
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The `FailureSafeParser` is only used in sql core, it doesn't make sense to put it in catalyst module. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #17408 from cloud-fan/minor.
* [DOCS] Clarify round mode for format_number & round functionsRoxanne Moslehi2017-03-251-5/+5
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Updated the description for the `format_number` description to indicate that it uses `HALF_EVEN` rounding. Updated the description for the `round` description to indicate that it uses `HALF_UP` rounding. ## How was this patch tested? Just changing the two function comments so no testing involved. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Roxanne Moslehi <rmoslehi@palantir.com> Author: roxannemoslehi <rmoslehi@berkeley.edu> Closes #17399 from roxannemoslehi/patch-1.
* Disable generate codegen since it fails my workload.Reynold Xin2017-03-241-1/+1
|
* [SPARK-20070][SQL] Redact DataSourceScanExec treeStringHerman van Hovell2017-03-241-16/+25
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The explain output of `DataSourceScanExec` can contain sensitive information (like Amazon keys). Such information should not end up in logs, or be exposed to non privileged users. This PR addresses this by adding a redaction facility for the `DataSourceScanExec.treeString`. A user can enable this by setting a regex in the `spark.redaction.string.regex` configuration. ## How was this patch tested? Added a unit test to check the output of DataSourceScanExec. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17397 from hvanhovell/SPARK-20070.
* [SPARK-19820][CORE] Add interface to kill tasks w/ a reasonEric Liang2017-03-231-3/+1
| | | | | | | | | | | | | | | | | | | | | | | | This commit adds a killTaskAttempt method to SparkContext, to allow users to kill tasks so that they can be re-scheduled elsewhere. This also refactors the task kill path to allow specifying a reason for the task kill. The reason is propagated opaquely through events, and will show up in the UI automatically as `(N killed: $reason)` and `TaskKilled: $reason`. Without this change, there is no way to provide the user feedback through the UI. Currently used reasons are "stage cancelled", "another attempt succeeded", and "killed via SparkContext.killTask". The user can also specify a custom reason through `SparkContext.killTask`. cc rxin In the stage overview UI the reasons are summarized: ![1](https://cloud.githubusercontent.com/assets/14922/23929209/a83b2862-08e1-11e7-8b3e-ae1967bbe2e5.png) Within the stage UI you can see individual task kill reasons: ![2](https://cloud.githubusercontent.com/assets/14922/23929200/9a798692-08e1-11e7-8697-72b27ad8a287.png) Existing tests, tried killing some stages in the UI and verified the messages are as expected. Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekl@google.com> Closes #17166 from ericl/kill-reason.
* Fix compilation of the Scala 2.10 master branchBurak Yavuz2017-03-232-11/+11
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fixes break caused by: https://github.com/apache/spark/commit/746a558de2136f91f8fe77c6e51256017aa50913 ## How was this patch tested? Compiled with `build/sbt -Dscala2.10 sql/compile` locally Author: Burak Yavuz <brkyvz@gmail.com> Closes #17403 from brkyvz/onceTrigger2.10.
* [SPARK-10849][SQL] Adds option to the JDBC data source write for user to ↵sureshthalamati2017-03-233-8/+64
| | | | | | | | | | | | | | | | | | | | | | | | specify database column type for the create table ## What changes were proposed in this pull request? Currently JDBC data source creates tables in the target database using the default type mapping, and the JDBC dialect mechanism.  If users want to specify different database data type for only some of columns, there is no option available. In scenarios where default mapping does not work, users are forced to create tables on the target database before writing. This workaround is probably not acceptable from a usability point of view. This PR is to provide a user-defined type mapping for specific columns. The solution is to allow users to specify database column data type for the create table as JDBC datasource option(createTableColumnTypes) on write. Data type information can be specified in the same format as table schema DDL format (e.g: `name CHAR(64), comments VARCHAR(1024)`). All supported target database types can not be specified , the data types has to be valid spark sql data types also. For example user can not specify target database CLOB data type. This will be supported in the follow-up PR. Example: ```Scala df.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc(url, "TEST.DBCOLTYPETEST", properties) ``` ## How was this patch tested? Added new test cases to the JDBCWriteSuite Author: sureshthalamati <suresh.thalamati@gmail.com> Closes #16209 from sureshthalamati/jdbc_custom_dbtype_option_json-spark-10849.
* [SPARK-19876][SS][WIP] OneTime Trigger ExecutorTyson Condie2017-03-237-33/+308
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers. In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature. ## How was this patch tested? A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly. In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests: - The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop). - The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log. - A OneTime trigger execution that results in an exception being thrown. marmbrus tdas zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17219 from tcondie/stream-commit.
* [MINOR][BUILD] Fix javadoc8 breakhyukjinkwon2017-03-233-16/+16
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Several javadoc8 breaks have been introduced. This PR proposes fix those instances so that we can build Scala/Java API docs. ``` [error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/GroupState.java:6: error: reference not found [error] * <code>flatMapGroupsWithState</code> operations on {link KeyValueGroupedDataset}. [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/GroupState.java:10: error: reference not found [error] * Both, <code>mapGroupsWithState</code> and <code>flatMapGroupsWithState</code> in {link KeyValueGroupedDataset} [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/GroupState.java:51: error: reference not found [error] * {link GroupStateTimeout.ProcessingTimeTimeout}) or event time (i.e. [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/GroupState.java:52: error: reference not found [error] * {link GroupStateTimeout.EventTimeTimeout}). [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/streaming/GroupState.java:158: error: reference not found [error] * Spark SQL types (see {link Encoder} for more details). [error] ^ [error] .../spark/mllib/target/java/org/apache/spark/ml/fpm/FPGrowthParams.java:26: error: bad use of '>' [error] * Number of partitions (>=1) used by parallel FP-growth. By default the param is not set, and [error] ^ [error] .../spark/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java:30: error: reference not found [error] * {link org.apache.spark.sql.KeyValueGroupedDataset#flatMapGroupsWithState( [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyValueGroupedDataset.java:211: error: reference not found [error] * See {link GroupState} for more details. [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyValueGroupedDataset.java:232: error: reference not found [error] * See {link GroupState} for more details. [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyValueGroupedDataset.java:254: error: reference not found [error] * See {link GroupState} for more details. [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyValueGroupedDataset.java:277: error: reference not found [error] * See {link GroupState} for more details. [error] ^ [error] .../spark/core/target/java/org/apache/spark/TaskContextImpl.java:10: error: reference not found [error] * {link TaskMetrics} &amp; {link MetricsSystem} objects are not thread safe. [error] ^ [error] .../spark/core/target/java/org/apache/spark/TaskContextImpl.java:10: error: reference not found [error] * {link TaskMetrics} &amp; {link MetricsSystem} objects are not thread safe. [error] ^ [info] 13 errors ``` ``` jekyll 3.3.1 | Error: Unidoc generation failed ``` ## How was this patch tested? Manually via `jekyll build` Author: hyukjinkwon <gurwls223@gmail.com> Closes #17389 from HyukjinKwon/minor-javadoc8-fix.
* [SPARK-18579][SQL] Use ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace ↵hyukjinkwon2017-03-234-11/+22
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | options in CSV writing ## What changes were proposed in this pull request? This PR proposes to support _not_ trimming the white spaces when writing out. These are `false` by default in CSV reading path but these are `true` by default in CSV writing in univocity parser. Both `ignoreLeadingWhiteSpace` and `ignoreTrailingWhiteSpace` options are not being used for writing and therefore, we are always trimming the white spaces. It seems we should provide a way to keep this white spaces easily. WIth the data below: ```scala val df = spark.read.csv(Seq("a , b , c").toDS) df.show() ``` ``` +---+----+---+ |_c0| _c1|_c2| +---+----+---+ | a | b | c| +---+----+---+ ``` **Before** ```scala df.write.csv("/tmp/text.csv") spark.read.text("/tmp/text.csv").show() ``` ``` +-----+ |value| +-----+ |a,b,c| +-----+ ``` It seems this can't be worked around via `quoteAll` too. ```scala df.write.option("quoteAll", true).csv("/tmp/text.csv") spark.read.text("/tmp/text.csv").show() ``` ``` +-----------+ | value| +-----------+ |"a","b","c"| +-----------+ ``` **After** ```scala df.write.option("ignoreLeadingWhiteSpace", false).option("ignoreTrailingWhiteSpace", false).csv("/tmp/text.csv") spark.read.text("/tmp/text.csv").show() ``` ``` +----------+ | value| +----------+ |a , b , c| +----------+ ``` Note that this case is possible in R ```r > system("cat text.csv") f1,f2,f3 a , b , c > df <- read.csv(file="text.csv") > df f1 f2 f3 1 a b c > write.csv(df, file="text1.csv", quote=F, row.names=F) > system("cat text1.csv") f1,f2,f3 a , b , c ``` ## How was this patch tested? Unit tests in `CSVSuite` and manual tests for Python. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17310 from HyukjinKwon/SPARK-18579.
* [SPARK-20057][SS] Renamed KeyedState to GroupState in mapGroupsWithStateTathagata Das2017-03-228-84/+85
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Since the state is tied a "group" in the "mapGroupsWithState" operations, its better to call the state "GroupState" instead of a key. This would make it more general if you extends this operation to RelationGroupedDataset and python APIs. ## How was this patch tested? Existing unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17385 from tdas/SPARK-20057.
* [SPARK-19949][SQL][FOLLOW-UP] Clean up parse modes and update related commentshyukjinkwon2017-03-224-17/+6
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to make `mode` options in both CSV and JSON to use `cass object` and fix some related comments related previous fix. Also, this PR modifies some tests related parse modes. ## How was this patch tested? Modified unit tests in both `CSVSuite.scala` and `JsonSuite.scala`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17377 from HyukjinKwon/SPARK-19949.
* [SPARK-20027][DOCS] Compilation fix in java docs.Prashant Sharma2017-03-221-1/+2
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? During build/sbt publish-local, build breaks due to javadocs errors. This patch fixes those errors. ## How was this patch tested? Tested by running the sbt build. Author: Prashant Sharma <prashsh1@in.ibm.com> Closes #17358 from ScrapCodes/docs-fix.
* [SPARK-20023][SQL] Output table comment for DESC FORMATTEDXiao Li2017-03-221-2/+3
| | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? Currently, `DESC FORMATTED` did not output the table comment, unlike what `DESC EXTENDED` does. This PR is to fix it. Also correct the following displayed names in `DESC FORMATTED`, for being consistent with `DESC EXTENDED` - `"Create Time:"` -> `"Created:"` - `"Last Access Time:"` -> `"Last Access:"` ### How was this patch tested? Added test cases in `describe.sql` Author: Xiao Li <gatorsmile@gmail.com> Closes #17381 from gatorsmile/descFormattedTableComment.
* [SPARK-20030][SS] Event-time-based timeout for MapGroupsWithStateTathagata Das2017-03-216-90/+255
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Adding event time based timeout. The user sets the timeout timestamp directly using `KeyedState.setTimeoutTimestamp`. The keys times out when the watermark crosses the timeout timestamp. ## How was this patch tested? Unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17361 from tdas/SPARK-20030.