aboutsummaryrefslogtreecommitdiff
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-20011][ML][DOCS] Clarify documentation for ALS 'rank' parameterchristopher snow2017-03-213-11/+11
| | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? API documentation and collaborative filtering documentation page changes to clarify inconsistent description of ALS rank parameter. - [DOCS] was previously: "rank is the number of latent factors in the model." - [API] was previously: "rank - number of features to use" This change describes rank in both places consistently as: - "Number of features to use (also referred to as the number of latent factors)" Author: Chris Snow <chris.snowuk.ibm.com> Author: christopher snow <chsnow123@gmail.com> Closes #17345 from snowch/SPARK-20011.
* [SPARK-20024][SQL][TEST-MAVEN] SessionCatalog reset need to set the current ↵Xiao Li2017-03-202-2/+1
| | | | | | | | | | | | | | | | database of ExternalCatalog ### What changes were proposed in this pull request? SessionCatalog API setCurrentDatabase does not set the current database of the underlying ExternalCatalog. Thus, weird errors could come in the test suites after we call reset. We need to fix it. So far, have not found the direct impact in the other code paths because we expect all the SessionCatalog APIs should always use the current database value we managed, unless some of code paths skip it. Thus, we fix it in the test-only function reset(). ### How was this patch tested? Multiple test case failures are observed in mvn and add a test case in SessionCatalogSuite. Author: Xiao Li <gatorsmile@gmail.com> Closes #17354 from gatorsmile/useDB.
* [SPARK-19949][SQL] unify bad record handling in CSV and JSONWenchen Fan2017-03-2014-285/+222
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently JSON and CSV have exactly the same logic about handling bad records, this PR tries to abstract it and put it in a upper level to reduce code duplication. The overall idea is, we make the JSON and CSV parser to throw a BadRecordException, then the upper level, FailureSafeParser, handles bad records according to the parse mode. Behavior changes: 1. with PERMISSIVE mode, if the number of tokens doesn't match the schema, previously CSV parser will treat it as a legal record and parse as many tokens as possible. After this PR, we treat it as an illegal record, and put the raw record string in a special column, but we still parse as many tokens as possible. 2. all logging is removed as they are not very useful in practice. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Author: hyukjinkwon <gurwls223@gmail.com> Author: Wenchen Fan <cloud0fan@gmail.com> Closes #17315 from cloud-fan/bad-record2.
* [SPARK-19912][SQL] String literals should be escaped for Hive metastore ↵Dongjoon Hyun2017-03-213-2/+35
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | partition pruning ## What changes were proposed in this pull request? Since current `HiveShim`'s `convertFilters` does not escape the string literals. There exists the following correctness issues. This PR aims to return the correct result and also shows the more clear exception message. **BEFORE** ```scala scala> Seq((1, "p1", "q1"), (2, "p1\" and q=\"q1", "q2")).toDF("a", "p", "q").write.partitionBy("p", "q").saveAsTable("t1") scala> spark.table("t1").filter($"p" === "p1\" and q=\"q1").select($"a").show +---+ | a| +---+ +---+ scala> spark.table("t1").filter($"p" === "'\"").select($"a").show java.lang.RuntimeException: Caught Hive MetaException attempting to get partition metadata by filter from ... ``` **AFTER** ```scala scala> spark.table("t1").filter($"p" === "p1\" and q=\"q1").select($"a").show +---+ | a| +---+ | 2| +---+ scala> spark.table("t1").filter($"p" === "'\"").select($"a").show java.lang.UnsupportedOperationException: Partition filter cannot have both `"` and `'` characters ``` ## How was this patch tested? Pass the Jenkins test with new test cases. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #17266 from dongjoon-hyun/SPARK-19912.
* [SPARK-17204][CORE] Fix replicated off heap storageMichael Allman2017-03-215-25/+105
| | | | | | | | | | | | | | | | (Jira: https://issues.apache.org/jira/browse/SPARK-17204) ## What changes were proposed in this pull request? There are a couple of bugs in the `BlockManager` with respect to support for replicated off-heap storage. First, the locally-stored off-heap byte buffer is disposed of when it is replicated. It should not be. Second, the replica byte buffers are stored as heap byte buffers instead of direct byte buffers even when the storage level memory mode is off-heap. This PR addresses both of these problems. ## How was this patch tested? `BlockManagerReplicationSuite` was enhanced to fill in the coverage gaps. It now fails if either of the bugs in this PR exist. Author: Michael Allman <michael@videoamp.com> Closes #16499 from mallman/spark-17204-replicated_off_heap_storage.
* [SPARK-19980][SQL] Add NULL checks in Bean serializerTakeshi Yamamuro2017-03-212-2/+28
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? A Bean serializer in `ExpressionEncoder` could change values when Beans having NULL. A concrete example is as follows; ``` scala> :paste class Outer extends Serializable { private var cls: Inner = _ def setCls(c: Inner): Unit = cls = c def getCls(): Inner = cls } class Inner extends Serializable { private var str: String = _ def setStr(s: String): Unit = str = str def getStr(): String = str } scala> Seq("""{"cls":null}""", """{"cls": {"str":null}}""").toDF().write.text("data") scala> val encoder = Encoders.bean(classOf[Outer]) scala> val schema = encoder.schema scala> val df = spark.read.schema(schema).json("data").as[Outer](encoder) scala> df.show +------+ | cls| +------+ |[null]| | null| +------+ scala> df.map(x => x)(encoder).show() +------+ | cls| +------+ |[null]| |[null]| // <-- Value changed +------+ ``` This is because the Bean serializer does not have the NULL-check expressions that the serializer of Scala's product types has. Actually, this value change does not happen in Scala's product types; ``` scala> :paste case class Outer(cls: Inner) case class Inner(str: String) scala> val encoder = Encoders.product[Outer] scala> val schema = encoder.schema scala> val df = spark.read.schema(schema).json("data").as[Outer](encoder) scala> df.show +------+ | cls| +------+ |[null]| | null| +------+ scala> df.map(x => x)(encoder).show() +------+ | cls| +------+ |[null]| | null| +------+ ``` This pr added the NULL-check expressions in Bean serializer along with the serializer of Scala's product types. ## How was this patch tested? Added tests in `JavaDatasetSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17347 from maropu/SPARK-19980.
* [SPARK-20010][SQL] Sort information is lost after sort merge joinwangzhenhua2017-03-219-18/+81
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? After sort merge join for inner join, now we only keep left key ordering. However, after inner join, right key has the same value and order as left key. So if we need another smj on right key, we will unnecessarily add a sort which causes additional cost. As a more complicated example, A join B on A.key = B.key join C on B.key = C.key join D on A.key = D.key. We will unnecessarily add a sort on B.key when join {A, B} and C, and add a sort on A.key when join {A, B, C} and D. To fix this, we need to propagate all sorted information (equivalent expressions) from bottom up through `outputOrdering` and `SortOrder`. ## How was this patch tested? Test cases are added. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17339 from wzhfy/sortEnhance.
* [SPARK-19573][SQL] Make NaN/null handling consistent in approxQuantileZheng RuiFeng2017-03-206-54/+95
| | | | | | | | | | | | ## What changes were proposed in this pull request? update `StatFunctions.multipleApproxQuantiles` to handle NaN/null ## How was this patch tested? existing tests and added tests Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #16971 from zhengruifeng/quantiles_nan.
* [SPARK-19906][SS][DOCS] Documentation describing how to write queries to KafkaTyson Condie2017-03-201-57/+264
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Add documentation that describes how to write streaming and batch queries to Kafka. zsxwing tdas Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Closes #17246 from tcondie/kafka-write-docs.
* [SPARK-19899][ML] Replace featuresCol with itemsCol in ml.fpm.FPGrowthzero3232017-03-202-18/+31
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Replaces `featuresCol` `Param` with `itemsCol`. See [SPARK-19899](https://issues.apache.org/jira/browse/SPARK-19899). ## How was this patch tested? Manual tests. Existing unit tests. Author: zero323 <zero323@users.noreply.github.com> Closes #17321 from zero323/SPARK-19899.
* [SPARK-19970][SQL] Table owner should be USER instead of PRINCIPAL in ↵Dongjoon Hyun2017-03-201-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | kerberized clusters ## What changes were proposed in this pull request? In the kerberized hadoop cluster, when Spark creates tables, the owner of tables are filled with PRINCIPAL strings instead of USER names. This is inconsistent with Hive and causes problems when using [ROLE](https://cwiki.apache.org/confluence/display/Hive/SQL+Standard+Based+Hive+Authorization) in Hive. We had better to fix this. **BEFORE** ```scala scala> sql("create table t(a int)").show scala> sql("desc formatted t").show(false) ... |Owner: |sparkEXAMPLE.COM | | ``` **AFTER** ```scala scala> sql("create table t(a int)").show scala> sql("desc formatted t").show(false) ... |Owner: |spark | | ``` ## How was this patch tested? Manually do `create table` and `desc formatted` because this happens in Kerberized clusters. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #17311 from dongjoon-hyun/SPARK-19970.
* [SPARK-19990][SQL][TEST-MAVEN] create a temp file for file in test.jar's ↵windpiger2017-03-202-14/+36
| | | | | | | | | | | | | | | | | | | | | | resource when run mvn test accross different modules ## What changes were proposed in this pull request? After we have merged the `HiveDDLSuite` and `DDLSuite` in [SPARK-19235](https://issues.apache.org/jira/browse/SPARK-19235), we have two subclasses of `DDLSuite`, that is `HiveCatalogedDDLSuite` and `InMemoryCatalogDDLSuite`. While `DDLSuite` is in `sql/core module`, and `HiveCatalogedDDLSuite` is in `sql/hive module`, if we mvn test `HiveCatalogedDDLSuite`, it will run the test in its parent class `DDLSuite`, this will cause some test case failed which will get and use the test file path in `sql/core module` 's `resource`. Because the test file path getted will start with 'jar:' like "jar:file:/home/jenkins/workspace/spark-master-test-maven-hadoop-2.6/sql/core/target/spark-sql_2.11-2.2.0-SNAPSHOT-tests.jar!/test-data/cars.csv", which will failed when new Path() in datasource.scala This PR fix this by copy file from resource to a temp dir. ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #17338 from windpiger/fixtestfailemvn.
* [SPARK-17791][SQL] Join reordering using star schema detectionIoana Delaney2017-03-2010-36/+978
| | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Star schema consists of one or more fact tables referencing a number of dimension tables. In general, queries against star schema are expected to run fast because of the established RI constraints among the tables. This design proposes a join reordering based on natural, generally accepted heuristics for star schema queries: - Finds the star join with the largest fact table and places it on the driving arm of the left-deep join. This plan avoids large tables on the inner, and thus favors hash joins. - Applies the most selective dimensions early in the plan to reduce the amount of data flow. The design document was included in SPARK-17791. Link to the google doc: [StarSchemaDetection](https://docs.google.com/document/d/1UAfwbm_A6wo7goHlVZfYK99pqDMEZUumi7pubJXETEA/edit?usp=sharing) ## How was this patch tested? A new test suite StarJoinSuite.scala was implemented. Author: Ioana Delaney <ioanamdelaney@gmail.com> Closes #15363 from ioana-delaney/starJoinReord2.
* [SPARK-20020][SPARKR][FOLLOWUP] DataFrame checkpoint API fix version tagFelix Cheung2017-03-191-1/+1
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? doc only change ## How was this patch tested? manual Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #17356 from felixcheung/rdfcheckpoint2.
* [SPARK-19994][SQL] Wrong outputOrdering for right/full outer smjwangzhenhua2017-03-202-99/+146
| | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? For right outer join, values of the left key will be filled with nulls if it can't match the value of the right key, so `nullOrdering` of the left key can't be guaranteed. We should output right key order instead of left key order. For full outer join, neither left key nor right key guarantees `nullOrdering`. We should not output any ordering. In tests, besides adding three test cases for left/right/full outer sort merge join, this patch also reorganizes code in `PlannerSuite` by putting together tests for `Sort`, and also extracts common logic in Sort tests into a method. ## How was this patch tested? Corresponding test cases are added. Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #17331 from wzhfy/wrongOrdering.
* [SPARK-20020][SPARKR] DataFrame checkpoint APIFelix Cheung2017-03-197-5/+70
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Add checkpoint, setCheckpointDir API to R ## How was this patch tested? unit tests, manual tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #17351 from felixcheung/rdfcheckpoint.
* [SPARK-19849][SQL] Support ArrayType in to_json to produce JSON arrayhyukjinkwon2017-03-1911-132/+236
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to support an array of struct type in `to_json` as below: ```scala import org.apache.spark.sql.functions._ val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a") df.select(to_json($"a").as("json")).show() ``` ``` +----------+ | json| +----------+ |[{"_1":1}]| +----------+ ``` Currently, it throws an exception as below (a newline manually inserted for readability): ``` org.apache.spark.sql.AnalysisException: cannot resolve 'structtojson(`array`)' due to data type mismatch: structtojson requires that the expression is a struct expression.;; ``` This allows the roundtrip with `from_json` as below: ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val df = Seq("""[{"a":1}, {"a":2}]""").toDF("json").select(from_json($"json", schema).as("array")) df.show() // Read back. df.select(to_json($"array").as("json")).show() ``` ``` +----------+ | array| +----------+ |[[1], [2]]| +----------+ +-----------------+ | json| +-----------------+ |[{"a":1},{"a":2}]| +-----------------+ ``` Also, this PR proposes to rename from `StructToJson` to `StructsToJson ` and `JsonToStruct` to `JsonToStructs`. ## How was this patch tested? Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite` for Scala, doctest for Python and test in `test_sparkSQL.R` for R. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17192 from HyukjinKwon/SPARK-19849.
* [SPARK-19067][SS] Processing-time-based timeout in MapGroupsWithStateTathagata Das2017-03-1922-429/+1353
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? When a key does not get any new data in `mapGroupsWithState`, the mapping function is never called on it. So we need a timeout feature that calls the function again in such cases, so that the user can decide whether to continue waiting or clean up (remove state, save stuff externally, etc.). Timeouts can be either based on processing time or event time. This JIRA is for processing time, but defines the high level API design for both. The usage would look like this. ``` def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = { ... state.setTimeoutDuration(10000) ... } dataset // type is Dataset[T] .groupByKey[K](keyingFunc) // generates KeyValueGroupedDataset[K, T] .mapGroupsWithState[S, U]( func = stateFunction, timeout = KeyedStateTimeout.withProcessingTime) // returns Dataset[U] ``` Note the following design aspects. - The timeout type is provided as a param in mapGroupsWithState as a parameter global to all the keys. This is so that the planner knows this at planning time, and accordingly optimize the execution based on whether to saves extra info in state or not (e.g. timeout durations or timestamps). - The exact timeout duration is provided inside the function call so that it can be customized on a per key basis. - When the timeout occurs for a key, the function is called with no values, and KeyedState.isTimingOut() set to true. - The timeout is reset for key every time the function is called on the key, that is, when the key has new data, or the key has timed out. So the user has to set the timeout duration everytime the function is called, otherwise there will not be any timeout set. Guarantees provided on timeout of key, when timeout duration is D ms: - Timeout will never be called before real clock time has advanced by D ms - Timeout will be called eventually when there is a trigger with any data in it (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur. For example, if there is no data in the stream (for any key) for a while, then the timeout will not be hit. Implementation details: - Added new param to `mapGroupsWithState` for timeout - Added new method to `StateStore` to filter data based on timeout timestamp - Changed the internal map type of `HDFSBackedStateStore` from Java's `HashMap` to `ConcurrentHashMap` as the latter allows weakly-consistent fail-safe iterators on the map data. See comments in code for more details. - Refactored logic of `MapGroupsWithStateExec` to - Save timeout info to state store for each key that has data. - Then, filter states that should be timed out based on the current batch processing timestamp. - Moved KeyedState for `o.a.s.sql` to `o.a.s.sql.streaming`. I remember that this was a feedback in the MapGroupsWithState PR that I had forgotten to address. ## How was this patch tested? New unit tests in - MapGroupsWithStateSuite for timeouts. - StateStoreSuite for new APIs in StateStore. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17179 from tdas/mapgroupwithstate-timeout.
* [SPARK-19990][TEST] Use the database after Hive's current Database is droppedXiao Li2017-03-191-7/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? This PR is to fix the following test failure in maven and the PR https://github.com/apache/spark/pull/15363. > org.apache.spark.sql.hive.orc.OrcSourceSuite SPARK-19459/SPARK-18220: read char/varchar column written by Hive The[ test history](https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.hive.orc.OrcSourceSuite&test_name=SPARK-19459%2FSPARK-18220%3A+read+char%2Fvarchar+column+written+by+Hive) shows all the maven builds failed this test case with the same error message. ``` FAILED: SemanticException [Error 10072]: Database does not exist: db2 org.apache.spark.sql.execution.QueryExecutionException: FAILED: SemanticException [Error 10072]: Database does not exist: db2 at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:637) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:621) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:288) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:229) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:228) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:271) at org.apache.spark.sql.hive.client.HiveClientImpl.runHive(HiveClientImpl.scala:621) at org.apache.spark.sql.hive.client.HiveClientImpl.runSqlHive(HiveClientImpl.scala:611) at org.apache.spark.sql.hive.orc.OrcSuite$$anonfun$7.apply$mcV$sp(OrcSourceSuite.scala:160) at org.apache.spark.sql.hive.orc.OrcSuite$$anonfun$7.apply(OrcSourceSuite.scala:155) at org.apache.spark.sql.hive.orc.OrcSuite$$anonfun$7.apply(OrcSourceSuite.scala:155) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) ``` ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #17344 from gatorsmile/testtest.
* [SPARK-18817][SPARKR][SQL] change derby log output to temp dirFelix Cheung2017-03-194-1/+63
| | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Passes R `tempdir()` (this is the R session temp dir, shared with other temp files/dirs) to JVM, set System.Property for derby home dir to move derby.log ## How was this patch tested? Manually, unit tests With this, these are relocated to under /tmp ``` # ls /tmp/RtmpG2M0cB/ derby.log ``` And they are removed automatically when the R session is ended. Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16330 from felixcheung/rderby.
* [MINOR][R] Reorder `Collate` fields in DESCRIPTION filehyukjinkwon2017-03-191-1/+1
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? It seems cran check scripts corrects `R/pkg/DESCRIPTION` and follows the order in `Collate` fields. This PR proposes to fix this so that running this script does not show up a diff in this file. ## How was this patch tested? Manually via `./R/check-cran.sh`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17349 from HyukjinKwon/minor-cran.
* [SPARK-19654][SPARKR][SS] Structured Streaming API for RFelix Cheung2017-03-188-5/+573
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Add "experimental" API for SS in R ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16982 from felixcheung/rss.
* [SPARK-16599][CORE] java.util.NoSuchElementException: None.get at at ↵Sean Owen2017-03-181-2/+2
| | | | | | | | | | | | | | | | | org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask ## What changes were proposed in this pull request? Avoid None.get exception in (rare?) case that no readLocks exist Note that while this would resolve the immediate cause of the exception, it's not clear it is the root problem. ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #17290 from srowen/SPARK-16599.
* [SPARK-19896][SQL] Throw an exception if case classes have circular ↵Takeshi Yamamuro2017-03-182-7/+37
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | references in toDS ## What changes were proposed in this pull request? If case classes have circular references below, it throws StackOverflowError; ``` scala> :pasge case class classA(i: Int, cls: classB) case class classB(cls: classA) scala> Seq(classA(0, null)).toDS() java.lang.StackOverflowError at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1494) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anon$1.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(JavaMirrors.scala:66) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anon$1.gilSynchronizedIfNotThreadsafe(JavaMirrors.scala:66) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anon$1.info(JavaMirrors.scala:66) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45) ``` This pr added code to throw UnsupportedOperationException in that case as follows; ``` scala> :paste case class A(cls: B) case class B(cls: A) scala> Seq(A(null)).toDS() java.lang.UnsupportedOperationException: cannot have circular references in class, but got the circular reference of class B at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:627) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:644) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:632) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) ``` ## How was this patch tested? Added tests in `DatasetSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17318 from maropu/SPARK-19896.
* [SPARK-19915][SQL] Exclude cartesian product candidates to reduce the search ↵wangzhenhua2017-03-183-100/+143
| | | | | | | | | | | | | | | | | | | | space ## What changes were proposed in this pull request? We have some concerns about removing size in the cost model [in the previous pr](https://github.com/apache/spark/pull/17240). It's a tradeoff between code structure and algorithm completeness. I tend to keep the size and thus create this new pr without changing cost model. What this pr does: 1. We only consider consecutive inner joinable items, thus excluding cartesian products in reordering procedure. This significantly reduces the search space and memory overhead of memo. Otherwise every combination of items will exist in the memo. 2. This pr also includes a bug fix: if a leaf item is a project(_, child), current solution will miss the project. ## How was this patch tested? Added test cases. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17286 from wzhfy/joinReorder3.
* [SQL][MINOR] Fix scaladoc for UDFRegistrationJacek Laskowski2017-03-171-1/+5
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fix scaladoc for UDFRegistration ## How was this patch tested? local build Author: Jacek Laskowski <jacek@japila.pl> Closes #17337 from jaceklaskowski/udfregistration-scaladoc.
* [SPARK-19873][SS] Record num shuffle partitions in offset log and enforce in ↵Kunal Khamar2017-03-1730-47/+207
| | | | | | | | | | | | | | | | | | | | | | | | next batch. ## What changes were proposed in this pull request? If the user changes the shuffle partition number between batches, Streaming aggregation will fail. Here are some possible cases: - Change "spark.sql.shuffle.partitions" - Use "repartition" and change the partition number in codes - RangePartitioner doesn't generate deterministic partitions. Right now it's safe as we disallow sort before aggregation. Not sure if we will add some operators using RangePartitioner in future. ## How was this patch tested? - Unit tests - Manual tests - forward compatibility tested by using the new `OffsetSeqMetadata` json with Spark v2.1.0 Author: Kunal Khamar <kkhamar@outlook.com> Closes #17216 from kunalkhamar/num-partitions.
* [SPARK-19967][SQL] Add from_json in FunctionRegistryTakeshi Yamamuro2017-03-175-4/+189
| | | | | | | | | | | | ## What changes were proposed in this pull request? This pr added entries in `FunctionRegistry` and supported `from_json` in SQL. ## How was this patch tested? Added tests in `JsonFunctionsSuite` and `SQLQueryTestSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17320 from maropu/SPARK-19967.
* [SPARK-18847][GRAPHX] PageRank gives incorrect results for graphs with sinksAndrew Ray2017-03-172-59/+144
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Graphs with sinks (vertices with no outgoing edges) don't have the expected rank sum of n (or 1 for personalized). We fix this by normalizing to the expected sum at the end of each implementation. Additionally this fixes the dynamic version of personal pagerank which gave incorrect answers that were not detected by existing unit tests. ## How was this patch tested? Revamped existing and additional unit tests with reference values (and reproduction code) from igraph and NetworkX. Note that for comparison on personal pagerank we use the arpack algorithm in igraph as prpack (the current default) redistributes rank to all vertices uniformly instead of just to the personalization source. We could take the alternate convention (redistribute rank to all vertices uniformly) but that would involve more extensive changes to the algorithms (the dynamic version would no longer be able to use Pregel). Author: Andrew Ray <ray.andrew@gmail.com> Closes #16483 from aray/pagerank-sink2.
* [SPARK-19986][TESTS] Make pyspark.streaming.tests.CheckpointTests more stableShixiong Zhu2017-03-171-5/+6
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Sometimes, CheckpointTests will hang on a busy machine because the streaming jobs are too slow and cannot catch up. I observed the scheduled delay was keeping increasing for dozens of seconds locally. This PR increases the batch interval from 0.5 seconds to 2 seconds to generate less Spark jobs. It should make `pyspark.streaming.tests.CheckpointTests` more stable. I also replaced `sleep` with `awaitTerminationOrTimeout` so that if the streaming job fails, it will also fail the test. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #17323 from zsxwing/SPARK-19986.
* [SPARK-13369] Add config for number of consecutive fetch failuresSital Kedia2017-03-174-27/+27
| | | | | | | | | | The previously hardcoded max 4 retries per stage is not suitable for all cluster configurations. Since spark retries a stage at the sign of the first fetch failure, you can easily end up with many stage retries to discover all the failures. In particular, two scenarios this value should change are (1) if there are more than 4 executors per node; in that case, it may take 4 retries to discover the problem with each executor on the node and (2) during cluster maintenance on large clusters, where multiple machines are serviced at once, but you also cannot afford total cluster downtime. By making this value configurable, cluster managers can tune this value to something more appropriate to their cluster configuration. Unit tests Author: Sital Kedia <skedia@fb.com> Closes #17307 from sitalkedia/SPARK-13369.
* [SPARK-19882][SQL] Pivot with null as a distinct pivot value throws NPEAndrew Ray2017-03-173-10/+24
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Allows null values of the pivot column to be included in the pivot values list without throwing NPE Note this PR was made as an alternative to #17224 but preserves the two phase aggregate operation that is needed for good performance. ## How was this patch tested? Additional unit test Author: Andrew Ray <ray.andrew@gmail.com> Closes #17226 from aray/pivot-null.
* [SPARK-19987][SQL] Pass all filters into FileIndexReynold Xin2017-03-167-27/+35
| | | | | | | | | | | | ## What changes were proposed in this pull request? This is a tiny teeny refactoring to pass data filters also to the FileIndex, so FileIndex can have a more global view on predicates. ## How was this patch tested? Change should be covered by existing test cases. Author: Reynold Xin <rxin@databricks.com> Closes #17322 from rxin/SPARK-19987.
* [SPARK-19635][ML] DataFrame-based API for chi square testJoseph K. Bradley2017-03-164-6/+192
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Wrapper taking and return a DataFrame ## How was this patch tested? Copied unit tests from RDD-based API Author: Joseph K. Bradley <joseph@databricks.com> Closes #17110 from jkbradley/df-hypotests.
* [SPARK-19721][SS] Good error message for version mismatch in log filesLiwei Lin2017-03-1611-35/+143
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## Problem There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message. ## What changes were proposed in this pull request? This patch made two major changes: 1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking): ``` HDFSMetadataLog - CompactibleFileStreamLog ------------> fixed with this patch - FileStreamSourceLog ---------------> inherited the fix of `CompactibleFileStreamLog` - FileStreamSinkLog -----------------> inherited the fix of `CompactibleFileStreamLog` - OffsetSeqLog ------------------------> fixed with this patch - anonymous subclass in KafkaSource ---> fixed with this patch ``` 2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"` - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"` ## Exception message with this patch ``` java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade. at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) ``` ## How was this patch tested? unit tests Author: Liwei Lin <lwlin7@gmail.com> Closes #17070 from lw-lin/better-msg.
* [SPARK-19945][SQL] add test suite for SessionCatalog with HiveExternalCatalogwindpiger2017-03-163-900/+1049
| | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently `SessionCatalogSuite` is only for `InMemoryCatalog`, there is no suite for `HiveExternalCatalog`. And there are some ddl function is not proper to test in `ExternalCatalogSuite`, because some logic are not full implement in `ExternalCatalog`, these ddl functions are full implement in `SessionCatalog`(e.g. merge the same logic from `ExternalCatalog` up to `SessionCatalog` ). It is better to test it in `SessionCatalogSuite` for this situation. So we should add a test suite for `SessionCatalog` with `HiveExternalCatalog` The main change is that in `SessionCatalogSuite` add two functions: `withBasicCatalog` and `withEmptyCatalog` And replace the code like `val catalog = new SessionCatalog(newBasicCatalog)` with above two functions ## How was this patch tested? add `HiveExternalSessionCatalogSuite` Author: windpiger <songjun@outlook.com> Closes #17287 from windpiger/sessioncatalogsuit.
* [SPARK-19946][TESTING] DebugFilesystem.assertNoOpenStreams should report the ↵Bogdan Raducanu2017-03-162-2/+21
| | | | | | | | | | | | | | | open streams to help debugging ## What changes were proposed in this pull request? DebugFilesystem.assertNoOpenStreams throws an exception with a cause exception that actually shows the code line which leaked the stream. ## How was this patch tested? New test in SparkContextSuite to check there is a cause exception. Author: Bogdan Raducanu <bogdan@databricks.com> Closes #17292 from bogdanrdc/SPARK-19946.
* [SPARK-13568][ML] Create feature transformer to impute missing valuesYuhao Yang2017-03-162-0/+444
| | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-13568 It is quite common to encounter missing values in data sets. It would be useful to implement a Transformer that can impute missing data points, similar to e.g. Imputer in scikit-learn. Initially, options for imputation could include mean, median and most frequent, but we could add various other approaches, where possible existing DataFrame code can be used (e.g. for approximate quantiles etc). Currently this PR supports imputation for Double and Vector (null and NaN in Vector). ## How was this patch tested? new unit tests and manual test Author: Yuhao Yang <hhbyyh@gmail.com> Author: Yuhao Yang <yuhao.yang@intel.com> Author: Yuhao <yuhao.yang@intel.com> Closes #11601 from hhbyyh/imputer.
* [SPARK-19830][SQL] Add parseTableSchema API to ParserInterfaceXiao Li2017-03-163-1/+104
| | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? Specifying the table schema in DDL formats is needed for different scenarios. For example, - [specifying the schema in SQL function `from_json` using DDL formats](https://issues.apache.org/jira/browse/SPARK-19637), which is suggested by marmbrus , - [specifying the customized JDBC data types](https://github.com/apache/spark/pull/16209). These two PRs need users to use the JSON format to specify the table schema. This is not user friendly. This PR is to provide a `parseTableSchema` API in `ParserInterface`. ### How was this patch tested? Added a test suite `TableSchemaParserSuite` Author: Xiao Li <gatorsmile@gmail.com> Closes #17171 from gatorsmile/parseDDLStmt.
* [SPARK-19751][SQL] Throw an exception if bean class has one's own class in ↵Takeshi Yamamuro2017-03-163-6/+132
| | | | | | | | | | | | | | | | | | | | | fields ## What changes were proposed in this pull request? The current master throws `StackOverflowError` in `createDataFrame`/`createDataset` if bean has one's own class in fields; ``` public class SelfClassInFieldBean implements Serializable { private SelfClassInFieldBean child; ... } ``` This pr added code to throw `UnsupportedOperationException` in that case as soon as possible. ## How was this patch tested? Added tests in `JavaDataFrameSuite` and `JavaDatasetSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17188 from maropu/SPARK-19751.
* [SPARK-19961][SQL][MINOR] unify a erro msg when drop databse for ↵windpiger2017-03-162-7/+3
| | | | | | | | | | | | | | HiveExternalCatalog and InMemoryCatalog ## What changes were proposed in this pull request? unify a exception erro msg for dropdatabase when the database still have some tables for HiveExternalCatalog and InMemoryCatalog ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #17305 from windpiger/unifyErromsg.
* [SPARK-19948] Document that saveAsTable uses catalog as source of truth for ↵Juliusz Sompolski2017-03-161-0/+5
| | | | | | | | | | | | table existence. It is quirky behaviour that saveAsTable to e.g. a JDBC source with SaveMode other than Overwrite will nevertheless overwrite the table in the external source, if that table was not a catalog table. Author: Juliusz Sompolski <julek@databricks.com> Closes #17289 from juliuszsompolski/saveAsTableDoc.
* [SPARK-19931][SQL] InMemoryTableScanExec should rewrite output partitioning ↵Liang-Chi Hsieh2017-03-162-3/+44
| | | | | | | | | | | | | | | | | | | | and ordering when aliasing output attributes ## What changes were proposed in this pull request? Now `InMemoryTableScanExec` simply takes the `outputPartitioning` and `outputOrdering` from the associated `InMemoryRelation`'s `child.outputPartitioning` and `outputOrdering`. However, `InMemoryTableScanExec` can alias the output attributes. In this case, its `outputPartitioning` and `outputOrdering` are not correct and its parent operators can't correctly determine its data distribution. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17175 from viirya/ensure-no-unnecessary-shuffle.
* [SPARK-18066][CORE][TESTS] Add Pool usage policies test coverage for FIFO & ↵erenavsarogullari2017-03-152-8/+96
| | | | | | | | | | | | | | | | | | | | | FAIR Schedulers ## What changes were proposed in this pull request? The following FIFO & FAIR Schedulers Pool usage cases need to have unit test coverage : - FIFO Scheduler just uses **root pool** so even if `spark.scheduler.pool` property is set, related pool is not created and `TaskSetManagers` are added to **root pool**. - FAIR Scheduler uses `default pool` when `spark.scheduler.pool` property is not set. This can be happened when - `Properties` object is **null**, - `Properties` object is **empty**(`new Properties()`), - **default pool** is set(`spark.scheduler.pool=default`). - FAIR Scheduler creates a **new pool** with **default values** when `spark.scheduler.pool` property points a **non-existent** pool. This can be happened when **scheduler allocation file** is not set or it does not contain related pool. ## How was this patch tested? New Unit tests are added. Author: erenavsarogullari <erenavsarogullari@gmail.com> Closes #15604 from erenavsarogullari/SPARK-18066.
* [MINOR][CORE] Fix a info message of `prunePartitions`Dongjoon Hyun2017-03-151-1/+2
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `PrunedInMemoryFileIndex.prunePartitions` shows `pruned NaN% partitions` for the following case. ```scala scala> Seq.empty[(String, String)].toDF("a", "p").write.partitionBy("p").saveAsTable("t1") scala> sc.setLogLevel("INFO") scala> spark.table("t1").filter($"p" === "1").select($"a").show ... 17/03/13 00:33:04 INFO PrunedInMemoryFileIndex: Selected 0 partitions out of 0, pruned NaN% partitions. ``` After this PR, the message looks like this. ```scala 17/03/15 10:39:48 INFO PrunedInMemoryFileIndex: Selected 0 partitions out of 0, pruned 0 partitions. ``` ## How was this patch tested? Pass the Jenkins with the existing tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #17273 from dongjoon-hyun/SPARK-EMPTY-PARTITION.
* [SPARK-19960][CORE] Move `SparkHadoopWriter` to `internal/io/`jiangxingbo2017-03-155-64/+99
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR introduces the following changes: 1. Move `SparkHadoopWriter` to `core/internal/io/`, so that it's in the same directory with `SparkHadoopMapReduceWriter`; 2. Move `SparkHadoopWriterUtils` to a separated file. After this PR is merged, we may consolidate `SparkHadoopWriter` and `SparkHadoopMapReduceWriter`, and make the new commit protocol support the old `mapred` package's committer; ## How was this patch tested? Tested by existing test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17304 from jiangxb1987/writer.
* [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change ↵Tejas Patil2017-03-1511-292/+1187
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | CartesianProductExec, SortMergeJoin, WindowExec to use it ## What issue does this PR address ? Jira: https://issues.apache.org/jira/browse/SPARK-13450 In `SortMergeJoinExec`, rows of the right relation having the same value for a join key are buffered in-memory. In case of skew, this causes OOMs (see comments in SPARK-13450 for more details). Heap dump from a failed job confirms this : https://issues.apache.org/jira/secure/attachment/12846382/heap-dump-analysis.png . While its possible to increase the heap size to workaround, Spark should be resilient to such issues as skews can happen arbitrarily. ## Change proposed in this pull request - Introduces `ExternalAppendOnlyUnsafeRowArray` - It holds `UnsafeRow`s in-memory upto a certain threshold. - After the threshold is hit, it switches to `UnsafeExternalSorter` which enables spilling of the rows to disk. It does NOT sort the data. - Allows iterating the array multiple times. However, any alteration to the array (using `add` or `clear`) will invalidate the existing iterator(s) - `WindowExec` was already using `UnsafeExternalSorter` to support spilling. Changed it to use the new array - Changed `SortMergeJoinExec` to use the new array implementation - NOTE: I have not changed FULL OUTER JOIN to use this new array implementation. Changing that will need more surgery and I will rather put up a separate PR for that once this gets in. - Changed `CartesianProductExec` to use the new array implementation #### Note for reviewers The diff can be divided into 3 parts. My motive behind having all the changes in a single PR was to demonstrate that the API is sane and supports 2 use cases. If reviewing as 3 separate PRs would help, I am happy to make the split. ## How was this patch tested ? #### Unit testing - Added unit tests `ExternalAppendOnlyUnsafeRowArray` to validate all its APIs and access patterns - Added unit test for `SortMergeExec` - with and without spill for inner join, left outer join, right outer join to confirm that the spill threshold config behaves as expected and output is as expected. - This PR touches the scanning logic in `SortMergeExec` for _all_ joins (except FULL OUTER JOIN). However, I expect existing test cases to cover that there is no regression in correctness. - Added unit test for `WindowExec` to check behavior of spilling and correctness of results. #### Stress testing - Confirmed that OOM is gone by running against a production job which used to OOM - Since I cannot share details about prod workload externally, created synthetic data to mimic the issue. Ran before and after the fix to demonstrate the issue and query success with this PR Generating the synthetic data ``` ./bin/spark-shell --driver-memory=6G import org.apache.spark.sql._ val hc = SparkSession.builder.master("local").getOrCreate() hc.sql("DROP TABLE IF EXISTS spark_13450_large_table").collect hc.sql("DROP TABLE IF EXISTS spark_13450_one_row_table").collect val df1 = (0 until 1).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2") df1.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_one_row_table") val df2 = (0 until 3000000).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2") df2.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_large_table") ``` Ran this against trunk VS local build with this PR. OOM repros with trunk and with the fix this query runs fine. ``` ./bin/spark-shell --driver-java-options="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/spark.driver.heapdump.hprof" import org.apache.spark.sql._ val hc = SparkSession.builder.master("local").getOrCreate() hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1") hc.sql("SET spark.sql.sortMergeJoinExec.buffer.spill.threshold=10000") hc.sql("DROP TABLE IF EXISTS spark_13450_result").collect hc.sql(""" CREATE TABLE spark_13450_result AS SELECT a.i AS a_i, a.j AS a_j, a.str1 AS a_str1, a.str2 AS a_str2, b.i AS b_i, b.j AS b_j, b.str1 AS b_str1, b.str2 AS b_str2 FROM spark_13450_one_row_table a JOIN spark_13450_large_table b ON a.i=b.i AND a.j=b.j """) ``` ## Performance comparison ### Macro-benchmark I ran a SMB join query over two real world tables (2 trillion rows (40 TB) and 6 million rows (120 GB)). Note that this dataset does not have skew so no spill happened. I saw improvement in CPU time by 2-4% over version without this PR. This did not add up as I was expected some regression. I think allocating array of capacity of 128 at the start (instead of starting with default size 16) is the sole reason for the perf. gain : https://github.com/tejasapatil/spark/blob/SPARK-13450_smb_buffer_oom/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala#L43 . I could remove that and rerun, but effectively the change will be deployed in this form and I wanted to see the effect of it over large workload. ### Micro-benchmark Two types of benchmarking can be found in `ExternalAppendOnlyUnsafeRowArrayBenchmark`: [A] Comparing `ExternalAppendOnlyUnsafeRowArray` against raw `ArrayBuffer` when all rows fit in-memory and there is no spill ``` Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ ArrayBuffer 7821 / 7941 33.5 29.8 1.0X ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ ArrayBuffer 19200 / 19206 25.6 39.1 1.0X ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ ArrayBuffer 5949 / 6028 17.2 58.1 1.0X ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X ``` [B] Comparing `ExternalAppendOnlyUnsafeRowArray` against raw `UnsafeExternalSorter` when there is spilling of data ``` Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X ``` Author: Tejas Patil <tejasp@fb.com> Closes #16909 from tejasapatil/SPARK-13450_smb_buffer_oom.
* [SPARK-19872] [PYTHON] Use the correct deserializer for RDD construction for ↵hyukjinkwon2017-03-152-1/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | coalesce/repartition ## What changes were proposed in this pull request? This PR proposes to use the correct deserializer, `BatchedSerializer` for RDD construction for coalesce/repartition when the shuffle is enabled. Currently, it is passing `UTF8Deserializer` as is not `BatchedSerializer` from the copied one. with the file, `text.txt` below: ``` a b d e f g h i j k l ``` - Before ```python >>> sc.textFile('text.txt').repartition(1).collect() ``` ``` UTF8Deserializer(True) Traceback (most recent call last): File "<stdin>", line 1, in <module> File ".../spark/python/pyspark/rdd.py", line 811, in collect return list(_load_from_socket(port, self._jrdd_deserializer)) File ".../spark/python/pyspark/serializers.py", line 549, in load_stream yield self.loads(stream) File ".../spark/python/pyspark/serializers.py", line 544, in loads return s.decode("utf-8") if self.use_unicode else s File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode return codecs.utf_8_decode(input, errors, True) UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte ``` - After ```python >>> sc.textFile('text.txt').repartition(1).collect() ``` ``` [u'a', u'b', u'', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l', u''] ``` ## How was this patch tested? Unit test in `python/pyspark/tests.py`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17282 from HyukjinKwon/SPARK-19872.
* [SPARK-19889][SQL] Make TaskContext callbacks thread safeHerman van Hovell2017-03-153-34/+93
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? It is sometimes useful to use multiple threads in a task to parallelize tasks. These threads might register some completion/failure listeners to clean up when the task completes or fails. We currently cannot register such a callback and be sure that it will get called, because the context might be in the process of invoking its callbacks, when the the callback gets registered. This PR improves this by making sure that you cannot add a completion/failure listener from a different thread when the context is being marked as completed/failed in another thread. This is done by synchronizing these methods on the task context itself. Failure listeners were called only once. Completion listeners now follow the same pattern; this lifts the idempotency requirement for completion listeners and makes it easier to implement them. In some cases we can (accidentally) add a completion/failure listener after the fact, these listeners will be called immediately in order make sure we can safely clean-up after a task. As a result of this change we could make the `failure` and `completed` flags non-volatile. The `isCompleted()` method now uses synchronization to ensure that updates are visible across threads. ## How was this patch tested? Adding tests to `TaskContestSuite` to test adding listeners to a completed/failed context. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17244 from hvanhovell/SPARK-19889.
* [SPARK-19877][SQL] Restrict the nested level of a viewjiangxingbo2017-03-144-5/+51
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? We should restrict the nested level of a view, to avoid stack overflow exception during the view resolution. ## How was this patch tested? Add new test case in `SQLViewSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17241 from jiangxb1987/view-depth.