yarn on flink 监控 flink任务监控

2024-01-08 14:32:41

Flink任务一般为实时不断运行的任务,如果没有任务监控,
任务异常时无法第一时间处理会比较麻烦。
这里通过调用API接口方式来获取参数,实现任务监控。

Flink任务监控(基于API接口编写shell脚本)
一 flink-on-yarn 模式
二 编写shell 脚本?

获取所有application

curl -s http://XXX:8088/ws/v1/cluster/apps

获取 state值为 RUNNING 的application任务

curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING?

获取这个任务单个信息?

curl -s http://XXX:8088/ws/v1/cluster/apps/application_1619074605427_0063 |jq .app.state

jq,是linux一个很方便的json处理工具

通俗的说就是一个能够接受json,处理json,输出json的程序,反正很好用。

安装起来也非常的方便,直接使用yum即可安装。

yum install jq

编写shell脚本

由于公司离线yarn和实时yarn 采用是分开的方式。
只需要监控实时yarn 任务有没有处于RUNNING,达到监控的目的
这里shell脚本也只记录,flink-on-yarn 这种部署方式任务监控
shell脚本水平有限,大家多多谅解,欢迎指导

shell脚本实现功能:
获取线运行job任务,记录到日志文件。下一次脚本调用时候读取日志文件,判断状态。
不是RUNNING,就告警同时重新记录日志。

?

#!/bin/bash

Joblist=`cat /opt/shell/logs/flink_job.log`    #获取记录job的log文件
let i=0  #获取任务数
let log_count=0  #获取日志中的任务数
start_count=RUNNING  #判断任务是否存在异常

############## 1 判断日志文件内容是否为空,为空时自动读取flink任务并记录到日志文件 #########
if [ -z "$Joblist" ]
then
	while :
	do
		job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

		if [ ${job_id[$i]} = "null" ];then
			break
		else
			echo ${job_id[$i]}
			echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
			let i++
		fi
	done
fi


############## 2 读取文件中JOB任务 ##################

let i=0
while read line
do
	JOB[$i]=$line
	let i++
done</opt/shell/logs/flink_job.log

log_count=$i #获取日志中的任务数


########### 3  判断任务状态,是否为RUNNIG,不是则邮件告警   ###############
for ((j=0;j<i;j++))
do
	JOB_ID=${JOB[$j]//\"}
	JOB_status=`curl -s http://XXXX:8088/ws/v1/cluster/apps/$JOB_ID  | jq .app.state`
	JOB_NAME=`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID  | jq .app.name`
	START=$[`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq  .app.startedTime` / 1000]

#	echo "JOB_NAME: "$JOB_NAME
#	echo 启动时间: `date -d @$START +"%F %H:%M:%S"`
#	echo "JOB_status: " ${JOB_status//\"}

#echo -e "【$JOB_NAME】 \n JOB_ID: $JOB_ID \n 启动时间: `date -d @$START +"%F %H:%M:%S"` \n 检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n 目前状态: $JOB_status"
#echo "=============================================="

	if [ ${JOB_status//\"} != "RUNNING" ];then
		SUBJECT="【异常告警】Flink任务异常"
		TEXT="Flink任务 【$JOB_NAME】 异常故障 \n\nJOB_ID: $JOB_ID\n\n启动时间: `date -d @$START +"%F %H:%M:%S"` \n\n检查时间: `date "+%Y-%m-%d %H:%M:%S"`  \n\n目前状态: $JOB_status"
		echo -e $TEXT | mail -s $SUBJECT     邮箱地址
		start_count=erron
	fi
done


########### 4  出现任务异常,重新读取job 任务记录到日志文件   ###############

let i=0
if [ $start_count == "erron" ];then


echo '重新写入日志文件'
	while :
	do
		job_id[$i]=`curl -s http://XXXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

		if [ ${job_id[$i]} = "null" ];then
			break
		elif  [ $i == 0 ]; then
			echo ${job_id[$i]}>/opt/shell/logs/flink_job.log

		else
			echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
		fi
		let i++
	done
	start_count=RUNNING
fi

########### 5  判断线上任务数是否一致,是否有新任务增加   ###############



let i=0
while :
do
	job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

	if [ ${job_id[$i]} = "null" ];then
		break
	else

		let i++
	fi
done
let count=$i #线上任务数
echo "==========================线上最新RUNNING状态任务数: "$count
echo "==========================日志RUNNING状态任务数: "$log_count



if [ ! $count -eq $log_count ]; then
	echo "现有RUNNING状态任务数不相等于已记录的任务数"
	echo  ${job_id[0]} >/opt/shell/logs/flink_job.log
	for ((i=1;i<count;i++))
	do
		echo "重新写入JOB: "${job_id[$i]}
		echo ${job_id[$i]}>> /opt/shell/logs/flink_job.log

	done

fi

echo "======================当前时间: `date "+%Y-%m-%d %H:%M:%S"`======================================="
echo  ================================================================================================
echo  =====================================本次crontab监控结束========================================
echo  ================================================================================================

文章来源:https://blog.csdn.net/m0_57320261/article/details/135455006
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。