通过Oracle GoldenGate 同步数据到KUDU

环境配置

源机器信息:

OS: CentOS 6.8 x86_64

Oracle: 11.2.1.0.0

GoldenGate: Oracle GoldenGate V11.2.1.0.3 for Oracle 11g on Linux x86-64

机器名: db1
Public IP: 172.16.20.14

Internet IP:10.1.6.3

目标机器信息:

OS: CentOS 6.8 x86_64

GoldenGate: Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64

机器名: app01
Public IP: 172.16.20.29

Internet IP:10.1.6.11

配置目标机器与源机器免密互通

  1. 源机器创建ogg用户useradd ogg

  2. 目标机器创建ggs用户useradd ggs

  3. 执行ssh-keygen -t rsa生成对应的公钥,把各自/.ssh/id_rsa.pub的内容复制到/.ssh/authorized_keys中

  4. vi /etc/hosts 编辑内容

    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    
    10.1.6.3 db1
    10.1.6.11 app01
    

源机器安装GoldenGate

  1. 在edelivery.oracle.com上下载 Oracle GoldenGate V11.2.1.0.3 V34339-01.zip

  2. 创建ogg用户,解压V34339-01.zip到ogg用户目录下

  3. 解压fbo_ggs_Linux_x64_ora11g_64bit.tar

  4. 把$ORACLE_HOME添加至环境变量,编辑~/.bash_profile

    export PATH
    export ORACLE_BASE=/u01/oracle/app/oracle
    export ORACLE_HOME=/u01/oracle/app/oracle/product/11.2.0/db_1
    export LD_LIBRARY_PATH=$ORACLE_HOME/lib
    export NLS_LANG=AMERICAN_AMERICA.UTF8
    export PATH=$PATH:$ORACLE_HOME/bin
    export LD_LIBRARY_PATH=$ORACLE_CLNT/lib:$ORACLE_HOME/lib:$LD_LIBRARY_PATH
    export NLS_CALENDAR=GREGORIAN
    export NLS_DATE_FORMAT="YYYY-MM-DD HH24:MI:SS"
    export NLS_DATE_LANGUAGE="ENGLISH"
    export ORACLE_SID=czcbs
    alias sqlplus='rlwrap sqlplus'
    
  5. 执行source ~/.bash_profile,使其生效

源Oracle中创建GoldenGate的表空间和用户(若已存在则跳过)

sqlplus / as sysdba

SQL> create tablespace cbs_ogg datafile '/data2/cbs_ogg.dbf' size 20m AUTOEXTEND ON;
Tablespace created.

SQL> create user cbs_ogg identified by cbs_ogg default tablespace cbs_ogg;
User created.

SQL> grant execute on utl_file to cbs_ogg;
Grant succeeded.

SQL>GRANT GGS_GGSUSER_ROLE, RESOURCE, DBA, CONNECT to cbs_ogg;

在Oracle上打开事务日志

切换至oracle用户sudo su - oracle

sqlplus / as sysdba
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE FORCE LOGGING;
SHUTDOWN IMMEDIATE
STARTUP MOUNT
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;
ALTER SYSTEM SWITCH LOGFILE;
ALTER SYSTEM SET ENABLE_GOLDENGATE_REPLICATION=TRUE SCOPE=BOTH;(此处有错误提示,先做记录)
EXIT

目标机器上安装Oracle GoldenGate for BigData

  1. 在edelivery.oracle.com上下载 Oracle GoldenGate for Big Data的安装包:12.3.1.1_ggs_Adapters_Linux_x64.zip
  2. 创建ggs用户,解压12.3.1.1_ggs_Adapters_Linux_x64.zip到当前用户目录下。
  3. 解压ggs_Adapters_Linux_x64.tar

目标机器上安装CDH5

  • CDH5安装见CDH5安装步骤

  • 启动confluent

    zookeeper-server:(先启动,每个节点均启动)
    /usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &
    broker:(各节点均启动)
    /usr/bin/kafka-server-start /etc/kafka/server.properties &
    schema-registry(app01节点启动):
    /usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &
    

源机器的GoldenGate配置

生成所需文件夹

在当前用户目录下执行./ggsci

GGSCI (db1) 1> create subdirs
...
...

GLOBALS

GGSCI (db1) 2> EDIT PARAMS ./GLOBALS
在./GLOBALS中添加如下内容:
ggschema cbs_hj
可以利用view params ./GLOBALS 查看

将Oracle CBS下的所有表添加supplment日志组

GGSCI (db1) 3> dblogin userid cbs_hj, password cbs_hj;
GGSCI (db1) 4> add trandata cbs_hj.*

编辑Manager的配置

GGSCI (db1) 5> EDIT PARAMS MGR

PORT 7016
ACCESSRULE, PROG *, IPADDR *, ALLOW

启动Manager

GGSCI (db1) 6> start mgr

Manager started.

查看Manager的运行状态

GGSCI (db1) 7> info mgr

Manager is running (IP port db1.7016).

目标机器的GoldenGate配置

生成所需文件夹

在当前用户下执行:./ggsci

GGSCI (app01) 1> create subdirs

编辑Manager的配置

GGSCI (app01) 2> EDIT PARAMS MGR
PORT 7016
ACCESSRULE, PROG *, IPADDR *, ALLOW

启动Manager

GGSCI (app01) 3> start mgr

Manager started.

查看Manager的运行状态

GGSCI (app01) 4> info mgr

Manager is running (IP port master1.7016, Process ID 16101).

初始数据导入

配置源机器上的EXTRACT

添加用于初始数据导入的EXTRACT

./ggsci

GGSCI (db1) 1> add extract einicbs, sourceistable

编辑EXTRACT的配置

GGSCI (db1) 2> edit params einicbs
EXTRACT EINICBS
userid cbs_hj, password cbs_hj
RMTHOST 10.1.6.11, MGRPORT 7016
RMTFILE ./dirdat/cb, MEGABYTES 200

table cbs_hj.party;
table cbs_hj.party_role;
···(此处添加需要导入的表名,上述两张表为示例)。

生成CBS的DEFINE文件

GGSCI (db1) 3> EDIT PARAMS cbsdef
defsfile ./dirdef/cbs.def
userid cbs_hj, password cbs_hj
table cbs_hj.*;

在/home/ogg下执行

./defgen paramfile dirprm/cbsdef.prm

将dirdef/cbs.def传到目标机器的/home/ggs/dirdef/cbs.def下

scp /home/ogg/dirdef/cbs.deg ggs@app01:/home/ggs/dirdef/cbs.def

改变目标机器上cbs.def的用户所属

sudo chown -R ggs:ggs /home/ggs/dirdef/cbs.def

目标机器环境变量的设置

添加java的环境变量

export JAVA_HOME=/usr/java/jdk1.8.0_152(根据实际的版本来)
export JAVA_LIBDIR=/usr/share/java
export PATH=$JAVA_HOME/bin:$PATH
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server/

目标机器上的REPLICAT配置

添加目标机器的REPLICAT

./ggsci

GGSCI (app01) 1> add replicat rcbsini,exttrail ./dirdat/cb

编辑REPLICAT

GGSCI (app01) 2> EDIT PARAMS RCBSINI
SPECIALRUN
END RUNTIME

EXTFILE ./dirdat/cb
TARGETDB LIBFILE libggjava.so SET property=dirprm/conf.props
SOURCEDEFS ./dirdef/cbs.def

MAP *.*, TARGET *.*;

编辑/home/ggs/dirprm/avro.properties, 内容如下:

vi /home/ggs/dirprm/avro.properties

bootstrap.servers=localhost:9092

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
schema.registry.url=http://localhost:8081

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081

internal.value.converter=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
#internal.value.converter.schema.registry.url=http://localhost:8081
#internal.key.converter.schema.registry.url=http://localhost:8081
replicator.topic.pool=cbs:5

编辑/home/ggs/dirprm/conf.props,内容如下:

vi /home/ggs/dirprm/conf.props

gg.handlerlist=kafkaconnect

#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=avro.properties
gg.handler.kafkaconnect.mode=tx
gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName}
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}

#The formatter properties
#op_type,current_ts,op_ts,table,pos这些字段都是默认开启的
gg.handler.kafkaconnect.messageFormatting=op
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
#设置为true将所有输出字段视为字符串。设置为false,处理程序将把来源轨迹文件中相应的字段类型映射到
#最佳对应的Kafka Connect数据类型
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
#以以ISO8601格式输出当前日期
gg.handler.kafkaconnect.iso8601Format=false
gg.handler.kafkaconnect.pkUpdateHandling=abend

goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter

javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec


#Set the classpath here
#gg.classpath=dirprm/:/home/ggs11/ggjava/resources/lib/*
gg.classpath=/usr/share/java/kafka-serde-tools/*:/usr/share/java/kafka/*:/usr/share/java/confluent-common/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm

启动EXTRACT

  1. 在源机器上执行

    ./ggsci

GGSCI (db1) 1> start einicbs

启动REPLICAT

  1. 在/home/ggs下执行:
[ggs@app1 ~]$ ./replicat paramfile dirprm/rcbsini.prm

检查结果

通过kafka检查是否生成对应的topic

kafka-topics --list --zookeeper localhost:2181

检查kafkatopic的偏移量,检查此时数据是否导入

kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic PARTY --time -1
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic PARTY --time -2

事务的同步

需要先把ogg用户组改成oracle的用户组所属

usermod -G oinstall ogg
usermod -g oinstall ogg

退出用户重新进入方可生效

源机器的EXTRACT配置

./ggsci

GGSCI (db1) 1> EDIT PARAMS EXTCBS
EXTRACT extcbs
USERID cbs_hj, PASSWORD cbs_hj
EXTTRAIL ./dirdat/rt
table cbs_hj.party;
table cbs_hj.party_role;

GGSCI (db1) 2> add extract extcbs, tranlog,begin now
EXTRACT added.

GGSCI (db1) 3> add exttrail ./dirdat/rt, extract extcbs
EXTTRAIL added.

GGSCI (db1) 4> start extcbs

Sending START request to MANAGER ...
EXTRACT EXTCBS starting

GGSCI (db1) 5> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING
EXTRACT     RUNNING     EXTCBS      00:00:31      00:00:01


源机器的Data Pump的配置

GGSCI (db1) 6> EDIT PARAMS CBSDP
EXTRACT CBSDP
USERID cbs_hj, PASSWORD cbs_hj
RMTHOST 10.1.6.11, MGRPORT 7016
RMTTRAIL ./dirdat/rt
table cbs_hj.party;
table cbs_hj.party_role;

GGSCI (db1) 7> ADD EXTRACT CBSDP, EXTTRAILSOURCE ./dirdat/rt, begin now
GGSCI (db1) 8> ADD RMTTRAIL ./dirdat/rt, EXTRACT CBSDP, MEGABYTES 5
GGSCI (db1) 9> start cbsdp
GGSCI (db1) 10> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING
EXTRACT     RUNNING     CBSDP       00:00:00      00:00:46
EXTRACT     RUNNING     EXTCBS      00:00:00      00:00:08

注:若有服务启动失败,通过vi ggserr.log 来查看

目标机器的REPLICAT 的配置

./ggsci

GGSCI (app01) 1> EDIT PARAMS rcbs
REPLICAT rcbs
TARGETDB LIBFILE libggjava.so SET property=dirprm/conf.props
SOURCEDEFS ./dirdef/cbs.def
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 1000
MAP *.*, TARGET *.*;

GGSCI (app01) 2> add replicat rcbs,exttrail ./dirdat/rt

GGSCI (app01) 3> start rcbs
GGSCI (app01) 4> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING
REPLICAT    RUNNING     RCBS        00:00:00      00:00:00

注:若配置文件发生改变,重启mgr,rcbs等服务

数据同步导入

  • 进入impala创建对应的表结构,这里两张表为party,party_role为例(在master01节点执行):

    • impala-shell
  • 创建数据库

    • [master01:21000] > CREATE DATABASE IF NOT EXISTS CBS_HJ;
  • 建表

    create table if not exists cbs_hj.party(
    ID INT,
    PARTY_TYPE_CODE STRING,
    COMMENTS STRING,
    PRIMARY KEY(ID)
    )PARTITION BY HASH (ID) PARTITIONS 16 STORED AS KUDU;
    
    create table if not exists cbs_hj.party_role(
    ID INT,
    ROLE_TYPE_CODE STRING,
    PARTY_ID INT,
    PARTY_ROLE_NO STRING,
    FROM_DATE STRING,
    THRU_DATE STRING,
    PRIMARY KEY(ID)
    )PARTITION BY HASH (ID) PARTITIONS 16 STORED AS KUDU;
    
    
  • 数据从kafka获取就不在此赘述

附录

配置文件的参数说明:

conf.props

  • gg.handlerlist:需要使用的处理程序列表的命名

  • gg.handler.name.type:用于选择Kafka Connect处理程序的配置,此处配置应该填写kafkaconnect

  • gg.handler.name.kafkaProducerConfigFile:kafkaconnect配置文件的路径

  • gg.handler.name.mode:可选参数为op以及tx,分为两种不同的模式,处理效率也不相同

  • gg.handler.name.topicMappingTemplate:生成topic的命名所对应的模板消息,例如:${fullyQualifiedTableName}就解析为完全的表名,并用.自动分隔。

    ${tableName}解析为短表名。

    更多请参考:https://docs.oracle.com/goldengate/bd123110/gg-bd/GADBD/using-kafka-connect-handler.htm#GADBD-GUID-A87CAFFA-DACF-43A0-8C6C-5C64B578D606

  • gg.handler.name.keyMappingTemplate:在运行时解析Kafka消息键值的模板字符串,模板类型参考:https://docs.oracle.com/goldengate/bd123110/gg-bd/GADBD/using-kafka-connect-handler.htm#GADBD-GUID-A87CAFFA-DACF-43A0-8C6C-5C64B578D606

  • gg.handler.name.messageFormatting:可选参数为row以及op,op类型会让Kafka消息中表字段以及op_type等消息区分开,表中的内容会被after字段包括,例如:

    u'after': {u'PARTY_TYPE_CODE': None, u'ID': 17045.0, u'COMMENTS': u'test11'}, 
    u'op_type': u'U', 
    u'pos': u'00000000010000001477', 
    u'op_ts': u'2018-01-20 07:00:50.260766', 
    u'current_ts': u'2018-01-20 15:00:56.566000', 
    

    而row格式即让所有消息均在一个dict中,例如:

    u'FROM_DATE': u'2015-08-14 00:00:00', u'PARTY_ROLE_NO': u'047353', 
    u'op_type': u'I', u'pos': u'-0000000000000000001', 
    u'THRU_DATE': None, u'op_ts': u'2018-01-19 07:19:29.192319', 
    u'PARTY_ID': 7.0, u'current_ts': u'2018-01-20 12:06:53.253002', u'table': u'CBS_HJ.PARTY_ROLE', u'ID': 7.0, u'ROLE_TYPE_CODE': u'BRANCH'
    
  • gg.handler.name.insertOpKey:op_type字段中插入时候的提示字段,默认为‘I’

  • gg.handler.name.updateOpKey:op_type字段中更新时候的提示字段,默认为‘U’

  • gg.handler.name.deleteOpKey:op_type字段中删除时候的提示字段,默认为‘D’

  • gg.handler.name.truncateOpKey:op_type字段中资料清除时候的提示字段,默认为‘T’

  • gg.handler.name.treatAllColumnsAsStrings:选择true则将所有字段视为字符串,默认为false,若为false则会使用最佳映射

  • gg.handler.name.iso8601Format:默认为false,如果为true,则以iso8601格式输出日期

  • gg.handler.name.pkUpdateHandling:仅当gg.handler.name.messageFormatting=row时适用

  • gg.classpath:如果是Apache Kafka,则路径示例:

    gg.classpath=dirprm:{kafka_install_dir}/libs/*
    

    如果是Confluent IO Kafka,示例路径如下:

gg.classpath = confluent_install_dir / share / java / kafka-serde-tools / *:{ confluent_install_dir } / share / java / kafka / *:{ confluent_install_dir } / share / java / confluent-common / *

avro.properties

bootstrap.servers=localhost:9092

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
schema.registry.url=http://localhost:8081

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081

使用avro格式进行传输,并连接schema_registry