这篇文章主要介绍了Flink 1.11新特性之SQL Hive Streaming的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
成都创新互联于2013年开始,是专业互联网技术服务公司,拥有项目成都网站建设、做网站网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元浚县做网站,已为上家服务,为浚县各地企业和个人服务,联系电话:18982081108
添加相关依赖
测试集群上的 Hive 版本为 1.1.0,Hadoop 版本为 2.6.0,Kafka 版本为 1.0.1。另外,别忘了找到 hdfs-site.xml 和 hive-site.xml,并将其加入项目。2.11 1.11.0 2.6.5-10.0 1.1.0 org.apache.flink flink-streaming-scala_${scala.bin.version} ${flink.version} org.apache.flink flink-clients_${scala.bin.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-table-api-scala-bridge_${scala.bin.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.bin.version} ${flink.version} org.apache.flink flink-connector-hive_${scala.bin.version} ${flink.version} org.apache.flink flink-sql-connector-kafka_${scala.bin.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-shaded-hadoop-2-uber ${flink-shaded-hadoop.version} org.apache.hive hive-exec ${hive.version} 
创建执行环境
Flink 1.11 的 Table/SQL API 中,FileSystem Connector 是靠增强版 StreamingFileSink 组件实现,在源码中名为 StreamingFileWriter。我们知道,只有在 Checkpoint 成功时,StreamingFileSink 写入的文件才会由 Pending 状态变成 Finished 状态,从而能够安全地被下游读取。所以,我们一定要打开 Checkpointing,并设定合理的间隔。
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)streamEnv.setParallelism(3)val tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))
   
   
  
  
    
   
   
  注册 HiveCatalog
val catalogName = "my_catalog"val catalog = new HiveCatalog( catalogName, // catalog name "default", // default database "/Users/lmagic/develop", // Hive config (hive-site.xml) directory "1.1.0" // Hive version)tableEnv.registerCatalog(catalogName, catalog)tableEnv.useCatalog(catalogName)
创建 Kafka 流表
Kafka Topic 中存储的是 JSON 格式的埋点日志,建表时用计算列生成事件时间与水印。1.11 版本 SQL Kafka Connector 的参数相比 1.10 版本有一定简化。前面已经注册了 HiveCatalog,故在 Hive 中可以观察到创建的 Kafka 流表的元数据(注意该表并没有事实上的列)。tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")tableEnv.executeSql("""|CREATE TABLE stream_tmp.analytics_access_log_kafka (| ts BIGINT,| userId BIGINT,| eventType STRING,| fromType STRING,| columnType STRING,| siteId BIGINT,| grouponId BIGINT,| partnerId BIGINT,| merchandiseId BIGINT,| procTime AS PROCTIME(),| eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),| WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND|) WITH (| 'connector' = 'kafka',| 'topic' = 'ods_analytics_access_log',| 'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092'| 'properties.group.id' = 'flink_hive_integration_exp_1',| 'scan.startup.mode' = 'latest-offset',| 'format' = 'json',| 'json.fail-on-missing-field' = 'false',| 'json.ignore-parse-errors' = 'true'|)""".stripMargin)
hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;OK# col_name data_type comment# Detailed Table InformationDatabase: stream_tmpOwner: nullCreateTime: Wed Jul 15 18:25:09 CST 2020LastAccessTime: UNKNOWNProtect Mode: NoneRetention: 0Location: hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafkaTable Type: MANAGED_TABLETable Parameters:flink.connector kafkaflink.format jsonflink.json.fail-on-missing-field falseflink.json.ignore-parse-errors trueflink.properties.bootstrap.servers kafka110:9092,kafka111:9092,kafka112:9092flink.properties.group.id flink_hive_integration_exp_1flink.scan.startup.mode latest-offsetflink.schema.0.data-type BIGINTflink.schema.0.name tsflink.schema.1.data-type BIGINTflink.schema.1.name userIdflink.schema.10.data-type TIMESTAMP(3)flink.schema.10.expr TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss'))flink.schema.10.name eventTimeflink.schema.2.data-type VARCHAR(2147483647)flink.schema.2.name eventType# 略......flink.schema.9.data-type TIMESTAMP(3) NOT NULLflink.schema.9.expr PROCTIME()flink.schema.9.name procTimeflink.schema.watermark.0.rowtime eventTimeflink.schema.watermark.0.strategy.data-type TIMESTAMP(3)flink.schema.watermark.0.strategy.expr `eventTime` - INTERVAL '15' SECONDflink.topic ods_analytics_access_logis_generic truetransient_lastDdlTime 1594808709# Storage InformationSerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDeInputFormat: org.apache.hadoop.mapred.TextInputFormatOutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormatCompressed: NoNum Buckets: -1Bucket Columns: []Sort Columns: []Storage Desc Params:serialization.format 1Time taken: 1.797 seconds, Fetched: 61 row(s)
创建 Hive 表
Flink SQL 提供了兼容 HiveQL 风格的 DDL,指定 SqlDialect.HIVE 即可( DML 兼容还在开发中)。为了方便观察结果,以下的表采用了天/小时/分钟的三级分区,实际应用中可以不用这样细的粒度(10分钟甚至1小时的分区可能更合适)。Hive 表的参数复用了 SQL FileSystem Connector 的相关参数,与分区提交(Partition Commit)密切相关。仅就上面出现的4个参数简单解释一下。tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")tableEnv.executeSql("""|CREATE TABLE hive_tmp.analytics_access_log_hive (| ts BIGINT,| user_id BIGINT,| event_type STRING,| from_type STRING,| column_type STRING,| site_id BIGINT,| groupon_id BIGINT,| partner_id BIGINT,| merchandise_id BIGINT|) PARTITIONED BY (| ts_date STRING,| ts_hour STRING,| ts_minute STRING|) STORED AS PARQUET|TBLPROPERTIES (| 'sink.partition-commit.trigger' = 'partition-time',| 'sink.partition-commit.delay' = '1 min',| 'sink.partition-commit.policy.kind' = 'metastore,success-file',| 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'|)""".stripMargin)
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#streaming-sink
流式写入 Hive
注意将流表中的事件时间转化为 Hive 的分区。tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)tableEnv.executeSql( """ |INSERT INTO hive_tmp.analytics_access_log_hive |SELECT | ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId, | DATE_FORMAT(eventTime,'yyyy-MM-dd'), | DATE_FORMAT(eventTime,'HH'), | DATE_FORMAT(eventTime,'mm') |FROM stream_tmp.analytics_access_log_kafka |WHERE merchandiseId > 0 """.stripMargin)来观察一下流式 Sink 的结果吧。

  
    
  
  
  hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT)))    > FROM hive_tmp.analytics_access_log_hive    > WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23';OK2020-07-15 23:23:00 2020-07-15 23:23:59Time taken: 1.115 seconds, Fetched: 1 row(s)
  
    
  
  
  
  
    
  
  
  流式读取 Hive
要将 Hive 表作为流式 Source,需要启用 Dynamic Table Options,并通过 Table Hints 来指定 Hive 数据流的参数。以下是简单地通过 Hive 计算商品 PV 的例子。  
    
  
  
  tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)
val result = tableEnv.sqlQuery(  """     |SELECT merchandise_id,count(1) AS pv     |FROM hive_tmp.analytics_access_log_hive     |/*+ OPTIONS(     |  'streaming-source.enable' = 'true',     |  'streaming-source.monitor-interval' = '1 min',     |  'streaming-source.consume-start-offset' = '2020-07-15 23:30:00'     |) */     |WHERE event_type = 'shtOpenGoodsDetail'     |AND ts_date >= '2020-07-15'     |GROUP BY merchandise_id     |ORDER BY pv DESC LIMIT 10   """.stripMargin)
result.toRetractStream[Row].print().setParallelism(1)streamEnv.execute()
  
    
  
  
  
  
    
  
  
  三个 Table Hint 参数的含义解释如下。https://links.jianshu.com/go?to=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Ftable%2Fhive%2Fhive_streaming.html%23streaming-reading最后,由于 SQL 语句中有 ORDER BY 和 LIMIT 逻辑,所以需要调用 toRetractStream() 方法转化为回撤流,即可输出结果。
感谢你能够认真阅读完这篇文章,希望小编分享的“Flink 1.11新特性之SQL Hive Streaming的示例分析”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!