aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/scala/spark/examples/CassandraTest.scala48
1 files changed, 43 insertions, 5 deletions
diff --git a/examples/src/main/scala/spark/examples/CassandraTest.scala b/examples/src/main/scala/spark/examples/CassandraTest.scala
index 6b9fd502e2..2cc62b9fe9 100644
--- a/examples/src/main/scala/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/spark/examples/CassandraTest.scala
@@ -1,17 +1,16 @@
package spark.examples
import org.apache.hadoop.mapreduce.Job
-import org.apache.cassandra.hadoop.{ConfigHelper, ColumnFamilyInputFormat}
-import org.apache.cassandra.thrift.{IndexExpression, SliceRange, SlicePredicate}
+import org.apache.cassandra.hadoop.{ColumnFamilyOutputFormat, ConfigHelper, ColumnFamilyInputFormat}
+import org.apache.cassandra.thrift._
import spark.{RDD, SparkContext}
-import SparkContext._
+import spark.SparkContext._
import java.nio.ByteBuffer
import java.util.SortedMap
import org.apache.cassandra.db.IColumn
import org.apache.cassandra.utils.ByteBufferUtil
import scala.collection.JavaConversions._
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
/*
@@ -44,8 +43,15 @@ object CassandraTest {
ConfigHelper.setInputRpcPort(job.getConfiguration(), args(2))
+ ConfigHelper.setOutputInitialAddress(job.getConfiguration(), args(1))
+
+ ConfigHelper.setOutputRpcPort(job.getConfiguration(), args(2))
+
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
+ ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")
+
+
val predicate = new SlicePredicate()
val sliceRange = new SliceRange()
sliceRange.setStart(Array.empty[Byte])
@@ -55,6 +61,8 @@ object CassandraTest {
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+ ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+
//Make a new Hadoop RDD
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[ColumnFamilyInputFormat],
@@ -74,6 +82,33 @@ object CassandraTest {
counts.collect().foreach {
case (word, count) => println(word + ":" + count)
}
+
+ counts.map {
+ case (word, count) => {
+ val colWord = new org.apache.cassandra.thrift.Column()
+ colWord.setName(ByteBufferUtil.bytes("word"))
+ colWord.setValue(ByteBufferUtil.bytes(word))
+ colWord.setTimestamp(System.currentTimeMillis)
+
+ val colCount = new org.apache.cassandra.thrift.Column()
+ colCount.setName(ByteBufferUtil.bytes("wcount"))
+ colCount.setValue(ByteBufferUtil.bytes(count.toLong))
+ colCount.setTimestamp(System.currentTimeMillis)
+
+
+ val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)
+
+ val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: Nil
+ mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
+ mutations.get(0).column_or_supercolumn.setColumn(colWord)
+
+ mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
+ mutations.get(1).column_or_supercolumn.setColumn(colCount)
+ (outputkey, mutations)
+ }
+ }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
+ classOf[ColumnFamilyOutputFormat], job.getConfiguration)
+
}
}
@@ -81,6 +116,9 @@ object CassandraTest {
create keyspace casDemo;
use casDemo;
+create column family WordCount with comparator = UTF8Type;
+update column family WordCount with column_metadata = [{column_name: word, validation_class: UTF8Type}, {column_name: wcount, validation_class: LongType}];
+
create column family Words with comparator = UTF8Type;
update column family Words with column_metadata = [{column_name: book, validation_class: UTF8Type}, {column_name: para, validation_class: UTF8Type}];