среда, 15 мая 2013 г.

Настройка репликации с помощью Streams


задача:
 реплицировать схему из одной базы в другую, за исключением таблиц


что имеем:


  • пользователь streamadmin создан на обоих базах с помощью oracle cloud control.
  • DB линки также созданы.
  • Настроить репликацию мастером не получилось - разъехались SCN.
  • база SOURCE_DB.DOMAIN.COM
  • база TARGET_DB.DOMAIN.COM
  • схема TEST_STREAM

Реализация


на базе SOURCE  под пользователем STREAMADMIN

--
-- Настраиваем очередь "STREAMADMIN"."STR_CAPTURE_Q"
--
BEGIN
dbms_streams_adm.set_up_queue(
queue_table => '"STREAMADMIN"."STR_CAPTURE_QT"',
storage_clause => NULL,
queue_name => '"STREAMADMIN"."STR_CAPTURE_Q"',
queue_user => '"STREAMADMIN"');
END;
/

--
-- настраиваем правила распространения для схемы TEST_STREAM
--

DECLARE
version_num NUMBER := 0;
release_num NUMBER := 0;
pos NUMBER;
initpos NUMBER;
q2q BOOLEAN;
stmt VARCHAR2(300);
ver VARCHAR2(30);
compat VARCHAR2(30);


BEGIN


dbms_streams_adm.add_schema_propagation_rules(
schema_name => '"TEST_STREAM"',
streams_name => '"STR_PROPOGATE"',
source_queue_name => '"STREAMADMIN"."STR_CAPTURE_Q"',
destination_queue_name => '"STREAMADMIN"."STR_APPLY_1_Q"@TARGET_DB.DOMAIN.COM',
include_dml => TRUE,
include_ddl => TRUE,
include_tagged_lcr => TRUE,
source_database => 'SOURCE_DB.DOMAIN.COM',
inclusion_rule => TRUE,
and_condition => NULL,
queue_to_queue => TRUE);
END;
/
--
--  Выключаем процесс распространения (PROPAGATE)
--
DECLARE
q2q VARCHAR2(10);
destn_q VARCHAR2(65);
BEGIN
dbms_aqadm.disable_propagation_schedule(
queue_name => '"STREAMADMIN"."STR_CAPTURE_Q"',
destination => 'TARGET_DB.DOMAIN.COM',
destination_queue => '"STREAMADMIN"."STR_APPLY_1_Q"');

EXCEPTION WHEN OTHERS THEN
IF sqlcode = -24065 THEN NULL; -- propagation already disabled
ELSE RAISE;
END IF;
END;
/


--
--  Настраиваем процесс захвата (CAPTURE)
--
BEGIN


dbms_streams_adm.add_schema_rules(
schema_name => '"TEST_STREAM"',
streams_type => 'CAPTURE',
streams_name => '"STR_CAPTURE"',
queue_name => '"STREAMADMIN"."STR_CAPTURE_Q"',
include_dml => TRUE,
include_ddl => TRUE,
include_tagged_lcr => TRUE,
source_database => 'SOURCE_DB.DOMAIN.COM',
inclusion_rule => TRUE,
and_condition => ':lcr.get_compatible() <= dbms_streams.compatible_11_2');
END;
/



Получаем текущий SCN SOURCE системы

select DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() from dual

Создаем файл параметров для экспорта
DUMPFILE="exp_stream.dmp"
LOGFILE="exp_stream.log"
DIRECTORY=DATA_PUMP_DIR
SCHEMAS=('TEST_STREAM')
CONTENT=ALL
EXCLUDE=STATISTICS
EXCLUDE=TABLE:"IN (‘SOME_TABLES')"
FLASHBACK_SCN=полученный_SCN


Экспортируем схему
expdp userid=\” / as sysdba\” parfile=out_exp_par

Копируем полученный дамп на TARGET и импортируем его

Выполняем следующий скрипт на TARGET
--
-- Set up queue "STREAMADMIN"."STR_APPLY_1_Q"
--
BEGIN
 dbms_streams_adm.set_up_queue(
   queue_table => '"STREAMADMIN"."STR_APPLY_1_QT"',
   storage_clause => NULL,
   queue_name => '"STREAMADMIN"."STR_APPLY_1_Q"',
   queue_user => '"STREAMADMIN"');
END;
/
--
-- добавляем правила приема изменений для схемы TEST_STREAM
--
DECLARE
 compat         VARCHAR2(512);
 initpos        NUMBER;
 pos            NUMBER;
 version_num    NUMBER;
 release_num    NUMBER;
 compat_func    VARCHAR2(65);
 get_compatible VARCHAR2(4000);
BEGIN

 dbms_streams_adm.add_schema_rules(
   schema_name => '"TEST_STREAM"',
   streams_type => 'APPLY',
   streams_name => '"STR_APPLY_1"',
   queue_name => '"STREAMADMIN"."STR_APPLY_1_Q"',
   include_dml => TRUE,
   include_ddl => TRUE,
   include_tagged_lcr => TRUE,
   source_database => 'SOURCE_DB.DOMAIN.COM',
   inclusion_rule => TRUE,
   and_condition => ':lcr.get_compatible() <= dbms_streams.compatible_11_2');
END;
/
--
-- Get tag value to be used for Apply
--
DECLARE
 found   BINARY_INTEGER := 0;
 tag_num NUMBER;
BEGIN
 -- Use the apply object id as the tag
 SELECT o.object_id INTO tag_num
 FROM dba_objects o
 WHERE o.object_name= 'STR_APPLY_1' AND
       o.object_type='APPLY';
 LOOP
   BEGIN
     found := 0;
     SELECT 1 INTO found FROM dba_apply
     WHERE apply_name != 'STR_APPLY_1' AND
           apply_tag = hextoraw(tag_num);
   EXCEPTION WHEN no_data_found THEN
     EXIT;
   END;
   EXIT WHEN (found = 0);
   tag_num := tag_num + 1;
 END LOOP;

 -- alter apply
 dbms_apply_adm.alter_apply(
   apply_name => '"STR_APPLY_1"',
   apply_tag => hextoraw(tag_num));
END;
/
--
-- Включаем процесс приема изменений STR_APPLY_1
--
BEGIN
 dbms_apply_adm.start_apply(
   apply_name => '"STR_APPLY_1"');
EXCEPTION WHEN OTHERS THEN
 IF sqlcode = -26666 THEN NULL;  -- APPLY process already running
 ELSE RAISE;
 END IF;
END;
/

Отключаем Disable из-за ошибок

DBMS_APPLY_ADM.SET_PARAMETER(
   apply_name  => 'apply_simp',
   parameter   => 'disable_on_error',
   value       => 'n');


на SOURCE выполняем следующий скрипт
--
-- Enable propagation schedule for "STREAMADMIN"."FINBET$CAP_Q"
-- to TARGET_DB.DOMAIN.COM
--
DECLARE
 q2q       VARCHAR2(10);
 destn_q   VARCHAR2(65);
BEGIN
 SELECT queue_to_queue INTO q2q
 FROM dba_propagation
 WHERE source_queue_owner = 'STREAMADMIN' AND
       source_queue_name = 'STR_CAPTURE_Q' AND
       destination_queue_owner = 'STREAMADMIN' AND        
       destination_queue_name = 'STR_APPLY_1_Q' AND
       destination_dblink = 'TARGET_DB.DOMAIN.COM';

 IF q2q = 'TRUE' THEN
   destn_q := '"STREAMADMIN"."STR_APPLY_1_Q"';
 ELSE
   destn_q := NULL;
 END IF;

 dbms_aqadm.enable_propagation_schedule(
   queue_name => '"STREAMADMIN"."STR_CAPTURE_Q"',
   destination => 'TARGET_DB.DOMAIN.COM',
   destination_queue => destn_q);
EXCEPTION WHEN OTHERS THEN
 IF sqlcode = -24064 THEN NULL; -- propagation already enabled
 ELSE RAISE;
 END IF;
END;
/
--
-- Стартуем захват STR_CAPTURE
--
BEGIN
 dbms_capture_adm.start_capture(
   capture_name => '"STR_CAPTURE"');
EXCEPTION WHEN OTHERS THEN
 IF sqlcode = -26666 THEN NULL;  -- CAPTURE process already running
 ELSE RAISE;
 END IF;
END;
/

Убираем из правил захвата ненужные таблицы
BEGIN
 DBMS_STREAMS_ADM.ADD_TABLE_RULES(
   table_name         => '"TEST_STREAM"."SOME_TABLES"',
   streams_type       => 'capture',
   streams_name       => '"STR_CAPTURE"',
   queue_name         => 'STREAMADMIN.STR_CAPTURE_Q',
   include_dml        => true,
   include_ddl        => true,
   include_tagged_lcr => true,
   inclusion_rule     => false);
END;
/

Проверяем работу, следим за логами


Комментариев нет:

Отправить комментарий