200 lines
6.7 KiB
C#
200 lines
6.7 KiB
C#
|
|
using CSRedis;
|
||
|
|
using System;
|
||
|
|
using System.Collections.Generic;
|
||
|
|
using System.Configuration;
|
||
|
|
using System.Linq;
|
||
|
|
using System.Text;
|
||
|
|
using static CSRedis.CSRedisClient;
|
||
|
|
|
||
|
|
namespace Common
|
||
|
|
{
|
||
|
|
/// <summary>
|
||
|
|
/// Redis缓存辅助类
|
||
|
|
/// </summary>
|
||
|
|
public class CSRedisCacheHelper
|
||
|
|
{
|
||
|
|
public static CSRedisClient? redis;
|
||
|
|
public static CSRedisClient? redis14;
|
||
|
|
public static CSRedisClient? redis15;
|
||
|
|
|
||
|
|
private const string ip = "127.0.0.1";
|
||
|
|
//private const string port = "6379";
|
||
|
|
private const string port = "6800";
|
||
|
|
static CSRedisCacheHelper()
|
||
|
|
{
|
||
|
|
var redisHostStr = string.Format("{0}:{1}", ip, port);
|
||
|
|
if (!string.IsNullOrEmpty(redisHostStr))
|
||
|
|
{
|
||
|
|
redis = new CSRedisClient(redisHostStr + ",password=,defaultDatabase=0");
|
||
|
|
redis15 = new CSRedisClient(redisHostStr + ",password=,defaultDatabase=15");
|
||
|
|
var DingYueMsg = ("CellCorelDRAWUser", new Action<SubscribeMessageEventArgs>(async (args) =>
|
||
|
|
{
|
||
|
|
string body = args.Body;
|
||
|
|
}));
|
||
|
|
|
||
|
|
CSRedisCacheHelper.redis.Subscribe(DingYueMsg);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
/// <summary>
|
||
|
|
/// 添加缓存
|
||
|
|
/// </summary>
|
||
|
|
/// <typeparam name="T"></typeparam>
|
||
|
|
/// <param name="key"></param>
|
||
|
|
/// <param name="value"></param>
|
||
|
|
public static void Set<T>(string key, T value, int ExpireTime)
|
||
|
|
{
|
||
|
|
redis?.Set(key, value, ExpireTime * 60);
|
||
|
|
}
|
||
|
|
|
||
|
|
public static T Get<T>(string key)
|
||
|
|
{
|
||
|
|
return redis.Get<T>(key);
|
||
|
|
}
|
||
|
|
|
||
|
|
public static void Forever<T>(string key, T value)
|
||
|
|
{
|
||
|
|
redis.Set(key, value, -1);
|
||
|
|
}
|
||
|
|
public static void Del(string key)
|
||
|
|
{
|
||
|
|
redis.Del(key);
|
||
|
|
}
|
||
|
|
public static void ListPush<T>(string key, T value)
|
||
|
|
{
|
||
|
|
redis.LPush(key, value);
|
||
|
|
}
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
/// 判断是否存在
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="key"></param>
|
||
|
|
/// <returns></returns>
|
||
|
|
|
||
|
|
public static bool Contains(string key)
|
||
|
|
{
|
||
|
|
bool result = redis.Exists(key);
|
||
|
|
return result;
|
||
|
|
}
|
||
|
|
|
||
|
|
public static string? XAdd(string key, params (string, string)[] fieldValues)
|
||
|
|
{
|
||
|
|
try
|
||
|
|
{
|
||
|
|
var result = redis.XAdd(key, fieldValues);
|
||
|
|
return result;
|
||
|
|
}
|
||
|
|
catch (Exception ex)
|
||
|
|
{
|
||
|
|
System.Diagnostics.Debug.WriteLine($"XAdd error: {ex.Message}");
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public static string? XReadGroup(string key, string group, string consumer, int count = 1, string id = null)
|
||
|
|
{
|
||
|
|
try
|
||
|
|
{
|
||
|
|
id = id ?? ">";
|
||
|
|
var result = redis.XReadGroup(group, consumer, count, 0, (key, id));
|
||
|
|
|
||
|
|
if (result != null && result.Length > 0)
|
||
|
|
{
|
||
|
|
// 处理消息
|
||
|
|
var messages = new List<Dictionary<string, object>>();
|
||
|
|
|
||
|
|
foreach (var streamResult in result)
|
||
|
|
{
|
||
|
|
foreach (var entry in streamResult.data)
|
||
|
|
{
|
||
|
|
var message = new Dictionary<string, object>
|
||
|
|
{
|
||
|
|
["Id"] = entry.id,
|
||
|
|
["Values"] = entry.items
|
||
|
|
};
|
||
|
|
|
||
|
|
messages.Add(message);
|
||
|
|
|
||
|
|
// 确认消息已处理
|
||
|
|
redis.XAck(key, group, entry.id);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return System.Text.Json.JsonSerializer.Serialize(messages);
|
||
|
|
}
|
||
|
|
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
catch (Exception ex)
|
||
|
|
{
|
||
|
|
System.Diagnostics.Debug.WriteLine($"XReadGroup error: {ex.Message}");
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public static (string key, (string id, string items)[] data)[] XReadGroup(string group, string consumer, long count, long block, params (string key, string id)[] streams)
|
||
|
|
{
|
||
|
|
try
|
||
|
|
{
|
||
|
|
var result = redis.XReadGroup(group, consumer, count, block, streams);
|
||
|
|
|
||
|
|
if (result != null && result.Length > 0)
|
||
|
|
{
|
||
|
|
// 处理消息并确认已处理
|
||
|
|
var processedResults = new List<(string key, (string id, string items)[] data)>();
|
||
|
|
|
||
|
|
foreach (var streamResult in result)
|
||
|
|
{
|
||
|
|
var messages = new List<(string id, string items)>();
|
||
|
|
|
||
|
|
foreach (var entry in streamResult.data)
|
||
|
|
{
|
||
|
|
// 确认消息已处理
|
||
|
|
redis.XAck(streamResult.key, group, entry.id);
|
||
|
|
|
||
|
|
// entry是一个元组 (string id, string[] items)
|
||
|
|
// 我们需要将string[] items转换为string items
|
||
|
|
var itemsArray = entry.items;
|
||
|
|
var itemsString = string.Join(",", itemsArray);
|
||
|
|
messages.Add((entry.id, itemsString));
|
||
|
|
}
|
||
|
|
|
||
|
|
processedResults.Add((streamResult.key, messages.ToArray()));
|
||
|
|
}
|
||
|
|
|
||
|
|
return processedResults.ToArray();
|
||
|
|
}
|
||
|
|
|
||
|
|
return Array.Empty<(string key, (string id, string items)[] data)>();
|
||
|
|
}
|
||
|
|
catch (Exception ex)
|
||
|
|
{
|
||
|
|
System.Diagnostics.Debug.WriteLine($"XReadGroup error: {ex.Message}");
|
||
|
|
return Array.Empty<(string key, (string id, string items)[] data)>();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public static bool XGroupCreate(string key, string group, string id = "0")
|
||
|
|
{
|
||
|
|
try
|
||
|
|
{
|
||
|
|
redis.XGroupCreate(key, group, id, true);
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
catch
|
||
|
|
{
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
/// 发布消息
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="Topic"></param>
|
||
|
|
/// <param name="Payload"></param>
|
||
|
|
public static void Publish(string Topic, string Payload)
|
||
|
|
{
|
||
|
|
CSRedisCacheHelper.redis.PublishNoneMessageId(Topic, Payload);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|