diff options
author | Davies Liu <davies@databricks.com> | 2015-10-12 21:12:59 -0700 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-10-12 21:12:59 -0700 |
commit | c4da5345a0ef643a7518756caaa18ff3f3ea9acc (patch) | |
tree | 330ed74a4ebe7e98b8983df84d0d91f556b7199e /unsafe/src | |
parent | f97e9323b526b3d0b0fee0ca03f4276f37bb5750 (diff) | |
download | spark-c4da5345a0ef643a7518756caaa18ff3f3ea9acc.tar.gz spark-c4da5345a0ef643a7518756caaa18ff3f3ea9acc.tar.bz2 spark-c4da5345a0ef643a7518756caaa18ff3f3ea9acc.zip |
[SPARK-10990] [SPARK-11018] [SQL] improve unrolling of complex types
This PR improve the unrolling and read of complex types in columnar cache:
1) Using UnsafeProjection to do serialization of complex types, so they will not be serialized three times (two for actualSize)
2) Copy the bytes from UnsafeRow/UnsafeArrayData to ByteBuffer directly, avoiding the immediate byte[]
3) Using the underlying array in ByteBuffer to create UTF8String/UnsafeRow/UnsafeArrayData without copy.
Combine these optimizations, we can reduce the unrolling time from 25s to 21s (20% less), reduce the scanning time from 3.5s to 2.5s (28% less).
```
df = sqlContext.read.parquet(path)
t = time.time()
df.cache()
df.count()
print 'unrolling', time.time() - t
for i in range(10):
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
The schema is
```
root
|-- a: struct (nullable = true)
| |-- b: long (nullable = true)
| |-- c: string (nullable = true)
|-- d: array (nullable = true)
| |-- element: long (containsNull = true)
|-- e: map (nullable = true)
| |-- key: long
| |-- value: string (valueContainsNull = true)
```
Now the columnar cache depends on that UnsafeProjection support all the data types (including UDT), this PR also fix that.
Author: Davies Liu <davies@databricks.com>
Closes #9016 from davies/complex2.
Diffstat (limited to 'unsafe/src')
-rw-r--r-- | unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java | 10 |
1 files changed, 10 insertions, 0 deletions
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 216aeea60d..b7aecb5102 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -19,6 +19,7 @@ package org.apache.spark.unsafe.types; import javax.annotation.Nonnull; import java.io.*; +import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; import java.util.Map; @@ -137,6 +138,15 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable Platform.copyMemory(base, offset, target, targetOffset, numBytes); } + public void writeTo(ByteBuffer buffer) { + assert(buffer.hasArray()); + byte[] target = buffer.array(); + int offset = buffer.arrayOffset(); + int pos = buffer.position(); + writeToMemory(target, Platform.BYTE_ARRAY_OFFSET + offset + pos); + buffer.position(pos + numBytes); + } + /** * Returns the number of bytes for a code point with the first byte as `b` * @param b The first byte of a code point |