Sqoop简介

  • Sqoop是apache旗下的一款 ”==Hadoop和关系数据库之间传输数据==”的工具
    • ==导入数据== import
      • 将MySQL,Oracle导入数据到Hadoop的HDFS、HIVE、HBASE等数据存储系统
    • ==导出数据== export
      • 从Hadoop的文件系统中导出数据到关系数据库

sqoop

  • Sqoop的工作机制

    • 将导入和导出的命令翻译成mapreduce程序实现
    • 在翻译出的mapreduce中主要是对inputformat和outputformat进行定制
  • Sqoop基本架构

    • sqoop在发展中的过程中演进出来了两种不同的架构.架构演变史

sqoop1

版本号为1.4.x0
  • ==sqoop2的架构图==

sqoop2

版本号为1.99x为sqoop2 
在架构上:sqoop2引入了sqoop server,对connector实现了集中的管理 
访问方式:REST API、 JAVA API、 WEB UI以及CLI控制台方式进行访问 

sqoop1 VS sqoop2

Sqoop安装部署

点击查看

数据导入

  • 导入单个表从RDBMS到HDFS。表中的每一行被视为HDFS的记录。所有记录都存储为文本文件的文本数据(或者Avro、sequence文件等二进制数据)
# 命令行查看帮助文档
sqoop list-databases --help

# 列出node03上mysql数据库中所有的数据库名称
sqoop list-databases --connect jdbc:mysql://node03:3306/ --username root --password 123456

# 查看某一个数据库下面的所有数据表
sqoop list-tables --connect jdbc:mysql://node03:3306/hive --username root --password 123456

# 导入数据库表数据到HDFS
sqoop import \
  --connect jdbc:mysql://node02:3306/userdb \
  --username root   \
  --password 123456 \
  --table emp \
  --m 1

# 导出数据到指定目录,指定分隔符
sqoop import  \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456 \
--delete-target-dir \
--table emp  \
--target-dir /sqoop/emp1 \
--fields-terminated-by '#' \
--m 1


#参数解释
--connect   指定mysql链接地址
--username  连接mysql的用户名
--password  连接mysql的密码
--table     指定要导入的mysql表名称
--m:        表示这个MR程序需要多少个MapTask去运行,默认为4
--target-dir    来指定导出目的地,
--delete-target-dir 来判断导出目录是否存在,如果存在就删掉
--fields-terminated-by '#' 指定数据的分隔符
# 默认路径是/user/hadoop下
--  导入关系表到Hive中
create database sqooptohive;

create external table sqooptohive.emp_hive(id int,name string,deg string,salary double ,dept string) row format delimited fields terminated by '\001';
# 导入数据到hive表
sqoop import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456 \
--table emp \
--fields-terminated-by '\001' \
--hive-import \
--hive-table sqooptohive.emp_hive \
--hive-overwrite \
--delete-target-dir \
--m 1

# 导入数据库表数据到hive中(并自动创建hive表)
sqoop import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456 \
--table emp \
--hive-database sqooptohive \
--hive-table emp1 \
--hive-import \
--m 1 

# 导入表数据子集
sqoop import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456 \
--table emp \
--target-dir /sqoop/emp_where \
--delete-target-dir \
--where "dept = 'TP'" \
--m 1 

# sql语句查找导入hive
sqoop import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/emp_sql \
--delete-target-dir \
--query 'select * from emp where salary >30000 and $CONDITIONS' \
--m 1

##  $CONTITONS是linux系统的变量,如果你想通过并行的方式导入结果,每个map task需要执行sql查询后脚语句的副本,结果会根据sqoop推测的边界条件分区。query必须包含$CONDITIONS。这样每个sqoop程序都会被替换为一个独立的条件。同时你必须指定 --split-by '字段',后期是按照字段进行数据划分,最后可以达到多个MapTask并行运行。

sqoop import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/emp_sql_2 \
--delete-target-dir \
--query 'select * from emp where salary >30000 and $CONDITIONS' \
--split-by 'id' \
--m 2


sqoop import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/emp_sql_2 \
--delete-target-dir \
--query 'select * from emp where id >1 and $CONDITIONS' \
--split-by 'salary' \
--m 2

sqoop import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/emp_sql_2 \
--delete-target-dir \
--query 'select * from emp where id >1 and $CONDITIONS' \
--split-by 'id' \
--m 7
## --split-by '字段': 后期按照字段进行数据划分实现并行运行多个MapTask。

增量导入

  • 在实际工作当中,数据的导入很多时候都是==只需要导入增量数据即可==,并不需要将表中的数据全部导入到hive或者hdfs当中去,肯定会出现重复的数据的状况,所以我们一般都是选用一些字段进行增量的导入,为了支持增量的导入,sqoop也给我们考虑到了这种情况并且支持增量的导入数据

  • 增量导入是仅导入新添加的表中的行的技术。

  • 它需要添加 ==‘incremental’, ‘check-column’, 和 ‘last-value’==选项来执行增量导入。

    --incremental <mode>
    --check-column <column name>
    --last value <last check column value>
    
  • ==第一种增量导入实现==

    • ==基于递增列的增量数据导入(Append方式)==
    • 导入emp表当中id大于1202的所有数据
      • 注意:==这里不能加上 –delete-target-dir 参数,添加就报错==

    ~~~shell
    sqoop import \
    –connect jdbc:mysql://node03:3306/userdb \
    –username root \
    –password 123456 \
    –table emp \
    –incremental append \
    –check-column id \
    –last-value 1202 \
    –target-dir /sqoop/increment \
    –m 1

##参数解释
–incremental 这里使用基于递增列的增量数据导入
–check-column 递增列字段
–last-value 指定上一次导入中检查列指定字段最大值
–target-dir 数据导入的目录



* ==第二种增量导入实现==

  * ==基于时间列的增量数据导入(LastModified方式)==

    * 此方式要求原有表中有time字段,它能指定一个时间戳
      * user表结构和数据

    ![table_user](https://kflys.gitee.io/upic/2020/04/02/uPic/sqoop%E6%95%B0%E6%8D%AE%E8%BF%81%E7%A7%BB%E5%B7%A5%E5%85%B7/assets//table_user.png)

  ~~~shell
  sqoop import \
  --connect jdbc:mysql://node03:3306/userdb \
  --username root \
  --password 123456  \
  --table user \
  --incremental lastmodified  \
  --check-column createTime  \
  --last-value '2019-10-01 10:30:00'  \
  --target-dir /sqoop/increment2 \
  --m 1

  ##参数解释
  --incremental   这里使用基于时间列的增量导入
  --check-column  时间字段
  --last-value    指定上一次导入中检查列指定字段最大值
  --target-dir    数据导入的目录
                  如果该目录存在(可能已经有数据)
                  再使用的时候需要添加 --merge-key or --append
          --merge-key 指定合并key(对于有修改的)
          --append    直接追加修改的数据

mysql导入hbase

  • 实现把一张mysql表数据导入到hbase中
sqoop import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456  \
--table emp \
--hbase-table  mysqluser \
--column-family  f1 \
--hbase-create-table \
--hbase-row-key id  \
--m 1 


#参数说明
--hbase-table              指定hbase表名
--column-family         指定表的列族
--hbase-create-table     表不存在就创建
--hbase-row-key         指定hbase表的id
--m                      指定使用的MapTask个数
list
scan 'mysqluser'
disable 'mysqluser'
drop 'mysqluser'


# mysql导入hbase 不同的列族
sqoop import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456  \
--columns id,salary,dept \
--table emp \
--hbase-table  mysqluser2 \
--column-family  f1 \
--hbase-create-table \
--hbase-row-key id  \
--m 1 


sqoop import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456  \
--columns id,name,deg \
--table emp \
--hbase-table  mysqluser2 \
--column-family  f2 \
--hbase-create-table \
--hbase-row-key id  \
--m 1 

数据导出

  • 将数据从HDFS把文件导出到RDBMS数据库
    • 导出前,目标表必须存在于目标数据库中。
      • 默认操作是从将文件中的数据使用INSERT语句插入到表中
      • 更新模式下,是生成UPDATE语句更新表数据

hdfs->mysql

  • 1、数据是在HDFS当中的如下目录/user/hive/warehouse/hive_source,数据内容如下
1 zhangsan 20 hubei
2 lisi 30 hunan
3 wangwu 40 beijing
4 xiaoming 50 shanghai
CREATE TABLE  userdb.fromhdfs (
   id INT DEFAULT NULL,
   name VARCHAR(100) DEFAULT NULL,
   age int DEFAULT NULL,
   address VARCHAR(100) DEFAULT NULL
) ENGINE=INNODB DEFAULT CHARSET=utf8;
sqoop export \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456 \
--table fromhdfs \
--export-dir /user/hive/warehouse/hive_source \
--input-fields-terminated-by " " 

##参数解释
--table                       指定导出的mysql表名
--export-dir                   指定hdfs数据文件目录
--input-fields-terminated-by  指定文件数据字段的分隔符

Sqoop job

  • 将事先定义好的数据导入导出任务按照指定流程运行
sqoop job (generic-args) (job-args)
   [-- [subtool-name] (subtool-args)]
  • 创建job
    • ==–create==
      • 创建一个名为myjob,实现从mysql表数据导入到hdfs上的作业
        • 在创建job时,==命令”– import” 中间有个空格==
sqoop job --help

##创建一个sqoop作业
sqoop job \
--create myjob1 \
-- import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456 \
--table emp \
--target-dir /sqoop/myjob \
--delete-target-dir \
--m 1

##创建一个sqoop增量导入的作业
sqoop  job  \
--create incrementJob1 \
-- import \
--connect jdbc:mysql://node03:3306/userdb \
--username root \
--password 123456  \
--table user \
--target-dir /sqoop/incrementJob \
--incremental append  \
--check-column createTime  \
--last-value '2019-11-19 16:40:21'  \
--m 1

# incremental.last.value
  • 查看job
sqoop job --list

最后显示:
Available jobs:
  myjob
# 展示job
sqoop job --show myjob1
  • 执行job
  sqoop job --exec myjob1
  • 解决sqoop需要输入密码的问题
  • sqoop-site.xml

    <property>
      <name>sqoop.metastore.client.record.password</name>
      <value>true</value>
      <description>If true, allow saved passwords in the metastore.
      </description>
    </property>
    
  • 删除job
sqoop job --delete myjob