using AutoNotificatPhone.Controllers;
using Common;
using Microsoft.Extensions.Configuration;
using NLog;
using Npgsql;
using System.Diagnostics;
namespace AutoNotificatPhone.Models
{
///
/// 定时后台服务:
/// 1) 每分钟固定时刻执行巡检
/// 2) 执行整点/定时通知
/// 3) 执行 Redis 指标告警
/// 4) 执行 PostgreSQL 心跳检查告警
///
public class TimerClass : BackgroundService
{
// NLog 记录器
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
// 告警通知接收手机号
private static readonly string Mobile1 = "13509214696";
private static readonly string Mobile2 = "16620970520";
// 每日定时任务触发小时(北京时间)
private static readonly HashSet DailyTaskHours = [10, 15, 22];
// 每分钟在第 30 秒触发一次巡检
private const int RunSecond = 30;
// 主循环异常后的重试等待时间(秒)
private const int RetryDelaySeconds = 10;
// 默认短信/电话任务过期时间(秒)
private const int SmsDeadlineSeconds = 1800;
private const int CallDeadlineSeconds = 900;
// 扩展短信/电话任务过期时间(秒)
private const int ExtendedSmsDeadlineSeconds = 3600;
private const int ExtendedCallDeadlineSeconds = 1800;
// Kafka 心跳超时阈值(分钟)
private const int KafkaStaleMinutes = 5;
// 数据库连接失败累计到 N 次才触发一次告警,避免告警风暴
private const int KafkaDbAlertTriggerCount = 8;
// 接收包“低值”判定阈值
private const int RecvPackageLowThreshold = 70000;
// 用于防止同一整点任务重复执行
private readonly Dictionary _executedTasks = new();
// 复用 API 控制器发送短信/电话任务
private readonly CallAndMsgController _callAndMsgController = new();
// 从 appsettings 读取 Postgres 配置
private readonly IConfiguration _configuration;
// Kafka 数据库连接失败计数器
private int _kafkaDbConnectionAlertCount;
///
/// 构造函数,注入配置对象。
///
/// 应用配置(用于读取 Postgres 节点)
public TimerClass(IConfiguration configuration)
{
_configuration = configuration;
}
///
/// 后台服务主循环。
///
/// 服务取消令牌
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
// 等待到下一次固定执行时刻(每分钟第 RunSecond 秒)
await DelayUntilNextRunAsync(cancellationToken);
// 检查电话机进程在线状态并写日志
LogPhoneStatus(CheckPhoneIsOnline());
// 执行整点/每日通知任务
RunHourlyNotificationTask();
// 执行各项系统检查任务
CheckCpuThreshold();
CheckRcuOnline();
CheckTotalSendPackage();
CheckTotalRecvPackage();
await CheckKafkaHeartbeatAsync();
}
catch (TaskCanceledException)
{
// 服务停止时会进入这里
Logger.Error("任务被取消");
break;
}
catch (Exception ex)
{
// 主循环兜底异常,稍后重试
Logger.Error($"主循环发生错误: {ex.Message}");
await Task.Delay(TimeSpan.FromSeconds(RetryDelaySeconds), cancellationToken);
}
}
}
///
/// 等待到下一次固定执行时间。
///
/// 取消令牌
private async Task DelayUntilNextRunAsync(CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
var nextRunTime = CalculateNextRunTime(now);
var delayTime = nextRunTime - now;
// 到点前阻塞等待
await Task.Delay(delayTime, cancellationToken);
}
///
/// 计算下一次执行时间点(每分钟第 RunSecond 秒)。
///
/// 当前 UTC 时间
/// 下一次执行时间
private static DateTime CalculateNextRunTime(DateTime now)
{
var nextRunTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute, RunSecond);
// 如果当前秒已过触发点,则顺延到下一分钟
return now.Second >= RunSecond ? nextRunTime.AddMinutes(1) : nextRunTime;
}
///
/// 输出电话机在线状态日志。
///
/// 是否在线
private static void LogPhoneStatus(bool isOnline)
{
Logger.Error(isOnline
? "电话机在线,开始判断!+++++str+++++"
: "电话机不在线,下面内容可能不会执行!+++++err+++++");
}
///
/// 通过本机进程名判断电话机程序是否运行。
///
/// 在线返回 true,否则 false
private static bool CheckPhoneIsOnline()
{
try
{
// 约定进程名为 Telephone
return Process.GetProcessesByName("Telephone").Length > 0;
}
catch (Exception ex)
{
Logger.Error($"电话机进程检查失败: {ex.Message}");
return false;
}
}
///
/// 整点通知任务调度(北京时间):
/// - 非整点直接返回
/// - 同一整点只执行一次
/// - 10/15/22 点执行每日任务,其余整点执行整点短信
///
private void RunHourlyNotificationTask()
{
// 当前北京时间
var beijingTime = DateTime.UtcNow.AddHours(8);
// 仅整点触发
if (beijingTime.Minute != 0)
{
return;
}
// 当前整点键,用于去重
var hourlyKey = new DateTime(beijingTime.Year, beijingTime.Month, beijingTime.Day, beijingTime.Hour, 0, 0);
if (_executedTasks.ContainsKey(hourlyKey))
{
// 避免重复执行
return;
}
Logger.Error($"准备执行整点短信任务 - 时间点: {hourlyKey:yyyy-MM-dd HH:mm}");
// 每日固定时点执行“每日任务”,否则执行“整点短信”
if (DailyTaskHours.Contains(beijingTime.Hour))
{
ExecuteDailyTask(beijingTime);
}
else
{
SendHourlySms(beijingTime);
}
// 标记当前整点已执行
_executedTasks[hourlyKey] = true;
// 清理历史日期记录
CleanupOldTasks(beijingTime);
}
///
/// 清理前一天及更早的整点执行记录。
///
/// 当前时间(北京时间)
private void CleanupOldTasks(DateTime currentTime)
{
var keysToRemove = _executedTasks.Keys.Where(key => key.Date < currentTime.Date).ToList();
foreach (var key in keysToRemove)
{
_executedTasks.Remove(key);
}
}
///
/// 发送整点短信。
///
/// 当前北京时间
private void SendHourlySms(DateTime beijingTime)
{
try
{
// 生成展示时间文本
var currentTimestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
var dateTimeStr = $"{beijingTime.Month}月{beijingTime.Day}日{beijingTime.Hour}点";
var smsContent = $"[BLV运维提示] 整点系统状态报告。当前时间:{dateTimeStr}";
// 仅发送短信到 Mobile1
var request = CreateSmsRequest(
type: "2",
deadline: currentTimestamp + SmsDeadlineSeconds,
phone: Mobile1,
caller: "整点报告",
content: smsContent);
// 投递短信任务
_callAndMsgController.SendToPhone(request);
}
catch (Exception ex)
{
Logger.Error($"发送整点短信时出错:{ex.Message}");
}
}
///
/// 执行每日定时通知(短信 + 电话)。
///
/// 当前北京时间
private void ExecuteDailyTask(DateTime beijingTime)
{
try
{
var currentTimestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
var dateTimeStr = $"{beijingTime.Month}月{beijingTime.Day}日{beijingTime.Hour}点";
var smsContent = $"[BLV运维提示] 每日定时通知。当前时间为:{dateTimeStr}";
var callContent = $"BLV运维提示 每日定时通知 当前时间为 {dateTimeStr}";
// 两路短信 + 一路电话
var smsRequest1 = CreateSmsRequest("2", currentTimestamp + SmsDeadlineSeconds, Mobile1, "每日定时通知", smsContent);
var smsRequest2 = CreateSmsRequest("2", currentTimestamp + SmsDeadlineSeconds, Mobile2, "每日定时通知", smsContent);
var callRequest = CreateSmsRequest("1", currentTimestamp + CallDeadlineSeconds, Mobile1, "每日定时通知", callContent);
// 执行发送并检查结果
var smsResult1 = _callAndMsgController.SendToPhone(smsRequest1);
var smsResult2 = _callAndMsgController.SendToPhone(smsRequest2);
var callResult = _callAndMsgController.SendToPhone(callRequest);
if (!smsResult1.isok || !smsResult2.isok || !callResult.isok)
{
Logger.Error($"发送每日定时通知失败:短信1={smsResult1.message} 短信2={smsResult2.message} 电话={callResult.message}");
}
}
catch (Exception ex)
{
Logger.Error($"执行每日任务时出错:{ex.Message}");
}
}
///
/// CPU 阈值检查:
/// 1) 先判断监控程序是否失联
/// 2) 再判断 CPU 指标是否超过阈值
///
private void CheckCpuThreshold()
{
try
{
// 监控程序最后上报时间(来自 Redis)
var detectTimeString = CSRedisCacheHelper.redis1.Get("UDPPackage_DetectTime");
if (!string.IsNullOrEmpty(detectTimeString) &&
DateTime.TryParse(detectTimeString, out var detectTime) &&
(DateTime.UtcNow - detectTime).TotalMinutes > 10)
{
// 超过 10 分钟未更新,触发监控失联告警
ExecuteMonitorUnavailableAlert(detectTime);
return;
}
// 拉取 CPU 指标
var cpuMax = CSRedisCacheHelper.redis1.Get("UDPPackage_CPUMax");
var cpuAvg = CSRedisCacheHelper.redis1.Get("UDPPackage_CPUAvg");
var cpuMaxValues = ParseCsvToIntList(cpuMax);
var cpuAvgValues = ParseCsvToIntList(cpuAvg);
// 规则:平均 CPU >= 80 的点达到 6 个触发告警
if (CheckThreshold(cpuAvgValues, threshold: 80, requiredCount: 6))
{
var cpuMinValues = ParseCsvToIntList(CSRedisCacheHelper.redis1.Get("UDPPackage_CPUMin"));
ExecuteCpuAlert(cpuMaxValues, cpuMinValues, cpuAvgValues);
}
}
catch (Exception ex)
{
Logger.Error($"CPU阈值检查错误: {ex.Message}");
}
}
///
/// 检查 RCU 在线数量。
///
private void CheckRcuOnline()
{
CheckRedisValue(
redisKey: "RCUOnLine",
baselineCount: 8,
thresholdRatio: 0.8,
alertAction: ExecuteRcuOnlineAlert,
logPrefix: "RCU主机的在线数量");
}
///
/// 检查 RCU 总发送包数量。
///
private void CheckTotalSendPackage()
{
CheckRedisValue(
redisKey: "UDPPackage_TotalSendPackage",
baselineCount: 8,
thresholdRatio: 0.6,
alertAction: ExecuteTotalSendPackageAlert,
logPrefix: "RCU主机的通讯数量");
}
///
/// 检查 RCU 总接收包数量。
/// 除了通用阈值逻辑外,还增加“最后 3 个值都低于固定阈值”的快速告警。
///
private void CheckTotalRecvPackage()
{
try
{
// 获取接收包时序数据
var valueString = CSRedisCacheHelper.redis1.Get("UDPPackage_TotalRecvPackage");
if (string.IsNullOrEmpty(valueString))
{
return;
}
var values = ParseCsvToIntList(valueString);
if (values.Count < 10)
{
// 数据点不足,无法按规则判定
return;
}
// 特殊规则:最后 3 个点都很低,立即告警
if (values.Count >= 3 && values[^3] < RecvPackageLowThreshold && values[^2] < RecvPackageLowThreshold && values[^1] < RecvPackageLowThreshold)
{
ExecuteTotalRecvPackageAlert([values[^3], values[^2], values[^1]]);
return;
}
// 回退到通用阈值规则
CheckRedisValue(
redisKey: "UDPPackage_TotalRecvPackage",
baselineCount: 8,
thresholdRatio: 0.75,
alertAction: ExecuteTotalRecvPackageAlert,
logPrefix: "RCU主机的通讯数量");
}
catch (Exception ex)
{
Logger.Error($"总接收包数量检查错误: {ex.Message}");
}
}
///
/// 通用 Redis 时序指标检查:
/// - 以前 baselineCount 个点的平均值作为基线
/// - 计算阈值(平均值 * thresholdRatio)
/// - 若后续两个点都低于阈值则触发告警
///
/// Redis 键
/// 基线样本数量
/// 阈值比例
/// 告警动作
/// 日志前缀
private void CheckRedisValue(string redisKey, int baselineCount, double thresholdRatio, Action> alertAction, string logPrefix)
{
try
{
// 从 Redis 读取 CSV 字符串
var valueString = CSRedisCacheHelper.redis1.Get(redisKey);
if (string.IsNullOrEmpty(valueString))
{
return;
}
var values = ParseCsvToIntList(valueString);
if (values.Count < 10)
{
return;
}
// 计算阈值
var average = values.Take(baselineCount).Average();
var threshold = average * thresholdRatio;
// 后续两个点均低于阈值才触发
if (values[baselineCount] < threshold && values[baselineCount + 1] < threshold)
{
alertAction(values);
}
}
catch (Exception ex)
{
Logger.Error($"{logPrefix}检查错误: {ex.Message}");
}
}
///
/// RCU 在线数量告警。
///
private void ExecuteRcuOnlineAlert(List values)
{
SendAlert(
smsContent: $"[BLV运维提示] RCU主机在线数量低于正常值,请立即检查。数据:{string.Join(",", values)}",
callContent: "BLV运维提示 RCU主机在线数量低于正常值 请立即检查",
alertType: "RCU-在线数量警报");
}
///
/// RCU 发送数量告警。
///
private void ExecuteTotalSendPackageAlert(List values)
{
SendAlert(
smsContent: $"[BLV运维提示] RCU发送数量低于预期值,请立即检查。数据:{string.Join(",", values)}",
callContent: "BLV运维提示 RCU发送数量低于预期值 请立即检查",
alertType: "RCU-通讯数量警报");
}
///
/// RCU 接收数量告警。
///
private void ExecuteTotalRecvPackageAlert(List values)
{
SendAlert(
smsContent: $"[BLV运维提示] RCU接收数量低于预期值,请立即检查。数据:{string.Join(",", values)}",
callContent: "BLV运维提示 RCU接收数量低于预期值 请立即检查",
alertType: "RCU-通讯数量警报");
}
///
/// CPU 告警。
///
private void ExecuteCpuAlert(List cpuMax, List cpuMin, List cpuAvg)
{
// 拼接 CPU 指标明细
var dataString = $"AVG:{string.Join(",", cpuAvg)},MAX:{string.Join(",", cpuMax)},MIN:{string.Join(",", cpuMin)}";
SendAlert(
smsContent: $"[BLV运维提示] RCU服务器的CPU使用率告警。{dataString}",
callContent: "BLV运维提示 RCU服务器的CPU使用率告警 请立即检查",
alertType: "RCU-CPU警报");
}
///
/// 监控程序失联告警。
///
/// 最后检测时间(当前版本仅用于语义传参)
private void ExecuteMonitorUnavailableAlert(DateTime detectTime)
{
SendAlert(
smsContent: "[BLV运维提示] RCU服务器的监控程序无法访问,请立即检查。",
callContent: "BLV运维提示 RCU服务器的监控程序无法访问 请立即检查",
alertType: "RCU-监控程序警报",
extendedDeadline: true);
}
///
/// 检查 PostgreSQL 心跳表,判断 Kafka 入库是否活跃。
///
private async Task CheckKafkaHeartbeatAsync()
{
try
{
// 从配置构建连接串
var connectionString = BuildPostgresConnectionString();
if (string.IsNullOrWhiteSpace(connectionString))
{
Logger.Error("Postgres配置缺失,无法检查Kafka入库心跳");
// 配置缺失等价于连接失败告警路径
ExecuteKafkaDbConnectionAlert();
return;
}
// 建立数据库连接
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync();
// 查询最近数据中的最新 write_ts_ms
const string sql = @"SELECT write_ts_ms
FROM (
SELECT write_ts_ms
FROM heartbeat.heartbeat_events
ORDER BY ts_ms DESC
LIMIT 3000
) AS recent_events
ORDER BY write_ts_ms DESC
LIMIT 1;";
await using var command = new NpgsqlCommand(sql, connection);
var result = await command.ExecuteScalarAsync();
// 空结果按数据库异常路径处理
if (result == null || result == DBNull.Value)
{
Logger.Error("Kafka入库心跳ts_ms查询结果为空");
ExecuteKafkaDbConnectionAlert();
return;
}
// 解析时间戳(毫秒)
if (!long.TryParse(result.ToString(), out var lastTsMs))
{
Logger.Error("Kafka入库心跳ts_ms解析失败");
ExecuteKafkaDbConnectionAlert();
return;
}
// 按“当前时间 - 最新入库时间”判断是否超时
var nowMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (nowMs - lastTsMs > TimeSpan.FromMinutes(KafkaStaleMinutes).TotalMilliseconds)
{
Logger.Error($"Kafka入库心跳超过{KafkaStaleMinutes}分钟未更新");
ExecuteKafkaInactiveAlert();
}
}
catch (Exception ex)
{
Logger.Error($"Kafka入库心跳检查错误: {ex.Message}");
ExecuteKafkaDbConnectionAlert();
}
}
///
/// 从配置读取 Postgres 参数并生成连接字符串。
///
/// 可用连接串;若关键配置缺失则返回 null
private string? BuildPostgresConnectionString()
{
// 约定配置节点:Postgres
var section = _configuration.GetSection("Postgres");
var host = section["Host"];
var portString = section["Port"];
var database = section["Database"];
var username = section["User"];
var password = section["Password"];
var maxConnectionsString = section["MaxConnections"];
var idleTimeoutMsString = section["IdleTimeoutMs"];
// 必填项校验
if (string.IsNullOrWhiteSpace(host) ||
string.IsNullOrWhiteSpace(portString) ||
string.IsNullOrWhiteSpace(database) ||
string.IsNullOrWhiteSpace(username) ||
string.IsNullOrWhiteSpace(password))
{
return null;
}
// 端口格式校验
if (!int.TryParse(portString, out var port))
{
return null;
}
// 构建基础连接串
var builder = new NpgsqlConnectionStringBuilder
{
Host = host,
Port = port,
Database = database,
Username = username,
Password = password
};
// 连接池最大连接数(可选)
if (int.TryParse(maxConnectionsString, out var maxConnections) && maxConnections > 0)
{
builder.MaxPoolSize = maxConnections;
}
// 空闲连接生命周期(ms -> s)
if (int.TryParse(idleTimeoutMsString, out var idleTimeoutMs) && idleTimeoutMs > 0)
{
builder.ConnectionIdleLifetime = Math.Max(1, idleTimeoutMs / 1000);
}
return builder.ConnectionString;
}
///
/// Kafka 入库停滞告警。
///
private void ExecuteKafkaInactiveAlert()
{
SendAlert(
smsContent: "[BLV运维提示] BLS数据库3分钟内入库数据为0。",
callContent: "BLV运维提示 BLS数据库3分钟内入库数据为0",
alertType: "BLS-数据库入库警报",
extendedDeadline: true);
}
///
/// Kafka 数据库连接异常告警(带计数节流)。
///
private void ExecuteKafkaDbConnectionAlert()
{
// 每次失败计数 +1,累计到阈值再告警
_kafkaDbConnectionAlertCount++;
if (_kafkaDbConnectionAlertCount < KafkaDbAlertTriggerCount)
{
return;
}
// 触发一次后清零重新计数
SendAlert(
smsContent: "[BLV运维提示] 数据库连接失败!",
callContent: "[BLV运维提示] 数据库连接失败",
alertType: "BLS-数据库连接警报",
extendedDeadline: true);
_kafkaDbConnectionAlertCount = 0;
}
///
/// 将逗号分隔字符串解析为整型列表;解析失败项按 0 处理。
///
/// CSV 字符串
/// 整型列表
private static List ParseCsvToIntList(string valueString)
{
if (string.IsNullOrEmpty(valueString))
{
return [];
}
return valueString
.Split(',')
.Select(item => int.TryParse(item, out var number) ? number : 0)
.ToList();
}
///
/// 判断列表中是否至少有 requiredCount 个值达到 threshold。
///
/// 待检查值集合
/// 阈值
/// 最少命中数量
/// 满足返回 true
private static bool CheckThreshold(List values, int threshold, int requiredCount)
{
return values.Count >= requiredCount && values.Count(v => v >= threshold) >= requiredCount;
}
///
/// 统一告警发送入口(两条短信 + 一通电话)。
///
/// 短信内容
/// 电话播报内容
/// 告警类型(用于 caller 与日志)
/// 是否使用扩展过期时间
private void SendAlert(string smsContent, string callContent, string alertType, bool extendedDeadline = false)
{
// 计算任务过期时间
var nowSeconds = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
var smsDeadline = nowSeconds + (extendedDeadline ? ExtendedSmsDeadlineSeconds : SmsDeadlineSeconds);
var callDeadline = nowSeconds + (extendedDeadline ? ExtendedCallDeadlineSeconds : CallDeadlineSeconds);
// 构建请求:两路短信(Mobile1/2)+ 一路电话(Mobile1)
var smsRequest1 = CreateSmsRequest("2", smsDeadline, Mobile1, alertType, smsContent);
var smsRequest2 = CreateSmsRequest("2", smsDeadline, Mobile2, alertType, smsContent);
var callRequest = CreateSmsRequest("1", callDeadline, Mobile1, alertType, callContent);
// 调用 API 投递任务
var smsResult1 = _callAndMsgController.SendToPhone(smsRequest1);
var smsResult2 = _callAndMsgController.SendToPhone(smsRequest2);
var callResult = _callAndMsgController.SendToPhone(callRequest);
// 任意一路失败都记录错误日志
if (!smsResult1.isok || !smsResult2.isok || !callResult.isok)
{
Logger.Error($"发送{alertType}通知失败: 短信1={smsResult1.message} 短信2={smsResult2.message} 电话={callResult.message}");
}
}
///
/// 创建短信/电话请求对象。
///
/// 1=电话,2=短信
/// 截止时间(Unix 秒)
/// 目标手机号
/// 任务标识/来电名称
/// 内容
/// SmsRequest 对象
private static SmsRequest CreateSmsRequest(string type, long deadline, string phone, string caller, string content)
{
return new SmsRequest
{
// 业务类型(1 电话 / 2 短信)
Type = type,
// 截止时间
DeadLine = deadline,
// 创建时间
StartingPoint = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
// 目标号码
PhoneNumber = phone,
// 任务标识
CallerName = caller,
// 消息内容
Content = content
};
}
}
}