增加日志
This commit is contained in:
@@ -32,21 +32,22 @@ namespace BLWData.Entity {
|
||||
"bHZlGAogASgFIoABChZTaW5nbGVQb3dlckNoYW5uZWxEYXRhEg8KB2FkZHJl",
|
||||
"c3MYASABKAkSDgoGZGlhbnlhGAIgASgBEg8KB2RpYW5saXUYAyABKAESDgoG",
|
||||
"Z29uZ2x2GAQgASgBEg8KB25lbmdoYW8YBSABKAESEwoLem9uZ25lbmdoYW8Y",
|
||||
"BiABKAEihAMKEUVuZXJneUNvbnN1bXB0aW9uEhEKCUhvdGVsQ29kZRgBIAEo",
|
||||
"BiABKAEilgMKEUVuZXJneUNvbnN1bXB0aW9uEhEKCUhvdGVsQ29kZRgBIAEo",
|
||||
"AxISCgpIb3N0TnVtYmVyGAIgASgJEgsKA01hYxgDIAEoCRIQCghFbmRQb2lu",
|
||||
"dBgEIAEoCRIPCgdWZXJzaW9uGAUgASgJEhIKCklzVGFrZUNhcmQYBiABKAgS",
|
||||
"dBgEIAEoCRIPCgdWZXJzaW9uGAUgASgJEhIKCklzVGFrZUNhcmQYBiABKAUS",
|
||||
"QAoQUG93ZXJDaGFubmVsTGlzdBgHIAMoCzImLkJMV0RhdGEuRW50aXR5LlNp",
|
||||
"bmdsZVBvd2VyQ2hhbm5lbERhdGESEgoKQ3JlYXRlVGltZRgIIAEoAxISCgpS",
|
||||
"b29tTnVtYmVyGAkgASgJEhEKCUNhcmJvblZJUBgKIAEoBRIUCgxJZGVudGl0",
|
||||
"eUluZm8YCyABKAUSNAoQRGV2aWNlU3RhdHVzTGlzdBgMIAMoCzIaLkJMV0Rh",
|
||||
"dGEuRW50aXR5LkRldmljZURhdGESEQoJQ2FyZEV2ZW50GA0gASgFEhQKDElz",
|
||||
"SW5zZXJ0Q2FyZBgOIAEoBRISCgpQTVNfU3RhdHVzGA8gASgJYgZwcm90bzM="));
|
||||
"SW5zZXJ0Q2FyZBgOIAEoBRISCgpQTVNfU3RhdHVzGA8gASgJEhAKCEJyaWdo",
|
||||
"dF9HGBAgASgFYgZwcm90bzM="));
|
||||
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
|
||||
new pbr::FileDescriptor[] { },
|
||||
new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] {
|
||||
new pbr::GeneratedClrTypeInfo(typeof(global::BLWData.Entity.DeviceData), global::BLWData.Entity.DeviceData.Parser, new[]{ "HostID", "DeviceType", "Address", "Status", "Brightness", "CurrentTemp", "SettingTemp", "FanSpeed", "Mode", "Valve" }, null, null, null, null),
|
||||
new pbr::GeneratedClrTypeInfo(typeof(global::BLWData.Entity.SinglePowerChannelData), global::BLWData.Entity.SinglePowerChannelData.Parser, new[]{ "Address", "Dianya", "Dianliu", "Gonglv", "Nenghao", "Zongnenghao" }, null, null, null, null),
|
||||
new pbr::GeneratedClrTypeInfo(typeof(global::BLWData.Entity.EnergyConsumption), global::BLWData.Entity.EnergyConsumption.Parser, new[]{ "HotelCode", "HostNumber", "Mac", "EndPoint", "Version", "IsTakeCard", "PowerChannelList", "CreateTime", "RoomNumber", "CarbonVIP", "IdentityInfo", "DeviceStatusList", "CardEvent", "IsInsertCard", "PMSStatus" }, null, null, null, null)
|
||||
new pbr::GeneratedClrTypeInfo(typeof(global::BLWData.Entity.EnergyConsumption), global::BLWData.Entity.EnergyConsumption.Parser, new[]{ "HotelCode", "HostNumber", "Mac", "EndPoint", "Version", "IsTakeCard", "PowerChannelList", "CreateTime", "RoomNumber", "CarbonVIP", "IdentityInfo", "DeviceStatusList", "CardEvent", "IsInsertCard", "PMSStatus", "BrightG" }, null, null, null, null)
|
||||
}));
|
||||
}
|
||||
#endregion
|
||||
@@ -1023,6 +1024,7 @@ namespace BLWData.Entity {
|
||||
cardEvent_ = other.cardEvent_;
|
||||
isInsertCard_ = other.isInsertCard_;
|
||||
pMSStatus_ = other.pMSStatus_;
|
||||
brightG_ = other.brightG_;
|
||||
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
|
||||
}
|
||||
|
||||
@@ -1094,10 +1096,10 @@ namespace BLWData.Entity {
|
||||
|
||||
/// <summary>Field number for the "IsTakeCard" field.</summary>
|
||||
public const int IsTakeCardFieldNumber = 6;
|
||||
private bool isTakeCard_;
|
||||
private int isTakeCard_;
|
||||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
|
||||
public bool IsTakeCard {
|
||||
public int IsTakeCard {
|
||||
get { return isTakeCard_; }
|
||||
set {
|
||||
isTakeCard_ = value;
|
||||
@@ -1210,6 +1212,18 @@ namespace BLWData.Entity {
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Field number for the "Bright_G" field.</summary>
|
||||
public const int BrightGFieldNumber = 16;
|
||||
private int brightG_;
|
||||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
|
||||
public int BrightG {
|
||||
get { return brightG_; }
|
||||
set {
|
||||
brightG_ = value;
|
||||
}
|
||||
}
|
||||
|
||||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
|
||||
public override bool Equals(object other) {
|
||||
@@ -1240,6 +1254,7 @@ namespace BLWData.Entity {
|
||||
if (CardEvent != other.CardEvent) return false;
|
||||
if (IsInsertCard != other.IsInsertCard) return false;
|
||||
if (PMSStatus != other.PMSStatus) return false;
|
||||
if (BrightG != other.BrightG) return false;
|
||||
return Equals(_unknownFields, other._unknownFields);
|
||||
}
|
||||
|
||||
@@ -1252,7 +1267,7 @@ namespace BLWData.Entity {
|
||||
if (Mac.Length != 0) hash ^= Mac.GetHashCode();
|
||||
if (EndPoint.Length != 0) hash ^= EndPoint.GetHashCode();
|
||||
if (Version.Length != 0) hash ^= Version.GetHashCode();
|
||||
if (IsTakeCard != false) hash ^= IsTakeCard.GetHashCode();
|
||||
if (IsTakeCard != 0) hash ^= IsTakeCard.GetHashCode();
|
||||
hash ^= powerChannelList_.GetHashCode();
|
||||
if (CreateTime != 0L) hash ^= CreateTime.GetHashCode();
|
||||
if (RoomNumber.Length != 0) hash ^= RoomNumber.GetHashCode();
|
||||
@@ -1262,6 +1277,7 @@ namespace BLWData.Entity {
|
||||
if (CardEvent != 0) hash ^= CardEvent.GetHashCode();
|
||||
if (IsInsertCard != 0) hash ^= IsInsertCard.GetHashCode();
|
||||
if (PMSStatus.Length != 0) hash ^= PMSStatus.GetHashCode();
|
||||
if (BrightG != 0) hash ^= BrightG.GetHashCode();
|
||||
if (_unknownFields != null) {
|
||||
hash ^= _unknownFields.GetHashCode();
|
||||
}
|
||||
@@ -1300,9 +1316,9 @@ namespace BLWData.Entity {
|
||||
output.WriteRawTag(42);
|
||||
output.WriteString(Version);
|
||||
}
|
||||
if (IsTakeCard != false) {
|
||||
if (IsTakeCard != 0) {
|
||||
output.WriteRawTag(48);
|
||||
output.WriteBool(IsTakeCard);
|
||||
output.WriteInt32(IsTakeCard);
|
||||
}
|
||||
powerChannelList_.WriteTo(output, _repeated_powerChannelList_codec);
|
||||
if (CreateTime != 0L) {
|
||||
@@ -1334,6 +1350,10 @@ namespace BLWData.Entity {
|
||||
output.WriteRawTag(122);
|
||||
output.WriteString(PMSStatus);
|
||||
}
|
||||
if (BrightG != 0) {
|
||||
output.WriteRawTag(128, 1);
|
||||
output.WriteInt32(BrightG);
|
||||
}
|
||||
if (_unknownFields != null) {
|
||||
_unknownFields.WriteTo(output);
|
||||
}
|
||||
@@ -1364,9 +1384,9 @@ namespace BLWData.Entity {
|
||||
output.WriteRawTag(42);
|
||||
output.WriteString(Version);
|
||||
}
|
||||
if (IsTakeCard != false) {
|
||||
if (IsTakeCard != 0) {
|
||||
output.WriteRawTag(48);
|
||||
output.WriteBool(IsTakeCard);
|
||||
output.WriteInt32(IsTakeCard);
|
||||
}
|
||||
powerChannelList_.WriteTo(ref output, _repeated_powerChannelList_codec);
|
||||
if (CreateTime != 0L) {
|
||||
@@ -1398,6 +1418,10 @@ namespace BLWData.Entity {
|
||||
output.WriteRawTag(122);
|
||||
output.WriteString(PMSStatus);
|
||||
}
|
||||
if (BrightG != 0) {
|
||||
output.WriteRawTag(128, 1);
|
||||
output.WriteInt32(BrightG);
|
||||
}
|
||||
if (_unknownFields != null) {
|
||||
_unknownFields.WriteTo(ref output);
|
||||
}
|
||||
@@ -1423,8 +1447,8 @@ namespace BLWData.Entity {
|
||||
if (Version.Length != 0) {
|
||||
size += 1 + pb::CodedOutputStream.ComputeStringSize(Version);
|
||||
}
|
||||
if (IsTakeCard != false) {
|
||||
size += 1 + 1;
|
||||
if (IsTakeCard != 0) {
|
||||
size += 1 + pb::CodedOutputStream.ComputeInt32Size(IsTakeCard);
|
||||
}
|
||||
size += powerChannelList_.CalculateSize(_repeated_powerChannelList_codec);
|
||||
if (CreateTime != 0L) {
|
||||
@@ -1449,6 +1473,9 @@ namespace BLWData.Entity {
|
||||
if (PMSStatus.Length != 0) {
|
||||
size += 1 + pb::CodedOutputStream.ComputeStringSize(PMSStatus);
|
||||
}
|
||||
if (BrightG != 0) {
|
||||
size += 2 + pb::CodedOutputStream.ComputeInt32Size(BrightG);
|
||||
}
|
||||
if (_unknownFields != null) {
|
||||
size += _unknownFields.CalculateSize();
|
||||
}
|
||||
@@ -1476,7 +1503,7 @@ namespace BLWData.Entity {
|
||||
if (other.Version.Length != 0) {
|
||||
Version = other.Version;
|
||||
}
|
||||
if (other.IsTakeCard != false) {
|
||||
if (other.IsTakeCard != 0) {
|
||||
IsTakeCard = other.IsTakeCard;
|
||||
}
|
||||
powerChannelList_.Add(other.powerChannelList_);
|
||||
@@ -1502,6 +1529,9 @@ namespace BLWData.Entity {
|
||||
if (other.PMSStatus.Length != 0) {
|
||||
PMSStatus = other.PMSStatus;
|
||||
}
|
||||
if (other.BrightG != 0) {
|
||||
BrightG = other.BrightG;
|
||||
}
|
||||
_unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
|
||||
}
|
||||
|
||||
@@ -1542,7 +1572,7 @@ namespace BLWData.Entity {
|
||||
break;
|
||||
}
|
||||
case 48: {
|
||||
IsTakeCard = input.ReadBool();
|
||||
IsTakeCard = input.ReadInt32();
|
||||
break;
|
||||
}
|
||||
case 58: {
|
||||
@@ -1581,6 +1611,10 @@ namespace BLWData.Entity {
|
||||
PMSStatus = input.ReadString();
|
||||
break;
|
||||
}
|
||||
case 128: {
|
||||
BrightG = input.ReadInt32();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
@@ -1621,7 +1655,7 @@ namespace BLWData.Entity {
|
||||
break;
|
||||
}
|
||||
case 48: {
|
||||
IsTakeCard = input.ReadBool();
|
||||
IsTakeCard = input.ReadInt32();
|
||||
break;
|
||||
}
|
||||
case 58: {
|
||||
@@ -1660,6 +1694,10 @@ namespace BLWData.Entity {
|
||||
PMSStatus = input.ReadString();
|
||||
break;
|
||||
}
|
||||
case 128: {
|
||||
BrightG = input.ReadInt32();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
307
BLWLogProduce/Redis_to_Kafka_System_Documentation.md
Normal file
307
BLWLogProduce/Redis_to_Kafka_System_Documentation.md
Normal file
@@ -0,0 +1,307 @@
|
||||
# Redis消息转发到Kafka系统设计与实现
|
||||
|
||||
## 1. 项目概述
|
||||
|
||||
本项目是一个消息转发服务,主要功能是将从Redis订阅的各种消息,经过序列化处理后发送到Kafka队列。为了节省网络流量,系统根据不同消息类型采用了两种序列化方式:
|
||||
- 大部分消息使用MessagePack序列化
|
||||
- 特定消息使用Google Protocol Buffer序列化
|
||||
|
||||
## 2. 系统架构
|
||||
|
||||
### 2.1 核心组件
|
||||
|
||||
| 组件 | 功能描述 | 实现文件 |
|
||||
|------|---------|---------|
|
||||
| KafkaProduce | 后台服务,负责Redis订阅和Kafka发送 | Services/KafkaProduce.cs |
|
||||
| MyPublishRedis | Redis发布订阅管理 | Models/MyPublishRedis.cs |
|
||||
| MessagePack序列化 | 轻量级序列化方案 | 通过MyMessagePacker实现 |
|
||||
| Protobuf序列化 | 高效二进制序列化方案 | EnergyConsumption.cs, NewActionChangeDataPush.cs |
|
||||
|
||||
### 2.2 数据流图
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
subgraph 数据来源
|
||||
A[Redis消息发布者]
|
||||
end
|
||||
|
||||
subgraph 消息处理层
|
||||
B[Redis订阅] --> C[消息类型判断]
|
||||
C --> D1[MessagePack序列化]
|
||||
C --> D2[Protobuf序列化]
|
||||
end
|
||||
|
||||
subgraph 消息发送层
|
||||
D1 --> E[Kafka生产者]
|
||||
D2 --> E
|
||||
end
|
||||
|
||||
subgraph 数据去向
|
||||
E --> F[Kafka队列]
|
||||
end
|
||||
```
|
||||
|
||||
## 3. 核心功能实现
|
||||
|
||||
### 3.1 Redis订阅管理
|
||||
|
||||
系统通过`CSRedisClient`订阅多个Redis通道,每个通道对应不同类型的消息:
|
||||
|
||||
```csharp
|
||||
// 订阅UDP包数据
|
||||
var DingYue = ("redis-udppackage", new Action<SubscribeMessageEventArgs>(async (args) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
string body = args.Body;
|
||||
UDPPackage? usa = System.Text.Json.JsonSerializer.Deserialize<UDPPackage>(body);
|
||||
// 处理数据...
|
||||
byte[] bytes = MyMessagePacker.FastSerialize(usa);
|
||||
// 发送到Kafka...
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine(ex.Message);
|
||||
Console.WriteLine(ex.StackTrace);
|
||||
}
|
||||
}));
|
||||
|
||||
// 订阅其他通道...
|
||||
|
||||
// 执行订阅
|
||||
CSRedisCacheHelper.redis3.Subscribe(DingYue);
|
||||
CSRedisCacheHelper.redis3.Subscribe(DingYue1);
|
||||
// 更多订阅...
|
||||
```
|
||||
<mcfile name="KafkaProduce.cs" path="Services/KafkaProduce.cs"></mcfile>
|
||||
|
||||
### 3.2 序列化策略
|
||||
|
||||
#### 3.2.1 MessagePack序列化
|
||||
|
||||
大部分消息类型使用MessagePack进行序列化,通过`MyMessagePacker.FastSerialize`方法实现:
|
||||
|
||||
```csharp
|
||||
// 序列化UDP包数据
|
||||
byte[] bytes = MyMessagePacker.FastSerialize(usa);
|
||||
var dr = await p.ProduceAsync(TopicKey, new Message<string, byte[]> { Key = DetailKey, Value = bytes });
|
||||
|
||||
// 序列化房间状态数据
|
||||
StepInfo? usa = System.Text.Json.JsonSerializer.Deserialize<StepInfo>(body);
|
||||
byte[] bytes = MyMessagePacker.FastSerialize(usa);
|
||||
await p.ProduceAsync(TopicKey, new Message<string, byte[]> { Key = DetailKey, Value = bytes });
|
||||
```
|
||||
<mcfile name="KafkaProduce.cs" path="Services/KafkaProduce.cs"></mcfile>
|
||||
|
||||
#### 3.2.2 Protobuf序列化
|
||||
|
||||
特定类型的消息(如能耗数据、设备动作数据)使用Protobuf进行序列化:
|
||||
|
||||
```csharp
|
||||
// 能耗数据使用Protobuf序列化
|
||||
EnergyConsumption ese = new EnergyConsumption();
|
||||
ese.HotelCode = poo.HotelCode;
|
||||
ese.HostNumber = poo.HostNumber;
|
||||
// 设置其他字段...
|
||||
byte[] data = ese.ToByteArray();
|
||||
await p.ProduceAsync(TopicKey1, new Message<string, byte[]> { Key = DetailKey1, Value = data });
|
||||
|
||||
// 设备动作数据使用Protobuf序列化
|
||||
NewActionChangeDataPush ese = new NewActionChangeDataPush();
|
||||
ese.Code = poo.code;
|
||||
ese.RoomNumber = poo.roomNumber;
|
||||
// 设置其他字段...
|
||||
byte[] data = ese.ToByteArray();
|
||||
await p.ProduceAsync(TopicKey1, new Message<string, byte[]> { Key = DetailKey1, Value = data });
|
||||
```
|
||||
<mcfile name="KafkaProduce.cs" path="Services/KafkaProduce.cs"></mcfile>
|
||||
|
||||
### 3.3 Kafka发送实现
|
||||
|
||||
系统使用Confluent.Kafka客户端库实现Kafka消息发送:
|
||||
|
||||
```csharp
|
||||
// 初始化Kafka生产者
|
||||
string? ipport = Configuration["Kafka:EndPoint"];
|
||||
string? user = Configuration["Kafka:UserName"];
|
||||
string? pwd = Configuration["Kafka:PassWord"];
|
||||
var config = new ProducerConfig
|
||||
{
|
||||
BootstrapServers = ipport,
|
||||
SecurityProtocol = SecurityProtocol.SaslPlaintext,
|
||||
SaslMechanism = SaslMechanism.Plain,
|
||||
SaslUsername = user,
|
||||
SaslPassword = pwd
|
||||
};
|
||||
var p = new ProducerBuilder<string, byte[]>(config).Build();
|
||||
|
||||
// 发送消息
|
||||
var dr = await p.ProduceAsync(TopicKey, new Message<string, byte[]> { Key = DetailKey, Value = bytes });
|
||||
```
|
||||
<mcfile name="KafkaProduce.cs" path="Services/KafkaProduce.cs"></mcfile>
|
||||
|
||||
## 4. 消息类型与处理
|
||||
|
||||
### 4.1 主要消息类型
|
||||
|
||||
| 消息类型 | Redis通道 | 序列化方式 | Kafka主题 | 用途 |
|
||||
|---------|----------|-----------|----------|------|
|
||||
| UDP包数据 | redis-udppackage | MessagePack | BLWLog_RCU_Topic | 设备通信数据 |
|
||||
| 房间状态监控 | redis-roomstatus-monitor | MessagePack | BLWLog_RCU_Topic | 房间状态变化 |
|
||||
| 旁路系统数据 | redis-forksystemdata | MessagePack | BLWLog_RCU_Topic | 旁路系统信息 |
|
||||
| 能耗数据 | redis-power | Protobuf | BLWLog4BaoJing_RCU_Topic | 能耗统计 |
|
||||
| 设备动作数据 | redis-action-data | Protobuf | BLWLog4BaoJing_RCU_Topic | 设备操作记录 |
|
||||
| 在线离线状态 | redis-on_off_line | Protobuf | BLWLog4BaoJing_RCU_Topic | 设备在线状态 |
|
||||
| 服务状态变化 | redis-serviceinfo_change | MessagePack | BLWLog_RCU_Topic | 服务状态监控 |
|
||||
| 第三方接口调用 | redis-invoke_httpinterface | MessagePack | BLWLog_RCU_Topic | 接口调用日志 |
|
||||
| 新版本协议数据 | redis-rcu-* | MessagePack | BLWLog_RCU_Topic | 新协议设备数据 |
|
||||
| 选住数据 | Redis-XuanZhuKafka | Protobuf | BLWLog4XuanZhu_RCU_Topic | 选住系统数据 |
|
||||
|
||||
### 4.2 消息处理流程
|
||||
|
||||
1. **消息接收**:通过Redis订阅接收消息
|
||||
2. **消息解析**:将JSON格式的消息解析为对应的数据模型
|
||||
3. **数据处理**:根据需要对数据进行转换和处理
|
||||
4. **序列化**:根据消息类型选择合适的序列化方式
|
||||
5. **发送到Kafka**:将序列化后的数据发送到对应的Kafka主题
|
||||
|
||||
## 5. 序列化方案对比
|
||||
|
||||
### 5.1 MessagePack vs Protocol Buffer
|
||||
|
||||
| 特性 | MessagePack | Protocol Buffer |
|
||||
|------|-------------|-----------------|
|
||||
| 序列化速度 | 非常快 | 快 |
|
||||
| 序列化大小 | 小 | 非常小 |
|
||||
| 开发难度 | 低 | 中等(需要定义.proto文件) |
|
||||
| 类型安全 | 中等 | 高 |
|
||||
| 适用场景 | 一般消息 | 结构化数据、性能敏感场景 |
|
||||
|
||||
### 5.2 选择策略
|
||||
|
||||
系统根据以下原则选择序列化方案:
|
||||
- **MessagePack**:适用于大部分通用消息,开发简单,性能足够
|
||||
- **Protocol Buffer**:适用于结构化数据(如能耗数据、设备动作数据),以及对性能和数据大小要求较高的场景
|
||||
|
||||
## 6. 代码优化建议
|
||||
|
||||
### 6.1 异常处理优化
|
||||
|
||||
当前代码中的异常处理较为简单,仅打印异常信息:
|
||||
|
||||
```csharp
|
||||
try
|
||||
{
|
||||
// 处理逻辑
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine(ex.Message);
|
||||
Console.WriteLine(ex.StackTrace);
|
||||
}
|
||||
```
|
||||
|
||||
**优化建议**:
|
||||
- 使用NLog等日志框架记录详细异常信息
|
||||
- 实现重试机制,提高系统可靠性
|
||||
- 对不同类型的异常进行分类处理
|
||||
|
||||
### 6.2 配置管理优化
|
||||
|
||||
当前Kafka配置直接从Configuration读取:
|
||||
|
||||
```csharp
|
||||
string? ipport = Configuration["Kafka:EndPoint"];
|
||||
string? user = Configuration["Kafka:UserName"];
|
||||
string? pwd = Configuration["Kafka:PassWord"];
|
||||
```
|
||||
|
||||
**优化建议**:
|
||||
- 使用强类型配置类,提高代码可维护性
|
||||
- 实现配置热更新机制
|
||||
- 添加配置验证逻辑
|
||||
|
||||
### 6.3 序列化工具封装
|
||||
|
||||
当前序列化调用较为分散:
|
||||
|
||||
```csharp
|
||||
// MessagePack序列化
|
||||
byte[] bytes = MyMessagePacker.FastSerialize(usa);
|
||||
|
||||
// Protobuf序列化
|
||||
byte[] data = ese.ToByteArray();
|
||||
```
|
||||
|
||||
**优化建议**:
|
||||
- 封装统一的序列化工具类,根据消息类型自动选择序列化方式
|
||||
- 提供序列化性能监控
|
||||
- 实现序列化缓存机制,提高重复消息的处理速度
|
||||
|
||||
## 7. 性能与扩展性
|
||||
|
||||
### 7.1 性能优化
|
||||
|
||||
- **批量处理**:对相似消息进行批量处理,减少Kafka网络请求
|
||||
- **连接池**:使用Redis和Kafka连接池,减少连接建立开销
|
||||
- **异步处理**:充分利用异步编程模型,提高系统吞吐量
|
||||
- **内存管理**:优化内存使用,减少GC压力
|
||||
|
||||
### 7.2 扩展性设计
|
||||
|
||||
- **模块化**:消息处理逻辑模块化,便于添加新的消息类型
|
||||
- **插件架构**:支持通过插件方式扩展新的序列化方案
|
||||
- **多租户**:支持多租户配置,适应不同场景需求
|
||||
- **监控指标**:提供详细的系统运行指标,便于监控和调优
|
||||
|
||||
## 8. 技术栈
|
||||
|
||||
| 技术 | 版本/库 | 用途 |
|
||||
|------|---------|------|
|
||||
| .NET Core | 6.0+ | 运行环境 |
|
||||
| Redis | CSRedisClient | 消息订阅 |
|
||||
| Kafka | Confluent.Kafka | 消息发送 |
|
||||
| MessagePack | MyMessagePacker | 序列化 |
|
||||
| Protocol Buffer | Google.Protobuf | 序列化 |
|
||||
| NLog | NLog | 日志记录 |
|
||||
|
||||
## 9. 部署与运维
|
||||
|
||||
### 9.1 部署方式
|
||||
|
||||
- **容器化**:使用Docker容器部署,便于横向扩展
|
||||
- **服务编排**:使用Kubernetes管理容器集群
|
||||
- **配置管理**:使用配置中心统一管理配置
|
||||
|
||||
### 9.2 监控与告警
|
||||
|
||||
- **健康检查**:实现服务健康检查接口
|
||||
- **指标监控**:收集Redis订阅、Kafka发送等关键指标
|
||||
- **告警机制**:对异常情况及时告警
|
||||
|
||||
### 9.3 日志管理
|
||||
|
||||
- **结构化日志**:使用结构化日志格式,便于分析
|
||||
- **日志聚合**:使用ELK等日志聚合系统
|
||||
- **日志轮转**:实现日志轮转,避免磁盘空间耗尽
|
||||
|
||||
## 10. 总结
|
||||
|
||||
本项目实现了一个高效的Redis消息转发到Kafka的系统,通过合理选择序列化方案,在保证性能的同时节省了网络流量。系统架构清晰,模块划分合理,具有良好的扩展性和可维护性。
|
||||
|
||||
### 10.1 核心优势
|
||||
|
||||
1. **序列化策略**:根据消息类型选择合适的序列化方案,平衡了性能和开发效率
|
||||
2. **异步处理**:充分利用异步编程模型,提高系统吞吐量
|
||||
3. **模块化设计**:消息处理逻辑模块化,便于扩展和维护
|
||||
4. **可靠性**:通过异常处理和日志记录,提高系统可靠性
|
||||
|
||||
### 10.2 应用场景
|
||||
|
||||
本系统适用于以下场景:
|
||||
- **物联网设备数据**:收集和转发物联网设备产生的大量数据
|
||||
- **监控系统**:实时监控和转发各种监控指标
|
||||
- **日志聚合**:集中收集和处理分布式系统的日志
|
||||
- **数据同步**:在不同系统间实现数据同步
|
||||
|
||||
通过本系统的实现,不仅解决了Redis到Kafka的消息转发问题,也为类似场景的消息处理提供了参考方案。
|
||||
@@ -9,6 +9,7 @@ using Google.Protobuf;
|
||||
using MessagePack;
|
||||
using Microsoft.CodeAnalysis.Host.Mef;
|
||||
using Microsoft.Extensions.Caching.Memory;
|
||||
using Microsoft.Extensions.ObjectPool;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using NLog;
|
||||
@@ -374,14 +375,14 @@ namespace BLWLogProduce.Services
|
||||
{
|
||||
return;
|
||||
}
|
||||
logger.Error("能耗:" + body);
|
||||
//logger.Error("能耗:" + body);
|
||||
//string str= Newtonsoft.Json.JsonConvert.SerializeObject(poo);
|
||||
//Console.WriteLine("收到了"+str);
|
||||
byte[] qf = MyMessagePacker.FastSerialize(poo);
|
||||
//byte[] qf = MyMessagePacker.FastSerialize(poo);
|
||||
|
||||
string TopicKey = KafkaKey.BLWLog_RCU_Topic;
|
||||
string DetailKey = KafkaKey.UDPPackagePowerMonitor;
|
||||
await p.ProduceAsync(TopicKey, new Message<string, byte[]> { Key = DetailKey, Value = qf });
|
||||
//string TopicKey = KafkaKey.BLWLog_RCU_Topic;
|
||||
//string DetailKey = KafkaKey.UDPPackagePowerMonitor;
|
||||
//await p.ProduceAsync(TopicKey, new Message<string, byte[]> { Key = DetailKey, Value = qf });
|
||||
|
||||
|
||||
#region 宝镜系统使用的能耗数据结构
|
||||
@@ -440,6 +441,7 @@ namespace BLWLogProduce.Services
|
||||
ese.IdentityInfo = poo.IdentityInfo;
|
||||
ese.CardEvent = poo.CardEvent;
|
||||
ese.PMSStatus = poo.PMS_Status;
|
||||
ese.BrightG = poo.Bright_G;
|
||||
|
||||
byte[] data = ese.ToByteArray();
|
||||
string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic;
|
||||
@@ -514,6 +516,10 @@ namespace BLWLogProduce.Services
|
||||
NewVersionLog? poo = System.Text.Json.JsonSerializer.Deserialize<NewVersionLog>(body);
|
||||
poo.ts_ms = Tools.GetUnixTime_MS();
|
||||
|
||||
if (string.IsNullOrEmpty(poo.hotel_id) || string.IsNullOrEmpty(poo.device_id))
|
||||
{
|
||||
return;
|
||||
}
|
||||
string TopicKey1 = KafkaKey.BLWLog4NodeJs_RCU_Topic;
|
||||
string DetailKey1 = poo.comm_seq.ToString();
|
||||
|
||||
@@ -529,6 +535,35 @@ namespace BLWLogProduce.Services
|
||||
}));
|
||||
|
||||
|
||||
var TSLog_DingYue_0X36 = ("redis-0X36-0X0F", new Action<SubscribeMessageEventArgs>(async (args) =>
|
||||
{
|
||||
string body = args.Body;
|
||||
|
||||
try
|
||||
{
|
||||
DeviceActionData? poo = System.Text.Json.JsonSerializer.Deserialize<DeviceActionData>(body);
|
||||
poo.ts_ms = Tools.GetUnixTime_MS();
|
||||
|
||||
if (string.IsNullOrEmpty(poo.hotel_id) || string.IsNullOrEmpty(poo.device_id))
|
||||
{
|
||||
return;
|
||||
}
|
||||
string TopicKey1 = KafkaKey.BLWLog4NodeJs_RCU_Action_Topic;
|
||||
string DetailKey1 = poo.frame_id.ToString();
|
||||
|
||||
var jsonstr = JsonConvert.SerializeObject(poo);
|
||||
await p.ProduceAsync(TopicKey1, new Message<string, byte[]> { Key = DetailKey1, Value = Encoding.UTF8.GetBytes(jsonstr) });
|
||||
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine(ex.Message);
|
||||
Console.WriteLine(ex.StackTrace);
|
||||
}
|
||||
}));
|
||||
|
||||
|
||||
|
||||
#region 碳达人
|
||||
//var DingYue3_1 = ("redis-carbon_trigger", new Action<SubscribeMessageEventArgs>(async (args) =>
|
||||
//{
|
||||
@@ -630,6 +665,8 @@ namespace BLWLogProduce.Services
|
||||
string body = args.Body;
|
||||
CommonEntity.OnOffLineData? usa = System.Text.Json.JsonSerializer.Deserialize<CommonEntity.OnOffLineData>(body);
|
||||
|
||||
//poo.ts_ms = Tools.GetUnixTime_MS();
|
||||
usa.UnixTime = Tools.GetUnixTime_MS();
|
||||
if (string.IsNullOrEmpty(usa.EndPoint))
|
||||
{
|
||||
logger.Error("RCUOnOffLine:" + body);
|
||||
@@ -640,6 +677,12 @@ namespace BLWLogProduce.Services
|
||||
logger.Error("RCUOnOffLine:" + body);
|
||||
return;
|
||||
}
|
||||
|
||||
string Key1 = KafkaKey.BLWLog4NodeJs_RCU_OnOffLine_Topic;
|
||||
string Key2 = usa.HostNumber;
|
||||
byte[] nnn = Encoding.UTF8.GetBytes(body);
|
||||
var dr = await p.ProduceAsync(Key1, new Message<string, byte[]> { Key = Key2, Value = nnn });
|
||||
|
||||
BLWData.Entity.OnOffLineData ese = new BLWData.Entity.OnOffLineData();
|
||||
ese.HotelCode = usa.HotelCode;
|
||||
ese.HostNumber = usa.HostNumber;
|
||||
@@ -649,7 +692,6 @@ namespace BLWLogProduce.Services
|
||||
ese.Mac = usa.MAC ?? "";
|
||||
ese.CurrentTime = usa.CurrentTime.ToString("yyyy-MM-dd HH:mm:ss.fff");
|
||||
|
||||
|
||||
byte[] data = ese.ToByteArray();
|
||||
string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic;
|
||||
string DetailKey1 = KafkaKey.RCUOnLineStatus;
|
||||
@@ -913,6 +955,7 @@ namespace BLWLogProduce.Services
|
||||
|
||||
//新版本日志
|
||||
CSRedisCacheHelper.redis3.Subscribe(TSLog_DingYue);
|
||||
CSRedisCacheHelper.redis3.Subscribe(TSLog_DingYue_0X36);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user