aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2016-02-28 21:16:06 -0800
committerReynold Xin <rxin@databricks.com>2016-02-28 21:16:06 -0800
commit6dfc4a764c8bcfc24d951239835015da3ed7c29e (patch)
tree79073ccf33dc2e53e552f2d75f23a40716911671 /sql
parent9e01dcc6446f8648e61062f8afe62589b9d4b5ab (diff)
downloadspark-6dfc4a764c8bcfc24d951239835015da3ed7c29e.tar.gz
spark-6dfc4a764c8bcfc24d951239835015da3ed7c29e.tar.bz2
spark-6dfc4a764c8bcfc24d951239835015da3ed7c29e.zip
[SPARK-13537][SQL] Fix readBytes in VectorizedPlainValuesReader
JIRA: https://issues.apache.org/jira/browse/SPARK-13537 ## What changes were proposed in this pull request? In readBytes of VectorizedPlainValuesReader, we use buffer[offset] to access bytes in buffer. It is incorrect because offset is added with Platform.BYTE_ARRAY_OFFSET when initialization. We should fix it. ## How was this patch tested? `ParquetHadoopFsRelationSuite` sometimes (depending on the randomly generated data) will be [failed](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52136/consoleFull) by this bug. After applying this, the test can be passed. I added a test to `ParquetHadoopFsRelationSuite` with the data which will fail without this patch. The error exception: [info] ParquetHadoopFsRelationSuite: [info] - test all data types - StringType (440 milliseconds) [info] - test all data types - BinaryType (434 milliseconds) [info] - test all data types - BooleanType (406 milliseconds) 20:59:38.618 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 2597.0 (TID 67966) java.lang.ArrayIndexOutOfBoundsException: 46 at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBytes(VectorizedPlainValuesReader.java:88) Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #11418 from viirya/fix-readbytes.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala33
2 files changed, 34 insertions, 1 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 57cc28e9f4..ee9a7a221b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -85,7 +85,7 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
for (int i = 0; i < total; i++) {
// Bytes are stored as a 4-byte little endian int. Just read the first byte.
// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
- c.putByte(rowId + i, buffer[offset]);
+ c.putByte(rowId + i, Platform.getByte(buffer, offset));
offset += 4;
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 5ce58e898e..f2501d7ce3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -175,4 +175,37 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}
+
+ test(s"SPARK-13537: Fix readBytes in VectorizedPlainValuesReader") {
+ withTempPath { file =>
+ val path = file.getCanonicalPath
+
+ val schema = new StructType()
+ .add("index", IntegerType, nullable = false)
+ .add("col", ByteType, nullable = true)
+
+ val data = Seq(Row(1, -33.toByte), Row(2, 0.toByte), Row(3, -55.toByte), Row(4, 56.toByte),
+ Row(5, 127.toByte), Row(6, -44.toByte), Row(7, 23.toByte), Row(8, -95.toByte),
+ Row(9, 127.toByte), Row(10, 13.toByte))
+
+ val rdd = sqlContext.sparkContext.parallelize(data)
+ val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+ df.write
+ .mode("overwrite")
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .save(path)
+
+ val loadedDF = sqlContext
+ .read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .schema(df.schema)
+ .load(path)
+ .orderBy("index")
+
+ checkAnswer(loadedDF, df)
+ }
+ }
}