博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于OGG Datahub插件将Oracle数据同步上云
阅读量:5732 次
发布时间:2019-06-18

本文共 12467 字,大约阅读时间需要 41 分钟。

一、背景介绍

随着数据规模的不断扩大,传统的RDBMS难以满足OLAP的需求,

OGG(Oracle GoldenGate)是一个基于日志的结构化数据备份工具,一般用于Oracle数据库之间的主从备份以及Oracle数据库到其他数据库(DB2, MySQL等)的同步。下面是Oracle官方提供的一个OGG的整体架构图,从图中可以看出OGG的部署分为源端和目标端两部分组成,主要有Manager,Extract,Pump,Collector,Replicat这么一些组件。

clipboard.png

  • Manager:在源端和目标端都会有且只有一个Manager进程存在,负责管理其他进程的启停和监控等;
  • Extract:负责从源端数据库表或者事务日志中捕获数据,有初始加载和增量同步两种模式可以配置,初始加载模式是直接将源表数据同步到目标端,而增量同步就是分析源端数据库的日志,将变动的记录传到目标端,本文介绍的是增量同步的模式;
  • Pump:Extract从源端抽取的数据会先写到本地磁盘的Trail文件,Pump进程会负责将Trail文件的数据投递到目标端;
  • Collector:目标端负责接收来自源端的数据,生成Trail文件
  • Replicat:负责读取目标端的Trail文件,转化为相应的DDL和DML语句作用到目标数据库,实现数据同步。

本文介绍的Oracle数据同步是通过OGG的Datahub插件实现的,该Datahub插件在架构图中处于Replicat的位置,会分析Trail文件,将数据的变化记录写入Datahub中,可以使用流计算对datahub中的数据进行实时分析,也可以将数据归档到MaxCompute中进行离线处理。

二、安装步骤

0. 环境要求

  • 源端已安装好Oracle
  • 源端已安装好OGG(建议版本Oracle GoldenGate V12.1.2.1)
  • 目标端已安装好OGG Adapters(建议版本Oracle GoldenGate Application Adapters
    12.1.2.1)
  • java 7

(下面将介绍Oracle/OGG相关安装和配置过程,Oracle的安装将不做介绍,另外需要注意的是:Oracle/OGG相关参数配置以熟悉Oracle/OGG的运维人员配置为准,本示例只是提供一个可运行的样本,Oracle所使用的版本为ORA11g)

1. 源端OGG安装

下载OGG安装包解压后有如下目录:

drwxr-xr-x installdrwxrwxr-x response-rwxr-xr-x runInstallerdrwxr-xr-x stage

目前oracle一般采取response安装的方式,在response/oggcore.rsp中配置安装依赖,具体如下:

oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2# 需要目前与oracle版本对应INSTALL_OPTION=ORA11g# goldegate主目录SOFTWARE_LOCATION=/home/oracle/u01/ggate# 初始不启动managerSTART_MANAGER=false# manger端口MANAGER_PORT=7839# 对应oracle的主目录DATABASE_LOCATION=/home/oracle/u01/app/oracle/product/11.2.0/dbhome_1# 暂可不配置INVENTORY_LOCATION=# 分组(目前暂时将oracle和ogg用同一个账号ogg_test,实际可以给ogg单独账号)UNIX_GROUP_NAME=oinstall

执行命令:

runInstaller -silent -responseFile {YOUR_OGG_INSTALL_FILE_PATH}/response/oggcore.rsp

本示例中,安装后OGG的目录在/home/oracle/u01/ggate,安装日志在/home/oracle/u01/ggate/cfgtoollogs/oui目录下,当silentInstall{时间}.log文件里出现如下提示,表明安装成功:

The installation of Oracle GoldenGate Core was successful.

执行/home/oracle/u01/ggate/ggsci命令,并在提示符下键入命令:CREATE SUBDIRS,从而生成ogg需要的各种目录(dir打头的那些)。

至此,源端OGG安装完成。

2. 源端Oracle配置

以dba分身进入sqlplus:sqlplus / as sysdba

# 创建独立的表空间create tablespace ATMV datafile '/home/oracle/u01/app/oracle/oradata/uprr/ATMV.dbf' size 100m autoextend on next 50m maxsize unlimited;# 创建ogg_test用户,密码也为ogg_testcreate user ogg_test identified by ogg_test default tablespace ATMV;# 给ogg_test赋予充分的权限grant connect,resource,dba to ogg_test;# 检查附加日志情况Select SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_UI, SUPPLEMENTAL_LOG_DATA_FK, SUPPLEMENTAL_LOG_DATA_ALL from v$database;# 增加数据库附加日志及回退alter database add supplemental log data;alter database add supplemental log data (primary key, unique,foreign key) columns;# rollbackalter database drop supplemental log data (primary key, unique,foreign key) columns;alter database drop supplemental log data;# 全字段模式,注意:在该模式下的delete操作也只有主键值ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;# 开启数据库强制日志模式alter database force logging;# 执行marker_setup.sql 脚本@marker_setup.sql# 执行@ddl_setup.sql@ddl_setup.sql# 执行role_setup.sql@role_setup.sql# 给ogg用户赋权grant GGS_GGSUSER_ROLE to ogg_test;# 执行@ddl_enable.sql,开启DDL trigger@ddl_enable.sql# 执行优化脚本@ddl_pin ogg_test# 安装sequence support@sequence.sql#alter table sys.seq$ add supplemental log data (primary key) columns;

3. OGG源端mgr配置

以下是通过ggsci对ogg进行配置

配置mgr

edit params mgr

PORT 7839DYNAMICPORTLIST  7840-7849USERID ogg_test, PASSWORD ogg_testPURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7LAGREPORTHOURS 1LAGINFOMINUTES 30LAGCRITICALMINUTES 45PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7

启动mgr(运行日志在ggate/dirrpt中)

start mgr

查看mgr状态

info mgr

查看mgr配置

view params mgr

4. OGG源端extract配置

以下是通过ggsci对ogg进行配置

配置extract(名字可任取,extract是组名)

edit params extract

EXTRACT extractSETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")DBOPTIONS   ALLOWUNUSEDCOLUMNUSERID ogg_test, PASSWORD ogg_testREPORTCOUNT EVERY 1 MINUTES, RATENUMFILES 5000DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 100DISCARDROLLOVER AT 2:00WARNLONGTRANS 2h, CHECKINTERVAL 3mEXTTRAIL ./dirdat/st, MEGABYTES 200DYNAMICRESOLUTIONTRANLOGOPTIONS CONVERTUCS2CLOBSTRANLOGOPTIONS RAWDEVICEOFFSET 0DDL &INCLUDE MAPPED OBJTYPE 'table' &INCLUDE MAPPED OBJTYPE 'index' &INCLUDE MAPPED OBJTYPE 'SEQUENCE' &EXCLUDE OPTYPE COMMENTDDLOPTIONS  NOCROSSRENAME  REPORTTABLE     OGG_TEST.*;SEQUENCE  OGG_TEST.*;GETUPDATEBEFORES

增加extract进程(ext后的名字要跟上面extract对应,本例中extract是组名)

add ext extract,tranlog, begin now

删除某废弃进程DP_TEST

delete ext DP_TEST

添加抽取进程,每个队列文件大小为200m

add exttrail ./dirdat/st,ext extract, megabytes 200

启动抽取进程(运行日志在ggate/dirrpt中)

start extract extract
至此,extract配置完成,数据库的一条变更可以在ggate/dirdat目录下的文件中看到

5. 生成def文件

源端ggsci起来后执行如下命令,生成defgen文件,并且拷贝到目标端dirdef下
edit params defgen

DEFSFILE ./dirdef/ogg_test.defUSERID ogg_test, PASSWORD ogg_testtable OGG_TEST.*;在shell中执行如下命令,生成ogg_test.def./defgen paramfile ./dirprm/defgen.prm

6. 目标端OGG安装和配置

解压adapter包
将源端中dirdef/ogg_test.def文件拷贝到adapter的dirdef下

执行ggsci起来后执行如下命令,创建必须目录

create subdirs

编辑mgr配置

edit params mgr

PORT 7839DYNAMICPORTLIST  7840-7849PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7LAGREPORTHOURS 1LAGINFOMINUTES 30LAGCRITICALMINUTES 45PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7

启动mgr

start mgr

7. 源端ogg pump配置

启动ggsci后执行如下操作:

编辑pump配置

edit params pump

EXTRACT pumpRMTHOST xx.xx.xx.xx, MGRPORT 7839, COMPRESSPASSTHRUNUMFILES 5000RMTTRAIL ./dirdat/stDYNAMICRESOLUTIONTABLE      OGG_TEST.*;SEQUENCE   OGG_TEST.*;

添加投递进程,从某一个队列开始投

add ext pump,exttrailsource ./dirdat/st

备注:投递进程,每个队文件大小为200m

add rmttrail ./dirdat/st,ext pump,megabytes 200

启动pump

start pump
启动后,结合上面adapter的配置,可以在目标端的dirdat目录下看到过来的trailfile

8. Datahub插件安装和配置

依赖环境:jdk1.7。
配置好JAVA_HOME, LD_LIBRARY_PATH,可以将环境变量配置到~/.bash_profile中,例如

export JAVA_HOME=/xxx/xxx/jrexxexport LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server

修改环境变量后,请重启adapter的mgr进程

下载datahub-ogg-plugin.tar.gz并解压:

修改conf路径下的javaue.properties文件,将{YOUR_HOME}替换为解压后的路径

gg.handlerlist=ggdatahubgg.handler.ggdatahub.type=com.aliyun.odps.ogg.handler.datahub.DatahubHandlergg.handler.ggdatahub.configureFileName={YOUR_HOME}/datahub-ogg-plugin/conf/configure.xmlgoldengate.userexit.nochkpt=falsegoldengate.userexit.timestamp=utcgg.classpath={YOUR_HOME}/datahub-ogg-plugin/lib/*gg.log.level=debugjvm.bootoptions=-Xmx512m -Dlog4j.configuration=file:{YOUR_HOME}/datahub-ogg-plugin/conf/log4j.properties -Djava.class.path=ggjava/ggjava.jar

修改conf路径下的log4j.properties文件,将{YOUR_HOME}替换为解压后的路径

修改conf路径下的configure.xml文件,修改方式见文件中的注释

100
ogg_test
YOUR_DATAHUB_ENDPOINT
YOUR_DATAHUB_PROJECT
YOUR_DATAHUB_ACCESS_ID
YOUR_DATAHUB_ACCESS_KEY
optype
readtime
record_id
1000
yyyy-MM-dd HH:mm:ss
true
datahub_ogg_plugin.dirty
200
0
4000
datahub_ogg_plugin.chk
t_person
t_person
1

在ggsci下启动datahub writer

edit params dhwriter

extract dhwritergetEnv (JAVA_HOME)getEnv (LD_LIBRARY_PATH)getEnv (PATH)CUSEREXIT ./libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES, PARAMS "{YOUR_HOME}/datahub-ogg-plugin/conf/javaue.properties"sourcedefs ./dirdef/ogg_test.deftable OGG_TEST.*;

添加dhwriter

add extract dhwriter, exttrailsource ./dirdat/st

启动dhwriter

start dhwriter

三、使用场景

这里会用一个简单的示例来说明数据的使用方法,例如我们在Oracle数据库有一张商品订单表orders(oid int, pid int, num int),该表有三列,分别为订单ID, 商品ID和商品数量。

将这个表通过OGG Datahub进行增量数据同步之前,我们需要先将源表已有的数据通过DataX同步到MaxCompute中。增量同步的关键步骤如下:
(1)在Datahub上创建相应的Topic,Topic的schema为(string record_id, string optype, string readtime, bigint oid_before, bigint oid_after, bigint pid_before, bigint pid_after, bigint num_before, bigint num_after);
(2)OGG Datahub的插件按照上述的安装流程部署配置,其中列的Mapping配置如下:

optype
readtime

其中optype和readtime字段是记录数据库的数据变更类型和时间,optype有"I", "D", "U"三种取值,分别对应为“增”,“删”,“改”三种数据变更操作。

(3)OGG Datahub插件部署好成功运行后,插件会源源不断的将源表的数据变更记录输送至datahub中,例如我们在源订单表中新增一条记录(1,2,1),datahub里收到的记录如下:

+--------+------------+------------+------------+------------+------------+------------+------------+------------+| record_id | optype     | readtime   | oid_before | oid_after  | pid_before | pid_after  | num_before | num_after  |+-------+------------+------------+------------+------------+------------+------------+------------+------------+| 14810373343020000 |     I          | 2016-12-06 15:15:28.000141 | NULL       | 1          | NULL       | 2          | NULL       | 1   |

修改这条数据,比如把num改为20,datahub则会收到的一条变更数据记录,如下:

+-------+------------+------------+------------+------------+------------+------------+------------+------------+| record_id | optype     | readtime   | oid_before | oid_after  | pid_before | pid_after  | num_before | num_after  |+--------+------------+------------+------------+------------+------------+------------+------------+------------+| 14810373343080000 |     U          | 2016-12-06 15:15:58.000253 | 1          | 1          | 2          | 2          | 1          | 20         |

实时计算

在前一天的离线计算的基础数据上,我们可以写一个StreamCompute流计算的分析程序,很容易地对数据进行实时汇总,例如实时统计当前总的订单数,每种商品的销售量等。处理思路就是对于每一条到来的变更数据,可以拿到变化的数值,实时更新统计变量即可。

离线处理

为了便于后续的离线分析,我们也可以将Datahub里的数据归档到MaxCompute中,在MaxCompute中创建相应Schema的表:

create table orders_log(record_id string, optype string, readtime string, oid_before bigint, oid_after bigint, pid_before bigint, pid_after bigint, num_before bigint, num_after bigint);

在Datahub上创建MaxCompute的数据归档,上述流入Datahub里的数据将自动同步到MaxCompute当中。建议将同步到MaxCompute中的数据按照时间段进行划分,比如每一天的增量数据都对应一个独立分区。这样当天的数据同步完成后,我们可以处理对应的分区,拿到当天所有的数据变更,而与和前一天的全量数据进行合并后,即可得到当天的全量数据。为了简单起见,先不考虑分区表的情况,以2016-12-06这天的增量数据为例,假设前一天的全量数据在表orders_base里面,datahub同步过来的增量数据在orders_log表中,将orders_base与orders_log做合并操作,可以得到2016-12-06这天的最终全量数据写入表orders_result中。这个过程可以在MaxCompute上用如下这样一条SQL完成。

INSERT OVERWRITE TABLE orders_resultSELECT t.oid, t.pid, t.numFROM(     SELECT oid, pid, num, '0' x_record_id, 1 AS x_optype     FROM     orders_base      UNION ALL     SELECT decode(optype,'D',oid_before,oid_after) AS oid              , decode(optype,'D', pid_before,pid_after) AS pid              , num_after AS num              , record_id x_record_id              , decode(optype, 'D', 0, 1) AS x_optype     FROM     orders_log ) tJOIN (     SELECT     oid     , pid     , max(record_id) x_max_modified     FROM     (     SELECT     oid     , pid     , '0' record_id     FROM     orders_base UNION ALL SELECT                      decode(optype,'D',oid_before,oid_after) AS oid                      , decode(optype,'D', pid_before,pid_after) AS pid                      , record_id                      FROM                      orders_log ) g     GROUP BY oid , pid ) sONt.oid = s.oid AND t.pid = s.pid AND t.x_record_id = s.x_max_modified AND t.x_optype <> 0;

四、常见问题

Q:目标端报错 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address.

A:目标端机器hostname在/etc/hosts里面重新设置localhost对应的ip

Q:找不到jvm相关的so包

A:将jvm的so路径添加到LD_LIBRARY_PATH后,重启mgr

例如:export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server

Q:有了DDL语句,比如增加一列,源端ogg没有问题,但是adapter端的ffwriter和jmswriter进程退出,且报错: 2015-06-11 14:01:10 ERROR OGG-01161 Bad column index (5) specified for table OGG_TEST.T_PERSON, max columns = 5.
A:由于表结构改变,需要重做def文件,将重做的def文件放入dirdef后重启即可

本文作者:冶善

本文为云栖社区原创内容,未经允许不得转载。

你可能感兴趣的文章
SQL SERVER 2012 只能识别20个CPU的问题
查看>>
【单调队列】【P1776】宝物筛选
查看>>
使用shell脚本生成数据库markdown文档
查看>>
centos和pycharm中取绝对路径的差别
查看>>
ext2磁盘布局
查看>>
MySql数据库2【常用命令行】
查看>>
动态规划---->货郎担问题
查看>>
添加虚拟子网
查看>>
Ubuntu 12.04 root用户登录设置
查看>>
存储过程点滴
查看>>
Maven编译跳过test的设置
查看>>
SQLyog图形化l数据库的操作和学习
查看>>
raspbian 怎么才能有声音?
查看>>
[LeetCode]22.Generate Parentheses
查看>>
《数据结构》—— 线性表(上)
查看>>
WEB前端 CSS选择器
查看>>
计算A/B Test需要的样本量
查看>>
二叉树前序中序后序遍历的非递归方法
查看>>
nginx+tomcat实现负载均衡
查看>>
mysql 行转列列转行
查看>>