Enhancement in SharePlex 8.6.6 for Replication from Oracle to Kafka

If you are currently using ActiveMQ to consume data produced by SharePlex from Oracle, you may want to look into the performance gains with Kafka. The use cases for Kafka include website activity tracking, metrics, log aggregation, stream processing, event sourcing and commit log. 

There are several enhancements included in the new release of SharePlex 8.6.6 for replication from Oracle to Kafka.

Support for multiple topics

  • A single open post process can now post to multiple topics
  • Topic is controlled via the “topic” target config setting
  • In the topic setting ‘%o’ will be replaced by the table owner name and ‘%t’ will be replaced by the table name.
  • So for example ‘%o_%t’ will create names like ‘OWNER_TABLENAME’
  • Customer may configure their Kafka server to auto-create topics

Note: You need to modify the Kafka config/server.properties to add “auto.create.topics.enable=true” and restart Kafka broker to enable auto create topics in Kafka.

In SharePlex source, add a new table test3 to replication going to a new post queue p3 like below and activate config

sp_ctrl (alvsupl30:9001)> view config kafka2

datasource:o.ORA11GR2

splex9001.demo_src !kafka alvsupl31:p1

splex9001.demo_dest !kafka alvsupl31:p2

jessica.test !kafka alvsupl31:p1

jessica.test2 !kafka alvsupl31:p2

jessica.test3 !kafka alvsupl31:p3

sp_ctrl (alvsupl30:9001)> activate config kafka2

SharePlex supports multiple Kafka cluster brokers.  In my test environment, I have two servers and each has a Kafka broker running and I have zookeeper running on both nodes.  On target, set the kafka cluster broker ip addresses and port number at global level with ‘target x.kafka set’ command and set kafka topic to %o_%t for p3 post queue and restart post process

sp_ctrl (alvsupl31:9001)> target x.kafka set kafka broker=10.1.23.160:9095,10.1.23.159:9092

sp_ctrl (alvsupl31:9001)> target x.kafka queue p3 set kafka topic=%o_%t

sp_ctrl (alvsupl31:9001)> target x.kafka show

  Queue: p3

      parameters defining data formatting:

         default:

            decimal = .

            enotation = 14

            record = xml

      parameters for kafka target:

         topic = %o_%t

         default:

            api.version.request = false

            broker = 10.1.23.160:9095,10.1.23.159:9092

            broker.version.fallback = 0.9.0

            compression.codec = none

            partition = 0

            request.required.acks = -1

            restart_timeout = 30

            threshold_size = 10000

 

sp_ctrl (alvsupl31:9001)> stop post

sp_ctrl (alvsupl31:9001)> start post

 

Checking available topics, you will see only existing ones (for example, I have previously created topics test2 and test3 for p1 and p2 post queues)

[ora11gr2@alvsupl31]/u01/kafka/kafka_2.10-0.10.0.0> bin/kafka-topics.sh --list --zookeeper localhost:2181

test2

test3

 

Now start truncate and inserting data on test3 table on source which will replicate to p3 post queue.

SQL> truncate table test3;

Table truncated.

SQL> insert into test3 values (1); 

1 row created.

SQL> commit;

Commit complete.

Check on target, a new topic is created called JESSICA_TEST3

[ora11gr2@alvsupl31]/u01/kafka/kafka_2.10-0.10.0.0>  bin/kafka-topics.sh --list --zookeeper localhost:2181

JESSICA_TEST3

test2

test3

Check the data SharePlex sent to the consumer, it shows the schema record, the truncate and the insert.

[ora11gr2@alvsupl31]/u01/kafka/kafka_2.10-0.10.0.0> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic JESSICA_TEST3 --from-beginning

<?xml version="1.0" encoding="UTF-8"?>

<?opentarget version="1.1"?>

<opentarget>

<txn id="69305284" />

<tbl name="JESSICA.TEST3" utcOffset="-8:00">

<cmd ops="schema">

<schema>

<col name="ID" xmlType="decimal" key="true" nullable="true" length="22" />

</schema>

</cmd>

</tbl>

</opentarget>

 

<?xml version="1.0" encoding="UTF-8"?>

<?opentarget version="1.1"?>

<opentarget>

<txn id="69305284" />

<tbl name="JESSICA.TEST3">

<cmd ops="trunc" />

</tbl>

</opentarget>

 

<?xml version="1.0" encoding="UTF-8"?>

<?opentarget version="1.1"?>

<opentarget>

<txn id="69305358" msgIdx="1" />

<tbl name="JESSICA.TEST3">

<cmd ops="ins">

<row id="AAAWqTAAEAAAANTAAA">

<col name="ID">1</col>

</row>

</cmd>

</tbl>

</opentarget>

 When you add a new table to post to p3 post queue, a new topic <OWNER>_<TABLENAME> will get created when data is posted to this topic.

 

Support for multiple partitions

Partition is controlled via the “partition” target config settings

Options are:

  • A fixed number – Data will be sent to a single partition, default is 0.
  • The keyword ‘rotate’ – Each operation will be sent to the next partition.
  • The keyword ‘rotate trans’ – Each transaction will be sent to the next partition.

For example:

Sp_ctrl> target x.kafka queue p1 set kafka partition=rotate trans

Sp_ctrl> target x.kafka queue p2 set kafka partition=rotate

Sp_ctrl> stop post

Sp_ctrl> start post

Sp_ctrl> target x.kafka show

   Queue: p1

      parameters defining data formatting:

         default:

            decimal = .

            enotation = 14

            record = xml

      parameters for kafka target:

         partition = rotate trans

         topic = test3

         default:

            api.version.request = false

            broker = 10.1.23.160:9095,10.1.23.159:9092

            broker.version.fallback = 0.9.0

            compression.codec = none

            request.required.acks = -1

            restart_timeout = 30

            threshold_size = 10000

 

   Queue: p2

      parameters defining data formatting:

         default:

            decimal = .

            enotation = 14

            record = xml

      parameters for kafka target:

         partition = rotate

         topic = test2

         default:

            api.version.request = false

            broker = 10.1.23.160:9095,10.1.23.159:9092

            broker.version.fallback = 0.9.0

            compression.codec = none

            request.required.acks = -1

            restart_timeout = 30

            threshold_size = 10000

 

So for rotate, if I have 3 partitions for my topic.

Source:

Begin

For i in 1..100 loop

Insert into test2 values (i,’test’);

End loop;

End;

/

Target:

It will insert 3,6,9.. to first partition, 1,4,7.. to second partition, 2,5,8… to third partition

With rotate trans, all messages within the same transaction will be posted to the same partition.

Rotate or rotate trans maybe suitable if all DMLs are insert or if you don’t care about the order of the dmls.  But if there are updates involved or you need transactions to be posted in order then you may not want to use rotate.

 

Improved cluster support

  • New ‘request.required.acks’ parameter setting.  Default value is -1 which means all in-sync replicas.  This helps avoid data loss.
  • Open post behavior is also improved when broker or cluster fails.  When you kill one of the kafka broker, you may see message like below but post should continue with the remaining working kafka broker to post the data.

xpst     2017-01-31 19:03:45.837866 30441 2512082688 logger_callback rk=0x136f000 level=3 fac=ERROR buf=alvsupl31.prod.quest.corp:9095/3: Connect to ipv4#10.1.23.160:9095 failed: Connection refused

  • If all the brokers are down, open post will exit and retry until a broker is available.

 

Note: For more details, please refer to new features in SharePlex 8.6.6 release notes.

https://support.quest.com/shareplex/8.6.6/technical-documents

Additional Knowledge Base articles on how to configure SharePlex replication to Kafka on support.quest.com:

KB212609 - https://support.quest.com/shareplex/kb/212609

Video KB224909 - https://support.quest.com/shareplex/kb/224909

For more information on Kafka and cluster broker set up, please refer to https://kafka.apache.org

Anonymous