aboutsummaryrefslogtreecommitdiff
path: root/core
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-11484][WEBUI] Using proxyBase set by spark AMSrinivasa Reddy Vundela2015-11-051-8/+4
| | | | | | | | Use the proxyBase set by the AM, if not found then use env. This is to fix the issue if somebody accidentally set APPLICATION_WEB_PROXY_BASE to wrong proxyBase Author: Srinivasa Reddy Vundela <vsr@cloudera.com> Closes #9448 from vundela/master.
* [SPARK-11501][CORE][YARN] Propagate spark.rpc config to executorsNishkam Ravi2015-11-051-0/+1
| | | | | | | | spark.rpc is supposed to be configurable but is not currently (doesn't get propagated to executors because RpcEnv.create is done before driver properties are fetched). Author: Nishkam Ravi <nishkamravi@gmail.com> Closes #9460 from nishkamravi2/master_akka.
* [SPARK-11449][CORE] PortableDataStream should be a factoryHerman van Hovell2015-11-051-29/+16
| | | | | | | | | | | | ```PortableDataStream``` maintains some internal state. This makes it tricky to reuse a stream (one needs to call ```close``` on both the ```PortableDataStream``` and the ```InputStream``` it produces). This PR removes all state from ```PortableDataStream``` and effectively turns it into an ```InputStream```/```Array[Byte]``` factory. This makes the user responsible for managing the ```InputStream``` it returns. cc srowen Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #9417 from hvanhovell/SPARK-11449.
* [SPARK-11440][CORE][STREAMING][BUILD] Declare rest of @Experimental items ↵Sean Owen2015-11-0512-67/+2
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | non-experimental if they've existed since 1.2.0 Remove `Experimental` annotations in core, streaming for items that existed in 1.2.0 or before. The changes are: * SparkContext * binary{Files,Records} : 1.2.0 * submitJob : 1.0.0 * JavaSparkContext * binary{Files,Records} : 1.2.0 * DoubleRDDFunctions, JavaDoubleRDD * {mean,sum}Approx : 1.0.0 * PairRDDFunctions, JavaPairRDD * sampleByKeyExact : 1.2.0 * countByKeyApprox : 1.0.0 * PairRDDFunctions * countApproxDistinctByKey : 1.1.0 * RDD * countApprox, countByValueApprox, countApproxDistinct : 1.0.0 * JavaRDDLike * countApprox : 1.0.0 * PythonHadoopUtil.Converter : 1.1.0 * PortableDataStream : 1.2.0 (related to binaryFiles) * BoundedDouble : 1.0.0 * PartialResult : 1.0.0 * StreamingContext, JavaStreamingContext * binaryRecordsStream : 1.2.0 * HiveContext * analyze : 1.2.0 Author: Sean Owen <sowen@cloudera.com> Closes #9396 from srowen/SPARK-11440.
* [SPARK-11425] [SPARK-11486] Improve hybrid aggregationDavies Liu2015-11-043-30/+70
| | | | | | | | After aggregation, the dataset could be smaller than inputs, so it's better to do hash based aggregation for all inputs, then using sort based aggregation to merge them. Author: Davies Liu <davies@databricks.com> Closes #9383 from davies/fix_switch.
* [SPARK-11307] Reduce memory consumption of OutputCommitCoordinatorJosh Rosen2015-11-043-16/+34
| | | | | | | | | | OutputCommitCoordinator uses a map in a place where an array would suffice, increasing its memory consumption for result stages with millions of tasks. This patch replaces that map with an array. The only tricky part of this is reasoning about the range of possible array indexes in order to make sure that we never index out of bounds. Author: Josh Rosen <joshrosen@databricks.com> Closes #9274 from JoshRosen/SPARK-11307.
* [SPARK-11493] remove bitset from BytesToBytesMapDavies Liu2015-11-041-43/+15
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Since we have 4 bytes as number of records in the beginning of a page, the address can not be zero, so we do not need the bitset. For performance concerns, the bitset could help speed up false lookup if the slot is empty (because bitset is smaller than longArray, cache hit rate will be higher). In practice, the map is filled with 35% - 70% (use 50% as average), so only half of the false lookups can benefit of it, all others will pay the cost of load the bitset (still need to access the longArray anyway). For aggregation, we always need to access the longArray (insert a new key after false lookup), also confirmed by a benchmark. For broadcast hash join, there could be a regression, but a simple benchmark showed that it may not (most of lookup are false): ``` sqlContext.range(1<<20).write.parquet("small") df = sqlContext.read.parquet('small') for i in range(3): t = time.time() df2 = sqlContext.range(1<<26).selectExpr("id * 1111111111 % 987654321 as id2") df2.join(df, df.id == df2.id2).count() print time.time() -t ``` Having bitset (used time in seconds): ``` 17.5404241085 10.2758829594 10.5786800385 ``` After removing bitset (used time in seconds): ``` 21.8939979076 12.4132959843 9.97224712372 ``` cc rxin nongli Author: Davies Liu <davies@databricks.com> Closes #9452 from davies/remove_bitset.
* [SPARK-10949] Update Snappy version to 1.1.2Adam Roberts2015-11-043-6/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | This is an updated version of #8995 by a-roberts. Original description follows: Snappy now supports concatenation of serialized streams, this patch contains a version number change and the "does not support" test is now a "supports" test. Snappy 1.1.2 changelog mentions: > snappy-java-1.1.2 (22 September 2015) > This is a backward compatible release for 1.1.x. > Add AIX (32-bit) support. > There is no upgrade for the native libraries of the other platforms. > A major change since 1.1.1 is a support for reading concatenated results of SnappyOutputStream(s) > snappy-java-1.1.2-RC2 (18 May 2015) > Fix #107: SnappyOutputStream.close() is not idempotent > snappy-java-1.1.2-RC1 (13 May 2015) > SnappyInputStream now supports reading concatenated compressed results of SnappyOutputStream > There has been no compressed format change since 1.0.5.x. So You can read the compressed results > interchangeablly between these versions. > Fixes a problem when java.io.tmpdir does not exist. Closes #8995. Author: Adam Roberts <aroberts@uk.ibm.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #9439 from JoshRosen/update-snappy.
* [SPARK-11505][SQL] Break aggregate functions into multiple filesReynold Xin2015-11-041-1/+4
| | | | | | | | | | functions.scala was getting pretty long. I broke it into multiple files. I also added explicit data types for some public vals, and renamed aggregate function pretty names to lower case, which is more consistent with rest of the functions. Author: Reynold Xin <rxin@databricks.com> Closes #9471 from rxin/SPARK-11505.
* [SPARK-10622][CORE][YARN] Differentiate dead from "mostly dead" executors.Marcelo Vanzin2015-11-045-16/+107
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | In YARN mode, when preemption is enabled, we may leave executors in a zombie state while we wait to retrieve the reason for which the executor exited. This is so that we don't account for failed tasks that were running on a preempted executor. The issue is that while we wait for this information, the scheduler might decide to schedule tasks on the executor, which will never be able to run them. Other side effects include the block manager still considering the executor available to cache blocks, for example. So, when we know that an executor went down but we don't know why, stop everything related to the executor, except its running tasks. Only when we know the reason for the exit (or give up waiting for it) we do update the running tasks. This is achieved by a new `disableExecutor()` method in the `Schedulable` interface. For managers that do not behave like this (i.e. every one but YARN), the existing `executorLost()` method will behave the same way it did before. On top of that change, a few minor changes that made debugging easier, and fixed some other minor issues: - The cluster-mode AM was printing a misleading log message every time an executor disconnected from the driver (because the akka actor system was shared between driver and AM). - Avoid sending unnecessary requests for an executor's exit reason when we already know it was explicitly disabled / killed. This avoids both multiple requests, and unnecessary requests that would just cause warning messages on the AM (in the explicit kill case). - Tone down a log message about the executor being lost when it exited normally (e.g. preemption) - Wake up the AM monitor thread when requests for executor loss reasons arrive too, so that we can more quickly remove executors from this zombie state. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #8887 from vanzin/SPARK-10622.
* [SPARK-11442] Reduce numSlices for local metrics test of SparkListenerSuitetedyu2015-11-041-4/+5
| | | | | | | | | | In the thread, http://search-hadoop.com/m/q3RTtcQiFSlTxeP/test+failed+due+to+OOME&subj=test+failed+due+to+OOME, it was discussed that memory consumption for SparkListenerSuite should be brought down. This is an attempt in that direction by reducing numSlices for local metrics test. Author: tedyu <yuzhihong@gmail.com> Closes #9384 from tedyu/master.
* [SPARK-11466][CORE] Avoid mockito in multi-threaded FsHistoryProviderSuite test.Marcelo Vanzin2015-11-032-39/+34
| | | | | | | | | | | The test functionality should be the same, but without using mockito; logs don't really say anything useful but I suspect it may be the cause of the flakiness, since updating mocks when multiple threads may be using it doesn't work very well. It also allows some other cleanup (= less test code in FsHistoryProvider). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9425 from vanzin/SPARK-11466.
* Fix typo in WebUIJacek Laskowski2015-11-031-1/+1
| | | | | | Author: Jacek Laskowski <jacek.laskowski@deepsense.io> Closes #9444 from jaceklaskowski/TImely-fix.
* [SPARK-9790][YARN] Expose in WebUI if NodeManager is the reason why ↵Mark Grover2015-11-038-17/+29
| | | | | | | | executors were killed. Author: Mark Grover <grover.markgrover@gmail.com> Closes #8093 from markgrover/nm2.
* [SPARK-11256] Mark all Stage/ResultStage/ShuffleMapStage internal state as ↵Reynold Xin2015-11-034-38/+80
| | | | | | | | private. Author: Reynold Xin <rxin@databricks.com> Closes #9219 from rxin/stage-cleanup1.
* [SPARK-11344] Made ApplicationDescription and DriverDescription case classesJacek Lewandowski2015-11-037-46/+34
| | | | | | | | | | | | DriverDescription refactored to case class because it included no mutable fields. ApplicationDescription had one mutable field, which was appUiUrl. This field was set by the driver to point to the driver web UI. Master was modifying this field when the application was removed to redirect requests to history server. This was wrong because objects which are sent over the wire should be immutable. Now appUiUrl is immutable in ApplicationDescription and always points to the driver UI even if it is already shutdown. The UI url which master exposes to the user and modifies dynamically is now included into ApplicationInfo - a data object which describes the application state internally in master. That URL in ApplicationInfo is initialised with the value from ApplicationDescription. ApplicationDescription also included value user, which is now a part of case class fields. Author: Jacek Lewandowski <lewandowski.jacek@gmail.com> Closes #9299 from jacek-lewandowski/SPARK-11344.
* [SPARK-11236] [TEST-MAVEN] [TEST-HADOOP1.0] [CORE] Update Tachyon dependency ↵Calvin Jia2015-11-021-5/+1
| | | | | | | | | | | | 0.7.1 -> 0.8.1 This is a reopening of #9204 which failed hadoop1 sbt tests. With the original PR, a classpath issue would occur due to the MIMA plugin pulling in hadoop-2.2 dependencies regardless of the hadoop version when building the `oldDeps` project. These affect the hadoop1 sbt build because they are placed in `lib_managed` and Tachyon 0.8.0's default hadoop version is 2.2. Author: Calvin Jia <jia.calvin@gmail.com> Closes #9395 from calvinjia/spark-11236.
* [SPARK-10997][CORE] Add "client mode" to netty rpc env.Marcelo Vanzin2015-11-0215-186/+259
| | | | | | | | | | | | | | | | | | | | | | | "Client mode" means the RPC env will not listen for incoming connections. This allows certain processes in the Spark stack (such as Executors or tha YARN client-mode AM) to act as pure clients when using the netty-based RPC backend, reducing the number of sockets needed by the app and also the number of open ports. Client connections are also preferred when endpoints that actually have a listening socket are involved; so, for example, if a Worker connects to a Master and the Master needs to send a message to a Worker endpoint, that client connection will be used, even though the Worker is also listening for incoming connections. With this change, the workaround for SPARK-10987 isn't necessary anymore, and is removed. The AM connects to the driver in "client mode", and that connection is used for all driver <-> AM communication, and so the AM is properly notified when the connection goes down. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9210 from vanzin/SPARK-10997.
* [SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap ↵Liang-Chi Hsieh2015-11-026-28/+82
| | | | | | | | | | | | to reduce memory usage JIRA: https://issues.apache.org/jira/browse/SPARK-11271 As reported in the JIRA ticket, when there are too many tasks, the memory usage of MapStatus will cause problem. Use BitSet instead of RoaringBitMap should be more efficient in memory usage. Author: Liang-Chi Hsieh <viirya@appier.com> Closes #9243 from viirya/mapstatus-bitset.
* [SPARK-11073][CORE][YARN] Remove akka dependency in secret key generation.Marcelo Vanzin2015-11-016-81/+105
| | | | | | | | | | Use standard JDK APIs for that (with a little help from Guava). Most of the changes here are in test code, since there were no tests specific to that part of the code. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9257 from vanzin/SPARK-11073.
* [SPARK-11020][CORE] Wait for HDFS to leave safe mode before initializing HS.Marcelo Vanzin2015-11-012-3/+166
| | | | | | | | | | | Large HDFS clusters may take a while to leave safe mode when starting; this change makes the HS wait for that before doing checks about its configuraton. This means the HS won't stop right away if HDFS is in safe mode and the configuration is not correct, but that should be a very uncommon situation. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9043 from vanzin/SPARK-11020.
* [SPARK-11338] [WEBUI] Prepend app links on HistoryPage with uiRoot pathChristian Kadner2015-11-012-7/+23
| | | | | | | | | | | | | | [SPARK-11338: HistoryPage not multi-tenancy enabled ...](https://issues.apache.org/jira/browse/SPARK-11338) - `HistoryPage.scala` ...prepending all page links with the web proxy (`uiRoot`) path - `HistoryServerSuite.scala` ...adding a test case to verify all site-relative links are prefixed when the environment variable `APPLICATION_WEB_PROXY_BASE` (or System property `spark.ui.proxyBase`) is set Author: Christian Kadner <ckadner@us.ibm.com> Closes #9291 from ckadner/SPARK-11338 and squashes the following commits: 01d2f35 [Christian Kadner] [SPARK-11338][WebUI] nit fixes d054bd7 [Christian Kadner] [SPARK-11338][WebUI] prependBaseUri in method makePageLink 8bcb3dc [Christian Kadner] [SPARK-11338][WebUI] Prepend application links on HistoryPage with uiRoot path
* [SPARK-11424] Guard against double-close() of RecordReadersJosh Rosen2015-10-314-52/+66
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | **TL;DR**: We can rule out one rare but potential cause of input stream corruption via defensive programming. ## Background [MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input. That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times. So far, I've had a hard time coming up with explanations of exactly how double-`close()`s occur in practice, but I do have a couple of explanations that work on paper. For instance, it looks like https://github.com/apache/spark/pull/7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up: * [The task has finished reading input, so we call close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168). * [While handling the close call and trying to close the reader, reader.close() throws an exception]( https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190) * We don't set `reader = null` after handling this exception, so the [TaskCompletionListener also ends up calling NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156), which, in turn, closes the record reader again. In this hypothetical situation, `LineRecordReader.close()` could [fail with an exception if its InputStream failed to close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212). I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: [SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), [SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491) Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after reading the last record then it looks like we could hit the bug here in 1.5. ## The fix This patch guards against these issues by modifying `HadoopRDD.close()` and `NewHadoopRDD.close()` so that they set `reader = null` even if an exception occurs in the `reader.close()` call. In addition, I modified `NextIterator. closeIfNeeded()` to guard against double-close if the first `close()` call throws an exception. I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up. Author: Josh Rosen <joshrosen@databricks.com> Closes #9382 from JoshRosen/hadoop-decompressor-pooling-fix and squashes the following commits: 5ec97d7 [Josh Rosen] Add SqlNewHadoopRDD.unsetInputFileName() that I accidentally deleted. ae46cf4 [Josh Rosen] Merge remote-tracking branch 'origin/master' into hadoop-decompressor-pooling-fix 087aa63 [Josh Rosen] Guard against double-close() of RecordReaders.
* Revert "[SPARK-11236][CORE] Update Tachyon dependency from 0.7.1 -> 0.8.0."Yin Huai2015-10-301-1/+5
| | | | This reverts commit 4f5e60c647d7d6827438721b7fabbc3a57b81023.
* [SPARK-11423] remove MapPartitionsWithPreparationRDDDavies Liu2015-10-303-145/+0
| | | | | | | | | | Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore. This PR basically revert #8543, #8511, #8038, #8011 Author: Davies Liu <davies@databricks.com> Closes #9381 from davies/remove_prepare2.
* [SPARK-11414][SPARKR] Forgot to update usage of 'spark.sparkr.r.command' in ↵Sun Rui2015-10-301-1/+6
| | | | | | | | RRDD in the PR for SPARK-10971. Author: Sun Rui <rui.sun@intel.com> Closes #9368 from sun-rui/SPARK-11414.
* [SPARK-10986][MESOS] Set the context class loader in the Mesos executor backend.Iulian Dragos2015-10-301-0/+5
| | | | | | | | | | | | | | | | See [SPARK-10986](https://issues.apache.org/jira/browse/SPARK-10986) for details. This fixes the `ClassNotFoundException` for Spark classes in the serializer. I am not sure this is the right way to handle the class loader, but I couldn't find any documentation on how the context class loader is used and who relies on it. It seems at least the serializer uses it to instantiate classes during deserialization. I am open to suggestions (I tried this fix on a real Mesos cluster and it *does* fix the issue). tnachen andrewor14 Author: Iulian Dragos <jaguarul@gmail.com> Closes #9282 from dragos/issue/mesos-classloader.
* [SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative ↵Davies Liu2015-10-2922-737/+1245
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | memory management This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed. Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling). The PrepareRDD may be not needed anymore, could be removed in follow up PR. The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration). ```python sqlContext.setConf("spark.sql.shuffle.partitions", "1") df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s") df2 = df.select(df.id.alias('id2'), df.s.alias('s2')) j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2") j.explain() print j.count() ``` For thread-safety, here what I'm got: 1) Without calling spill(), the operators should only be used by single thread, no safety problems. 2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems. 3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it. 4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning. 5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter). Author: Davies Liu <davies@databricks.com> Closes #9241 from davies/force_spill.
* [SPARK-11236][CORE] Update Tachyon dependency from 0.7.1 -> 0.8.0.Calvin Jia2015-10-291-5/+1
| | | | | | | | | | Upgrades the tachyon-client version to the latest release. No new dependencies are added and no spark facing APIs are changed. The removal of the `tachyon-underfs-s3` exclusion will enable users to use S3 out of the box and there are no longer any additional external dependencies added by the module. Author: Calvin Jia <jia.calvin@gmail.com> Closes #9204 from calvinjia/spark-11236.
* [SPARK-11178] Improving naming around task failures.Kay Ousterhout2015-10-2710-34/+52
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Commit af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0 introduced new functionality so that if an executor dies for a reason that's not caused by one of the tasks running on the executor (e.g., due to pre-emption), Spark doesn't count the failure towards the maximum number of failures for the task. That commit introduced some vague naming that this commit attempts to fix; in particular: (1) The variable "isNormalExit", which was used to refer to cases where the executor died for a reason unrelated to the tasks running on the machine, has been renamed (and reversed) to "exitCausedByApp". The problem with the existing name is that it's not clear (at least to me!) what it means for an exit to be "normal"; the new name is intended to make the purpose of this variable more clear. (2) The variable "shouldEventuallyFailJob" has been renamed to "countTowardsTaskFailures". This variable is used to determine whether a task's failure should be counted towards the maximum number of failures allowed for a task before the associated Stage is aborted. The problem with the existing name is that it can be confused with implying that the task's failure should immediately cause the stage to fail because it is somehow fatal (this is the case for a fetch failure, for example: if a task fails because of a fetch failure, there's no point in retrying, and the whole stage should be failed). Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #9164 from kayousterhout/SPARK-11178.
* [SPARK-11212][CORE][STREAMING] Make preferred locations support ↵zsxwing2015-10-272-2/+16
| | | | | | | | | | | | | | | | | | | ExecutorCacheTaskLocation and update… … ReceiverTracker and ReceiverSchedulingPolicy to use it This PR includes the following changes: 1. Add a new preferred location format, `executor_<host>_<executorID>` (e.g., "executor_localhost_2"), to support specifying the executor locations for RDD. 2. Use the new preferred location format in `ReceiverTracker` to optimize the starting time of Receivers when there are multiple executors in a host. The goal of this PR is to enable the streaming scheduler to place receivers (which run as tasks) in specific executors. Basically, I want to have more control on the placement of the receivers such that they are evenly distributed among the executors. We tried to do this without changing the core scheduling logic. But it does not allow specifying particular executor as preferred location, only at the host level. So if there are two executors in the same host, and I want two receivers to run on them (one on each executor), I cannot specify that. Current code only specifies the host as preference, which may end up launching both receivers on the same executor. We try to work around it but restarting a receiver when it does not launch in the desired executor and hope that next time it will be started in the right one. But that cause lots of restarts, and delays in correctly launching the receiver. So this change, would allow the streaming scheduler to specify the exact executor as the preferred location. Also this is not exposed to the user, only the streaming scheduler uses this. Author: zsxwing <zsxwing@gmail.com> Closes #9181 from zsxwing/executor-location.
* [SPARK-11306] Fix hang when JVM exits.Kay Ousterhout2015-10-271-1/+1
| | | | | | | | | | | | | | | | | This commit fixes a bug where, in Standalone mode, if a task fails and crashes the JVM, the failure is considered a "normal failure" (meaning it's considered unrelated to the task), so the failure isn't counted against the task's maximum number of failures: https://github.com/apache/spark/commit/af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0#diff-a755f3d892ff2506a7aa7db52022d77cL138. As a result, if a task fails in a way that results in it crashing the JVM, it will continuously be re-launched, resulting in a hang. This commit fixes that problem. This bug was introduced by #8007; andrewor14 mccheah vanzin can you take a look at this? This error is hard to trigger because we handle executor losses through 2 code paths (the second is via Akka, where Akka notices that the executor endpoint is disconnected). In my setup, the Akka code path completes first, and doesn't have this bug, so things work fine (see my recent email to the dev list about this). If I manually disable the Akka code path, I can see the hang (and this commit fixes the issue). Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #9273 from kayousterhout/SPARK-11306.
* [SPARK-11276][CORE] SizeEstimator prevents class unloadingSem Mulder2015-10-271-2/+4
| | | | | | | | | | | | The SizeEstimator keeps a cache of ClassInfos but this cache uses Class objects as keys. Which results in strong references to the Class objects. If these classes are dynamically created this prevents the corresponding ClassLoader from being GCed. Leading to PermGen exhaustion. We use a Map with WeakKeys to prevent this issue. Author: Sem Mulder <sem.mulder@site2mobile.com> Closes #9244 from SemMulder/fix-sizeestimator-classunloading.
* [SPARK-11209][SPARKR] Add window functions into SparkR [step 1].Sun Rui2015-10-261-1/+2
| | | | | | Author: Sun Rui <rui.sun@intel.com> Closes #9193 from sun-rui/SPARK-11209.
* [SPARK-5966][WIP] Spark-submit deploy-mode cluster is not compatible with ↵Kevin Yu2015-10-261-0/+2
| | | | | | | | | | master local> … master local> Author: Kevin Yu <qyu@us.ibm.com> Closes #9220 from kevinyu98/working_on_spark-5966.
* [SPARK-10984] Simplify *MemoryManager class structureJosh Rosen2015-10-2543-942/+1081
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes: - MemoryManager - StaticMemoryManager - ExecutorMemoryManager - TaskMemoryManager - ShuffleMemoryManager This is fairly confusing. To simplify things, this patch consolidates several of these classes: - ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager. - TaskMemoryManager is moved into Spark Core. **Key changes and tasks**: - [x] Merge ExecutorMemoryManager into MemoryManager. - [x] Move pooling logic into Allocator. - [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`. - [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager. - [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager. - [x] Merge ShuffleMemoryManager into MemoryManager. - [x] Move code - [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.** - [x] Port ShuffleMemoryManagerSuite tests. - [x] Move classes from `unsafe` package to `memory` package. - [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction. - [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation - [x] AbstractBytesToBytesMapSuite - [x] UnsafeExternalSorterSuite - [x] UnsafeFixedWidthAggregationMapSuite - [x] UnsafeKVExternalSorterSuite **Compatiblity notes**: - This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task. Author: Josh Rosen <joshrosen@databricks.com> Closes #9127 from JoshRosen/SPARK-10984.
* [SPARK-11287] Fixed class name to properly start TestExecutor from ↵Bryan Cutler2015-10-251-1/+2
| | | | | | | | | | deploy.client.TestClient Executing deploy.client.TestClient fails due to bad class name for TestExecutor in ApplicationDescription. Author: Bryan Cutler <bjcutler@us.ibm.com> Closes #9255 from BryanCutler/fix-TestClient-classname-SPARK-11287.
* Fix typosJacek Laskowski2015-10-254-4/+5
| | | | | | | | | | Two typos squashed. BTW Let me know how to proceed with other typos if I ran across any. I don't feel well to leave them aside as much as sending pull requests with such tiny changes. Guide me. Author: Jacek Laskowski <jacek.laskowski@deepsense.io> Closes #9250 from jaceklaskowski/typos-hunting.
* [SPARK-11125] [SQL] Uninformative exception when running spark-sql witho…Jeff Zhang2015-10-231-0/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | …ut building with -Phive-thriftserver and SPARK_PREPEND_CLASSES is set This is the exception after this patch. Please help review. ``` java.lang.NoClassDefFoundError: org/apache/hadoop/hive/cli/CliDriver at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:412) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.util.Utils$.classForName(Utils.scala:173) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:647) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.cli.CliDriver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 21 more Failed to load hive class. You need to build Spark with -Phive and -Phive-thriftserver. ``` Author: Jeff Zhang <zjffdu@apache.org> Closes #9134 from zjffdu/SPARK-11125.
* [SPARK-10971][SPARKR] RRunner should allow setting path to Rscript.Sun Rui2015-10-231-1/+10
| | | | | | | | | | | | | | | | | Add a new spark conf option "spark.sparkr.r.driver.command" to specify the executable for an R script in client modes. The existing spark conf option "spark.sparkr.r.command" is used to specify the executable for an R script in cluster modes for both driver and workers. See also [launch R worker script](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/RRDD.scala#L395). BTW, [envrionment variable "SPARKR_DRIVER_R"](https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L275) is used to locate R shell on the local host. For your information, PYSPARK has two environment variables serving simliar purpose: PYSPARK_PYTHON Python binary executable to use for PySpark in both driver and workers (default is `python`). PYSPARK_DRIVER_PYTHON Python binary executable to use for PySpark in driver only (default is PYSPARK_PYTHON). pySpark use the code [here](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala#L41) to determine the python executable for a python script. Author: Sun Rui <rui.sun@intel.com> Closes #9179 from sun-rui/SPARK-10971.
* Fix a (very tiny) typoJacek Laskowski2015-10-221-1/+1
| | | | | | Author: Jacek Laskowski <jacek.laskowski@deepsense.io> Closes #9230 from jaceklaskowski/utils-seconds-typo.
* [SPARK-11134][CORE] Increase LauncherBackendSuite timeout.Marcelo Vanzin2015-10-221-2/+2
| | | | | | | | This test can take a little while to finish on slow / loaded machines. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9235 from vanzin/SPARK-11134.
* [SPARK-11098][CORE] Add Outbox to cache the sending messages to resolve the ↵zsxwing2015-10-222-57/+310
| | | | | | | | | | | | | | | | | | message disorder issue The current NettyRpc has a message order issue because it uses a thread pool to send messages. E.g., running the following two lines in the same thread, ``` ref.send("A") ref.send("B") ``` The remote endpoint may see "B" before "A" because sending "A" and "B" are in parallel. To resolve this issue, this PR added an outbox for each connection, and if we are connecting to the remote node when sending messages, just cache the sending messages in the outbox and send them one by one when the connection is established. Author: zsxwing <zsxwing@gmail.com> Closes #9197 from zsxwing/rpc-outbox.
* [SPARK-11251] Fix page size calculation in local modeAndrew Or2015-10-223-15/+40
| | | | | | | | | | | | | | | | ``` // My machine only has 8 cores $ bin/spark-shell --master local[32] scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b") scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count() Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) ``` Author: Andrew Or <andrew@databricks.com> Closes #9209 from andrewor14/fix-local-page-size.
* [SPARK-11163] Remove unnecessary addPendingTask calls.Kay Ousterhout2015-10-221-22/+5
| | | | | | | | | | | | | | | | | | | | | | | | | | This commit removes unnecessary calls to addPendingTask in TaskSetManager.executorLost. These calls are unnecessary: for tasks that are still pending and haven't been launched, they're still in all of the correct pending lists, so calling addPendingTask has no effect. For tasks that are currently running (which may still be in the pending lists, depending on how they were scheduled), we call addPendingTask in handleFailedTask, so the calls at the beginning of executorLost are redundant. I think these calls are left over from when we re-computed the locality levels in addPendingTask; now that we call recomputeLocality separately, I don't think these are necessary. Now that those calls are removed, the readding parameter in addPendingTask is no longer necessary, so this commit also removes that parameter. markhamstra can you take a look at this? cc vanzin Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #9154 from kayousterhout/SPARK-11163.
* [SPARK-11232][CORE] Use 'offer' instead of 'put' to make sure calling send ↵zsxwing2015-10-221-5/+5
| | | | | | | | | | | | | | | | | | | | | | | | | | | | won't be interrupted The current `NettyRpcEndpointRef.send` can be interrupted because it uses `LinkedBlockingQueue.put`, which may hang the application. Image the following execution order: | thread 1: TaskRunner.kill | thread 2: TaskRunner.run ------------- | ------------- | ------------- 1 | killed = true | 2 | | if (killed) { 3 | | throw new TaskKilledException 4 | | case _: TaskKilledException _: InterruptedException if task.killed => 5 | task.kill(interruptThread): interruptThread is true | 6 | | execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) 7 | | localEndpoint.send(StatusUpdate(taskId, state, serializedData)): in LocalBackend Then `localEndpoint.send(StatusUpdate(taskId, state, serializedData))` will throw `InterruptedException`. This will prevent the executor from updating the task status and hang the application. An failure caused by the above issue here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44062/consoleFull Since `receivers` is an unbounded `LinkedBlockingQueue`, we can just use `LinkedBlockingQueue.offer` to resolve this issue. Author: zsxwing <zsxwing@gmail.com> Closes #9198 from zsxwing/dont-interrupt-send.
* [SPARK-10708] Consolidate sort shuffle implementationsJosh Rosen2015-10-2226-1290/+435
| | | | | | | | There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together. Author: Josh Rosen <joshrosen@databricks.com> Closes #8829 from JoshRosen/consolidate-sort-shuffle-implementations.
* [SPARK-11121][CORE] Correct the TaskLocation typezhichao.li2015-10-222-4/+9
| | | | | | | | Correct the logic to return `HDFSCacheTaskLocation` instance when the input `str` is a in memory location. Author: zhichao.li <zhichao.li@intel.com> Closes #9096 from zhichao-li/uselessBranch.
* Minor cleanup of ShuffleMapStage.outputLocs code.Reynold Xin2015-10-214-20/+39
| | | | | | | | | | I was looking at this code and found the documentation to be insufficient. I added more documentation, and refactored some relevant code path slightly to improve encapsulation. There are more that I want to do, but I want to get these changes in before doing more work. My goal is to reduce exposing internal fields directly in ShuffleMapStage to improve encapsulation. After this change, DAGScheduler no longer directly writes outputLocs. There are still 3 places that reads outputLocs directly, but we can change those later. Author: Reynold Xin <rxin@databricks.com> Closes #9175 from rxin/stage-cleanup.
* [SPARK-10447][SPARK-3842][PYSPARK] upgrade pyspark to py4j0.9Holden Karau2015-10-202-2/+2
| | | | | | | | | Upgrade to Py4j0.9 Author: Holden Karau <holden@pigscanfly.ca> Author: Holden Karau <holden@us.ibm.com> Closes #8615 from holdenk/SPARK-10447-upgrade-pyspark-to-py4j0.9.