aboutsummaryrefslogtreecommitdiff
path: root/sql/README.md
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-29 23:38:06 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-29 23:38:06 -0700
commit56419cf11f769c80f391b45dc41b3c7101cc5ff4 (patch)
treec40211de4baa6c9ab9d12160ac3bab977fb17db4 /sql/README.md
parentd89be0bf81029cd82008a959d191e1c7b6ceaa18 (diff)
downloadspark-56419cf11f769c80f391b45dc41b3c7101cc5ff4.tar.gz
spark-56419cf11f769c80f391b45dc41b3c7101cc5ff4.tar.bz2
spark-56419cf11f769c80f391b45dc41b3c7101cc5ff4.zip
[SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative memory management
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed. Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling). The PrepareRDD may be not needed anymore, could be removed in follow up PR. The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration). ```python sqlContext.setConf("spark.sql.shuffle.partitions", "1") df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s") df2 = df.select(df.id.alias('id2'), df.s.alias('s2')) j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2") j.explain() print j.count() ``` For thread-safety, here what I'm got: 1) Without calling spill(), the operators should only be used by single thread, no safety problems. 2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems. 3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it. 4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning. 5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter). Author: Davies Liu <davies@databricks.com> Closes #9241 from davies/force_spill.
Diffstat (limited to 'sql/README.md')
0 files changed, 0 insertions, 0 deletions