aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-03-02 22:14:08 -0800
committerMichael Armbrust <michael@databricks.com>2015-03-02 22:14:08 -0800
commit54d19689ff8d786acde5b8ada6741854ffadadea (patch)
treeb0a2a68c3e8ea6a7f0209fa6a12d02d1c38b3c71 /sql/core
parent12599942e69e4d73040f3a8611661a0862514ffc (diff)
downloadspark-54d19689ff8d786acde5b8ada6741854ffadadea.tar.gz
spark-54d19689ff8d786acde5b8ada6741854ffadadea.tar.bz2
spark-54d19689ff8d786acde5b8ada6741854ffadadea.zip
[SPARK-5310][SQL] Fixes to Docs and Datasources API
- Various Fixes to docs - Make data source traits actually interfaces Based on #4862 but with fixed conflicts. Author: Reynold Xin <rxin@databricks.com> Author: Michael Armbrust <michael@databricks.com> Closes #4868 from marmbrus/pr/4862 and squashes the following commits: fe091ea [Michael Armbrust] Merge remote-tracking branch 'origin/master' into pr/4862 0208497 [Reynold Xin] Test fixes. 34e0a28 [Reynold Xin] [SPARK-5310][SQL] Various fixes to Spark SQL docs.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala43
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala11
10 files changed, 72 insertions, 41 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index f3aac0826a..46f50708a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -64,7 +64,7 @@ private[sql] object DataFrame {
* val people = sqlContext.parquetFile("...")
*
* // Create a DataFrame from data sources
- * val df =
+ * val df = sqlContext.load("...", "json")
* }}}
*
* Once created, it can be manipulated using the various domain-specific-language (DSL) functions
@@ -80,9 +80,10 @@ private[sql] object DataFrame {
* {{{
* // The following creates a new column that increases everybody's age by 10.
* people("age") + 10 // in Scala
+ * people.col("age").plus(10); // in Java
* }}}
*
- * A more concrete example:
+ * A more concrete example in Scala:
* {{{
* // To create DataFrame using SQLContext
* val people = sqlContext.parquetFile("...")
@@ -94,6 +95,18 @@ private[sql] object DataFrame {
* .agg(avg(people("salary")), max(people("age")))
* }}}
*
+ * and in Java:
+ * {{{
+ * // To create DataFrame using SQLContext
+ * DataFrame people = sqlContext.parquetFile("...");
+ * DataFrame department = sqlContext.parquetFile("...");
+ *
+ * people.filter("age".gt(30))
+ * .join(department, people.col("deptId").equalTo(department("id")))
+ * .groupBy(department.col("name"), "gender")
+ * .agg(avg(people.col("salary")), max(people.col("age")));
+ * }}}
+ *
* @groupname basic Basic DataFrame functions
* @groupname dfops Language Integrated Queries
* @groupname rdd RDD Operations
@@ -102,7 +115,7 @@ private[sql] object DataFrame {
*/
// TODO: Improve documentation.
@Experimental
-class DataFrame protected[sql](
+class DataFrame private[sql](
@transient val sqlContext: SQLContext,
@DeveloperApi @transient val queryExecution: SQLContext#QueryExecution)
extends RDDApi[Row] with Serializable {
@@ -295,12 +308,14 @@ class DataFrame protected[sql](
* 1984 04 0.450090 0.483521
* }}}
* @param numRows Number of rows to show
- * @group basic
+ *
+ * @group action
*/
def show(numRows: Int): Unit = println(showString(numRows))
/**
* Displays the top 20 rows of [[DataFrame]] in a tabular form.
+ * @group action
*/
def show(): Unit = show(20)
@@ -738,16 +753,19 @@ class DataFrame protected[sql](
/**
* Returns the first `n` rows.
+ * @group action
*/
def head(n: Int): Array[Row] = limit(n).collect()
/**
* Returns the first row.
+ * @group action
*/
def head(): Row = head(1).head
/**
* Returns the first row. Alias for head().
+ * @group action
*/
override def first(): Row = head()
@@ -834,6 +852,11 @@ class DataFrame protected[sql](
/**
* @group basic
*/
+ override def cache(): this.type = persist()
+
+ /**
+ * @group basic
+ */
override def persist(newLevel: StorageLevel): this.type = {
sqlContext.cacheManager.cacheQuery(this, None, newLevel)
this
@@ -847,6 +870,11 @@ class DataFrame protected[sql](
this
}
+ /**
+ * @group basic
+ */
+ override def unpersist(): this.type = unpersist(blocking = false)
+
/////////////////////////////////////////////////////////////////////////////
// I/O
/////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
index df866fd1ad..ba4373f012 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
@@ -29,13 +29,13 @@ import org.apache.spark.storage.StorageLevel
*/
private[sql] trait RDDApi[T] {
- def cache(): this.type = persist()
+ def cache(): this.type
def persist(): this.type
def persist(newLevel: StorageLevel): this.type
- def unpersist(): this.type = unpersist(blocking = false)
+ def unpersist(): this.type
def unpersist(blocking: Boolean): this.type
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index beb76f2c55..1778d39c42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -119,7 +119,8 @@ private[sql] case class JDBCRelation(
url: String,
table: String,
parts: Array[Partition])(@transient val sqlContext: SQLContext)
- extends PrunedFilteredScan {
+ extends BaseRelation
+ with PrunedFilteredScan {
override val schema = JDBCRDD.resolveTable(url, table)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index f9d0ba2241..b645199ded 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -90,7 +90,10 @@ private[sql] case class JSONRelation(
samplingRatio: Double,
userSpecifiedSchema: Option[StructType])(
@transient val sqlContext: SQLContext)
- extends TableScan with InsertableRelation {
+ extends BaseRelation
+ with TableScan
+ with InsertableRelation {
+
// TODO: Support partitioned JSON relation.
private def baseRDD = sqlContext.sparkContext.textFile(path)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 8d95858493..234e6bb844 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -159,7 +159,8 @@ private[sql] case class ParquetRelation2(
maybeSchema: Option[StructType] = None,
maybePartitionSpec: Option[PartitionSpec] = None)(
@transient val sqlContext: SQLContext)
- extends CatalystScan
+ extends BaseRelation
+ with CatalystScan
with InsertableRelation
with SparkHadoopMapReduceUtil
with Logging {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 0c4b706eee..a046a48c17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.spark.sql.sources
import org.apache.spark.annotation.{Experimental, DeveloperApi}
@@ -90,12 +91,6 @@ trait CreatableRelationProvider {
* existing data is expected to be overwritten by the contents of the DataFrame.
* ErrorIfExists mode means that when saving a DataFrame to a data source,
* if data already exists, an exception is expected to be thrown.
- *
- * @param sqlContext
- * @param mode
- * @param parameters
- * @param data
- * @return
*/
def createRelation(
sqlContext: SQLContext,
@@ -138,7 +133,7 @@ abstract class BaseRelation {
* A BaseRelation that can produce all of its tuples as an RDD of Row objects.
*/
@DeveloperApi
-trait TableScan extends BaseRelation {
+trait TableScan {
def buildScan(): RDD[Row]
}
@@ -148,7 +143,7 @@ trait TableScan extends BaseRelation {
* containing all of its tuples as Row objects.
*/
@DeveloperApi
-trait PrunedScan extends BaseRelation {
+trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
@@ -162,25 +157,11 @@ trait PrunedScan extends BaseRelation {
* as filtering partitions based on a bloom filter.
*/
@DeveloperApi
-trait PrunedFilteredScan extends BaseRelation {
+trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
/**
- * ::Experimental::
- * An interface for experimenting with a more direct connection to the query planner. Compared to
- * [[PrunedFilteredScan]], this operator receives the raw expressions from the
- * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. Unlike the other APIs this
- * interface is not designed to be binary compatible across releases and thus should only be used
- * for experimentation.
- */
-@Experimental
-trait CatalystScan extends BaseRelation {
- def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
-}
-
-@DeveloperApi
-/**
* ::DeveloperApi::
* A BaseRelation that can be used to insert data into it through the insert method.
* If overwrite in insert method is true, the old data in the relation should be overwritten with
@@ -196,6 +177,20 @@ trait CatalystScan extends BaseRelation {
* If a data source needs to check the actual nullability of a field, it needs to do it in the
* insert method.
*/
-trait InsertableRelation extends BaseRelation {
+@DeveloperApi
+trait InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit
}
+
+/**
+ * ::Experimental::
+ * An interface for experimenting with a more direct connection to the query planner. Compared to
+ * [[PrunedFilteredScan]], this operator receives the raw expressions from the
+ * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. Unlike the other APIs this
+ * interface is NOT designed to be binary compatible across releases and thus should only be used
+ * for experimentation.
+ */
+@Experimental
+trait CatalystScan {
+ def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
index 0ec756bfeb..54af50c6e1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
@@ -29,7 +29,7 @@ class DDLScanSource extends RelationProvider {
}
case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
- extends TableScan {
+ extends BaseRelation with TableScan {
override def schema =
StructType(Seq(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 41cd35683c..ffeccf0b69 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -32,7 +32,8 @@ class FilteredScanSource extends RelationProvider {
}
case class SimpleFilteredScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
- extends PrunedFilteredScan {
+ extends BaseRelation
+ with PrunedFilteredScan {
override def schema =
StructType(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index a33cf1172c..08fb5380dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -31,7 +31,8 @@ class PrunedScanSource extends RelationProvider {
}
case class SimplePrunedScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
- extends PrunedScan {
+ extends BaseRelation
+ with PrunedScan {
override def schema =
StructType(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index 0a4d4b6342..7928600ac2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -33,7 +33,7 @@ class SimpleScanSource extends RelationProvider {
}
case class SimpleScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
- extends TableScan {
+ extends BaseRelation with TableScan {
override def schema =
StructType(StructField("i", IntegerType, nullable = false) :: Nil)
@@ -51,10 +51,11 @@ class AllDataTypesScanSource extends SchemaRelationProvider {
}
case class AllDataTypesScan(
- from: Int,
- to: Int,
- userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext)
- extends TableScan {
+ from: Int,
+ to: Int,
+ userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext)
+ extends BaseRelation
+ with TableScan {
override def schema = userSpecifiedSchema