// 方法2:由json文件和类直接映射成DataSet val ds3: Dataset[Student] = spark.read.json("./data/json").as[Student]
// 方法3:读取外部文件直接加载DataSet val ds4: Dataset[String] = spark.read.textFile("./data/people.txt") val ds5: Dataset[Person] = ds4.map(line => { val arr: Array[String] = line.split(",") Person(arr(0).toInt, arr(1).toString, arr(2).toInt, arr(3).toDouble) })
实现WordCount
val linesDs: Dataset[String] = spark.read.textFile("./data/words").as[String] val words: Dataset[String] = linesDs.flatMap(line=>{line.split(" ")})
// 统计每个单词的个数并从大到小排序 val groupDs: RelationalGroupedDataset = words.groupBy($"value" as "word") val aggDs: DataFrame = groupDs.agg(count("*") as "totalCount") val result: Dataset[Row] = aggDs.sort($"totalCount" desc) result.show(100, truncate = false)
// 通过sparkSQL语句统计每个单词的个数并从大到小排序 val frame: DataFrame = words.withColumnRenamed("value","word") frame.createOrReplaceTempView("myWords") spark.sql("select word,count(word) as totalCount from myWords group by word order by totalCount desc").show()
val df1: DataFrame = sql("select * from students where age > 18 and name = 'zhangsan5'")
// 访问全局的表的时候需要加上global_temp.表名,这样就可以跨seeesion访问 // 用select语句中的where,可以用模糊匹配,比如列名 like 'wang%' // 一个sc可以有多个会话,通过会话.newSession()来创建 spark.newSession().sql("select * from Students where name like 'wang%'").show()
// 创建一个某个会话下的新视图会报错!!! spark.newSession().sql("select * from students").show()
数据库
MySQL
// 方法1 val properties = newProperties() properties.setProperty("user", "root") properties.setProperty("password", "11111111")
//默认用mysql-connector-java里面的包 val person: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark", "person", properties)
// 方法4 val properties = newProperties() properties.setProperty("user", "root") properties.setProperty("password", "11111111")
spark.read.jdbc("jdbc:mysql://localhost:3306/spark", table = "(select person.id,person.name,person.age,score.score from person ,score where person.id = score.id) T", properties)
spark.sql("show databases") spark.sql("use spark") spark.sql("show tables") spark.sql("drop table if exists student_infos") spark.sql("create table if not exists student_infos(name string, age int) row format delimited fields terminated by '\t'") spark.sql("load data local inpath './data/student_infos' into table student_infos") spark.sql("drop table if exists student_scores") spark.sql("create table if not exists student_scores ( name string, score int) row format delimited fields terminated by '\t'") spark.sql("load data local inpath './data/student_scores' into table student_scores") val result: DataFrame = spark.sql("select si.name, si.age, ss.score from student_infos si, student_scores ss where si.name = ss.name") spark.sql("drop table if exists good_student_infos")
// 创建一个具有多种数据格式的hive表 spark.sql("drop table if exists t") spark.sql("create table if not exists t(id struct<id1:int,id2:int,id3:int>,name array<string>,xx map<int,string>) " + " row format delimited " + "fields terminated by ' ' " + "collection items terminated by ',' " + "map keys terminated by ':' " + "lines terminated by '\n'")
spark.sql("load data local inpath './data/student_t' overwrite into table t") val result2: DataFrame = spark.sql("select * from t") result2.show(truncate = false)
val df1: DataFrame = sql("select * from students where age > 18 and name = 'zhangsan5'")
// 访问全局的表的时候需要加上global_temp.表名,这样就可以跨seeesion访问 // 用select语句中的where,可以用模糊匹配,比如列名 like 'wang%' // 一个sc可以有多个会话,通过会话.newSession()来创建 spark.newSession().sql("select * from Students where name like 'wang%'").show()
// 创建一个某个会话下的新视图会报错!!! spark.newSession().sql("select * from students").show()
DataFrame使用
val df: DataFrame = spark.read.json("./data/json")
// 查询方法 val df1: DataFrame = df.select("name") val df2: DataFrame = df.select("name", "age") val df3: DataFrame = df.select($"name".as("studentNanme"), $"age") val df4: DataFrame = df.select($"name", ($"age" + 1).as("addAge")) val df5: DataFrame = df.select(df.col("name") , (df.col("age") + 1).as("addAge"))
// 过滤方法 val df1: Dataset[Row] = df.filter("age > 18") val df2: Dataset[Row] = df.filter($"age">18) val df3: Dataset[Row] = df.filter(df.col("age")>18) val df4: Dataset[Row] = df.filter(df.col("name").equalTo("zhangsan")) val df5: Dataset[Row] = df.filter("name = 'zhangsan4' or name = 'zhangsan5'")
// 排序方法 val df1: Dataset[Row] = df.sort($"age".asc, $"name".desc) val df2: Dataset[Row] = df.sort(df.col("age").asc, df.col("name").desc) val df3: DataFrame = df.select(df.col("name").as("studentName"), df.col("age").alias("studentAge"))
// 分组方法 val df1: DataFrame = df.groupBy("age").count()
objectOverFunctionOnHive{ defmain(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .appName("OverFunctionOnHive") .master("local") .enableHiveSupport() .getOrCreate()
spark.sparkContext.setLogLevel("Error")
spark.sql("use spark") spark.sql("drop table if exists sales") spark.sql("create table if not exists sales (date string, category string, price Int)" + "row format delimited fields terminated by '\t'") spark.sql("load data local inpath './data/sales' into table sales" )
/** * rank 在每个组内从1开始 * 5 A 200 -- 1 * 3 A 100 -- 2 * 4 A 80 -- 3 * 7 A 60 -- 4 * * 1 B 100 -- 1 * 8 B 90 -- 2 * 6 B 80 -- 3 * 1 B 70 -- 4 */ val result: DataFrame = spark.sql( "select " + "date, category, price " + "from (" + "select " + "date, category, price, row_number() over (partition by category order by price desc) rank " + "from sales) t " + "where rank <= 3" )
//读取保存到hive中的sales val salesDF: DataFrame = spark.sql("select * from saleResult") salesDF.show(100) } }
基于MySQL的开窗函数
objectOverFunctionOnMySQL{ defmain(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .appName("OverFunctionOnMySQL") .master("local") .getOrCreate()
val properties = newProperties() properties.setProperty("user", "root") properties.setProperty("password", "11111111")
val sql: String = "select * from (select date, category, price, row_number() over (partition by category order by price desc) 'rank' from sales) t where t.rank <= 3"