aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-01-25 16:23:59 -0800
committerDavies Liu <davies.liu@gmail.com>2016-01-25 16:23:59 -0800
commitbe375fcbd200fb0e210b8edcfceb5a1bcdbba94b (patch)
tree060e087c33e27b44b30fb97f9861c97d8a5a06af /sql/core
parent6f0f1d9e04a8db47e2f6f8fcfe9dea9de0f633da (diff)
downloadspark-be375fcbd200fb0e210b8edcfceb5a1bcdbba94b.tar.gz
spark-be375fcbd200fb0e210b8edcfceb5a1bcdbba94b.tar.bz2
spark-be375fcbd200fb0e210b8edcfceb5a1bcdbba94b.zip
[SPARK-12879] [SQL] improve the unsafe row writing framework
As we begin to use unsafe row writing framework(`BufferHolder` and `UnsafeRowWriter`) in more and more places(`UnsafeProjection`, `UnsafeRowParquetRecordReader`, `GenerateColumnAccessor`, etc.), we should add more doc to it and make it easier to use. This PR abstract the technique used in `UnsafeRowParquetRecordReader`: avoid unnecessary operatition as more as possible. For example, do not always point the row to the buffer at the end, we only need to update the size of row. If all fields are of primitive type, we can even save the row size updating. Then we can apply this technique to more places easily. a local benchmark shows `UnsafeProjection` is up to 1.7x faster after this PR: **old version** ``` Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz unsafe projection: Avg Time(ms) Avg Rate(M/s) Relative Rate ------------------------------------------------------------------------------- single long 2616.04 102.61 1.00 X single nullable long 3032.54 88.52 0.86 X primitive types 9121.05 29.43 0.29 X nullable primitive types 12410.60 21.63 0.21 X ``` **new version** ``` Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz unsafe projection: Avg Time(ms) Avg Rate(M/s) Relative Rate ------------------------------------------------------------------------------- single long 1533.34 175.07 1.00 X single nullable long 2306.73 116.37 0.66 X primitive types 8403.93 31.94 0.18 X nullable primitive types 12448.39 21.56 0.12 X ``` For single non-nullable long(the best case), we can have about 1.7x speed up. Even it's nullable, we can still have 1.3x speed up. For other cases, it's not such a boost as the saved operations only take a little proportion of the whole process. The benchmark code is included in this PR. Author: Wenchen Fan <wenchen@databricks.com> Closes #10809 from cloud-fan/unsafe-projection.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala7
3 files changed, 10 insertions, 22 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 80805f15a8..17adfec321 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -74,11 +74,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
private boolean containsVarLenFields;
/**
- * The number of bytes in the fixed length portion of the row.
- */
- private int fixedSizeBytes;
-
- /**
* For each request column, the reader to read this column.
* columnsReaders[i] populated the UnsafeRow's attribute at i.
*/
@@ -266,19 +261,13 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
/**
* Initialize rows and rowWriters. These objects are reused across all rows in the relation.
*/
- int rowByteSize = UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
- rowByteSize += 8 * requestedSchema.getFieldCount();
- fixedSizeBytes = rowByteSize;
- rowByteSize += numVarLenFields * DEFAULT_VAR_LEN_SIZE;
containsVarLenFields = numVarLenFields > 0;
rowWriters = new UnsafeRowWriter[rows.length];
for (int i = 0; i < rows.length; ++i) {
rows[i] = new UnsafeRow(requestedSchema.getFieldCount());
- rowWriters[i] = new UnsafeRowWriter();
- BufferHolder holder = new BufferHolder(rowByteSize);
- rowWriters[i].initialize(rows[i], holder, requestedSchema.getFieldCount());
- rows[i].pointTo(holder.buffer, Platform.BYTE_ARRAY_OFFSET, holder.buffer.length);
+ BufferHolder holder = new BufferHolder(rows[i], numVarLenFields * DEFAULT_VAR_LEN_SIZE);
+ rowWriters[i] = new UnsafeRowWriter(holder, requestedSchema.getFieldCount());
}
}
@@ -295,7 +284,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
if (containsVarLenFields) {
for (int i = 0; i < rowWriters.length; ++i) {
- rowWriters[i].holder().resetTo(fixedSizeBytes);
+ rowWriters[i].holder().reset();
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index 72eb1f6cf0..738b9a35d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -132,8 +132,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
private ByteOrder nativeOrder = null;
private byte[][] buffers = null;
private UnsafeRow unsafeRow = new UnsafeRow($numFields);
- private BufferHolder bufferHolder = new BufferHolder();
- private UnsafeRowWriter rowWriter = new UnsafeRowWriter();
+ private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
+ private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, $numFields);
private MutableUnsafeRow mutableRow = null;
private int currentRow = 0;
@@ -181,9 +181,9 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
public InternalRow next() {
currentRow += 1;
bufferHolder.reset();
- rowWriter.initialize(bufferHolder, $numFields);
+ rowWriter.zeroOutNullBytes();
${extractors.mkString("\n")}
- unsafeRow.pointTo(bufferHolder.buffer, bufferHolder.totalSize());
+ unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow;
}
}"""
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index bd2d17c018..430257f60d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -98,16 +98,15 @@ private[sql] class TextRelation(
sqlContext.sparkContext.hadoopRDD(
conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
.mapPartitions { iter =>
- val bufferHolder = new BufferHolder
- val unsafeRowWriter = new UnsafeRowWriter
val unsafeRow = new UnsafeRow(1)
+ val bufferHolder = new BufferHolder(unsafeRow)
+ val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
iter.map { case (_, line) =>
// Writes to an UnsafeRow directly
bufferHolder.reset()
- unsafeRowWriter.initialize(bufferHolder, 1)
unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
- unsafeRow.pointTo(bufferHolder.buffer, bufferHolder.totalSize())
+ unsafeRow.setTotalSize(bufferHolder.totalSize())
unsafeRow
}
}