1ï¼ä»ä¹æ¯cdc
CDCæ¯ï¼Change Data Capture åæ´æ°æ®è·åï¼çç®ç§°ãæ ¸å¿ææ³æ¯ï¼çæµå¹¶æè·æ°æ®åºçåå¨ï¼å æ¬æ°æ® æ æ°æ®è¡¨çæå ¥INSERTãæ´æ°UPDATEãå é¤DELETEçï¼ï¼å°è¿äºåæ´æåçç顺åºå®æ´è®°å½ä¸æ¥ï¼åå ¥å°æ¶æ¯ä¸é´ä»¶ä¸ä»¥ä¾å ¶ä»æå¡è¿è¡è®¢é åæ¶è´¹ã
2ï¼flinkçcdc
项ç®å°åï¼https://github.com/ververica/flink-cdc-connectors
项ç®ææ¡£ï¼https://ververica.github.io/flink-cdc-connectors/master/
Â
3ï¼ç¯å¢åå¤
- mysql
- elasticsearch
- flink on yarn
说æï¼å¦æ没æå®è£ hadoopï¼é£ä¹å¯ä»¥ä¸ç¨yarnï¼ç´æ¥ç¨flink standaloneç¯å¢å§ã
æ¬ä¾ä½¿ç¨çæ¬å¦ä¸ï¼
 ä¸é¢ä¸¤ä¸ªå°åä¸è½½flinkçä¾èµå ï¼æ¾å¨libç®å½ä¸é¢ãã
ããä¸è½½å°åï¼
ãã1ãhttps://repo.maven.apache.org/maven2/com/alibaba/ververica/
ããflink-sql-connector-mysql-cdc-1.4.0.jar
ããæ¤ä»åºæä¾çææ°çæ¬ä¸º1.4.0ï¼å¦éæ°çæ¬å¯èªè¡ç¼è¯æè å»https://mvnrepository.com/ä¸è½½ã
ãã2ãhttps://repo.maven.apache.org/maven2/org/apache/flink/
ããflink-sql-connector-elasticsearch7_2.11-1.13.5.jar
ããå°åï¼æ¤å¤ä½¿ç¨çæ¯es7ï¼ç±äºæ¬å°ç¯å¢æ¯es8导è´æ æ³å建索å¼ï¼åéæ°å®è£ es7æµè¯æåã
Â
4ï¼å¯å¨flink
å¯å¨flinké群
./start-cluster.sh
å¯å¨æåçè¯ï¼å¯ä»¥å¨ http://localhost:8081/ 访é®å° Flink Web UIï¼å¦ä¸æ示ï¼
 å¯å¨flink sql client
./sql-client.sh
å¯å¨æååï¼å¯ä»¥çå°å¦ä¸ç页é¢ï¼
Â
5ï¼æ°æ®åæ¥åå§å
1ï¼mysqlæ°æ®åºåå§è¡¨
CREATE TABLE `product_view` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) NOT NULL, `product_id` int(11) NOT NULL, `server_id` int(11) NOT NULL, `duration` int(11) NOT NULL, `times` varchar(11) NOT NULL, `time` datetime NOT NULL, PRIMARY KEY (`id`), KEY `time` (`time`), KEY `user_product` (`user_id`,`product_id`) USING BTREE, KEY `times` (`times`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- æ ·æ¬æ°æ®
INSERT INTO `product_view` VALUES ('1', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('2', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('3', '1', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('4', '1', '1', '2', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('5', '8', '1', '1', '120', '120', '2020-05-14 13:14:00');
INSERT INTO `product_view` VALUES ('6', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
INSERT INTO `product_view` VALUES ('7', '8', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('8', '8', '1', '3', '120', '120', '2020-04-23 13:14:00');
INSERT INTO `product_view` VALUES ('9', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
2ï¼flink å建sourceæ°æ®åºå ³è表
CREATE TABLE product_view_source ( `id` int, `user_id` int, `product_id` int, `server_id` int, `duration` int, `times` string, `time` timestamp, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '10.34.100.209', 'port' = '3306', 'username' = 'root', 'password' = '123', 'database-name' = 'flinkcdc_test', 'table-name' = 'product_view',
'server-id' = '5401' );
è¿æ ·ï¼æ们å¨flink-sql clientæä½è¿ä¸ªè¡¨ç¸å½äºæä½mysqléé¢ç对åºè¡¨ã
3ï¼flink å建sinkï¼æ°æ®åºå ³è表elasticsearch
CREATE TABLE product_view_sink( `id` int, `user_id` int, `product_id` int, `server_id` int, `duration` int, `times` string, `time` timestamp, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://10.34.100.156:9200', 'index' = 'product_view_index' );
è¿æ ·ï¼eséçproduct_view_indexè¿ä¸ªç´¢å¼å¨æ°æ®åæ¥æ¶ä¼è¢«èªå¨å建ï¼å¦ææ³æå®ä¸äºå±æ§ï¼å¯ä»¥æåæå¨å建好索å¼ãå¾product_view_sinkéé¢æå ¥æ°æ®ï¼å¯ä»¥åç°esä¸å·²ç»ææ°æ®äºã
æ¥çflinkå建ç表
 æ¥çflink表æ°æ®
select * from product_view_source;
Â
select * from product_view_sink;
 ç±æ¤å¯è§ï¼sinkä¸è½ç´æ¥ä½¿ç¨sqlæ¥è¯¢ã
4ï¼å»ºç«åæ¥ä»»å¡
insert into product_view_sink select * from product_view_source;
è¿ä¸ªæ¶åæ¯å¯ä»¥éåºflink sql-clientçï¼ç¶åè¿å ¥flink web-uiï¼å¯ä»¥çå°mysql表æ°æ®å·²ç»åæ¥å°elasticsearchä¸äºï¼å¯¹mysqlè¿è¡æå ¥å é¤æ´æ°ï¼elasticsearché½æ¯åæ¥æ´æ°çã
æ¥çä»»å¡
 æ¥çesæ°æ®
Â
6ï¼æ°æ®å®æ¶åæ¥
1ï¼æ°å¢è®°å½
mysqlæ°æ®åºæå ¥ä¸æ¡è®°å½
INSERT INTO `product_view` VALUES ('10', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
æ¥è¯¢esï¼æ°å¢ä¸æ¡è®°å½
Â
2ï¼å é¤è®°å½
mysqlæ°æ®åºå é¤ä¸æ¡è®°å½
DELETE FROM `product_view` where id=10;
æ¥è¯¢esï¼åå°ä¸æ¡è®°å½
 3ï¼æ´æ°è®°å½
esåå§è®°å½
 mysqlæ´æ°ä¸æ¡è®°å½
UPDATE `product_view` SET user_id=100,product_id=101 WHERE id=2;
åæ´åesè®°å½
Â
7ï¼éå°çé®é¢
1ï¼èµæºä¸è¶³
flinké»è®¤taskmanager.numberOfTaskSlots=1å³åªè½è¿è¡ä¸ä¸ªåä»»å¡ï¼ä¸è¬è®¾ç½®ä¸ºæºå¨çCPUæ ¸å¿æ°ã
 2ï¼éå¤server-id
Â
åæï¼æ¯ä¸ªinsertè¯å¥å°±ä¼å½¢æä¸ä¸ªjobï¼å°±æ¯ä¸ä¸ªåæ¥ä»»å¡ã
ç»è®ºï¼éè¿å®è·µå¯ç¥ï¼ä¸åçjobæ æ³å ±äº«ç¸åserver-idçsource表ï¼ä¸ä¸ªjobä¸ä¹ä¸è½åå¨ç¸åserver-idçsource表ã
åºæ¯ï¼åå¦source1åsource2è¡¨å ·æç¸åçserver-idï¼å¦æjob1ä¸ä½¿ç¨source1æsource2ï¼é£å ¶ä»jobå°±ä¸è½å¨ç¨source1ãsource2äºã
åæï¼å æ交ä¸ä¸ªjob1并ä¸å·²ç»å¨åæ¥äºï¼æ¤æ¶å¦ææ交çjob2ä¸æsource表ä¸job1ä¸source表æç¸åçserver-idï¼æjob2ä¸ä½¿ç¨åjob1ä¸éå¤çsource表ï¼é£job2ä¹ä»job1å·²ç»è¯»å°çbinlogä½ç½®å¼å§è¯»å°±ä¼æé®é¢ï¼å¯¼è´ä¸¢å¤±æ°æ®ï¼æ¥å¦ä¸é误ã
æä½³å®è·µï¼ä¸å¼ source表对åºä¸ä¸ªserver-idï¼å¦ç¸åçsource表éè¦æ交å°å¤ä¸ªjobä¸ï¼å¯ä»¥å¨æ¯ä¸ªjobä¸è®¾ç½®ä¸åç表ååserver-idã
举ä¾è¯´æï¼å¦order表éè¦å¨3个jobä¸ä½¿ç¨ï¼job1ä¸name=order1,server-id=5401;job2ä¸name=order2,server-id=5402;job3ä¸name=order3,server-id=5403;è¿æ ·3个jobå°±ä¼åèªç»´æ¤åèªçbinlogç¶æ