aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/sql/readwriter.py44
-rw-r--r--python/test_support/sql/orc_partitioned/._SUCCESS.crcbin0 -> 8 bytes
-rwxr-xr-xpython/test_support/sql/orc_partitioned/_SUCCESS0
-rw-r--r--python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crcbin0 -> 12 bytes
-rwxr-xr-xpython/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orcbin0 -> 168 bytes
-rw-r--r--python/test_support/sql/orc_partitioned/b=1/c=1/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crcbin0 -> 12 bytes
-rwxr-xr-xpython/test_support/sql/orc_partitioned/b=1/c=1/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orcbin0 -> 168 bytes
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala8
13 files changed, 79 insertions, 23 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 882a03090e..dea8bad79e 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -146,14 +146,28 @@ class DataFrameReader(object):
return self._df(self._jreader.table(tableName))
@since(1.4)
- def parquet(self, *path):
+ def parquet(self, *paths):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
>>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
- return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
+ return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, paths)))
+
+ @since(1.5)
+ def orc(self, path):
+ """
+ Loads an ORC file, returning the result as a :class:`DataFrame`.
+
+ ::Note: Currently ORC support is only available together with
+ :class:`HiveContext`.
+
+ >>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
+ >>> df.dtypes
+ [('a', 'bigint'), ('b', 'int'), ('c', 'int')]
+ """
+ return self._df(self._jreader.orc(path))
@since(1.4)
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
@@ -378,6 +392,29 @@ class DataFrameWriter(object):
self.partitionBy(partitionBy)
self._jwrite.parquet(path)
+ def orc(self, path, mode=None, partitionBy=None):
+ """Saves the content of the :class:`DataFrame` in ORC format at the specified path.
+
+ ::Note: Currently ORC support is only available together with
+ :class:`HiveContext`.
+
+ :param path: the path in any Hadoop supported file system
+ :param mode: specifies the behavior of the save operation when data already exists.
+
+ * ``append``: Append contents of this :class:`DataFrame` to existing data.
+ * ``overwrite``: Overwrite existing data.
+ * ``ignore``: Silently ignore this operation if data already exists.
+ * ``error`` (default case): Throw an exception if data already exists.
+ :param partitionBy: names of partitioning columns
+
+ >>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
+ >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
+ """
+ self.mode(mode)
+ if partitionBy is not None:
+ self.partitionBy(partitionBy)
+ self._jwrite.orc(path)
+
@since(1.4)
def jdbc(self, url, table, mode=None, properties={}):
"""Saves the content of the :class:`DataFrame` to a external database table via JDBC.
@@ -408,7 +445,7 @@ def _test():
import os
import tempfile
from pyspark.context import SparkContext
- from pyspark.sql import Row, SQLContext
+ from pyspark.sql import Row, SQLContext, HiveContext
import pyspark.sql.readwriter
os.chdir(os.environ["SPARK_HOME"])
@@ -420,6 +457,7 @@ def _test():
globs['os'] = os
globs['sc'] = sc
globs['sqlContext'] = SQLContext(sc)
+ globs['hiveContext'] = HiveContext(sc)
globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
(failure_count, test_count) = doctest.testmod(
diff --git a/python/test_support/sql/orc_partitioned/._SUCCESS.crc b/python/test_support/sql/orc_partitioned/._SUCCESS.crc
new file mode 100644
index 0000000000..3b7b044936
--- /dev/null
+++ b/python/test_support/sql/orc_partitioned/._SUCCESS.crc
Binary files differ
diff --git a/python/test_support/sql/orc_partitioned/_SUCCESS b/python/test_support/sql/orc_partitioned/_SUCCESS
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/python/test_support/sql/orc_partitioned/_SUCCESS
diff --git a/python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc b/python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
new file mode 100644
index 0000000000..834cf0b7f2
--- /dev/null
+++ b/python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
Binary files differ
diff --git a/python/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc b/python/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
new file mode 100755
index 0000000000..4943801873
--- /dev/null
+++ b/python/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
Binary files differ
diff --git a/python/test_support/sql/orc_partitioned/b=1/c=1/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc b/python/test_support/sql/orc_partitioned/b=1/c=1/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
new file mode 100644
index 0000000000..693dceeee3
--- /dev/null
+++ b/python/test_support/sql/orc_partitioned/b=1/c=1/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
Binary files differ
diff --git a/python/test_support/sql/orc_partitioned/b=1/c=1/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc b/python/test_support/sql/orc_partitioned/b=1/c=1/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
new file mode 100755
index 0000000000..4cbb95ae02
--- /dev/null
+++ b/python/test_support/sql/orc_partitioned/b=1/c=1/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
Binary files differ
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 0e37ad3e12..f1c1ddf898 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -265,6 +265,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
}
/**
+ * Loads an ORC file and returns the result as a [[DataFrame]].
+ *
+ * @param path input path
+ * @since 1.5.0
+ * @note Currently, this method can only be used together with `HiveContext`.
+ */
+ def orc(path: String): DataFrame = format("orc").load(path)
+
+ /**
* Returns the specified table as a [[DataFrame]].
*
* @since 1.4.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 5548b26cb8..3e7b9cd797 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -280,6 +280,18 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*/
def parquet(path: String): Unit = format("parquet").save(path)
+ /**
+ * Saves the content of the [[DataFrame]] in ORC format at the specified path.
+ * This is equivalent to:
+ * {{{
+ * format("orc").save(path)
+ * }}}
+ *
+ * @since 1.5.0
+ * @note Currently, this method can only be used together with `HiveContext`.
+ */
+ def orc(path: String): Unit = format("orc").save(path)
+
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 080af5bb23..af3f468aaa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -41,8 +41,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
.write
- .format("orc")
- .save(partitionDir.toString)
+ .orc(partitionDir.toString)
}
val dataSchemaWithPartition =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
index 3c2efe329b..d463e8fd62 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
@@ -49,13 +49,13 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
def makeOrcFile[T <: Product: ClassTag: TypeTag](
data: Seq[T], path: File): Unit = {
- data.toDF().write.format("orc").mode("overwrite").save(path.getCanonicalPath)
+ data.toDF().write.mode("overwrite").orc(path.getCanonicalPath)
}
def makeOrcFile[T <: Product: ClassTag: TypeTag](
df: DataFrame, path: File): Unit = {
- df.write.format("orc").mode("overwrite").save(path.getCanonicalPath)
+ df.write.mode("overwrite").orc(path.getCanonicalPath)
}
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
@@ -90,7 +90,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}
- read.format("orc").load(base.getCanonicalPath).registerTempTable("t")
+ read.orc(base.getCanonicalPath).registerTempTable("t")
withTempTable("t") {
checkAnswer(
@@ -137,7 +137,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}
- read.format("orc").load(base.getCanonicalPath).registerTempTable("t")
+ read.orc(base.getCanonicalPath).registerTempTable("t")
withTempTable("t") {
checkAnswer(
@@ -187,9 +187,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
}
read
- .format("orc")
.option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName)
- .load(base.getCanonicalPath)
+ .orc(base.getCanonicalPath)
.registerTempTable("t")
withTempTable("t") {
@@ -230,9 +229,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
}
read
- .format("orc")
.option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName)
- .load(base.getCanonicalPath)
+ .orc(base.getCanonicalPath)
.registerTempTable("t")
withTempTable("t") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index ca131faaee..744d462938 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -63,14 +63,14 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withOrcFile(data) { file =>
checkAnswer(
- sqlContext.read.format("orc").load(file),
+ sqlContext.read.orc(file),
data.toDF().collect())
}
}
test("Read/write binary data") {
withOrcFile(BinaryData("test".getBytes("utf8")) :: Nil) { file =>
- val bytes = read.format("orc").load(file).head().getAs[Array[Byte]](0)
+ val bytes = read.orc(file).head().getAs[Array[Byte]](0)
assert(new String(bytes, "utf8") === "test")
}
}
@@ -88,7 +88,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withOrcFile(data) { file =>
checkAnswer(
- read.format("orc").load(file),
+ read.orc(file),
data.toDF().collect())
}
}
@@ -158,7 +158,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withOrcFile(data) { file =>
checkAnswer(
- read.format("orc").load(file),
+ read.orc(file),
Row(Seq.fill(5)(null): _*))
}
}
@@ -310,7 +310,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
""".stripMargin)
val errorMessage = intercept[AnalysisException] {
- sqlContext.read.format("orc").load(path)
+ sqlContext.read.orc(path)
}.getMessage
assert(errorMessage.contains("Failed to discover schema from ORC files"))
@@ -323,7 +323,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
|SELECT key, value FROM single
""".stripMargin)
- val df = sqlContext.read.format("orc").load(path)
+ val df = sqlContext.read.orc(path)
assert(df.schema === singleRowDF.schema.asNullable)
checkAnswer(df, singleRowDF)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
index 5daf691aa8..9d76d6503a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
@@ -39,7 +39,7 @@ private[sql] trait OrcTest extends SQLTestUtils {
(data: Seq[T])
(f: String => Unit): Unit = {
withTempPath { file =>
- sparkContext.parallelize(data).toDF().write.format("orc").save(file.getCanonicalPath)
+ sparkContext.parallelize(data).toDF().write.orc(file.getCanonicalPath)
f(file.getCanonicalPath)
}
}
@@ -51,7 +51,7 @@ private[sql] trait OrcTest extends SQLTestUtils {
protected def withOrcDataFrame[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
(f: DataFrame => Unit): Unit = {
- withOrcFile(data)(path => f(sqlContext.read.format("orc").load(path)))
+ withOrcFile(data)(path => f(sqlContext.read.orc(path)))
}
/**
@@ -70,11 +70,11 @@ private[sql] trait OrcTest extends SQLTestUtils {
protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
data: Seq[T], path: File): Unit = {
- data.toDF().write.format("orc").mode(SaveMode.Overwrite).save(path.getCanonicalPath)
+ data.toDF().write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
}
protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
df: DataFrame, path: File): Unit = {
- df.write.format("orc").mode(SaveMode.Overwrite).save(path.getCanonicalPath)
+ df.write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
}
}