# SpaceWalk.scala: unmodified version from "M233:
Getting Started with Spark and MongoDB"
donghua@ubuntu:~$ cat SpaceWalk.scala
import com.mongodb.spark._
import com.mongodb.spark.config._
import
com.mongodb.spark.rdd.MongoRDD
import org.bson.Document
val rdd = sc.loadFromMongoDB()
val readConf = ReadConfig( sc )
val readConfig = ReadConfig(
Map("collection" -> "eva" ), Some(readConf))
val newRDD = sc.loadFromMongoDB(
readConfig = readConfig )
def breakoutCrew ( document: Document ): List[(String,Int)] = {
var minutes = 0;
val timeString = document.get(
"Duration").asInstanceOf[String]
if( timeString != null && !timeString.isEmpty ) {
val time = document.get(
"Duration").asInstanceOf[String].split( ":" )
minutes = time(0).toInt * 60 + time(1).toInt
}
import scala.util.matching.Regex
val pattern = new Regex("(\\w+\\s\\w+)")
val names = pattern findAllIn
document.get( "Crew" ).asInstanceOf[String]
var tuples : List[(String,Int)] = List()
for ( name <- :="" minutes="" name="" names="" span="" tuples="tuples">->
return tuples
}
val logs = rdd.flatMap(
breakoutCrew ).reduceByKey( (m1: Int, m2: Int) => ( m1 + m2 ) )
def mapToDocument( tuple: (String,
Int ) ): Document = {
val doc = new Document();
doc.put( "name", tuple._1 )
doc.put( "minutes", tuple._2 )
return doc
}
val writeConf = WriteConfig(sc)
val writeConfig =
WriteConfig(Map("collection" -> "astronautTotals",
"writeConcern.w" -> "majority", "db" ->
"nasa"), Some(writeConf))
logs.map( mapToDocument
).saveToMongoDB( writeConfig )
import
org.apache.spark.sql.SQLContext
import com.mongodb.spark.sql._
// load the first dataframe
"EVAs"
val evadf =
sqlContext.read.mongo()
evadf.printSchema()
evadf.registerTempTable("evas")
// load the 2nd dataframe
"astronautTotals"
case class astronautTotal ( name:
String, minutes: Integer )
val astronautDF =
sqlContext.read.option("collection",
"astronautTotals").mongo[astronautTotal]()
astronautDF.printSchema()
astronautDF.registerTempTable("astronautTotals")
sqlContext.sql("SELECT
astronautTotals.name, astronautTotals.minutes FROM astronautTotals" ).show()
sqlContext.sql("SELECT
astronautTotals.name, astronautTotals.minutes, evas.Vehicle, evas.Duration
FROM " +
"astronautTotals JOIN evas ON astronautTotals.name LIKE
evas.Crew" ).show()
|
donghua@ubuntu:~$ ./spark-1.6.1/bin/spark-shell --packages
org.mongodb.spark:mongo-spark-connector_2.10:0.1 --conf
"spark.mongodb.input.uri=mongodb://127.0.01/nasa.eva" --conf
"spark.mongodb.output.uri=mongodb://127.0.0.1/nasa.astronautTotals"
Ivy Default Cache set to:
/home/donghua/.ivy2/cache
The jars for the packages stored
in: /home/donghua/.ivy2/jars
:: loading settings :: url =
jar:file:/home/donghua/spark-1.6.1/lib/spark-assembly-1.6.1-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.10
added as a dependency
:: resolving dependencies ::
org.apache.spark#spark-submit-parent;1.0
confs: [default]
found
org.mongodb.spark#mongo-spark-connector_2.10;0.1 in central
found
org.mongodb#mongo-java-driver;3.2.2 in central
:: resolution report :: resolve
321ms :: artifacts dl 21ms
:: modules in use:
org.mongodb#mongo-java-driver;3.2.2
from central in [default]
org.mongodb.spark#mongo-spark-connector_2.10;0.1 from central in
[default]
---------------------------------------------------------------------
| | modules ||
artifacts |
| conf | number| search|dwnlded|evicted||
number|dwnlded|
---------------------------------------------------------------------
|
default | 2
| 0 |
0 | 0
|| 2 |
0 |
---------------------------------------------------------------------
:: retrieving ::
org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 2 already
retrieved (0kB/16ms)
log4j:WARN No appenders could be
found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the
log4j system properly.
log4j:WARN See
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile:
org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use
sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__
___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\
version 1.6.1
/_/
Using Scala version 2.10.5 (Java
HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them
evaluated.
Type :help for more information.
16/11/07 22:12:07 WARN Utils: Your
hostname, ubuntu resolves to a loopback address: 127.0.1.1; using
192.168.6.152 instead (on interface ens33)
16/11/07 22:12:07 WARN Utils: Set
SPARK_LOCAL_IP if you need to bind to another address
Spark context available as sc.
16/11/07 22:12:12 WARN Connection:
BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/11/07 22:12:13 WARN Connection:
BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/11/07 22:12:20 WARN
ObjectStore: Version information not found in metastore. hive.metastore.schema.verification
is not enabled so recording the schema version 1.2.0
16/11/07 22:12:20 WARN
ObjectStore: Failed to get database default, returning NoSuchObjectException
16/11/07 22:12:25 WARN Connection:
BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/11/07 22:12:26 WARN Connection:
BoneCP specified but not present in CLASSPATH (or one of dependencies)
SQL context available as
sqlContext.
scala> :load
/home/donghua/SpaceWalk.scala
Loading
/home/donghua/SpaceWalk.scala...
import com.mongodb.spark._
import com.mongodb.spark.config._
import
com.mongodb.spark.rdd.MongoRDD
import org.bson.Document
rdd:
com.mongodb.spark.rdd.MongoRDD[org.bson.Document] = MongoRDD[0] at RDD at
MongoRDD.scala:160
readConf:
com.mongodb.spark.config.ReadConfig.Self =
ReadConfig(nasa,eva,Some(mongodb://127.0.01/nasa.eva),1000,64,_id,15,ReadPreferenceConfig(primary,None),ReadConcernConfig(None))
readConfig: com.mongodb.spark.config.ReadConfig
=
ReadConfig(nasa,eva,Some(mongodb://127.0.01/nasa.eva),1000,64,_id,15,ReadPreferenceConfig(primary,None),ReadConcernConfig(None))
newRDD:
com.mongodb.spark.rdd.MongoRDD[org.bson.Document] = MongoRDD[1] at RDD at
MongoRDD.scala:160
breakoutCrew: (document:
org.bson.Document)List[(String, Int)]
logs:
org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at
mapToDocument: (tuple: (String,
Int))org.bson.Document
writeConf:
com.mongodb.spark.config.WriteConfig.Self =
WriteConfig(nasa,astronautTotals,15,Some(mongodb://127.0.0.1/nasa.astronautTotals),WriteConcernConfig(None,None,None,None))
writeConfig:
com.mongodb.spark.config.WriteConfig =
WriteConfig(nasa,astronautTotals,15,Some(mongodb://127.0.0.1/nasa.astronautTotals),WriteConcernConfig(None,Some(majority),None,None))
import
org.apache.spark.sql.SQLContext
import com.mongodb.spark.sql._
evadf:
org.apache.spark.sql.DataFrame = [Country: string, Crew: string, Date:
string, Duration: string, EVA #: conflict, Purpose: string, Vehicle: string,
_id: string]
root
|-- Country: string (nullable = true)
|-- Crew: string (nullable = true)
|-- Date: string (nullable = true)
|-- Duration: string (nullable = true)
|-- EVA #: conflict (nullable = true)
|-- Purpose: string (nullable = true)
|-- Vehicle: string (nullable = true)
|-- _id: string (nullable = true)
defined class astronautTotal
astronautDF:
org.apache.spark.sql.DataFrame = [name: string, minutes: int]
root
|-- name: string (nullable = true)
|-- minutes: integer (nullable = true)
+--------------------+-------+
| name|minutes|
+--------------------+-------+
| Ken Bowersox| 797|
| Don Peterson| 257|
|Vladimir Dzhanibekov| 512|
| Mike Collins| 89|
| Claude Nicollier| 490|
|Alexandr Ivanchenkov| 125|
| Woody Spring| 746|
| Ron Garan| 1623|
| Vladimir Lyakhov| 427|
|
Patrick Forrester| 705|
| Harrison Schmidt| 1333|
|
Gennady Strekalov| 1322|
| Yuri Lonchakov| 627|
| Bill Pogue| 814|
| Valentin Lebedev| 153|
| Hans Schlegel| 405|
| Gennady Padalka| 1601|
| Louc Chretien| 357|
|
Mikhail Kornienko| 402|
| Dave Leestma| 209|
+--------------------+-------+
only showing top 20 rows
+----------------+-------+-----------+--------+
| name|minutes| Vehicle|Duration|
+----------------+-------+-----------+--------+
| Mike Collins| 89|
Gemini X| 0:50|
| Mike Collins| 89|
Gemini X| 0:39|
|
Richard Gordon| 174| Gemini XI| 0:44|
|
Richard Gordon| 174| Gemini XI| 2:10|
|
Alexei Leonov| 12| Voskhod 2| 0:12|
|Russ Schweickart| 51|
Apollo 9| 0:51|
| Ed White| 36|
Gemini IV| 0:36|
|
Eugene Cernan| 1460|Gemini
IX-A| 2:07|
| Buzz Aldrin| 487| Gemini XII| 2:29|
| Buzz Aldrin| 487| Gemini XII| 2:06|
| Buzz Aldrin| 487| Gemini XII| 0:55|
| David Scott| 1200|Gemini VIII| 0:00|
| David Scott| 1200|
Apollo 9| 0:47|
| David Scott| 1200|
Apollo 15| 0:33|
| Al Worden| 39|
Apollo 15| 0:39|
|
Tom Mattingly| 83| Apollo 16| 1:23|
| Ron Evans| 67|
Apollo 17| 1:07|
+----------------+-------+-----------+--------+
|
No comments:
Post a Comment