OGG数据同步
通过Oracle GoldenGate 同步数据到KUDU
环境配置
源机器信息:
OS: CentOS 6.8 x86_64
Oracle: 11.2.1.0.0
GoldenGate: Oracle GoldenGate V11.2.1.0.3 for Oracle 11g on Linux x86-64
机器名: db1
Public IP: 172.16.20.14
Internet IP:10.1.6.3
目标机器信息:
OS: CentOS 6.8 x86_64
GoldenGate: Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64
机器名: app01
Public IP: 172.16.20.29
Internet IP:10.1.6.11
配置目标机器与源机器免密互通
-
源机器创建ogg用户
useradd ogg
-
目标机器创建ggs用户
useradd ggs
-
执行
ssh-keygen -t rsa
生成对应的公钥,把各自/.ssh/id_rsa.pub的内容复制到/.ssh/authorized_keys中 -
vi /etc/hosts
编辑内容127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 10.1.6.3 db1 10.1.6.11 app01
源机器安装GoldenGate
-
在edelivery.oracle.com上下载 Oracle GoldenGate V11.2.1.0.3 V34339-01.zip
-
创建ogg用户,解压V34339-01.zip到ogg用户目录下
-
解压fbo_ggs_Linux_x64_ora11g_64bit.tar
-
把$ORACLE_HOME添加至环境变量,编辑~/.bash_profile
export PATH export ORACLE_BASE=/u01/oracle/app/oracle export ORACLE_HOME=/u01/oracle/app/oracle/product/11.2.0/db_1 export LD_LIBRARY_PATH=$ORACLE_HOME/lib export NLS_LANG=AMERICAN_AMERICA.UTF8 export PATH=$PATH:$ORACLE_HOME/bin export LD_LIBRARY_PATH=$ORACLE_CLNT/lib:$ORACLE_HOME/lib:$LD_LIBRARY_PATH export NLS_CALENDAR=GREGORIAN export NLS_DATE_FORMAT="YYYY-MM-DD HH24:MI:SS" export NLS_DATE_LANGUAGE="ENGLISH" export ORACLE_SID=czcbs alias sqlplus='rlwrap sqlplus'
-
执行source ~/.bash_profile,使其生效
源Oracle中创建GoldenGate的表空间和用户(若已存在则跳过)
sqlplus / as sysdba
SQL> create tablespace cbs_ogg datafile '/data2/cbs_ogg.dbf' size 20m AUTOEXTEND ON;
Tablespace created.
SQL> create user cbs_ogg identified by cbs_ogg default tablespace cbs_ogg;
User created.
SQL> grant execute on utl_file to cbs_ogg;
Grant succeeded.
SQL>GRANT GGS_GGSUSER_ROLE, RESOURCE, DBA, CONNECT to cbs_ogg;
在Oracle上打开事务日志
切换至oracle用户sudo su - oracle
sqlplus / as sysdba
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE FORCE LOGGING;
SHUTDOWN IMMEDIATE
STARTUP MOUNT
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;
ALTER SYSTEM SWITCH LOGFILE;
ALTER SYSTEM SET ENABLE_GOLDENGATE_REPLICATION=TRUE SCOPE=BOTH;(此处有错误提示,先做记录)
EXIT
目标机器上安装Oracle GoldenGate for BigData
- 在edelivery.oracle.com上下载 Oracle GoldenGate for Big Data的安装包:12.3.1.1_ggs_Adapters_Linux_x64.zip
- 创建ggs用户,解压12.3.1.1_ggs_Adapters_Linux_x64.zip到当前用户目录下。
- 解压ggs_Adapters_Linux_x64.tar
目标机器上安装CDH5
-
CDH5安装见CDH5安装步骤
-
启动confluent
zookeeper-server:(先启动,每个节点均启动) /usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties & broker:(各节点均启动) /usr/bin/kafka-server-start /etc/kafka/server.properties & schema-registry(app01节点启动): /usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &
源机器的GoldenGate配置
生成所需文件夹
在当前用户目录下执行./ggsci
GGSCI (db1) 1> create subdirs
...
...
GLOBALS
GGSCI (db1) 2> EDIT PARAMS ./GLOBALS
在./GLOBALS中添加如下内容:
ggschema cbs_hj
可以利用view params ./GLOBALS 查看
将Oracle CBS下的所有表添加supplment日志组
GGSCI (db1) 3> dblogin userid cbs_hj, password cbs_hj;
GGSCI (db1) 4> add trandata cbs_hj.*
编辑Manager的配置
GGSCI (db1) 5> EDIT PARAMS MGR
PORT 7016
ACCESSRULE, PROG *, IPADDR *, ALLOW
启动Manager
GGSCI (db1) 6> start mgr
Manager started.
查看Manager的运行状态
GGSCI (db1) 7> info mgr
Manager is running (IP port db1.7016).
目标机器的GoldenGate配置
生成所需文件夹
在当前用户下执行:./ggsci
GGSCI (app01) 1> create subdirs
编辑Manager的配置
GGSCI (app01) 2> EDIT PARAMS MGR
PORT 7016
ACCESSRULE, PROG *, IPADDR *, ALLOW
启动Manager
GGSCI (app01) 3> start mgr
Manager started.
查看Manager的运行状态
GGSCI (app01) 4> info mgr
Manager is running (IP port master1.7016, Process ID 16101).
初始数据导入
配置源机器上的EXTRACT
添加用于初始数据导入的EXTRACT
./ggsci
GGSCI (db1) 1> add extract einicbs, sourceistable
编辑EXTRACT的配置
GGSCI (db1) 2> edit params einicbs
EXTRACT EINICBS
userid cbs_hj, password cbs_hj
RMTHOST 10.1.6.11, MGRPORT 7016
RMTFILE ./dirdat/cb, MEGABYTES 200
table cbs_hj.party;
table cbs_hj.party_role;
···(此处添加需要导入的表名,上述两张表为示例)。
生成CBS的DEFINE文件
GGSCI (db1) 3> EDIT PARAMS cbsdef
defsfile ./dirdef/cbs.def
userid cbs_hj, password cbs_hj
table cbs_hj.*;
在/home/ogg下执行
./defgen paramfile dirprm/cbsdef.prm
将dirdef/cbs.def传到目标机器的/home/ggs/dirdef/cbs.def下
scp /home/ogg/dirdef/cbs.deg ggs@app01:/home/ggs/dirdef/cbs.def
改变目标机器上cbs.def的用户所属
sudo chown -R ggs:ggs /home/ggs/dirdef/cbs.def
目标机器环境变量的设置
添加java的环境变量
export JAVA_HOME=/usr/java/jdk1.8.0_152(根据实际的版本来)
export JAVA_LIBDIR=/usr/share/java
export PATH=$JAVA_HOME/bin:$PATH
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server/
目标机器上的REPLICAT配置
添加目标机器的REPLICAT
./ggsci
GGSCI (app01) 1> add replicat rcbsini,exttrail ./dirdat/cb
编辑REPLICAT
GGSCI (app01) 2> EDIT PARAMS RCBSINI
SPECIALRUN
END RUNTIME
EXTFILE ./dirdat/cb
TARGETDB LIBFILE libggjava.so SET property=dirprm/conf.props
SOURCEDEFS ./dirdef/cbs.def
MAP *.*, TARGET *.*;
编辑/home/ggs/dirprm/avro.properties, 内容如下:
vi /home/ggs/dirprm/avro.properties
bootstrap.servers=localhost:9092
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
internal.value.converter=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
#internal.value.converter.schema.registry.url=http://localhost:8081
#internal.key.converter.schema.registry.url=http://localhost:8081
replicator.topic.pool=cbs:5
编辑/home/ggs/dirprm/conf.props,内容如下:
vi /home/ggs/dirprm/conf.props
gg.handlerlist=kafkaconnect
#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=avro.properties
gg.handler.kafkaconnect.mode=tx
gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName}
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}
#The formatter properties
#op_type,current_ts,op_ts,table,pos这些字段都是默认开启的
gg.handler.kafkaconnect.messageFormatting=op
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
#设置为true将所有输出字段视为字符串。设置为false,处理程序将把来源轨迹文件中相应的字段类型映射到
#最佳对应的Kafka Connect数据类型
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
#以以ISO8601格式输出当前日期
gg.handler.kafkaconnect.iso8601Format=false
gg.handler.kafkaconnect.pkUpdateHandling=abend
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Set the classpath here
#gg.classpath=dirprm/:/home/ggs11/ggjava/resources/lib/*
gg.classpath=/usr/share/java/kafka-serde-tools/*:/usr/share/java/kafka/*:/usr/share/java/confluent-common/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm
启动EXTRACT
-
在源机器上执行
./ggsci
GGSCI (db1) 1> start einicbs
启动REPLICAT
- 在/home/ggs下执行:
[ggs@app1 ~]$ ./replicat paramfile dirprm/rcbsini.prm
检查结果
通过kafka检查是否生成对应的topic
kafka-topics --list --zookeeper localhost:2181
检查kafkatopic的偏移量,检查此时数据是否导入
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic PARTY --time -1
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic PARTY --time -2
事务的同步
需要先把ogg用户组改成oracle的用户组所属
usermod -G oinstall ogg
usermod -g oinstall ogg
退出用户重新进入方可生效
源机器的EXTRACT配置
./ggsci
GGSCI (db1) 1> EDIT PARAMS EXTCBS
EXTRACT extcbs
USERID cbs_hj, PASSWORD cbs_hj
EXTTRAIL ./dirdat/rt
table cbs_hj.party;
table cbs_hj.party_role;
GGSCI (db1) 2> add extract extcbs, tranlog,begin now
EXTRACT added.
GGSCI (db1) 3> add exttrail ./dirdat/rt, extract extcbs
EXTTRAIL added.
GGSCI (db1) 4> start extcbs
Sending START request to MANAGER ...
EXTRACT EXTCBS starting
GGSCI (db1) 5> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTCBS 00:00:31 00:00:01
源机器的Data Pump的配置
GGSCI (db1) 6> EDIT PARAMS CBSDP
EXTRACT CBSDP
USERID cbs_hj, PASSWORD cbs_hj
RMTHOST 10.1.6.11, MGRPORT 7016
RMTTRAIL ./dirdat/rt
table cbs_hj.party;
table cbs_hj.party_role;
GGSCI (db1) 7> ADD EXTRACT CBSDP, EXTTRAILSOURCE ./dirdat/rt, begin now
GGSCI (db1) 8> ADD RMTTRAIL ./dirdat/rt, EXTRACT CBSDP, MEGABYTES 5
GGSCI (db1) 9> start cbsdp
GGSCI (db1) 10> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING CBSDP 00:00:00 00:00:46
EXTRACT RUNNING EXTCBS 00:00:00 00:00:08
注:若有服务启动失败,通过vi ggserr.log
来查看
目标机器的REPLICAT 的配置
./ggsci
GGSCI (app01) 1> EDIT PARAMS rcbs
REPLICAT rcbs
TARGETDB LIBFILE libggjava.so SET property=dirprm/conf.props
SOURCEDEFS ./dirdef/cbs.def
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 1000
MAP *.*, TARGET *.*;
GGSCI (app01) 2> add replicat rcbs,exttrail ./dirdat/rt
GGSCI (app01) 3> start rcbs
GGSCI (app01) 4> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING RCBS 00:00:00 00:00:00
注:若配置文件发生改变,重启mgr,rcbs等服务
数据同步导入
-
进入impala创建对应的表结构,这里两张表为party,party_role为例(在master01节点执行):
impala-shell
-
创建数据库
[master01:21000] > CREATE DATABASE IF NOT EXISTS CBS_HJ;
-
建表
create table if not exists cbs_hj.party( ID INT, PARTY_TYPE_CODE STRING, COMMENTS STRING, PRIMARY KEY(ID) )PARTITION BY HASH (ID) PARTITIONS 16 STORED AS KUDU; create table if not exists cbs_hj.party_role( ID INT, ROLE_TYPE_CODE STRING, PARTY_ID INT, PARTY_ROLE_NO STRING, FROM_DATE STRING, THRU_DATE STRING, PRIMARY KEY(ID) )PARTITION BY HASH (ID) PARTITIONS 16 STORED AS KUDU;
-
数据从kafka获取就不在此赘述
附录
配置文件的参数说明:
conf.props
-
gg.handlerlist:需要使用的处理程序列表的命名
-
gg.handler.name.type:用于选择Kafka Connect处理程序的配置,此处配置应该填写kafkaconnect
-
gg.handler.name.kafkaProducerConfigFile:kafkaconnect配置文件的路径
-
gg.handler.name.mode:可选参数为op以及tx,分为两种不同的模式,处理效率也不相同
-
gg.handler.name.topicMappingTemplate:生成topic的命名所对应的模板消息,例如:${fullyQualifiedTableName}就解析为完全的表名,并用.自动分隔。
${tableName}解析为短表名。
更多请参考:https://docs.oracle.com/goldengate/bd123110/gg-bd/GADBD/using-kafka-connect-handler.htm#GADBD-GUID-A87CAFFA-DACF-43A0-8C6C-5C64B578D606
-
gg.handler.name.keyMappingTemplate:在运行时解析Kafka消息键值的模板字符串,模板类型参考:https://docs.oracle.com/goldengate/bd123110/gg-bd/GADBD/using-kafka-connect-handler.htm#GADBD-GUID-A87CAFFA-DACF-43A0-8C6C-5C64B578D606
-
gg.handler.name.messageFormatting:可选参数为row以及op,op类型会让Kafka消息中表字段以及op_type等消息区分开,表中的内容会被after字段包括,例如:
u'after': {u'PARTY_TYPE_CODE': None, u'ID': 17045.0, u'COMMENTS': u'test11'}, u'op_type': u'U', u'pos': u'00000000010000001477', u'op_ts': u'2018-01-20 07:00:50.260766', u'current_ts': u'2018-01-20 15:00:56.566000',
而row格式即让所有消息均在一个dict中,例如:
u'FROM_DATE': u'2015-08-14 00:00:00', u'PARTY_ROLE_NO': u'047353', u'op_type': u'I', u'pos': u'-0000000000000000001', u'THRU_DATE': None, u'op_ts': u'2018-01-19 07:19:29.192319', u'PARTY_ID': 7.0, u'current_ts': u'2018-01-20 12:06:53.253002', u'table': u'CBS_HJ.PARTY_ROLE', u'ID': 7.0, u'ROLE_TYPE_CODE': u'BRANCH'
-
gg.handler.name.insertOpKey:op_type字段中插入时候的提示字段,默认为‘I’
-
gg.handler.name.updateOpKey:op_type字段中更新时候的提示字段,默认为‘U’
-
gg.handler.name.deleteOpKey:op_type字段中删除时候的提示字段,默认为‘D’
-
gg.handler.name.truncateOpKey:op_type字段中资料清除时候的提示字段,默认为‘T’
-
gg.handler.name.treatAllColumnsAsStrings:选择true则将所有字段视为字符串,默认为false,若为false则会使用最佳映射
-
gg.handler.name.iso8601Format:默认为false,如果为true,则以iso8601格式输出日期
-
gg.handler.name.pkUpdateHandling:仅当gg.handler.name.messageFormatting=row时适用
-
gg.classpath:如果是Apache Kafka,则路径示例:
gg.classpath=dirprm:{kafka_install_dir}/libs/*
如果是Confluent IO Kafka,示例路径如下:
gg.classpath = confluent_install_dir / share / java / kafka-serde-tools / *:{ confluent_install_dir } / share / java / kafka / *:{ confluent_install_dir } / share / java / confluent-common / *
avro.properties
bootstrap.servers=localhost:9092
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
使用avro格式进行传输,并连接schema_registry