Spark SQL

  • Spark SQL is Apache Spark’s module for working with structured data.
  • SparkSQL是apache Spark用来处理结构化数据的一个模块

的四大特性

  • ==1、易整合(Integrated)==

    • results = spark.sql("SELECT * FROM people")
      
  • ==2、统一的数据源访问(Uniform Data Access)==

    • spark.read.json("s3n://...")
      spark.read.text("s3n://...")
      spark.read.parquet("s3n://...")
      
  • ==3、兼容hive(Hive Integration)==

  • ==4、支持标准的数据库连接(Standard Connectivity)==

    • spark.read.jdbc(---)
      

DataFrame概述

  • 在Spark中,DataFrame是一种==以RDD为基础的分布式数据集==,类似于==传统数据库的二维表格==
  • DataFrame带有==Schema元信息==,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化
  • DataFrame可以从很多数据源构建
    • 比如:已经存在的RDD、结构化文件、外部数据库、Hive表。
  • RDD可以把它内部元素看成是一个java对象
  • DataFrame可以把内部是一个Row对象,它表示一行一行的数据

DataFrame和RDD的优缺点

  • ==1、RDD==

    • ==优点==

      • 1、编译时类型安全
        • 开发会进行类型检查,在编译的时候及时发现错误
      • 2、具有面向对象编程的风格
    • ==缺点==

      • 1、构建大量的java对象占用了大量heap堆空间,导致频繁的GC

        由于数据集RDD它的数据量比较大,后期都需要存储在heap堆中,这里有heap堆中的内存空间有限,出现频繁的垃圾回收(GC),程序在进行垃圾回收的过程中,所有的任务都是暂停。影响程序执行的效率
        
      • 2、数据的序列化和反序列性能开销很大

          在分布式程序中,对象(对象的内容和结构)是先进行序列化,发送到其他服务器,进行大量的网络传输,然后接受到这些序列化的数据之后,再进行反序列化来恢复该对象
        
  • ==2、DataFrame==

    • ==DataFrame引入了schema元信息和off-heap(堆外)==
    • ==优点==
      • 1、DataFrame引入off-heap,大量的对象构建直接使用操作系统层面上的内存,不在使用heap堆中的内存,这样一来heap堆中的内存空间就比较充足,不会导致频繁GC,程序的运行效率比较高,它是解决了RDD构建大量的java对象占用了大量heap堆空间,导致频繁的GC这个缺点。
      • 2、DataFrame引入了schema元信息—就是数据结构的描述信息,后期spark程序中的大量对象在进行网络传输的时候,只需要把数据的内容本身进行序列化就可以,数据结构信息可以省略掉。这样一来数据网络传输的数据量是有所减少,数据的序列化和反序列性能开销就不是很大了。它是解决了RDD数据的序列化和反序列性能开销很大这个缺点
      • ==缺点==
        • DataFrame引入了schema元信息和off-heap(堆外)它是分别解决了RDD的缺点,同时它也丢失了RDD的优点
          • 1、编译时类型不安全
            • 编译时不会进行类型的检查,这里也就意味着前期是无法在编译的时候发现错误,只有在运行的时候才会发现
          • 2、不在具有面向对象编程的风格
// 1. 读取文本文件
val personDF=spark.read.text("/person.txt")
val peopleDF=spark.read.json("/people.json")
val usersDF=spark.read.parquet("/users.parquet")

// 2. 加载数据
val rdd1=sc.textFile("/person.txt").map(x=>x.split(" "))
    //定义一个样例类
    case class Person(id:String,name:String,age:Int)
    //把rdd与样例类进行关联
    val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))
    //把rdd转换成DataFrame
    val personDF=personRDD.toDF
// 3.语法风格
// 3.1 DSL
        personDF.select("name").show
        personDF.select($"name").show
        personDF.select(col("name").show
        //实现age+1
         personDF.select($"name",$"age",$"age"+1)).show   
        //实现age大于30过滤
         personDF.filter($"age" > 30).show
         //按照age分组统计次数
         personDF.groupBy("age").count.show 
        //按照age分组统计次数降序
         personDF.groupBy("age").count().sort($"count".desc)show  
// 3.2 sql
        //DataFrame注册成表
        personDF.createTempView("person")

        //使用SparkSession调用sql方法统计查询
        spark.sql("select * from person").show
        spark.sql("select name from person").show
        spark.sql("select name,age from person").show

DataSet概述

  • DataSet是分布式的数据集合,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。
  • DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型和可以用强大lambda函数)以及使用了Spark SQL优化的执行引擎。
1. 假设RDD中的两行数据长这样
        1,张三,23
        2,李四,35
2.那么DataFrame中的数据长这样
                ID:String    Name:String    Age:int
                1                张三                23
                2                李四                35
3.Dataset中的数据长这样 
            value:String
        1,张三,23
        2,李四,35
  或者
  value:People(age:bigint,id:bigint,name:string)
        People(id=1,name="张三",age=23)
        People(id=2,name="李四",age=23)
DataSet包含了DataFrame的功能,Spark2.0中两者统一,DataFrame表示为DataSet[Row],即DataSet的子集。
(1)DataSet可以在编译时检查类型
(2)并且是面向对象的编程接口

DataFrame DataSet转换 构建dataset

// 把一个DataFrame转换成DataSet
val dataSet=dataFrame.as[强类型]
//  2、把一个DataSet转换成DataFrame
val dataFrame=dataSet.toDF

// 补充说明,可以从dataFrame和dataSet获取得到rdd
val rdd1=dataFrame.rdd
val rdd2=dataSet.rdd

// 1、 通过sparkSession调用createDataset方法
  val ds=spark.createDataset(1 to 10) //scala集合
  val ds=spark.createDataset(sc.textFile("/person.txt"))  //rdd

// 2、使用scala集合和rdd调用toDS方法
  sc.textFile("/person.txt").toDS
  List(1,2,3,4,5).toDS

// 3、把一个DataFrame转换成DataSet
  val dataSet=dataFrame.as[强类型]

// 4、通过一个DataSet转换生成一个新的DataSet
   List(1,2,3,4,5).toDS.map(x=>x*10)

// 5、将rdd与Row对象进行关联
    val rowRDD: RDD[Row] = data.map(x=>Row(x(0),x(1),x(2).toInt))
    //指定dataFrame的schema信息   
    //这里指定的字段个数和类型必须要跟Row对象保持一致
    val schema=StructType(
        StructField("id",StringType)::
        StructField("name",StringType)::
        StructField("age",IntegerType)::Nil
    )
    val dataFrame: DataFrame = spark.createDataFrame(rowRDD,schema)

示例代码

// EG
 // 1、构建SparkSession对象,开启hive支持
    val spark: SparkSession = SparkSession.builder()
      .appName("HiveSupport")
      .master("local[2]")
      .enableHiveSupport() //开启对hive的支持
      .getOrCreate()

// 2. 读取mysql数据
        val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
        val url="jdbc:mysql://node03:3306/spark"
        val tableName="user"
        val properties = new Properties()
      properties.setProperty("user","root")
      properties.setProperty("password","123456")
   val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)

// 3. 保存数据到mysql表中
     //mode:指定数据的插入模式
        //overwrite: 表示覆盖,如果表不存在,事先帮我们创建
        //append   :表示追加, 如果表不存在,事先帮我们创建
        //ignore   :表示忽略,如果表事先存在,就不进行任何操作
        //error    :如果表事先存在就报错(默认选项)
    result.write.mode("append").jdbc(url,"kaikeba",properties)

自定义函数

//小写转大写
sparkSession.udf.register("low2Up",new UDF1[String,String]() {
  override def call(t1: String): String = {
    t1.toUpperCase
  }
},StringType)
//大写转小写
sparkSession.udf.register("up2low",(x:String)=>x.toLowerCase)
// 把数据文件中的单词统一转换成大小写
sparkSession.sql("select  value from t_udf").show()
sparkSession.sql("select  low2Up(value) from t_udf").show()
sparkSession.sql("select  up2low(value) from t_udf").show()