feat: 添加Npgsql包支持,更新Redis连接配置,增加Kafka心跳检查功能,完善Postgres配置,新增发布配置文件

This commit is contained in:
2026-02-09 19:31:55 +08:00
parent 4d37bf5a09
commit 7a60c62166
6 changed files with 160 additions and 5 deletions

View File

@@ -3,6 +3,8 @@ using Common;
using System.Threading;
using NLog;
using System.Diagnostics;
using Npgsql;
using Microsoft.Extensions.Configuration;
namespace AutoNotificatPhone.Models
{
@@ -17,6 +19,12 @@ namespace AutoNotificatPhone.Models
private Dictionary<DateTime, bool> _executedTasks = [];
// 消息控制器实例
private readonly CallAndMsgController _callAndMsgController = new();
private readonly IConfiguration _configuration;
public TimerClass(IConfiguration configuration)
{
_configuration = configuration;
}
/// <summary>
/// 后台服务主执行方法
@@ -48,6 +56,7 @@ namespace AutoNotificatPhone.Models
await CheckRcuOnlineAsync();
await CheckTotalSendPackageAsync();
await CheckTotalRecvPackageAsync();
await CheckKafkaHeartbeatAsync();
}
catch (TaskCanceledException)
{
@@ -262,7 +271,7 @@ namespace AutoNotificatPhone.Models
//logger.Error($"RCU服务器的CPU使用率-AVG:{cpuAvg},MAX:{cpuMax}");
// 检查是否超过阈值
if (CheckThreshold(cpuAvgValues, 70, 6))//(CheckThreshold(cpuMaxValues, 90, 6) || CheckThreshold(cpuAvgValues, 85, 6))
if (CheckThreshold(cpuAvgValues, 80, 6))//(CheckThreshold(cpuMaxValues, 90, 6) || CheckThreshold(cpuAvgValues, 85, 6))
{
// 触发CPU警报
ExecuteCpuAlert(cpuMaxValues, ParseCpuValues(CSRedisCacheHelper.redis1.Get<string>("UDPPackage_CPUMin")), cpuAvgValues);
@@ -419,6 +428,115 @@ namespace AutoNotificatPhone.Models
);
}
/// <summary>
/// 检查Kafka入库心跳PostgreSQL
/// </summary>
private async Task CheckKafkaHeartbeatAsync()
{
try
{
string? connectionString = BuildPostgresConnectionString();
if (string.IsNullOrWhiteSpace(connectionString))
{
logger.Error("Postgres配置缺失无法检查Kafka入库心跳");
return;
}
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync();
const string sql = "select ts_ms from heartbeat.heartbeat_events order by ts_ms desc limit 1";
await using var command = new NpgsqlCommand(sql, connection);
object? result = await command.ExecuteScalarAsync();
if (result == null || result == DBNull.Value)
{
ExecuteKafkaInactiveAlert();
return;
}
if (!long.TryParse(result.ToString(), out long lastTsMs))
{
logger.Error("Kafka入库心跳ts_ms解析失败");
return;
}
long nowMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (nowMs - lastTsMs > TimeSpan.FromMinutes(5).TotalMilliseconds)
{
ExecuteKafkaInactiveAlert();
}
}
catch (Exception ex)
{
logger.Error($"Kafka入库心跳检查错误: {ex.Message}");
}
await Task.CompletedTask;
}
/// <summary>
/// 生成PostgreSQL连接字符串
/// </summary>
private string? BuildPostgresConnectionString()
{
IConfigurationSection section = _configuration.GetSection("Postgres");
string? host = section["Host"];
string? portString = section["Port"];
string? database = section["Database"];
string? username = section["User"];
string? password = section["Password"];
string? maxConnectionsString = section["MaxConnections"];
string? idleTimeoutMsString = section["IdleTimeoutMs"];
if (string.IsNullOrWhiteSpace(host) ||
string.IsNullOrWhiteSpace(portString) ||
string.IsNullOrWhiteSpace(database) ||
string.IsNullOrWhiteSpace(username) ||
string.IsNullOrWhiteSpace(password))
{
return null;
}
if (!int.TryParse(portString, out int port))
{
return null;
}
var builder = new NpgsqlConnectionStringBuilder
{
Host = host,
Port = port,
Database = database,
Username = username,
Password = password
};
if (int.TryParse(maxConnectionsString, out int maxConnections) && maxConnections > 0)
{
builder.MaxPoolSize = maxConnections;
}
if (int.TryParse(idleTimeoutMsString, out int idleTimeoutMs) && idleTimeoutMs > 0)
{
builder.ConnectionIdleLifetime = Math.Max(1, idleTimeoutMs / 1000);
}
return builder.ConnectionString;
}
/// <summary>
/// 执行Kafka入库失活警报
/// </summary>
private void ExecuteKafkaInactiveAlert()
{
SendAlert(
smsContent: "[BLV运维提示] BLS数据库5分钟内入库数据为0。",
callContent: "BLV运维提示 BLS数据库5分钟内入库数据为0",
alertType: "BLS-数据库入库警报",
extendedDeadline: true
);
}
/// <summary>
/// 解析CPU值字符串为整数列表
/// </summary>