Reference URL: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
[donghua@vmxdb01 ~]$ tar zxvf Downloads/spark-2.0.1-bin-hadoop2.7.tgz
[donghua@vmxdb01 ~]$ tar zxvf Downloads/scala-2.11.8.tgz
[donghua@vmxdb01 ~]$ export SPARK_HOME=/home/donghua/spark-2.0.1-bin-hadoop2.7
[donghua@vmxdb01 ~]$ export SCALA_HOME=/home/donghua/scala-2.11.8
[donghua@vmxdb01 ~]$ export export PATH=$SCALA_HOME/bin:$PATH
[donghua@vmxdb01 ~]$ cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.9 | CQL spec 3.4.2 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE TABLE mykeyspace.kv(key text PRIMARY KEY, value int);
cqlsh> INSERT INTO mykeyspace.kv(key, value) VALUES ('key1', 1);
cqlsh> INSERT INTO mykeyspace.kv(key, value) VALUES ('key2', 2);
cqlsh> select * from mykeyspace.kv;
key | value
------+-------
key1 | 1
key2 | 2
[donghua@vmxdb01 ~]$ $SPARK_HOME/bin/spark-shell --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11 --conf spark.cassandra.connection.host=127.0.0.1
Ivy Default Cache set to: /home/donghua/.ivy2/cache
The jars for the packages stored in: /home/donghua/.ivy2/jars
:: loading settings :: url = jar:file:/home/donghua/spark-2.0.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
datastax#spark-cassandra-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found datastax#spark-cassandra-connector;2.0.0-M2-s_2.11 in spark-packages
found commons-beanutils#commons-beanutils;1.8.0 in central
found org.joda#joda-convert;1.2 in central
found joda-time#joda-time;2.3 in central
found io.netty#netty-all;4.0.33.Final in central
found com.twitter#jsr166e;1.1.0 in central
found org.scala-lang#scala-reflect;2.11.8 in central
downloading http://dl.bintray.com/spark-packages/maven/datastax/spark-cassandra-connector/2.0.0-M2-s_2.11/spark-cassandra-connector-2.0.0-M2-s_2.11.jar ...
[SUCCESSFUL ] datastax#spark-cassandra-connector;2.0.0-M2-s_2.11!spark-cassandra-connector.jar (2275ms)
downloading https://repo1.maven.org/maven2/commons-beanutils/commons-beanutils/1.8.0/commons-beanutils-1.8.0.jar ...
[SUCCESSFUL ] commons-beanutils#commons-beanutils;1.8.0!commons-beanutils.jar (599ms)
downloading https://repo1.maven.org/maven2/org/joda/joda-convert/1.2/joda-convert-1.2.jar ...
[SUCCESSFUL ] org.joda#joda-convert;1.2!joda-convert.jar (204ms)
downloading https://repo1.maven.org/maven2/joda-time/joda-time/2.3/joda-time-2.3.jar ...
[SUCCESSFUL ] joda-time#joda-time;2.3!joda-time.jar (466ms)
downloading https://repo1.maven.org/maven2/io/netty/netty-all/4.0.33.Final/netty-all-4.0.33.Final.jar ...
[SUCCESSFUL ] io.netty#netty-all;4.0.33.Final!netty-all.jar (597ms)
downloading https://repo1.maven.org/maven2/com/twitter/jsr166e/1.1.0/jsr166e-1.1.0.jar ...
[SUCCESSFUL ] com.twitter#jsr166e;1.1.0!jsr166e.jar (211ms)
downloading https://repo1.maven.org/maven2/org/scala-lang/scala-reflect/2.11.8/scala-reflect-2.11.8.jar ...
[SUCCESSFUL ] org.scala-lang#scala-reflect;2.11.8!scala-reflect.jar (1151ms)
:: resolution report :: resolve 14983ms :: artifacts dl 5509ms
:: modules in use:
com.twitter#jsr166e;1.1.0 from central in [default]
commons-beanutils#commons-beanutils;1.8.0 from central in [default]
datastax#spark-cassandra-connector;2.0.0-M2-s_2.11 from spark-packages in [default]
io.netty#netty-all;4.0.33.Final from central in [default]
joda-time#joda-time;2.3 from central in [default]
org.joda#joda-convert;1.2 from central in [default]
org.scala-lang#scala-reflect;2.11.8 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 7 | 7 | 7 | 0 || 7 | 7 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
7 artifacts copied, 0 already retrieved (13315kB/56ms)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/10/11 18:45:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/10/11 18:45:39 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://192.168.6.151:4040
Spark context available as 'sc' (master = local[*], app id = local-1476225939323).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._
scala> val rdd = sc.cassandraTable("mykeyspace", "kv")
rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18
scala> println(rdd.count)
2
scala> println(rdd.first)
CassandraRow{key: key1, value: 1}
scala> println(rdd.map(_.getInt("value")).sum)
3.0
scala> val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at
scala> collection.saveToCassandra("mykeyspace", "kv", SomeColumns("key", "value"))
scala> println(rdd.count)
4
scala> sc.cassandraTable("mykeyspace", "kv").select("key", "value").collect()
res13: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{key: key1, value: 1}, CassandraRow{key: key4, value: 4}, CassandraRow{key: key3, value: 3}, CassandraRow{key: key2, value: 2})
scala> sc.cassandraTable("mykeyspace", "kv").select("key", "value").collect().foreach(println)
CassandraRow{key: key1, value: 1}
CassandraRow{key: key4, value: 4}
CassandraRow{key: key3, value: 3}
CassandraRow{key: key2, value: 2}
scala> sc.cassandraTable("mykeyspace", "kv").select("key", "value").where("value>?",2).collect().foreach(println)
CassandraRow{key: key4, value: 4}
CassandraRow{key: key3, value: 3}
scala> :quit
cqlsh> select * from mykeyspace.kv;
key | value
------+-------
key1 | 1
key4 | 4
key3 | 3
key2 | 2
Below error caused by incompatible Scala version and cassandra spark connector. (for example, datastax:spark-cassandra-connector:2.0.0-M2-s_2.10 is using Scala 2.10 instead of 2.11)
scala> println(rdd.count)
[Stage 0:> (0 + 0) / 6]16/10/11 10:48:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
at com.datastax.spark.connector.util.CountingIterator.
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:336)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[Stage 0:> (0 + 2) / 6]16/10/11 10:48:03 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
No comments:
Post a Comment