LSF 状态异常主机告警
2024-01-03 02:37:49
LSF 集群中的状态异常主机通常需要尽快处理,因为LSF不再向状态异常主机派发作业,会导致LSF集群可用资源减少,甚至影响用户任务运行。
状态异常通常包括:
- unreach : SBD 服务进程终止,通常用户任务的运行不受影响,但LSF无法获取作业的状态和资源使用情况;
- unavail: LIM 服务进程终止,导致LSF无法获取到主机的资源信息。原因有可能是进程终止,或者硬件故障导致主机宕机,或者网络故障导致与管理节点的通信中断。
可以通过命令行或 LSF API 获取主机状态,然后根据告警规则向管理员报告异常状态的主机。为避免频繁发送告警,可以将每次检测到的状态异常的主机列表缓存下来,然后将新检测到的状态异常的主机列表和缓存的进行比较,如果有变化就立即发送通知,如果没有变化,则根据设定的间隔时间发送通知,比如没有变化,当天最多通知两次,然后一天通知一次。
通知示例如下:
检测流程如下:
示例代码如下:
#!/opt/miniconda3/bin/python
##########################################
## Author: shuguangbo
##########################################
import os
import re
import datetime
import shutil
import sys
import stat
import subprocess
import traceback
import time
import yaml
import logging
import logging.config
import re
import urllib3
import requests
import json
import string
import tempfile
import errno
import glob
import pandas as pd
def isWritable(path):
try:
tFile = tempfile.TemporaryFile(dir=path)
tFile.close()
except OSError as e:
if e.errno == errno.EACCES:
return False
e.filename = path
raise
return True
# 辅助函数,运行命令行并返回结果
def executer(cmd, timeout=600):
rc, ret, stdout, stderr = None, None, None, None
try:
if isinstance(cmd, list):
cmd = ' '.join(cmd)
ret = subprocess.run(cmd, shell=True, capture_output=True, timeout=timeout, preexec_fn=os.setsid)
except Exception as e:
logging.error('CMD: [{}] failed. Error: {}, Stack: {}'.format(cmd, str(e), traceback.format_exc()))
stdout = str(e.stdout.strip(), encoding='utf-8', errors='ignore') if 'stdout' in dir(e) else ''
stderr = str(e)
else:
if ret:
rc = ret.returncode
stdout = str(ret.stdout.strip(), encoding='utf-8', errors='ignore') if 'stdout' in dir(ret) else ''
stderr = str(ret.stderr.strip(), encoding='utf-8', errors='ignore') if 'stderr' in dir(ret) else ''
return [rc, stdout, stderr]
def parseConfig(cfname=None):
appConf = None
LSF = True
try:
if cfname is None or not os.path.exists(cfname) :
cfname = os.path.join(os.path.dirname(__file__), os.path.basename(__file__).split('.')[0] + ".yml")
with open(cfname, 'r') as fd :
appConf = yaml.load(fd, Loader=yaml.FullLoader)
except Exception as e:
logging.error("Read configuration failed! Error:{}".format(str(e)))
sys.exit(1)
reqParams = {'NOTICE':['NOTIFY_ADMINS', 'TOKEN', 'LARKURL', 'TMPDIR'],
'LOGGERCONFIG':['formatters', 'handlers', 'root']}
missingParams = {key: set(reqParams[key]) - set(appConf[key].keys()) for key in reqParams.keys() if len(set(reqParams[key]) - set(appConf[key].keys()))}
if len(missingParams):
logging.error('Missing parameters: {}'.format(missingParams))
sys.exit(1)
if 'LSF_ENVDIR' not in os.environ or 'LSF_ENVDIR' not in appConf['LSF']:
sys.exit('Missing LSF_ENVDIR, exit...')
if 'NOTIFY_INTERVAL' not in appConf['NOTICE']:
appConf['NOTICE']['NOTIFY_INTERVAL'] = 24
if 'MAX_NOTIFY_TIMES' not in appConf['NOTICE']:
appConf['NOTICE']['MAX_NOTIFY_TIMES'] = 2
if not os.path.exists(appConf['NOTICE']['TMPDIR']) or not isWritable(appConf['NOTICE']['TMPDIR']):
logging.error('Can\'t write to temp dir {}, please check'.format(appConf['NOTICE']['TMPDIR']))
sys.exit(3)
return appConf
def setHostStatus(host):
if pd.isna(host['status']):
if host['server'].upper() == 'YES':
return 'failed'
else:
return 'ok'
return host['status']
class lsfFailedHostChecker():
def __init__(self, config):
self._config = config
self._baseHosts = []
self._batchHosts = []
self._failedHosts = []
self._numFailedHosts = 0
self._notifyTimes = 0
self._savedData = None
self._sameFailedHosts = True
if lsf.lsb_init('lsf') > 0:
msg = 'Failed to connect LSF, please check LSF health.'
logging.error(msg)
sys.exit(msg)
def run(self):
self._getBaseHosts()
self._getBatchHosts()
self._checkFailedHosts()
self._notify()
# Shall run on LSF node
def _checkFailedHosts(self):
failedHosts = []
baseHosts = pd.DataFrame(self._baseHosts)
batchHosts = pd.DataFrame(self._batchHosts)
lsfHosts = pd.merge(baseHosts, batchHosts, how='left', on=['host'])
lsfHosts['status'] = lsfHosts.apply(lambda host: setHostStatus(host), axis=1)
for index, host in lsfHosts.iterrows():
if host['status'].lower() == 'failed':
failedHosts.append(host['host'])
self._failedHosts = sorted(list(set(failedHosts)))
self._numFailedHosts = len(self._failedHosts)
def _getBaseHosts(self):
hostData = lsf.get_host_info()
for host in hostData:
hostName = host.hostName
isServer = 'no' if host.isServer == '\x00' else 'yes'
self._baseHosts.append({'host':hostName, 'server':isServer})
def _getBatchHosts(self):
numHosts = lsf.new_intp()
lsf.intp_assign(numHosts, 0)
hostData = lsf.lsb_hostinfo_ex(None, numHosts, "", 0)
allHostData = lsf.hostInfoEntArray_frompointer(hostData)
for i in range(0, lsf.intp_value(numHosts)):
hostName = allHostData[i].host
failedStatus = allHostData[i].hStatus & (0x20|0x40|0x100)
hostStatus = 'failed' if failedStatus else 'ok'
self._batchHosts.append({'host':hostName, 'status':hostStatus})
def _getSavedData(self):
savedFailureHosts = None
fname = f"{self._config['NOTICE']['TMPDIR']}/.lsf_failedhosts.yml"
if os.path.exists(fname):
with open(fname, 'r') as fd :
savedFailureHosts = yaml.load(fd, Loader=yaml.FullLoader)
self._savedData = savedFailureHosts
def _saveData(self):
data = {'notifyTimes': self._notifyTimes,
'failedHosts': self._failedHosts,
'lastNotifyTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
fname = f"{self._config['NOTICE']['TMPDIR']}/.lsf_failedhosts.yml"
with open(fname, 'w') as fd:
fd.writelines(json.dumps(data))
def _willNotify(self):
self._getSavedData()
if self._savedData:
self._notifyTimes = self._savedData['notifyTimes']
self._sameFailedHosts = True if len(set(self._failedHosts) ^ set(self._savedData['failedHosts'])) == 0 else False
if not self._sameFailedHosts and self._numFailedHosts:
self._notifyTimes = 0
if self._numFailedHosts == 0 :
fname = f"{self._config['NOTICE']['TMPDIR']}/.lsf_failedhosts.yml"
if os.path.exists(fname):
os.unlink(fname)
return False
if self._numFailedHosts and not self._sameFailedHosts and self._notifyTimes > 0 and self._notifyTimes < self._config['NOTICE']['MAX_NOTIFY_TIMES']:
lastNotifyTime = datetime.datetime.strptime(self._savedData['lastNotifyTime'], '%Y-%m-%d %H:%M:%S')
now = datetime.datetime.now()
delta = (now - lastNotifyTime).total_seconds()
if delta > int(self._config['NOTICE']['NOTIFY_INTERVAL'])*3600:
return True
else:
return False
if not self._sameFailedHosts or self._notifyTimes < self._config['NOTICE']['MAX_NOTIFY_TIMES']:
return True
return False
def _notify(self):
msg = ''
failedHostStr = '\n'.join(self._failedHosts)
logging.info(f"主机状态异常总数 {self._numFailedHosts} {failedHostStr}")
receiver = ' '.join(self._config['NOTICE']['NOTIFY_ADMINS'])
notify = self._willNotify()
if self._numFailedHosts and len(receiver) and notify:
msg=f"主机状态异常总数 {self._numFailedHosts} ,清单如下,请及时处理。\n"
msg += f"{failedHostStr}"
self._sendLark(msg, receiver)
self._notifyTimes += 1
self._saveData()
if self._notifyTimes > self._config['NOTICE']['MAX_NOTIFY_TIMES']:
logging.info("已超过通知发送次数,不再发送通知。")
def _sendLark(self, message, receivers):
try:
session = requests.Session()
larkURL = self._config['NOTICE']['LARKURL']
header = {'Content-Type':'application/json', 'Authorization':self._config['NOTICE']['TOKEN']}
data = {}
data['trigger_key'] = 'LSF主机状态异常提醒'
data['instance'] = {'content':message}
data['notice'] = receivers if type(receivers) == list else receivers.split()
result = session.post(larkURL, headers=header, data=json.dumps(data), verify=False)
if result.status_code == 200:
logging.info('Send message succeeded.')
else:
logging.error(f"Send message failed. Error: {result.text}")
except Exception as e:
logging.error(f"Send message failed. Error: {str(e)}, Stack: {traceback.format_exc()}")
if __name__ == "__main__":
config = parseConfig()
if 'openlava' in os.environ['LSF_ENVDIR']:
LSF = False
from pythonOpenlava import lsf
else:
from pythonlsf import lsf
logging.config.dictConfig(config['LOGGERCONFIG'])
logger = logging.getLogger(os.path.basename(__file__))
lsfChecker = lsfFailedHostChecker(config)
lsfChecker.run()
可以通过 crontab 或者 Jenkins 定时执行此脚本,比如每分钟执行一次,以便及时得到通知。
文章来源:https://blog.csdn.net/weixin_71448448/article/details/135276492
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!