Monday, November 7, 2016

run scala files inside spark-shell



# SpaceWalk.scala: unmodified version from "M233: Getting Started with Spark and MongoDB"

donghua@ubuntu:~$ cat SpaceWalk.scala
import com.mongodb.spark._
import com.mongodb.spark.config._
import com.mongodb.spark.rdd.MongoRDD
import org.bson.Document

val rdd = sc.loadFromMongoDB()

val readConf = ReadConfig( sc )
val readConfig = ReadConfig( Map("collection" -> "eva" ), Some(readConf))
val newRDD = sc.loadFromMongoDB( readConfig = readConfig )

def  breakoutCrew (  document: Document  ): List[(String,Int)]  = {

  var minutes = 0;
  val timeString = document.get( "Duration").asInstanceOf[String]
  if( timeString != null && !timeString.isEmpty ) {
    val time =  document.get( "Duration").asInstanceOf[String].split( ":" )
    minutes = time(0).toInt * 60 + time(1).toInt
  }

  import scala.util.matching.Regex
  val pattern = new Regex("(\\w+\\s\\w+)")
  val names =  pattern findAllIn document.get( "Crew" ).asInstanceOf[String]
  var tuples : List[(String,Int)] = List()
  for ( name <- :="" minutes="" name="" names="" span="" tuples="tuples">

  return tuples
}

val logs = rdd.flatMap( breakoutCrew ).reduceByKey( (m1: Int, m2: Int) => ( m1 + m2 ) )

def mapToDocument( tuple: (String, Int )  ): Document = {
  val doc = new Document();
  doc.put( "name", tuple._1 )
  doc.put( "minutes", tuple._2 )

  return doc
}

val writeConf = WriteConfig(sc)
val writeConfig = WriteConfig(Map("collection" -> "astronautTotals", "writeConcern.w" -> "majority", "db" -> "nasa"), Some(writeConf))

logs.map( mapToDocument ).saveToMongoDB( writeConfig )

import org.apache.spark.sql.SQLContext
import com.mongodb.spark.sql._

// load the first dataframe "EVAs"
val evadf = sqlContext.read.mongo()
evadf.printSchema()
evadf.registerTempTable("evas")

// load the 2nd dataframe "astronautTotals"
case class astronautTotal ( name: String, minutes: Integer )
val astronautDF = sqlContext.read.option("collection", "astronautTotals").mongo[astronautTotal]()
astronautDF.printSchema()
astronautDF.registerTempTable("astronautTotals")

sqlContext.sql("SELECT astronautTotals.name, astronautTotals.minutes FROM astronautTotals"  ).show()


sqlContext.sql("SELECT astronautTotals.name, astronautTotals.minutes, evas.Vehicle, evas.Duration FROM " +
  "astronautTotals JOIN evas ON astronautTotals.name LIKE evas.Crew"  ).show()



donghua@ubuntu:~$ ./spark-1.6.1/bin/spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.10:0.1 --conf "spark.mongodb.input.uri=mongodb://127.0.01/nasa.eva" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/nasa.astronautTotals"
Ivy Default Cache set to: /home/donghua/.ivy2/cache
The jars for the packages stored in: /home/donghua/.ivy2/jars
:: loading settings :: url = jar:file:/home/donghua/spark-1.6.1/lib/spark-assembly-1.6.1-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found org.mongodb.spark#mongo-spark-connector_2.10;0.1 in central
        found org.mongodb#mongo-java-driver;3.2.2 in central
:: resolution report :: resolve 321ms :: artifacts dl 21ms
        :: modules in use:
        org.mongodb#mongo-java-driver;3.2.2 from central in [default]
        org.mongodb.spark#mongo-spark-connector_2.10;0.1 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 2 already retrieved (0kB/16ms)
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.
16/11/07 22:12:07 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.6.152 instead (on interface ens33)
16/11/07 22:12:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context available as sc.
16/11/07 22:12:12 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/11/07 22:12:13 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/11/07 22:12:20 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/11/07 22:12:20 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/11/07 22:12:25 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/11/07 22:12:26 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
SQL context available as sqlContext.

scala> :load /home/donghua/SpaceWalk.scala
Loading /home/donghua/SpaceWalk.scala...
import com.mongodb.spark._
import com.mongodb.spark.config._
import com.mongodb.spark.rdd.MongoRDD
import org.bson.Document
rdd: com.mongodb.spark.rdd.MongoRDD[org.bson.Document] = MongoRDD[0] at RDD at MongoRDD.scala:160
readConf: com.mongodb.spark.config.ReadConfig.Self = ReadConfig(nasa,eva,Some(mongodb://127.0.01/nasa.eva),1000,64,_id,15,ReadPreferenceConfig(primary,None),ReadConcernConfig(None))
readConfig: com.mongodb.spark.config.ReadConfig = ReadConfig(nasa,eva,Some(mongodb://127.0.01/nasa.eva),1000,64,_id,15,ReadPreferenceConfig(primary,None),ReadConcernConfig(None))
newRDD: com.mongodb.spark.rdd.MongoRDD[org.bson.Document] = MongoRDD[1] at RDD at MongoRDD.scala:160
breakoutCrew: (document: org.bson.Document)List[(String, Int)]
logs: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at :39
mapToDocument: (tuple: (String, Int))org.bson.Document
writeConf: com.mongodb.spark.config.WriteConfig.Self = WriteConfig(nasa,astronautTotals,15,Some(mongodb://127.0.0.1/nasa.astronautTotals),WriteConcernConfig(None,None,None,None))
writeConfig: com.mongodb.spark.config.WriteConfig = WriteConfig(nasa,astronautTotals,15,Some(mongodb://127.0.0.1/nasa.astronautTotals),WriteConcernConfig(None,Some(majority),None,None))
import org.apache.spark.sql.SQLContext
import com.mongodb.spark.sql._
evadf: org.apache.spark.sql.DataFrame = [Country: string, Crew: string, Date: string, Duration: string, EVA #: conflict, Purpose: string, Vehicle: string, _id: string]
root
 |-- Country: string (nullable = true)
 |-- Crew: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EVA #: conflict (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- Vehicle: string (nullable = true)
 |-- _id: string (nullable = true)

defined class astronautTotal
astronautDF: org.apache.spark.sql.DataFrame = [name: string, minutes: int]
root
 |-- name: string (nullable = true)
 |-- minutes: integer (nullable = true)

+--------------------+-------+
|                name|minutes|
+--------------------+-------+
|        Ken Bowersox|    797|
|        Don Peterson|    257|
|Vladimir Dzhanibekov|    512|
|        Mike Collins|     89|
|    Claude Nicollier|    490|
|Alexandr Ivanchenkov|    125|
|        Woody Spring|    746|
|           Ron Garan|   1623|
|    Vladimir Lyakhov|    427|
|   Patrick Forrester|    705|
|    Harrison Schmidt|   1333|
|   Gennady Strekalov|   1322|
|      Yuri Lonchakov|    627|
|          Bill Pogue|    814|
|    Valentin Lebedev|    153|
|       Hans Schlegel|    405|
|     Gennady Padalka|   1601|
|       Louc Chretien|    357|
|   Mikhail Kornienko|    402|
|        Dave Leestma|    209|
+--------------------+-------+
only showing top 20 rows

+----------------+-------+-----------+--------+
|            name|minutes|    Vehicle|Duration|
+----------------+-------+-----------+--------+
|    Mike Collins|     89|   Gemini X|    0:50|
|    Mike Collins|     89|   Gemini X|    0:39|
|  Richard Gordon|    174|  Gemini XI|    0:44|
|  Richard Gordon|    174|  Gemini XI|    2:10|
|   Alexei Leonov|     12|  Voskhod 2|    0:12|
|Russ Schweickart|     51|   Apollo 9|    0:51|
|        Ed White|     36|  Gemini IV|    0:36|
|   Eugene Cernan|   1460|Gemini IX-A|    2:07|
|     Buzz Aldrin|    487| Gemini XII|    2:29|
|     Buzz Aldrin|    487| Gemini XII|    2:06|
|     Buzz Aldrin|    487| Gemini XII|    0:55|
|     David Scott|   1200|Gemini VIII|    0:00|
|     David Scott|   1200|   Apollo 9|    0:47|
|     David Scott|   1200|  Apollo 15|    0:33|
|       Al Worden|     39|  Apollo 15|    0:39|
|   Tom Mattingly|     83|  Apollo 16|    1:23|
|       Ron Evans|     67|  Apollo 17|    1:07|
+----------------+-------+-----------+--------+


No comments:

Post a Comment