今天就跟大家聊聊有关什么是增量索引实现以及投送数据到MQ kafka,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

创新互联是一家专业提供海拉尔企业网站建设,专注与成都做网站、成都网站设计、HTML5建站、小程序制作等业务。10年已为海拉尔众多企业、政府机构等服务。创新互联专业网络公司优惠进行中。
我们将根据binlog 的数据对象,来实现增量数据的处理,我们构建广告的增量数据,其实说白了就是为了在后期能把广告投放到索引服务,实现增量数据到增量索引的生成。
定义一个投递增量数据的接口(接收参数为我们上一节定义的binlog日志的转换对象)
/** * ISender for 投递增量数据 方法定义接口 * * @author Isaac.Zhang | 若初 */ public interface ISender { void sender(MySQLRowData rowData); }
创建增量索引监听器
/** * IncrementListener for 增量数据实现监听 * * @author Isaac.Zhang | 若初 * @since 2019/6/27 */ @Slf4j @Component public class IncrementListener implements Ilistener { private final AggregationListener aggregationListener; @Autowired public IncrementListener(AggregationListener aggregationListener) { this.aggregationListener = aggregationListener; } //根据名称选择要注入的投递方式 @Resource(name = "indexSender") private ISender sender; /** * 标注为 {@link PostConstruct}, * 即表示在服务启动,Bean完成初始化之后,立刻初始化 */ @Override @PostConstruct public void register() { log.info("IncrementListener register db and table info."); Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this)); } @Override public void onEvent(BinlogRowData eventData) { TableTemplate table = eventData.getTableTemplate(); EventType eventType = eventData.getEventType(); //包装成最后需要投递的数据 MysqlRowData rowData = new MysqlRowData(); rowData.setTableName(table.getTableName()); rowData.setLevel(eventData.getTableTemplate().getLevel()); //将EventType转为OperationTypeEnum OperationTypeEnum operationType = OperationTypeEnum.convert(eventType); rowData.setOperationTypeEnum(operationType); //获取模版中该操作对应的字段列表 ListfieldList = table.getOpTypeFieldSetMap().get(operationType); if (null == fieldList) { log.warn("{} not support for {}.", operationType, table.getTableName()); return; } for (Map afterMap : eventData.getAfter()) { Map _afterMap = new HashMap<>(); for (Map.Entry entry : afterMap.entrySet()) { String colName = entry.getKey(); String colValue = entry.getValue(); _afterMap.put(colName, colValue); } rowData.getFieldValueMap().add(_afterMap); } sender.sender(rowData); } }
首先来配置监听binlog的数据库连接信息
adconf: mysql: host: 127.0.0.1 port: 3306 username: root password: 12345678 binlogName: "" position: -1 # 从当前位置开始监听
编写配置类:
/** * BinlogConfig for 定义监听Binlog的配置信息 * * @author Isaac.Zhang | 若初 */ @Component @ConfigurationProperties(prefix = "adconf.mysql") @Data @AllArgsConstructor @NoArgsConstructor public class BinlogConfig { private String host; private Integer port; private String username; private String password; private String binlogName; private Long position; }
在我们实现 监听binlog那节,我们实现了一个自定义client CustomBinlogClient,需要实现binlog的监听,这个监听的客户端就必须是一个独立运行的线程,并且要在程序启动的时候进行监听,我们来实现运行当前client的方式,这里我们会使用到一个新的Runnerorg.springframework.boot.CommandLineRunner,let's code.
@Slf4j
@Component
public class BinlogRunner implements CommandLineRunner {
@Autowired
private CustomBinlogClient binlogClient;
@Override
public void run(String... args) throws Exception {
log.info("BinlogRunner is running...");
binlogClient.connect();
}
}在binlog监听的过程中,我们看到针对于int, String 这类数据字段,mysql的记录是没有问题的,但是针对于时间类型,它被格式化成了字符串类型:Fri Jun 21 15:07:53 CST 2019。
--------Insert-----------
WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
[10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
--------Update-----------
UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
{before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}对于这个时间格式,我们需要关注2点信息:
CST,这个时间格式会比我们的时间+ 8h(中国标准时间 China Standard Time UT+8:00)
需要对这个日期进行解释处理
当然,我们也可以通过设置mysql的日期格式来改变该行为,在此,我们通过编码来解析该时间格式:
/**
* Thu Jun 27 08:00:00 CST 2019
*/
public static Date parseBinlogString2Date(String dateString) {
try {
DateFormat dateFormat = new SimpleDateFormat(
"EEE MMM dd HH:mm:ss zzz yyyy",
Locale.US
);
return DateUtils.addHours(dateFormat.parse(dateString), -8);
} catch (ParseException ex) {
log.error("parseString2Date error:{}", dateString);
return null;
}
}因为我们在定义索引的时候,是根据表之间的层级关系(Level)来设定的,根据代码规范,不允许出现Magic Number, 因此我们定义一个数据层级枚举,来表达数据层级。
/** * AdDataLevel for 广告数据层级 * * @author Isaac.Zhang | 若初 */ @Getter public enum AdDataLevel { LEVEL2("2", "level 2"), LEVEL3("3", "level 3"), LEVEL4("4", "level 4"); private String level; private String desc; AdDataLevel(String level, String desc) { this.level = level; this.desc = desc; } }
因为增量数据可以投递到不同的位置以及用途,我们之前实现了一个投递接口com.sxzhongf.ad.sender.ISender,接下来我们实现一个投递类:
@Slf4j
@Component("indexSender")
public class IndexSender implements ISender {
/**
* 根据广告级别,投递Binlog数据
*/
@Override
public void sender(MysqlRowData rowData) {
if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {
Level2RowData(rowData);
} else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {
Level3RowData(rowData);
} else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {
Level4RowData(rowData);
} else {
log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));
}
}
private void Level2RowData(MysqlRowData rowData) {
if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {
List planTables = new ArrayList<>();
for (Map fieldValueMap : rowData.getFieldValueMap()) {
AdPlanTable planTable = new AdPlanTable();
//Map的第二种循环方式
fieldValueMap.forEach((k, v) -> {
switch (k) {
case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:
planTable.setPlanId(Long.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:
planTable.setUserId(Long.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:
planTable.setPlanStatus(Integer.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:
planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:
planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));
break;
}
});
planTables.add(planTable);
}
//投递推广计划
planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));
} else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {
List creativeTables = new LinkedList<>();
rowData.getFieldValueMap().forEach(afterMap -> {
AdCreativeTable creativeTable = new AdCreativeTable();
afterMap.forEach((k, v) -> {
switch (k) {
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:
creativeTable.setAdId(Long.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:
creativeTable.setType(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:
creativeTable.setMaterialType(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:
creativeTable.setHeight(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:
creativeTable.setWidth(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:
creativeTable.setAuditStatus(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:
creativeTable.setAdUrl(v);
break;
}
});
creativeTables.add(creativeTable);
});
//投递广告创意
creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));
}
}
private void Level3RowData(MysqlRowData rowData) {
...
}
/**
* 处理4级广告
*/
private void Level4RowData(MysqlRowData rowData) {
...
}
} 为了我们的数据投放更加灵活,方便数据统计,分析等系统的需求,我们来实现一个投放到消息中的接口,其他服务可以订阅当前MQ 的TOPIC来实现数据订阅。
配置文件中配置TOPIC adconf: kafka: topic: ad-search-mysql-data -------------------------------------- /** * KafkaSender for 投递Binlog增量数据到kafka消息队列 * * @author Isaac.Zhang | 若初 * @since 2019/7/1 */ @Component(value = "kafkaSender") public class KafkaSender implements ISender { @Value("${adconf.kafka.topic}") private String topic; @Autowired private KafkaTemplate kafkaTemplate; /** * 发送数据到kafka队列 */ @Override public void sender(MysqlRowData rowData) { kafkaTemplate.send( topic, JSON.toJSONString(rowData) ); } /** * 测试消费kafka消息 */ @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search") public void processMysqlRowData(ConsumerRecord record) { Optional kafkaMsg = Optional.ofNullable(record.value()); if (kafkaMsg.isPresent()) { Object message = kafkaMsg.get(); MysqlRowData rowData = JSON.parseObject( message.toString(), MysqlRowData.class ); System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData)); //sender.sender(); } } } ```
看完上述内容,你们对什么是增量索引实现以及投送数据到MQ kafka有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。