第一次提交至Git
This commit is contained in:
219
AUTS_DataService/ServiceTask/DbSyncServiceTask.vb
Normal file
219
AUTS_DataService/ServiceTask/DbSyncServiceTask.vb
Normal file
@@ -0,0 +1,219 @@
|
||||
Imports System.Threading
|
||||
Imports System.Xml.Serialization
|
||||
Imports UTS_Core.Database
|
||||
Imports UTS_Core.UTSModule
|
||||
Imports UTS_Core.UTSModule.Service
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 数据库同步类
|
||||
''' </summary>
|
||||
<XmlInclude(GetType(DbSyncServiceTask))>
|
||||
Public Class DbSyncServiceTask
|
||||
Inherits ServiceTask
|
||||
|
||||
Sub New()
|
||||
TaskType = ServiceTaskTypeEnum.DbSync
|
||||
TaskStatus = ServiceTaskStatusEnum.Stop
|
||||
TaskName = "DbSync_Default"
|
||||
|
||||
Interval = 5
|
||||
|
||||
|
||||
LocalDbType = UtsDb.LocalDbType
|
||||
LocalConnString = UtsDb.LocalConnString
|
||||
|
||||
RemoteDbType = UtsDb.RemoteDbType
|
||||
RemoteConnString = UtsDb.RemoteConnString
|
||||
RemotePublicDb = UtsDb.RemotePublicDb
|
||||
RemotePrivateDb = UtsDb.RemotePrivateDb
|
||||
End Sub
|
||||
|
||||
Sub New(name As String, param As Dictionary(Of String, String))
|
||||
TaskType = ServiceTaskTypeEnum.DbSync
|
||||
TaskStatus = ServiceTaskStatusEnum.Stop
|
||||
TaskName = name
|
||||
|
||||
LocalDbType = UtsDb.LocalDbType
|
||||
LocalConnString = UtsDb.LocalConnString
|
||||
|
||||
RemoteDbType = UtsDb.RemoteDbType
|
||||
RemoteConnString = UtsDb.RemoteConnString
|
||||
RemotePublicDb = UtsDb.RemotePublicDb
|
||||
RemotePrivateDb = UtsDb.RemotePrivateDb
|
||||
|
||||
SetParams(param)
|
||||
End Sub
|
||||
|
||||
|
||||
Private Sub StartCallback(stat As Object)
|
||||
While TaskStatus = ServiceTaskStatusEnum.Start
|
||||
|
||||
'同步任务
|
||||
Try
|
||||
Dim syncParam As DbSynchronizer.SyncParam
|
||||
syncParam.LocalConnString = LocalConnString
|
||||
syncParam.LocalType = LocalDbType
|
||||
syncParam.RemoteType = RemoteDbType
|
||||
syncParam.RemoteConnString = RemoteConnString
|
||||
syncParam.PublicDb = RemotePublicDb
|
||||
syncParam.PrivateDb = RemotePrivateDb
|
||||
|
||||
'开始数据库同步
|
||||
Dim sync As DbSynchronizer
|
||||
sync = New DbSynchronizer(syncParam)
|
||||
sync.SyncDatabase()
|
||||
ServiceLog.WriteInfoLog($"DbSynchronizer Succeeded!")
|
||||
|
||||
'更新数据库更新时间
|
||||
LastUpdateTime = $"{Now:yyyy-MM-dd HH:mm:ss}"
|
||||
Catch ex As Exception
|
||||
ServiceLog.WriteErrorLog($"TaskName:{TaskName},SyncDatabase Error:{ex.Message}")
|
||||
End Try
|
||||
|
||||
'等待任务
|
||||
Dim lastTime As Date = Now
|
||||
While (Now - lastTime).TotalMinutes < Interval
|
||||
If TaskStatus = ServiceTaskStatusEnum.Stop Then
|
||||
Return
|
||||
End If
|
||||
|
||||
Thread.Sleep(1000)
|
||||
End While
|
||||
|
||||
End While
|
||||
End Sub
|
||||
|
||||
|
||||
Public Overrides Sub Start()
|
||||
TaskStatus = ServiceTaskStatusEnum.Start
|
||||
|
||||
If _syncThread IsNot Nothing AndAlso _syncThread.IsAlive Then
|
||||
ServiceLog.WriteDebugLog($"SyncThread IsAlive!")
|
||||
Return
|
||||
End If
|
||||
|
||||
_syncThread = New Thread(AddressOf StartCallback)
|
||||
_syncThread.IsBackground = True
|
||||
_syncThread.Start()
|
||||
|
||||
ServiceLog.WriteDebugLog($"DbSync Start!")
|
||||
End Sub
|
||||
|
||||
|
||||
Public Overrides Sub [Stop]()
|
||||
TaskStatus = ServiceTaskStatusEnum.Stop
|
||||
|
||||
Dim lastTime As Date = Now
|
||||
While _syncThread IsNot Nothing AndAlso _syncThread.IsAlive
|
||||
If (Now - lastTime).TotalMilliseconds > 10000 Then
|
||||
_syncThread.Abort()
|
||||
End If
|
||||
|
||||
Thread.Sleep(100)
|
||||
End While
|
||||
ServiceLog.WriteInfoLog($"DbSync Stop!")
|
||||
End Sub
|
||||
|
||||
|
||||
Public Overrides Sub Restart()
|
||||
If _syncThread IsNot Nothing AndAlso _syncThread.IsAlive Then
|
||||
[Stop]()
|
||||
End If
|
||||
|
||||
|
||||
Start()
|
||||
ServiceLog.WriteInfoLog($"DbSync Restart!")
|
||||
End Sub
|
||||
|
||||
|
||||
Public Overrides Sub SetParams(params As Dictionary(Of String, String))
|
||||
Dim tmpStatus As ServiceTaskStatusEnum = TaskStatus
|
||||
Dim tmpInterval As Integer = Interval
|
||||
|
||||
'将转换后的数据填充至临时缓存,一遍设置失败时可以保留上一次数据
|
||||
For Each param As KeyValuePair(Of String, String) In params
|
||||
Select Case param.Key
|
||||
Case "Interval"
|
||||
If Integer.TryParse(param.Value, tmpInterval) = False Then
|
||||
Throw New Exception($"Error Interval :{param.Value}")
|
||||
End If
|
||||
Case "LocalDbType"
|
||||
'If [Enum].TryParse(param.Value, tmpLocalType) = False Then
|
||||
' Throw New Exception($"Error LocalDbType :{param.Value}")
|
||||
'End If
|
||||
Case "LocalConnString"
|
||||
'tmpLocalConnString = UTS_Core.Security.Aes128.DecryptStr(param.Value, UTS_Core.Security.Aes128.ServerAesKey)
|
||||
Case "Status", "Type", "Name" '不处理的字段
|
||||
|
||||
|
||||
Case Else
|
||||
ServiceLog.WriteWarningLog($"DbSync Unknown param name :{param.Key}")
|
||||
End Select
|
||||
Next
|
||||
|
||||
TaskStatus = tmpStatus
|
||||
Interval = tmpInterval
|
||||
End Sub
|
||||
|
||||
|
||||
Public Overrides Function GetParams() As Dictionary(Of String, String)
|
||||
Dim params As New Dictionary(Of String, String)
|
||||
params.Add("Type", TaskType.ToString())
|
||||
params.Add("Name", TaskName)
|
||||
params.Add("Status", TaskStatus.ToString())
|
||||
params.Add("Interval", Interval.ToString())
|
||||
params.Add("LocalDbType", LocalDbType.ToString())
|
||||
params.Add("LocalConnString", UTS_Core.Security.Aes128.EncryptStr(LocalConnString, UTS_Core.Security.Aes128.ServerAesKey))
|
||||
|
||||
Return params
|
||||
End Function
|
||||
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 远程数据库的类型
|
||||
''' </summary>
|
||||
Private RemoteDbType As DbExecutor.DbTypeEnum
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 远程数据库的连接字符串
|
||||
''' </summary>
|
||||
Private RemoteConnString As String
|
||||
|
||||
''' <summary>
|
||||
''' 远端公共数据库名
|
||||
''' </summary>
|
||||
Private RemotePublicDb As String
|
||||
|
||||
''' <summary>
|
||||
''' 远端私有库名
|
||||
''' </summary>
|
||||
Private RemotePrivateDb As String
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 本地数据库的类型
|
||||
''' </summary>
|
||||
Private LocalDbType As DbExecutor.DbTypeEnum
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 本地数据库的连接字符串
|
||||
''' </summary>
|
||||
Private LocalConnString As String
|
||||
|
||||
''' <summary>
|
||||
''' 同步间隔,单位分钟,默认5分钟,最小值为1分钟
|
||||
''' </summary>
|
||||
''' <returns></returns>
|
||||
Public Property Interval() As Integer
|
||||
|
||||
Private _syncThread As Thread
|
||||
|
||||
''' <summary>
|
||||
''' 最后一次更新的时间字符串
|
||||
''' </summary>
|
||||
Public Shared LastUpdateTime As String = ""
|
||||
End Class
|
||||
676
AUTS_DataService/ServiceTask/DbSynchronizer.vb
Normal file
676
AUTS_DataService/ServiceTask/DbSynchronizer.vb
Normal file
@@ -0,0 +1,676 @@
|
||||
Imports System.Data.Common
|
||||
Imports System.Text
|
||||
Imports MySql.Data.MySqlClient
|
||||
Imports UTS_Core.Database
|
||||
Imports UTS_Core.UTSModule.DbTableModel
|
||||
|
||||
''' <summary>
|
||||
''' 数据库同步器
|
||||
''' </summary>
|
||||
Public Class DbSynchronizer
|
||||
|
||||
''' <summary>
|
||||
''' 同步表内容
|
||||
''' </summary>
|
||||
Structure SyncTableScheme
|
||||
''' <summary>表名</summary>
|
||||
Public TableName As String
|
||||
''' <summary>同步类型</summary>
|
||||
Public SyncType As String
|
||||
''' <summary>是否存在当前数据表</summary>
|
||||
Public IsExistsTable As Boolean
|
||||
''' <summary>更新版本</summary>
|
||||
Public RevisionID As String
|
||||
''' <summary>上一次更新时间字符串</summary>
|
||||
Public LastSyncTime As String
|
||||
''' <summary>同步后更新的时间字符串</summary>
|
||||
Public NowSyncTime As String
|
||||
End Structure
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 同步数据库参数
|
||||
''' </summary>
|
||||
Structure SyncParam
|
||||
Public PublicDb As String
|
||||
|
||||
Public PrivateDb As String
|
||||
|
||||
Public RemoteType As DbExecutor.DbTypeEnum
|
||||
|
||||
Public RemoteConnString As String
|
||||
|
||||
Public LocalType As DbExecutor.DbTypeEnum
|
||||
|
||||
Public LocalConnString As String
|
||||
End Structure
|
||||
|
||||
|
||||
Sub New(param As SyncParam)
|
||||
Parameters = param
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 连接参数
|
||||
''' </summary>
|
||||
''' <returns></returns>
|
||||
Public Property Parameters() As SyncParam
|
||||
|
||||
''' <summary>
|
||||
''' 同步数据库,开始同步
|
||||
''' </summary>
|
||||
Public Sub SyncDatabase()
|
||||
Using remoteDb As New DbExecutor(Parameters.RemoteType, Parameters.RemoteConnString)
|
||||
remoteDb.Open()
|
||||
|
||||
Using localDb As New DbExecutor(Parameters.LocalType, Parameters.LocalConnString)
|
||||
localDb.Open()
|
||||
|
||||
SyncDatabase(localDb, remoteDb, Parameters.PublicDb, Parameters.PrivateDb)
|
||||
|
||||
localDb.Close()
|
||||
End Using
|
||||
|
||||
remoteDb.Close()
|
||||
End Using
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 同步数据库执行过程
|
||||
''' </summary>
|
||||
''' <param name="localDb">本地数据库执行器</param>
|
||||
''' <param name="remoteDb">云端数据库执行器</param>
|
||||
''' <param name="publicDb">远端数据库公共库名</param>
|
||||
''' <param name="privateDb">远端数据库私有库名</param>
|
||||
Public Sub SyncDatabase(localDb As DbExecutor, remoteDb As DbExecutor, publicDb As String, privateDb As String)
|
||||
If remoteDb Is Nothing Then Return
|
||||
If localDb Is Nothing Then Return
|
||||
|
||||
'上传本地缓存数据
|
||||
remoteDb.BeginTransaction()
|
||||
UploadCacheData(remoteDb, localDb)
|
||||
remoteDb.CommitTransaction()
|
||||
ServiceLog.WriteDebugLog($"UploadCacheData Successded!")
|
||||
|
||||
'获取需要公有下载的数据表
|
||||
DownloadSyncTable(remoteDb, localDb, publicDb, Manage.SyncListTable.TableName)
|
||||
ServiceLog.WriteDebugLog($"Download Puclic SyncTable Successded!")
|
||||
|
||||
'获取需要私有下载的数据表
|
||||
DownloadSyncTable(remoteDb, localDb, privateDb, Customer.SyncListTable.TableName)
|
||||
ServiceLog.WriteDebugLog($"Download Private SyncTable Successded!")
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 上传本地缓存数据,DataTable的方式
|
||||
''' </summary>
|
||||
''' <param name="remoteDb">远程数据库执行器</param>
|
||||
''' <param name="localDb">本地数据库执行器</param>
|
||||
Private Sub UploadCacheData(remoteDb As DbExecutor, localDb As DbExecutor)
|
||||
|
||||
CreateCacheTableWhenNotExists(localDb)
|
||||
|
||||
Dim dataTable As DataTable = SearchCacheData(localDb)
|
||||
|
||||
UploadCacheData(remoteDb, localDb, dataTable)
|
||||
|
||||
End Sub
|
||||
|
||||
#Region "上传本地缓存数据"
|
||||
''' <summary>
|
||||
''' 本地创建缓存记录表,如果不存在则创建
|
||||
''' </summary>
|
||||
''' <param name="localDb">本地数据库的执行器</param>
|
||||
Private Sub CreateCacheTableWhenNotExists(localDb As DbExecutor)
|
||||
localDb.ExecuteNonQuery(LocalPrivate.CacheTable.CreateTableString)
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 查询本地未上传的数据,单次查询5000条数据
|
||||
''' </summary>
|
||||
''' <param name="localDb">本地数据库的执行器</param>
|
||||
''' <returns></returns>
|
||||
Private Function SearchCacheData(localDb As DbExecutor) As DataTable
|
||||
Dim tableName As String = LocalPrivate.CacheTable.TableName
|
||||
Dim colNames As String = $"{LocalPrivate.CacheTable.ColNamesEnum.ID},{LocalPrivate.CacheTable.ColNamesEnum.SqlCmd}"
|
||||
Dim condition As String = $"{LocalPrivate.CacheTable.ColNamesEnum.IsUpload} = 0 order by `{LocalPrivate.CacheTable.ColNamesEnum.ID}` Limit 5000"
|
||||
Dim cmdText As String = localDb.CmdHelper.Search(colNames, tableName, condition)
|
||||
Return localDb.ExecuteDataTable(cmdText)
|
||||
End Function
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 上传本地数据至云端数据库
|
||||
''' </summary>
|
||||
''' <param name="remoteDb"></param>
|
||||
''' <param name="localDb"></param>
|
||||
''' <param name="dataTable"></param>
|
||||
Private Sub UploadCacheData(remoteDb As DbExecutor, localDb As DbExecutor, dataTable As DataTable)
|
||||
ServiceLog.WriteDebugLog($"Begin Upload Cache DataTable [{dataTable.Rows.Count}] Rows!")
|
||||
For Each row As DataRow In dataTable.Rows
|
||||
Try
|
||||
Dim cmdText As String = row(LocalPrivate.CacheTable.ColNamesEnum.SqlCmd.ToString()).ToString()
|
||||
remoteDb.ExecuteNonQuery(cmdText)
|
||||
|
||||
Catch ex As MySqlException
|
||||
'记录失败
|
||||
ServiceLog.WriteErrorLog($"UploadData Error:{ex.Message},ErrorNumber:{ex.Number}")
|
||||
|
||||
Select Case ex.Number
|
||||
Case MySqlErrorCode.DuplicateFieldName '重复字段
|
||||
ServiceLog.WriteInfoLog($"DuplicateFieldName,Ignore the error!")
|
||||
Case Else
|
||||
ServiceLog.WriteWarningLog($"Unknown,Continue to perform!")
|
||||
Exit For
|
||||
End Select
|
||||
Catch ex As Exception
|
||||
'记录失败
|
||||
ServiceLog.WriteErrorLog($"UploadData Error:{ex.Message}")
|
||||
|
||||
ServiceError.RecodeError("DbSync", $"UploadData Error:{ex.Message}")
|
||||
Exit For
|
||||
End Try
|
||||
|
||||
Try
|
||||
DeleteUploadData(localDb, row(LocalPrivate.CacheTable.ColNamesEnum.ID).ToString)
|
||||
Catch ex As Exception
|
||||
'记录失败
|
||||
ServiceLog.WriteErrorLog($"UpdateCacheTable Error:{ex.Message}")
|
||||
End Try
|
||||
Next
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 数据上传完成后,删除本地数据库中对应序号的记录
|
||||
''' </summary>
|
||||
''' <param name="localDb"></param>
|
||||
''' <param name="sn"></param>
|
||||
Private Sub DeleteUploadData(localDb As DbExecutor, sn As String)
|
||||
Dim tableName As String = LocalPrivate.CacheTable.TableName
|
||||
Dim condition As String = $"{LocalPrivate.CacheTable.ColNamesEnum.ID} = '{sn}'"
|
||||
localDb.ExecuteNonQuery(localDb.CmdHelper.DeleteRows(tableName, condition))
|
||||
End Sub
|
||||
#End Region
|
||||
|
||||
#Region "获取需要下载表集合"
|
||||
''' <summary>
|
||||
''' 比对本地与云端版本表,下载变化的数据表
|
||||
''' </summary>
|
||||
''' <param name="remoteDb"></param>
|
||||
''' <param name="localDb"></param>
|
||||
''' <param name="dbName"></param>
|
||||
''' <param name="tbName"></param>
|
||||
Public Sub DownloadSyncTable(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, tbName As String)
|
||||
Dim downloadTables As List(Of SyncTableScheme)
|
||||
downloadTables = CompareVersionTable(remoteDb, localDb, dbName, tbName)
|
||||
UpdateTables(remoteDb, localDb, dbName, downloadTables) '更新本地数据
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 比较本地与云端版本表的差异,获取需要下载的数据表
|
||||
''' </summary>
|
||||
''' <param name="remoteDb"></param>
|
||||
''' <param name="localDb"></param>
|
||||
''' <param name="tableName"></param>
|
||||
''' <returns></returns>
|
||||
Private Function CompareVersionTable(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, tableName As String) As List(Of SyncTableScheme)
|
||||
CreateVersionTableWhenNotExists(localDb)
|
||||
Dim localTable As DataTable = localDb.ExecuteDataTable(localDb.CmdHelper.SearchAll(Customer.SyncListTable.TableName))
|
||||
Dim remoteTable As DataTable = remoteDb.ExecuteDataTable(remoteDb.CmdHelper.DbSearchAll(dbName, tableName))
|
||||
Return CompareVersionTable(localTable, remoteTable)
|
||||
End Function
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 创建本地数据库表版本记录表
|
||||
''' </summary>
|
||||
''' <param name="localDb"></param>
|
||||
Private Sub CreateVersionTableWhenNotExists(localDb As DbExecutor)
|
||||
Try
|
||||
localDb.ExecuteNonQuery(Customer.SyncListTable.CreateTableString(localDb.DatabaseType))
|
||||
Catch ex As Exception
|
||||
'记录失败
|
||||
ServiceLog.WriteErrorLog($"CreateVersionTableWhenNotExists Error:{ex.Message}")
|
||||
End Try
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 比较本地与云端版本表的差异
|
||||
''' </summary>
|
||||
''' <param name="srcDataTable">源DataTable</param>
|
||||
''' <param name="destDataTable">目标DataTable</param>
|
||||
''' <returns>差异信息列表</returns>
|
||||
Private Function CompareVersionTable(srcDataTable As DataTable, destDataTable As DataTable) As List(Of SyncTableScheme)
|
||||
Dim result As New List(Of SyncTableScheme)
|
||||
Dim srcTableName As String
|
||||
Dim destTableName As String
|
||||
Dim isFind As Boolean
|
||||
Dim srcReVId As String = String.Empty
|
||||
Dim destRevId As String
|
||||
For i As Integer = 0 To destDataTable.Rows.Count - 1
|
||||
destTableName = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.TableName}")}"
|
||||
isFind = False
|
||||
|
||||
For j As Integer = 0 To srcDataTable.Rows.Count - 1
|
||||
srcTableName = $"{srcDataTable.Rows(j)($"{Customer.SyncListTable.ColNamesEnum.TableName}")}"
|
||||
|
||||
If String.Compare(destTableName, srcTableName, True) = 0 Then
|
||||
srcReVId = $"{srcDataTable.Rows(j)($"{Customer.SyncListTable.ColNamesEnum.RevisionID}")}"
|
||||
isFind = True
|
||||
Exit For
|
||||
End If
|
||||
Next
|
||||
|
||||
If isFind Then
|
||||
destRevId = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.RevisionID}")}"
|
||||
If String.Compare(srcReVId, destRevId, True) <> 0 Then '源表中存在记录且与目的表记录时间不相同
|
||||
result.Add(New SyncTableScheme() With {.TableName = destTableName,
|
||||
.SyncType = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.SyncType}")}",
|
||||
.LastSyncTime = $"{srcDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.SyncTime}")}",
|
||||
.RevisionID = destRevId,
|
||||
.IsExistsTable = True})
|
||||
End If
|
||||
Else
|
||||
result.Add(New SyncTableScheme() With {.TableName = destTableName,
|
||||
.SyncType = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.SyncType}")}",
|
||||
.LastSyncTime = $"2000-01-01 00:00:00",
|
||||
.RevisionID = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.RevisionID}")}",
|
||||
.IsExistsTable = False})
|
||||
End If
|
||||
Next
|
||||
Return result
|
||||
End Function
|
||||
|
||||
#End Region
|
||||
|
||||
#Region "下载远程数据至本地"
|
||||
''' <summary>
|
||||
''' 更新下载数据表
|
||||
''' </summary>
|
||||
''' <param name="remoteDb">远程数据库执行器</param>
|
||||
''' <param name="localDb">本地数据库执行器</param>
|
||||
''' <param name="tables">需要同步的数据表信息集合</param>
|
||||
Private Sub UpdateTables(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, tables As List(Of SyncTableScheme))
|
||||
Dim ts As Date
|
||||
Try
|
||||
ts = CDate(remoteDb.ExecuteScalar("select current_timestamp()"))
|
||||
Catch ex As Exception
|
||||
ServiceLog.WriteErrorLog($"Get DB System Time Error:{ex.Message}")
|
||||
Return
|
||||
End Try
|
||||
|
||||
For Each table As SyncTableScheme In tables
|
||||
table.NowSyncTime = ts.ToString("yyyy-MM-dd HH:mm:ss")
|
||||
localDb.BeginTransaction()
|
||||
Try
|
||||
If String.Compare(table.SyncType, "all", True) = 0 Then
|
||||
UpdateTableForAll(remoteDb, localDb, dbName, table)
|
||||
ElseIf String.Compare(table.SyncType, "new", True) = 0 Then
|
||||
'增量更新Update限制,必须包含ID、UpdateTime字段,且ID字段为主键
|
||||
UpdateTableForNew(remoteDb, localDb, dbName, table)
|
||||
Else
|
||||
UpdateTableForAll(remoteDb, localDb, dbName, table)
|
||||
End If
|
||||
|
||||
UpdateVersionTable(localDb, table)
|
||||
Catch ex As Exception
|
||||
ServiceLog.WriteErrorLog($"UpdateTable {table.TableName} Error:{ex.Message}")
|
||||
End Try
|
||||
localDb.CommitTransaction()
|
||||
Next
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 下载修改全表修改部分的方式下载表
|
||||
''' </summary>
|
||||
''' <param name="remoteDb"></param>
|
||||
''' <param name="localDb"></param>
|
||||
''' <param name="dbName"></param>
|
||||
''' <param name="table"></param>
|
||||
Private Sub UpdateTableForNew(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, table As SyncTableScheme)
|
||||
Dim maxID As Integer = 0
|
||||
If table.IsExistsTable = False Then
|
||||
Dim remoteTable As DataTable = remoteDb.ExecuteDataTable(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName, "1 = 0"))
|
||||
Dim createTableString As String = CreateTableStringByDataTable(table.TableName, remoteTable) '获取建表语句
|
||||
localDb.ExecuteNonQuery(createTableString)
|
||||
Else
|
||||
'查询本地最大ID,主键必须为ID
|
||||
Dim cmd As String = localDb.CmdHelper.SearchOrder(New List(Of String)() From {"ID"}, table.TableName, "ORDER BY [ID] DESC LIMIT 1")
|
||||
Dim maxIdObject As Object = localDb.ExecuteScalar(cmd)
|
||||
If IsDBNull(maxIdObject) = False Then
|
||||
maxID = CInt(maxIdObject)
|
||||
End If
|
||||
End If
|
||||
|
||||
'通过dataReader插入数据,查询远程更新时间后的所有内容
|
||||
Using reader As DbDataReader = remoteDb.ExecuteReader(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName, $"`UpdateTime` > '{table.LastSyncTime}'"))
|
||||
While reader.Read()
|
||||
If CInt(reader("ID")) > maxID Then '小于最大ID则更新,大于最大ID则插入
|
||||
InsertDataByDataReader(localDb, table.TableName, reader)
|
||||
Else
|
||||
UpdateDataByDataReader(localDb, table.TableName, reader)
|
||||
End If
|
||||
End While
|
||||
reader.Close()
|
||||
End Using
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 更新全表的方式下载表
|
||||
''' </summary>
|
||||
''' <param name="remoteDb"></param>
|
||||
''' <param name="localDb"></param>
|
||||
''' <param name="table"></param>
|
||||
Private Sub UpdateTableForAll(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, table As SyncTableScheme)
|
||||
'删除本地表
|
||||
If table.IsExistsTable Then
|
||||
localDb.ExecuteNonQuery(localDb.CmdHelper.DropTable(table.TableName))
|
||||
End If
|
||||
|
||||
'新建表(考虑是否应该查询总数后,依此取1000行的方式下载,或考虑使用DataReader)
|
||||
Dim remoteTable As DataTable = remoteDb.ExecuteDataTable(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName, "1 = 0"))
|
||||
Dim createTableString As String = CreateTableStringByDataTable(table.TableName, remoteTable) '获取建表语句
|
||||
localDb.ExecuteNonQuery(createTableString)
|
||||
|
||||
'通过dataReader插入数据
|
||||
Using reader As DbDataReader = remoteDb.ExecuteReader(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName))
|
||||
While reader.Read()
|
||||
InsertDataByDataReader(localDb, table.TableName, reader)
|
||||
End While
|
||||
reader.Close()
|
||||
End Using
|
||||
End Sub
|
||||
|
||||
Private Sub UpdateDataByDataReader(localDb As DbExecutor, tableName As String, dbReader As DbDataReader)
|
||||
Dim colNames As New List(Of String)
|
||||
|
||||
Using comm As DbCommand = localDb.CreateCommand()
|
||||
For i As Integer = 0 To dbReader.FieldCount - 1
|
||||
colNames.Add(dbReader.GetName(i))
|
||||
|
||||
Dim param As DbParameter = comm.CreateParameter()
|
||||
param.DbType = DbType.Object
|
||||
param.ParameterName = $"@{dbReader.GetName(i)}"
|
||||
param.Value = dbReader.Item(i)
|
||||
|
||||
comm.Parameters.Add(param)
|
||||
Next
|
||||
|
||||
comm.CommandText = localDb.CmdHelper.UpdateParam(tableName, colNames, $"`ID` = {dbReader("ID")}")
|
||||
|
||||
comm.ExecuteNonQuery()
|
||||
End Using
|
||||
End Sub
|
||||
|
||||
|
||||
Private Sub InsertDataByDataReader(localDb As DbExecutor, tableName As String, dbReader As DbDataReader)
|
||||
Dim colNames As New List(Of String)
|
||||
|
||||
Using comm As DbCommand = localDb.CreateCommand()
|
||||
For i As Integer = 0 To dbReader.FieldCount - 1
|
||||
colNames.Add(dbReader.GetName(i))
|
||||
|
||||
Dim param As DbParameter = comm.CreateParameter()
|
||||
param.DbType = DbType.Object
|
||||
param.ParameterName = $"@{dbReader.GetName(i)}"
|
||||
param.Value = dbReader.Item(i)
|
||||
|
||||
comm.Parameters.Add(param)
|
||||
Next
|
||||
|
||||
comm.CommandText = localDb.CmdHelper.InsertParam(tableName, colNames)
|
||||
|
||||
comm.ExecuteNonQuery()
|
||||
End Using
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 将目标DataTable的所有数据插入到指定数据表中,不检测是否需要新增列
|
||||
'''
|
||||
''' 后续插入字符需要区别数值与字符串
|
||||
''' </summary>
|
||||
''' <param name="localDb"></param>
|
||||
''' <param name="tableName"></param>
|
||||
''' <param name="dataTable"></param>
|
||||
Private Sub InsertDataByDataTable(localDb As DbExecutor, tableName As String, dataTable As DataTable)
|
||||
Dim colNameParams As New Dictionary(Of String, String)
|
||||
' Dim colParamValues As New Dictionary(Of String, Object)
|
||||
|
||||
For Each dtRow As DataRow In dataTable.Rows
|
||||
For Each dtCol As DataColumn In dataTable.Columns
|
||||
colNameParams.Add(dtCol.ColumnName, $"@{dtCol.ColumnName}")
|
||||
' colParamValues.Add($"@{dtCol.ColumnName}", dtRow(dtCol.ColumnName))
|
||||
|
||||
'存疑是否可以修改
|
||||
Dim param As DbParameter = localDb.Command.CreateParameter()
|
||||
param.DbType = DbType.Object
|
||||
param.ParameterName = $"@{dtCol.ColumnName}"
|
||||
param.Value = dtRow(dtCol.ColumnName)
|
||||
|
||||
localDb.Command.Parameters.Add(param)
|
||||
Next
|
||||
|
||||
Dim cmdText As String = localDb.CmdHelper.Insert(tableName, colNameParams)
|
||||
localDb.ExecuteNonQuery(cmdText) '执行插入
|
||||
localDb.Command.Parameters.Clear() '清空参数
|
||||
|
||||
|
||||
colNameParams.Clear() '复位
|
||||
Next
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 更新本地数据成功后,同步更新本地版本记录表
|
||||
''' </summary>
|
||||
''' <param name="localDb">本地数据库执行器</param>
|
||||
''' <param name="table">需要同步的数据表信息</param>
|
||||
Private Sub UpdateVersionTable(localDb As DbExecutor, table As SyncTableScheme)
|
||||
Dim tableName As String = Customer.SyncListTable.TableName
|
||||
Dim cmdText As String
|
||||
|
||||
If table.IsExistsTable Then
|
||||
Dim colNameValue As New Dictionary(Of String, String) From {
|
||||
{$"{Customer.SyncListTable.ColNamesEnum.RevisionID}", table.RevisionID},
|
||||
{$"{Customer.SyncListTable.ColNamesEnum.SyncTime}", table.NowSyncTime}
|
||||
}
|
||||
Dim condition As String = $"{Customer.SyncListTable.ColNamesEnum.TableName} = '{table.TableName}'"
|
||||
cmdText = localDb.CmdHelper.Update(tableName, colNameValue, condition)
|
||||
Else
|
||||
Dim colNameValue As New Dictionary(Of String, String) From {
|
||||
{$"{Customer.SyncListTable.ColNamesEnum.TableName}", table.TableName},
|
||||
{$"{Customer.SyncListTable.ColNamesEnum.RevisionID}", table.RevisionID},
|
||||
{$"{Customer.SyncListTable.ColNamesEnum.SyncType}", table.SyncType},
|
||||
{$"{Customer.SyncListTable.ColNamesEnum.SyncTime}", table.NowSyncTime}
|
||||
}
|
||||
cmdText = localDb.CmdHelper.Insert(tableName, colNameValue)
|
||||
End If
|
||||
|
||||
Try
|
||||
localDb.ExecuteNonQuery(cmdText)
|
||||
Catch ex As Exception
|
||||
ServiceLog.WriteErrorLog($"UpdateVersionTable Error:{ex.Message}")
|
||||
End Try
|
||||
End Sub
|
||||
|
||||
#End Region
|
||||
|
||||
|
||||
|
||||
#Region "数据类型转换"
|
||||
Enum SystemTypeEnum
|
||||
[AnsiString]
|
||||
[AnsiStringFixedLength]
|
||||
Binary
|
||||
[Boolean]
|
||||
[Byte]
|
||||
[DateTime]
|
||||
[Decimal]
|
||||
[Double]
|
||||
[Guid]
|
||||
[Int16]
|
||||
[Int32]
|
||||
[Int64]
|
||||
[SByte]
|
||||
[Single]
|
||||
[String]
|
||||
[TimeSpan]
|
||||
[UInt16]
|
||||
[UInt32]
|
||||
[UInt64]
|
||||
End Enum
|
||||
|
||||
''' <summary>
|
||||
''' 将.net数据类型转换为Sqlite数据类型
|
||||
''' </summary>
|
||||
''' <param name="dataType">.net数据类型</param>
|
||||
''' <returns>转换后的Sqlite数据类型</returns>
|
||||
Public Shared Function ConvertSystemTypeToSqliteType(dataType As String) As String
|
||||
Dim reType As String
|
||||
|
||||
Select Case dataType
|
||||
Case SystemTypeEnum.AnsiString.ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.Varchar.ToString()
|
||||
|
||||
Case SystemTypeEnum.[AnsiStringFixedLength].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.Nchar.ToString()
|
||||
|
||||
Case SystemTypeEnum.Binary.ToString(), SystemTypeEnum.Byte.ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.Blob.ToString()
|
||||
|
||||
Case SystemTypeEnum.[Boolean].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.Bit.ToString()
|
||||
|
||||
Case SystemTypeEnum.[DateTime].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.Datetime.ToString()
|
||||
|
||||
Case SystemTypeEnum.[Decimal].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.[Decimal].ToString()
|
||||
|
||||
Case SystemTypeEnum.[Double].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.Real.ToString()
|
||||
|
||||
Case SystemTypeEnum.[Guid].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.UniqueIdentifier.ToString()
|
||||
|
||||
Case SystemTypeEnum.[Int16].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.SmallInt.ToString()
|
||||
|
||||
Case SystemTypeEnum.[Int32].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.[Integer].ToString()
|
||||
|
||||
Case SystemTypeEnum.[Int64].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.[Integer].ToString()
|
||||
|
||||
Case SystemTypeEnum.[SByte].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.TinyInt.ToString()
|
||||
|
||||
Case SystemTypeEnum.[Single].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.[Single].ToString()
|
||||
|
||||
Case SystemTypeEnum.[String].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.Nvarchar.ToString()
|
||||
|
||||
Case SystemTypeEnum.TimeSpan.ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.Datetime.ToString()
|
||||
|
||||
Case SystemTypeEnum.[UInt16].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.SmallUint.ToString()
|
||||
|
||||
Case SystemTypeEnum.[UInt32].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.Uint.ToString()
|
||||
|
||||
Case SystemTypeEnum.[UInt64].ToString()
|
||||
reType = Sqlite.DataParam.DataTypeEnum.UnsignedInteger.ToString()
|
||||
|
||||
Case "Byte[]"
|
||||
reType = Sqlite.DataParam.DataTypeEnum.Blob.ToString()
|
||||
|
||||
Case Else
|
||||
'reType = Sqlite.DataParam.DataTypeEnum.Varchar.ToString()
|
||||
Throw New Exception($"ConvertSystemTypeToSqliteType,UnDealType:{dataType}{vbNewLine}")
|
||||
End Select
|
||||
|
||||
Return reType
|
||||
End Function
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 通过解析DbDataReader列名与列类型,暂不可用
|
||||
''' </summary>
|
||||
''' <param name="tableName"></param>
|
||||
''' <param name="reader"></param>
|
||||
''' <returns></returns>
|
||||
Private Function CreateTableStringByDataReader(tableName As String, reader As DbDataReader) As String
|
||||
Dim builder As New StringBuilder
|
||||
builder.Append($"CREATE TABLE If Not Exists [{tableName}] (")
|
||||
|
||||
For i As Integer = 0 To reader.FieldCount - 1
|
||||
If i = 0 Then
|
||||
'应该修改成通用的类型转换
|
||||
builder.Append($"[{reader.GetName(i)}] {ConvertSystemTypeToSqliteType(reader.GetDataTypeName(i))}")
|
||||
Else
|
||||
builder.Append($", [{reader.GetName(i)}] {ConvertSystemTypeToSqliteType(reader.GetDataTypeName(i))}")
|
||||
End If
|
||||
Next
|
||||
|
||||
builder.Append(" )")
|
||||
Return builder.ToString()
|
||||
End Function
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 通过解析DataTable,获取建表语句
|
||||
''' </summary>
|
||||
''' <param name="tableName">数据表名</param>
|
||||
''' <param name="destTable">需要解析的内存数据表</param>
|
||||
''' <returns></returns>
|
||||
Private Function CreateTableStringByDataTable(tableName As String, destTable As DataTable) As String
|
||||
Dim builder As New StringBuilder
|
||||
builder.Append($"CREATE TABLE If Not Exists [{tableName}] (")
|
||||
For i As Integer = 0 To destTable.Columns.Count - 1
|
||||
If i = 0 Then
|
||||
'应该修改成通用的类型转换
|
||||
builder.Append($"[{destTable.Columns(i).ColumnName}] {ConvertSystemTypeToSqliteType(destTable.Columns(i).DataType.Name)}")
|
||||
Else
|
||||
builder.Append($", [{destTable.Columns(i).ColumnName}] {ConvertSystemTypeToSqliteType(destTable.Columns(i).DataType.Name)}")
|
||||
End If
|
||||
|
||||
'列名长度
|
||||
If destTable.Columns(i).MaxLength > -1 Then
|
||||
builder.Append($"({destTable.Columns(i).MaxLength})")
|
||||
End If
|
||||
|
||||
'列为空
|
||||
If destTable.Columns(i).AllowDBNull = False Then
|
||||
builder.Append($" NOT NULL")
|
||||
End If
|
||||
|
||||
'列自增(默认列自增为主键)
|
||||
If destTable.Columns(i).AutoIncrement Then
|
||||
builder.Append($" PRIMARY KEY AutoIncrement")
|
||||
'Else
|
||||
' '唯一值
|
||||
' If destTable.Columns(i).Unique Then
|
||||
' builder.Append($" Unique")
|
||||
' End If
|
||||
End If
|
||||
|
||||
''默认值
|
||||
'If destTable.Columns(i).DefaultValue IsNot Nothing Then
|
||||
' If String.IsNullOrEmpty(destTable.Columns(i).DefaultValue.ToString()) = False Then
|
||||
' builder.Append($" DEFAULT '{destTable.Columns(i).DefaultValue}'")
|
||||
' End If
|
||||
'End If
|
||||
|
||||
Next
|
||||
|
||||
builder.Append(" )")
|
||||
Return builder.ToString()
|
||||
End Function
|
||||
#End Region
|
||||
|
||||
End Class
|
||||
33
AUTS_DataService/ServiceTask/IServiceTask.vb
Normal file
33
AUTS_DataService/ServiceTask/IServiceTask.vb
Normal file
@@ -0,0 +1,33 @@
|
||||
''' <summary>
|
||||
''' 任务类型接口,每个任务必须实现的功能
|
||||
''' </summary>
|
||||
Public Interface IServiceTask
|
||||
''' <summary>
|
||||
''' 任务开始
|
||||
''' </summary>
|
||||
Sub Start()
|
||||
|
||||
''' <summary>
|
||||
''' 任务退出
|
||||
''' </summary>
|
||||
Sub [Stop]()
|
||||
|
||||
''' <summary>
|
||||
''' 重启任务
|
||||
''' </summary>
|
||||
Sub Restart()
|
||||
|
||||
''' <summary>
|
||||
''' 设置任务参数
|
||||
''' </summary>
|
||||
''' <param name="params"></param>
|
||||
Sub SetParams(params As Dictionary(Of String, String))
|
||||
|
||||
''' <summary>
|
||||
''' 获取任务参数
|
||||
''' </summary>
|
||||
''' <returns></returns>
|
||||
Function GetParams() As Dictionary(Of String, String)
|
||||
|
||||
|
||||
End Interface
|
||||
127
AUTS_DataService/ServiceTask/JsonFileListen.vb
Normal file
127
AUTS_DataService/ServiceTask/JsonFileListen.vb
Normal file
@@ -0,0 +1,127 @@
|
||||
Imports System.IO
|
||||
Imports Newtonsoft.Json
|
||||
|
||||
Imports UTS_Core.Database
|
||||
Imports UTS_Core.UTSModule.DbConnect
|
||||
|
||||
Public Class JsonFileListen
|
||||
|
||||
''' <summary>
|
||||
''' 读取文件并将内容保存在本地数据库中
|
||||
''' </summary>
|
||||
''' <param name="path"></param>
|
||||
''' <returns></returns>
|
||||
Public Function ReadFile(db As DbExecutor, path As String) As Boolean
|
||||
ServiceLog.WriteInfoLog($"FileListen Begin ReadFile!")
|
||||
|
||||
If File.Exists(path) = False Then
|
||||
Throw New Exception($"{path}不存在!")
|
||||
End If
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Get fileLength!")
|
||||
|
||||
Dim fileInfo As New FileInfo(path) '获取文件大小
|
||||
If fileInfo.Length = 0 Then Return True
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Begin Read Text String!")
|
||||
|
||||
Using reader As New StreamReader(path)
|
||||
While reader.Peek() <> -1
|
||||
Dim line As String = reader.ReadLine()
|
||||
If String.IsNullOrWhiteSpace(line) Then Continue While '过滤信息
|
||||
line = line.Replace("\", "\\") '处理转义字符
|
||||
|
||||
Dim tmpField As Dictionary(Of String, String)
|
||||
Try
|
||||
tmpField = CType(JsonConvert.DeserializeObject(line, GetType(Dictionary(Of String, String))), Dictionary(Of String, String))
|
||||
Catch ex As Exception
|
||||
ServiceLog.WriteErrorLog($"FileListen JsonConvertToKeyValue Error:{ex.Message};JsonString:{line}")
|
||||
Continue While
|
||||
End Try
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Filter Text String!")
|
||||
|
||||
''过滤缺失字段字符串
|
||||
If tmpField.ContainsKey("ProjectName") = False OrElse tmpField.ContainsKey("StationName") = False Then
|
||||
ServiceLog.WriteErrorLog($"FileListen Unexist ProjectName Or StationName Filed,JsonString:{line}")
|
||||
Continue While
|
||||
End If
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Filter Text String Success!")
|
||||
|
||||
'数据处理,根据表名进行分类,增删必要字段
|
||||
Dim tableName As String
|
||||
Try
|
||||
Dim projectIndex As Integer = DbConnector.SearchProjectIndex(db, tmpField("ProjectName"))
|
||||
Dim stationIndex As Integer = DbConnector.SearchStationIndex(db, projectIndex, tmpField("StationName"))
|
||||
tableName = UTS_Core.UTSModule.DbTableModel.Customer.TestLogTable.TableName(projectIndex, stationIndex)
|
||||
Catch ex As Exception
|
||||
ServiceLog.WriteErrorLog($"FileListen GetTestLogTableName Error:{ex.Message};ProjectName:{tmpField("ProjectName")};StationName:{tmpField("StationName")}")
|
||||
Continue While
|
||||
End Try
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Begin CreateTestLogTable!")
|
||||
|
||||
Try '本地表格新建
|
||||
DbConnector.UtsCreateTestLogTableToLocal(db, tableName)
|
||||
Catch ex As Exception
|
||||
ServiceLog.WriteErrorLog($"FileListen CreateTestLogTable Error:{ex.Message}")
|
||||
Continue While
|
||||
End Try
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Begin CreateTestLogTable Success!")
|
||||
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Begin Filter Db Filed!")
|
||||
|
||||
|
||||
Dim fields As New Dictionary(Of String, String) '入库字段
|
||||
For Each valuePair As KeyValuePair(Of String, String) In tmpField
|
||||
If String.IsNullOrWhiteSpace(valuePair.Key) Then Continue For
|
||||
If String.Compare(valuePair.Key, "ProjectName", True) = 0 Then Continue For
|
||||
If String.Compare(valuePair.Key, "StationName", True) = 0 Then Continue For
|
||||
|
||||
If String.Compare(valuePair.Key, "ProductionLineID", True) = 0 Then
|
||||
If String.IsNullOrWhiteSpace(valuePair.Value) Then Continue For
|
||||
End If
|
||||
|
||||
If String.Compare(valuePair.Key, "OrderID", True) = 0 Then
|
||||
If String.IsNullOrWhiteSpace(valuePair.Value) Then Continue For
|
||||
End If
|
||||
|
||||
If String.Compare(valuePair.Key, "ServiceID", True) = 0 Then
|
||||
If String.IsNullOrWhiteSpace(valuePair.Value) Then Continue For
|
||||
End If
|
||||
|
||||
If String.Compare(valuePair.Key, "UserID", True) = 0 Then
|
||||
If String.IsNullOrWhiteSpace(valuePair.Value) Then Continue For
|
||||
End If
|
||||
|
||||
fields.Add(valuePair.Key, valuePair.Value) '过滤字段,保留有效字段
|
||||
Next
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Begin Filter Db Filed Success!")
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen SaveTestRecord!")
|
||||
Try '存储
|
||||
DbConnector.UtsInsertToLocal(db, UTS_Core.UTSModule.UtsDb.RemotePrivateDb, tableName, fields)
|
||||
Catch ex As Exception
|
||||
ServiceLog.WriteErrorLog($"FileListen SaveTestRecord Error:{ex.Message}")
|
||||
Continue While
|
||||
End Try
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen SaveTestRecord Success!")
|
||||
End While
|
||||
|
||||
reader.Close()
|
||||
End Using
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Begin Clear!")
|
||||
|
||||
File.WriteAllText(path, String.Empty) '清空文件内容,由于多地同时写的原因,有可能会失败
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Success!")
|
||||
|
||||
Return True
|
||||
End Function
|
||||
End Class
|
||||
162
AUTS_DataService/ServiceTask/ListenJsonFileServiceTask.vb
Normal file
162
AUTS_DataService/ServiceTask/ListenJsonFileServiceTask.vb
Normal file
@@ -0,0 +1,162 @@
|
||||
Imports System.Threading
|
||||
Imports System.Xml.Serialization
|
||||
Imports UTS_Core.UTSModule.Service
|
||||
|
||||
<XmlInclude(GetType(ListenJsonFileServiceTask))>
|
||||
Public Class ListenJsonFileServiceTask
|
||||
Inherits ServiceTask
|
||||
|
||||
Private ReadOnly _jsonFile As JsonFileListen
|
||||
|
||||
Sub New()
|
||||
TaskType = ServiceTaskTypeEnum.ListenJsonFile
|
||||
TaskStatus = ServiceTaskStatusEnum.Stop
|
||||
TaskName = "ListenJsonFile"
|
||||
|
||||
_jsonFile = New JsonFileListen()
|
||||
End Sub
|
||||
|
||||
Sub New(name As String)
|
||||
TaskType = ServiceTaskTypeEnum.ListenJsonFile
|
||||
TaskStatus = ServiceTaskStatusEnum.Stop
|
||||
TaskName = name
|
||||
|
||||
_jsonFile = New JsonFileListen()
|
||||
End Sub
|
||||
|
||||
Sub New(name As String, param As Dictionary(Of String, String))
|
||||
TaskType = ServiceTaskTypeEnum.ListenJsonFile
|
||||
TaskStatus = ServiceTaskStatusEnum.Stop
|
||||
TaskName = name
|
||||
|
||||
_jsonFile = New JsonFileListen()
|
||||
|
||||
SetParams(param)
|
||||
|
||||
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 具体的执行过程
|
||||
''' </summary>
|
||||
''' <param name="stat">状态值,暂未启用</param>
|
||||
Private Sub StartCallback(stat As Object)
|
||||
While TaskStatus = ServiceTaskStatusEnum.Start
|
||||
ServiceLog.WriteInfoLog($"SyncTask Wait Timeout.")
|
||||
|
||||
'等待任务
|
||||
Dim lastTime As DateTime = Now
|
||||
While (Now - lastTime).TotalMinutes < Interval
|
||||
If TaskStatus = ServiceTaskStatusEnum.Stop Then
|
||||
Return
|
||||
End If
|
||||
|
||||
Thread.Sleep(1000)
|
||||
End While
|
||||
|
||||
|
||||
Using localDb As New UTS_Core.Database.DbExecutor(UTS_Core.UTSModule.UtsDb.LocalDbType, UTS_Core.UTSModule.UtsDb.LocalConnString) '连接本地字符串
|
||||
localDb.Open()
|
||||
localDb.BeginTransaction()
|
||||
Try
|
||||
_jsonFile.ReadFile(localDb, FilePath)
|
||||
Catch ex As Exception
|
||||
ServiceLog.WriteErrorLog($"_jsonFile.ReadFile TaskName:{TaskName},ListenJsonFile Error:{ex.Message}")
|
||||
End Try
|
||||
localDb.CommitTransaction()
|
||||
localDb.Close()
|
||||
End Using
|
||||
End While
|
||||
End Sub
|
||||
|
||||
|
||||
Public Overrides Sub Start()
|
||||
ServiceLog.WriteInfoLog($"FileListen Befor StartCallback")
|
||||
|
||||
TaskStatus = ServiceTaskStatusEnum.Start
|
||||
|
||||
If _syncThread IsNot Nothing AndAlso _syncThread.IsAlive Then
|
||||
ServiceLog.WriteInfoLog($"FileListenThread IsAlive!")
|
||||
Return
|
||||
End If
|
||||
|
||||
_syncThread = New Thread(AddressOf StartCallback)
|
||||
_syncThread.IsBackground = True
|
||||
_syncThread.Start()
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen After StartCallback")
|
||||
End Sub
|
||||
|
||||
|
||||
Public Overrides Sub [Stop]()
|
||||
ServiceLog.WriteInfoLog($"FileListen Befor Stop!")
|
||||
TaskStatus = ServiceTaskStatusEnum.Stop
|
||||
|
||||
Dim lastTime As DateTime = Now
|
||||
While _syncThread IsNot Nothing AndAlso _syncThread.IsAlive
|
||||
If (Now - lastTime).TotalMilliseconds > 10000 Then
|
||||
_syncThread.Abort()
|
||||
End If
|
||||
|
||||
Thread.Sleep(10)
|
||||
End While
|
||||
|
||||
ServiceLog.WriteInfoLog($"FileListen Stop!")
|
||||
End Sub
|
||||
|
||||
|
||||
Public Overrides Sub Restart()
|
||||
ServiceLog.WriteInfoLog($"FileListen Befor Restart!")
|
||||
If _syncThread IsNot Nothing AndAlso _syncThread.IsAlive Then
|
||||
[Stop]()
|
||||
|
||||
End If
|
||||
|
||||
Start()
|
||||
ServiceLog.WriteInfoLog($"FileListen Restart!")
|
||||
End Sub
|
||||
|
||||
|
||||
Public Overrides Sub SetParams(params As Dictionary(Of String, String))
|
||||
Dim tmpStatus As ServiceTaskStatusEnum
|
||||
Dim tmpInterval As Integer
|
||||
Dim tmpPath As String = String.Empty
|
||||
|
||||
'将转换后的数据填充至临时缓存,一遍设置失败时可以保留上一次数据
|
||||
For Each param As KeyValuePair(Of String, String) In params
|
||||
Select Case param.Key
|
||||
Case "Interval"
|
||||
If Integer.TryParse(param.Value, tmpInterval) = False Then
|
||||
Throw New Exception($"Error Interval :{param.Value}")
|
||||
End If
|
||||
Case "Path"
|
||||
tmpPath = param.Value
|
||||
Case "Status", "Type", "Name" '不处理的字段
|
||||
End Select
|
||||
Next
|
||||
|
||||
TaskStatus = tmpStatus
|
||||
Interval = tmpInterval
|
||||
FilePath = tmpPath
|
||||
End Sub
|
||||
|
||||
|
||||
Public Overrides Function GetParams() As Dictionary(Of String, String)
|
||||
Dim params As New Dictionary(Of String, String) From {
|
||||
{"Type", TaskType.ToString()},
|
||||
{"Name", TaskName},
|
||||
{"Status", TaskStatus.ToString()},
|
||||
{"Interval", Interval.ToString()},
|
||||
{"Path", FilePath}
|
||||
}
|
||||
Return params
|
||||
End Function
|
||||
|
||||
|
||||
Public Property Interval() As Integer
|
||||
|
||||
Public Property FilePath() As String
|
||||
|
||||
Private _syncThread As Thread
|
||||
End Class
|
||||
88
AUTS_DataService/ServiceTask/ServiceTask.vb
Normal file
88
AUTS_DataService/ServiceTask/ServiceTask.vb
Normal file
@@ -0,0 +1,88 @@
|
||||
Imports System.Xml.Serialization
|
||||
|
||||
|
||||
<Serializable>
|
||||
<XmlInclude(GetType(DbSyncServiceTask))>
|
||||
<XmlInclude(GetType(ListenJsonFileServiceTask))>
|
||||
Public MustInherit Class ServiceTask
|
||||
Implements IServiceTask
|
||||
|
||||
''' <summary>
|
||||
''' 服务任务类型枚举集合
|
||||
''' </summary>
|
||||
Enum ServiceTaskTypeEnum
|
||||
''' <summary>
|
||||
''' 数据库同步
|
||||
''' </summary>
|
||||
DbSync
|
||||
|
||||
''' <summary>
|
||||
''' 监听Json文件
|
||||
''' </summary>
|
||||
ListenJsonFile
|
||||
End Enum
|
||||
|
||||
''' <summary>
|
||||
''' 服务任务状态枚举值
|
||||
''' </summary>
|
||||
Enum ServiceTaskStatusEnum
|
||||
''' <summary>
|
||||
''' 启动状态
|
||||
''' </summary>
|
||||
Start
|
||||
|
||||
''' <summary>
|
||||
''' 停止状态
|
||||
''' </summary>
|
||||
[Stop]
|
||||
End Enum
|
||||
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 服务任务类型
|
||||
''' </summary>
|
||||
''' <returns></returns>
|
||||
Public Property TaskType() As ServiceTaskTypeEnum
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 服务任务名,服务任务的唯一索引
|
||||
''' </summary>
|
||||
''' <returns></returns>
|
||||
Public Property TaskName() As String
|
||||
|
||||
''' <summary>
|
||||
''' 服务任务的状态
|
||||
''' </summary>
|
||||
''' <returns></returns>
|
||||
Public Property TaskStatus() As ServiceTaskStatusEnum
|
||||
|
||||
''' <summary>
|
||||
''' 任务开启
|
||||
''' </summary>
|
||||
Public MustOverride Sub Start() Implements IServiceTask.Start
|
||||
|
||||
''' <summary>
|
||||
''' 任务停止
|
||||
''' </summary>
|
||||
Public MustOverride Sub [Stop]() Implements IServiceTask.[Stop]
|
||||
|
||||
''' <summary>
|
||||
''' 任务重启
|
||||
''' </summary>
|
||||
Public MustOverride Sub Restart() Implements IServiceTask.Restart
|
||||
|
||||
''' <summary>
|
||||
''' 任务参数集合设置
|
||||
''' </summary>
|
||||
''' <param name="params">任务参数键值对</param>
|
||||
Public MustOverride Sub SetParams(params As Dictionary(Of String, String)) Implements IServiceTask.SetParams
|
||||
|
||||
''' <summary>
|
||||
''' 任务参数集合获取
|
||||
''' </summary>
|
||||
''' <returns></returns>
|
||||
Public MustOverride Function GetParams() As Dictionary(Of String, String) Implements IServiceTask.GetParams
|
||||
|
||||
End Class
|
||||
230
AUTS_DataService/ServiceTask/ServiceTasks.vb
Normal file
230
AUTS_DataService/ServiceTask/ServiceTasks.vb
Normal file
@@ -0,0 +1,230 @@
|
||||
Imports UTS_Core.UTSModule.Service
|
||||
|
||||
''' <summary>
|
||||
''' 服务任务列表,管理服务的所有任务
|
||||
''' </summary>
|
||||
Public Class ServiceTasks
|
||||
Private ReadOnly _tasks As Dictionary(Of String, ServiceTask)
|
||||
|
||||
Public Event AddTask(task As Dictionary(Of String, String))
|
||||
Public Event UpdateTask(task As Dictionary(Of String, String))
|
||||
Public Event DelTask(task As String)
|
||||
Public Event ClearTask()
|
||||
|
||||
|
||||
|
||||
Sub New()
|
||||
_tasks = New Dictionary(Of String, ServiceTask)()
|
||||
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 任务总数
|
||||
''' </summary>
|
||||
''' <returns></returns>
|
||||
Public Function Count() As Integer
|
||||
Return _tasks.Count
|
||||
End Function
|
||||
|
||||
''' <summary>
|
||||
''' 获取所有的服务任务
|
||||
''' </summary>
|
||||
''' <returns></returns>
|
||||
Public Function GetAllServiceTasks() As List(Of ServiceTask)
|
||||
Return _tasks.Values.ToList()
|
||||
End Function
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 添加任务,默认开启任务
|
||||
''' </summary>
|
||||
Public Sub Add(task As ServiceTask, Optional start As Boolean = True)
|
||||
If String.IsNullOrEmpty(task.TaskName) Then '无效的任务名
|
||||
Throw New Exception($"TaskName {task.TaskName} is invalid!")
|
||||
End If
|
||||
|
||||
If _tasks.ContainsKey(task.TaskName) Then '已存在的任务名
|
||||
Throw New Exception($"TaskName {task.TaskName} already exists!")
|
||||
End If
|
||||
|
||||
_tasks.Add(task.TaskName, task)
|
||||
If start Then task.Start() '默认添加任务时,开启任务
|
||||
RaiseEvent AddTask(task.GetParams())
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 添加任务,默认开启服务
|
||||
''' </summary>
|
||||
Public Sub Add(taskInfo As Dictionary(Of String, String), Optional start As Boolean = True)
|
||||
If taskInfo.ContainsKey("Type") AndAlso taskInfo.ContainsKey("Name") Then
|
||||
Dim task As ServiceTask
|
||||
Select Case taskInfo("Type")'根据类型进行不同任务的初始化
|
||||
Case $"{ ServiceTask.ServiceTaskTypeEnum.DbSync}"
|
||||
task = New DbSyncServiceTask(taskInfo("Name"), taskInfo)
|
||||
Add(task, start)
|
||||
ServiceLog.WriteInfoLog($"taskParam:{task.TaskName} {task.TaskStatus} {task.TaskType}")
|
||||
Case $"{ ServiceTask.ServiceTaskTypeEnum.ListenJsonFile}"
|
||||
task = New ListenJsonFileServiceTask(taskInfo("Name"), taskInfo)
|
||||
Add(task, start)
|
||||
Case Else
|
||||
Throw New Exception($"Unknown Type:{taskInfo("Type")}")
|
||||
End Select
|
||||
Else
|
||||
Throw New Exception($"AddTask Invalid TaskInfo")
|
||||
End If
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 添加任务列表
|
||||
''' </summary>
|
||||
''' <param name="tasks"></param>
|
||||
Public Sub AddRange(tasks As List(Of ServiceTask), Optional start As Boolean = True)
|
||||
For Each task As ServiceTask In tasks
|
||||
Add(task, start)
|
||||
Next
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 移除指定的服务任务
|
||||
''' </summary>
|
||||
''' <param name="task"></param>
|
||||
Sub Remove(task As ServiceTask)
|
||||
If _tasks.ContainsKey(task.TaskName) Then
|
||||
task.Stop()
|
||||
RaiseEvent DelTask(task.TaskName)
|
||||
_tasks.Remove(task.TaskName)
|
||||
End If
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 移除指定名称的服务任务
|
||||
''' </summary>
|
||||
Sub RemoveAt(taskName As String)
|
||||
If _tasks.ContainsKey(taskName) Then
|
||||
Remove(_tasks.Item(taskName))
|
||||
End If
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
''' 清空所有的服务任务
|
||||
''' </summary>
|
||||
Sub Clear()
|
||||
'关闭所有任务
|
||||
For Each task As KeyValuePair(Of String, ServiceTask) In _tasks
|
||||
task.Value.Stop()
|
||||
Next
|
||||
|
||||
'清空任务列表
|
||||
_tasks.Clear()
|
||||
RaiseEvent ClearTask()
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 设置指定任务的任务参数
|
||||
''' </summary>
|
||||
''' <param name="taskName"></param>
|
||||
''' <param name="param"></param>
|
||||
Public Sub SetTaskParams(taskName As String, param As Dictionary(Of String, String))
|
||||
If _tasks.ContainsKey(taskName) = False Then '不存在的任务
|
||||
Throw New Exception($"TaskName {taskName} is nonexistent!")
|
||||
End If
|
||||
_tasks.Item(taskName).SetParams(param)
|
||||
RaiseEvent UpdateTask(_tasks.Item(taskName).GetParams())
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 返回指定任务名称的任务参数
|
||||
''' </summary>
|
||||
''' <param name="taskName"></param>
|
||||
''' <returns></returns>
|
||||
Public Function GetTaskParams(taskName As String) As Dictionary(Of String, String)
|
||||
If _tasks.ContainsKey(taskName) = False Then '不存在的任务
|
||||
Throw New Exception($"TaskName {taskName} is nonexistent!")
|
||||
End If
|
||||
Return _tasks.Item(taskName).GetParams()
|
||||
End Function
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 获取所有任务的参数信息
|
||||
''' </summary>
|
||||
''' <returns></returns>
|
||||
Public Function GetAllTasksParam() As List(Of Dictionary(Of String, String))
|
||||
Dim allParams As New List(Of Dictionary(Of String, String))
|
||||
For Each task As KeyValuePair(Of String, ServiceTask) In _tasks
|
||||
allParams.Add(task.Value.GetParams())
|
||||
Next
|
||||
Return allParams
|
||||
End Function
|
||||
|
||||
Public Sub StartTask(taskName As String)
|
||||
If _tasks.ContainsKey(taskName) = False Then '不存在的任务
|
||||
Throw New Exception($"TaskName {taskName} is nonexistent!")
|
||||
End If
|
||||
|
||||
_tasks.Item(taskName).Start()
|
||||
RaiseEvent UpdateTask(_tasks.Item(taskName).GetParams())
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
'''开启所有任务
|
||||
''' </summary>
|
||||
Public Sub StartAllTasks()
|
||||
For Each task As KeyValuePair(Of String, ServiceTask) In _tasks
|
||||
task.Value.Start()
|
||||
RaiseEvent UpdateTask(task.Value.GetParams())
|
||||
Next
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 结束休眠,立即执行任务
|
||||
''' </summary>
|
||||
''' <param name="taskName"></param>
|
||||
Public Sub RestartTask(taskName As String)
|
||||
If _tasks.ContainsKey(taskName) = False Then '不存在的任务
|
||||
Throw New Exception($"TaskName {taskName} is nonexistent!")
|
||||
End If
|
||||
|
||||
_tasks.Item(taskName).Restart()
|
||||
RaiseEvent UpdateTask(_tasks.Item(taskName).GetParams())
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
'''开启所有任务
|
||||
''' </summary>
|
||||
Public Sub RestartAllTasks()
|
||||
For Each task As KeyValuePair(Of String, ServiceTask) In _tasks
|
||||
task.Value.Restart()
|
||||
RaiseEvent UpdateTask(task.Value.GetParams())
|
||||
Next
|
||||
End Sub
|
||||
|
||||
|
||||
''' <summary>
|
||||
''' 停止指定任务名的任务
|
||||
''' </summary>
|
||||
''' <param name="taskName"></param>
|
||||
Public Sub StopTask(taskName As String)
|
||||
If _tasks.ContainsKey(taskName) = False Then '不存在的任务
|
||||
Throw New Exception($"TaskName {taskName} is nonexistent!")
|
||||
End If
|
||||
|
||||
_tasks.Item(taskName).Stop()
|
||||
RaiseEvent UpdateTask(_tasks.Item(taskName).GetParams())
|
||||
End Sub
|
||||
|
||||
''' <summary>
|
||||
'''停止所有任务
|
||||
''' </summary>
|
||||
Public Sub StopAllTasks()
|
||||
For Each task As KeyValuePair(Of String, ServiceTask) In _tasks
|
||||
task.Value.Stop()
|
||||
RaiseEvent UpdateTask(task.Value.GetParams())
|
||||
Next
|
||||
End Sub
|
||||
End Class
|
||||
Reference in New Issue
Block a user