166 lines
6.0 KiB
VB.net
166 lines
6.0 KiB
VB.net
|
|
Imports StackExchange.Redis
|
|||
|
|
|
|||
|
|
|
|||
|
|
''这段代码提供了一个完整的 Redis 订阅实现,包含以下功能:
|
|||
|
|
|
|||
|
|
''通过构造函数初始化 Redis 连接
|
|||
|
|
''订阅指定频道并处理接收到的消息
|
|||
|
|
''取消订阅功能
|
|||
|
|
''关闭连接功能
|
|||
|
|
''使用示例
|
|||
|
|
''你可以根据需要修改 messageHandler 来自定义消息处理逻辑。这个实现使用了异步回调来处理接收到的消息,确保不会阻塞主线程。
|
|||
|
|
|
|||
|
|
''记得在使用前替换 connectionString 为你的 Redis 服务器地址。如果你的 Redis 服务器设置了密码,连接字符串应该类似这样:localhost : 6379,password=yourpassword。
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
'Public Class RedisSubscriber
|
|||
|
|
' Private ReadOnly connection As ConnectionMultiplexer
|
|||
|
|
' Private ReadOnly db As IDatabase
|
|||
|
|
|
|||
|
|
' ' 构造函数,初始化 Redis 连接
|
|||
|
|
' Public Sub New(ByVal connectionString As String)
|
|||
|
|
' connection = ConnectionMultiplexer.Connect(connectionString)
|
|||
|
|
' db = connection.GetDatabase()
|
|||
|
|
' End Sub
|
|||
|
|
|
|||
|
|
' ' 订阅频道并处理消息
|
|||
|
|
' Public Sub SubscribeToChannel(ByVal channelName As String, ByVal messageHandler As Action(Of RedisChannel, RedisValue))
|
|||
|
|
' ' 创建订阅者
|
|||
|
|
' Dim subscriber = connection.GetSubscriber()
|
|||
|
|
|
|||
|
|
' ' 订阅频道并指定消息处理程序
|
|||
|
|
' subscriber.Subscribe(channelName, Function(channel, message) As Task
|
|||
|
|
' messageHandler(channel, message)
|
|||
|
|
' Return Task.CompletedTask
|
|||
|
|
' End Function)
|
|||
|
|
|
|||
|
|
' 'Console.WriteLine($"已订阅频道: {channelName}")
|
|||
|
|
' End Sub
|
|||
|
|
|
|||
|
|
' ' 取消订阅
|
|||
|
|
' Public Sub UnsubscribeFromChannel(ByVal channelName As String)
|
|||
|
|
' Dim subscriber = connection.GetSubscriber()
|
|||
|
|
' subscriber.Unsubscribe(channelName)
|
|||
|
|
' 'Console.WriteLine($"已取消订阅频道: {channelName}")
|
|||
|
|
' End Sub
|
|||
|
|
|
|||
|
|
' ' 关闭连接
|
|||
|
|
' Public Sub CloseConnection()
|
|||
|
|
' connection.Close()
|
|||
|
|
' End Sub
|
|||
|
|
'End Class
|
|||
|
|
|
|||
|
|
'' 使用示例
|
|||
|
|
'Public Class Program
|
|||
|
|
' Public Shared Sub Main()
|
|||
|
|
' ' 替换为你的 Redis 连接字符串
|
|||
|
|
' Dim connectionString = "localhost:6379"
|
|||
|
|
' Dim subscriber = New RedisSubscriber(connectionString)
|
|||
|
|
|
|||
|
|
' ' 定义消息处理程序
|
|||
|
|
' Dim messageHandler As Action(Of RedisChannel, RedisValue) = Sub(channel, message)
|
|||
|
|
' 'Console.WriteLine($"接收到来自 {channel} 的消息: {message}")
|
|||
|
|
' End Sub
|
|||
|
|
|
|||
|
|
' ' 订阅频道
|
|||
|
|
' subscriber.SubscribeToChannel("testChannel", messageHandler)
|
|||
|
|
|
|||
|
|
' ' 保持程序运行以接收消息
|
|||
|
|
' 'Console.WriteLine("按任意键退出...")
|
|||
|
|
' Console.ReadKey()
|
|||
|
|
|
|||
|
|
' ' 取消订阅并关闭连接
|
|||
|
|
' subscriber.UnsubscribeFromChannel("testChannel")
|
|||
|
|
' subscriber.CloseConnection()
|
|||
|
|
' End Sub
|
|||
|
|
'End Class
|
|||
|
|
|
|||
|
|
|
|||
|
|
'这个实现添加了以下功能:
|
|||
|
|
|
|||
|
|
'发布消息功能:
|
|||
|
|
|
|||
|
|
'PublishMessage 方法允许你向指定频道发布消息
|
|||
|
|
'返回值表示有多少个订阅者接收到了这条消息
|
|||
|
|
'完整的发布/ 订阅功能
|
|||
|
|
|
|||
|
|
'保留了之前的订阅功能
|
|||
|
|
'将发布和订阅功能整合到一个类中
|
|||
|
|
'使用示例:
|
|||
|
|
|
|||
|
|
'展示了如何发布消息
|
|||
|
|
'展示了如何订阅频道并接收消息
|
|||
|
|
'展示了完整的初始化和清理流程
|
|||
|
|
'你可以通过以下方式使用这个类:
|
|||
|
|
|
|||
|
|
'创建 RedisPubSub 实例,传入 Redis 连接字符串
|
|||
|
|
'使用 PublishMessage 发布消息到频道
|
|||
|
|
'使用 SubscribeToChannel 订阅频道并处理消息
|
|||
|
|
'使用 UnsubscribeFromChannel 取消订阅
|
|||
|
|
'使用 CloseConnection 关闭连接
|
|||
|
|
'这个实现是线程安全的, 适合在多线程环境中使用。如果你的 Redis 服务器需要密码, 确保在连接字符串中包含密码信息, 例如: localhost : 6379,password=yourpassword。
|
|||
|
|
|
|||
|
|
|
|||
|
|
Public Class RedisSubscriber
|
|||
|
|
Private ReadOnly connection As ConnectionMultiplexer
|
|||
|
|
Private ReadOnly db As IDatabase
|
|||
|
|
Private ReadOnly subscriber As ISubscriber
|
|||
|
|
|
|||
|
|
' 构造函数,初始化 Redis 连接
|
|||
|
|
Public Sub New(ip As String, port As Integer, password As String)
|
|||
|
|
Dim config As ConfigurationOptions = New ConfigurationOptions()
|
|||
|
|
config.EndPoints.Add(ip, port) ' 设置Redis服务器地址和端口
|
|||
|
|
config.Password = password ' 设置Redis密码
|
|||
|
|
config.AbortOnConnectFail = False ' 连接失败时不中止
|
|||
|
|
|
|||
|
|
connection = ConnectionMultiplexer.Connect(config)
|
|||
|
|
|
|||
|
|
db = connection.GetDatabase()
|
|||
|
|
subscriber = connection.GetSubscriber()
|
|||
|
|
End Sub
|
|||
|
|
' Public Sub New(ByVal connectionString As String)
|
|||
|
|
' connection = ConnectionMultiplexer.Connect(connectionString)
|
|||
|
|
' db = connection.GetDatabase()
|
|||
|
|
' End Sub
|
|||
|
|
|
|||
|
|
' 发布消息到指定频道
|
|||
|
|
Public Function PublishMessage(ByVal channelName As String, ByVal message As String) As Long
|
|||
|
|
Dim result = subscriber.Publish(channelName, message)
|
|||
|
|
|
|||
|
|
'Console.WriteLine($"已发布消息到 {channelName}: {message}, 被订阅者接收次数: {result}")
|
|||
|
|
Return result
|
|||
|
|
End Function
|
|||
|
|
|
|||
|
|
' 订阅频道并处理消息
|
|||
|
|
Public Sub SubscribeToChannel(ByVal channelName As String, ByVal messageHandler As Action(Of RedisChannel, RedisValue))
|
|||
|
|
subscriber.Subscribe(channelName, Function(channel, message) As Task
|
|||
|
|
messageHandler(channel, message)
|
|||
|
|
Return Task.CompletedTask
|
|||
|
|
End Function)
|
|||
|
|
'Console.WriteLine($"已订阅频道: {channelName}")
|
|||
|
|
End Sub
|
|||
|
|
|
|||
|
|
' 取消订阅
|
|||
|
|
Public Sub UnsubscribeFromChannel(ByVal channelName As String)
|
|||
|
|
subscriber.Unsubscribe(channelName)
|
|||
|
|
'Console.WriteLine($"已取消订阅频道: {channelName}")
|
|||
|
|
End Sub
|
|||
|
|
|
|||
|
|
' 关闭连接
|
|||
|
|
Public Sub CloseConnection()
|
|||
|
|
connection.Close()
|
|||
|
|
End Sub
|
|||
|
|
|
|||
|
|
'判断是否连接
|
|||
|
|
Public Function IsConnected() As Boolean
|
|||
|
|
Return connection.IsConnected
|
|||
|
|
End Function
|
|||
|
|
|
|||
|
|
End Class
|
|||
|
|
|
|||
|
|
|
|||
|
|
|