aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-03-29 22:02:53 -0700
committerReynold Xin <rxin@apache.org>2014-03-29 22:02:53 -0700
commit2861b07bb030f72769f5b757b4a7d4a635807140 (patch)
tree650e55484a09f4954dc8605459071dfcd711f7fb /sql
parent92b83959cacbc902ff0b50110261f097bf2df247 (diff)
downloadspark-2861b07bb030f72769f5b757b4a7d4a635807140.tar.gz
spark-2861b07bb030f72769f5b757b4a7d4a635807140.tar.bz2
spark-2861b07bb030f72769f5b757b4a7d4a635807140.zip
[SQL] SPARK-1354 Fix self-joins of parquet relations
@AndreSchumacher, please take a look. https://spark-project.atlassian.net/browse/SPARK-1354 Author: Michael Armbrust <michael@databricks.com> Closes #269 from marmbrus/parquetJoin and squashes the following commits: 4081e77 [Michael Armbrust] Create new instances of Parquet relation when multiple copies are in a single plan.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala19
2 files changed, 32 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 2b825f84ee..67a34e1f21 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -36,7 +36,7 @@ import parquet.schema.{MessageType, MessageTypeParser}
import parquet.schema.{PrimitiveType => ParquetPrimitiveType}
import parquet.schema.{Type => ParquetType}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.types._
@@ -54,7 +54,8 @@ import org.apache.spark.sql.catalyst.types._
* @param tableName The name of the relation that can be used in queries.
* @param path The path to the Parquet file.
*/
-case class ParquetRelation(tableName: String, path: String) extends BaseRelation {
+case class ParquetRelation(tableName: String, path: String)
+ extends BaseRelation with MultiInstanceRelation {
/** Schema derived from ParquetFile **/
def parquetSchema: MessageType =
@@ -74,6 +75,16 @@ case class ParquetRelation(tableName: String, path: String) extends BaseRelation
// Parquet files have no concepts of keys, therefore no Partitioner
// Note: we could allow Block level access; needs to be thought through
override def isPartitioned = false
+
+ override def newInstance = ParquetRelation(tableName, path).asInstanceOf[this.type]
+
+ // Equals must also take into account the output attributes so that we can distinguish between
+ // different instances of the same relation,
+ override def equals(other: Any) = other match {
+ case p: ParquetRelation =>
+ p.tableName == tableName && p.path == path && p.output == output
+ case _ => false
+ }
}
object ParquetRelation {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 71caa709af..ea1733b361 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -30,6 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.util.getTempFilePath
import org.apache.spark.sql.test.TestSQLContext
+// Implicits
+import org.apache.spark.sql.test.TestSQLContext._
+
class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
override def beforeAll() {
ParquetTestData.writeFile()
@@ -39,6 +42,22 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
ParquetTestData.testFile.delete()
}
+ test("self-join parquet files") {
+ val x = ParquetTestData.testData.subquery('x)
+ val y = ParquetTestData.testData.subquery('y)
+ val query = x.join(y).where("x.myint".attr === "y.myint".attr)
+
+ // Check to make sure that the attributes from either side of the join have unique expression
+ // ids.
+ query.queryExecution.analyzed.output.filter(_.name == "myint") match {
+ case Seq(i1, i2) if(i1.exprId == i2.exprId) =>
+ fail(s"Duplicate expression IDs found in query plan: $query")
+ case Seq(_, _) => // All good
+ }
+
+ // TODO: We can't run this query as it NPEs
+ }
+
test("Import of simple Parquet file") {
val result = getRDD(ParquetTestData.testData).collect()
assert(result.size === 15)