Imports System.Data.Common
Imports System.Text
Imports MySql.Data.MySqlClient
Imports UTS_Core.Database
Imports UTS_Core.UTSModule.DbTableModel
'''
''' 数据库同步器
'''
Public Class DbSynchronizer
'''
''' 同步表内容
'''
Structure SyncTableScheme
''' 表名
Public TableName As String
''' 同步类型
Public SyncType As String
''' 是否存在当前数据表
Public IsExistsTable As Boolean
''' 更新版本
Public RevisionID As String
''' 上一次更新时间字符串
Public LastSyncTime As String
''' 同步后更新的时间字符串
Public NowSyncTime As String
End Structure
'''
''' 同步数据库参数
'''
Structure SyncParam
Public PublicDb As String
Public PrivateDb As String
Public RemoteType As DbExecutor.DbTypeEnum
Public RemoteConnString As String
Public LocalType As DbExecutor.DbTypeEnum
Public LocalConnString As String
End Structure
Sub New(param As SyncParam)
Parameters = param
End Sub
'''
''' 连接参数
'''
'''
Public Property Parameters() As SyncParam
'''
''' 同步数据库,开始同步
'''
Public Sub SyncDatabase()
Using remoteDb As New DbExecutor(Parameters.RemoteType, Parameters.RemoteConnString)
remoteDb.Open()
Using localDb As New DbExecutor(Parameters.LocalType, Parameters.LocalConnString)
localDb.Open()
SyncDatabase(localDb, remoteDb, Parameters.PublicDb, Parameters.PrivateDb)
localDb.Close()
End Using
remoteDb.Close()
End Using
End Sub
'''
''' 同步数据库执行过程
'''
''' 本地数据库执行器
''' 云端数据库执行器
''' 远端数据库公共库名
''' 远端数据库私有库名
Public Sub SyncDatabase(localDb As DbExecutor, remoteDb As DbExecutor, publicDb As String, privateDb As String)
If remoteDb Is Nothing Then Return
If localDb Is Nothing Then Return
'上传本地缓存数据
remoteDb.BeginTransaction()
UploadCacheData(remoteDb, localDb)
remoteDb.CommitTransaction()
ServiceLog.WriteDebugLog($"UploadCacheData Successded!")
'获取需要公有下载的数据表
DownloadSyncTable(remoteDb, localDb, publicDb, Manage.SyncListTable.TableName)
ServiceLog.WriteDebugLog($"Download Puclic SyncTable Successded!")
'获取需要私有下载的数据表
DownloadSyncTable(remoteDb, localDb, privateDb, Customer.SyncListTable.TableName)
ServiceLog.WriteDebugLog($"Download Private SyncTable Successded!")
End Sub
'''
''' 上传本地缓存数据,DataTable的方式
'''
''' 远程数据库执行器
''' 本地数据库执行器
Private Sub UploadCacheData(remoteDb As DbExecutor, localDb As DbExecutor)
CreateCacheTableWhenNotExists(localDb)
Dim dataTable As DataTable = SearchCacheData(localDb)
UploadCacheData(remoteDb, localDb, dataTable)
End Sub
#Region "上传本地缓存数据"
'''
''' 本地创建缓存记录表,如果不存在则创建
'''
''' 本地数据库的执行器
Private Sub CreateCacheTableWhenNotExists(localDb As DbExecutor)
localDb.ExecuteNonQuery(LocalPrivate.CacheTable.CreateTableString)
End Sub
'''
''' 查询本地未上传的数据,单次查询5000条数据
'''
''' 本地数据库的执行器
'''
Private Function SearchCacheData(localDb As DbExecutor) As DataTable
Dim tableName As String = LocalPrivate.CacheTable.TableName
Dim colNames As String = $"{LocalPrivate.CacheTable.ColNamesEnum.ID},{LocalPrivate.CacheTable.ColNamesEnum.SqlCmd}"
Dim condition As String = $"{LocalPrivate.CacheTable.ColNamesEnum.IsUpload} = 0 order by `{LocalPrivate.CacheTable.ColNamesEnum.ID}` Limit 5000"
Dim cmdText As String = localDb.CmdHelper.Search(colNames, tableName, condition)
Return localDb.ExecuteDataTable(cmdText)
End Function
'''
''' 上传本地数据至云端数据库
'''
'''
'''
'''
Private Sub UploadCacheData(remoteDb As DbExecutor, localDb As DbExecutor, dataTable As DataTable)
ServiceLog.WriteDebugLog($"Begin Upload Cache DataTable [{dataTable.Rows.Count}] Rows!")
For Each row As DataRow In dataTable.Rows
Try
Dim cmdText As String = row(LocalPrivate.CacheTable.ColNamesEnum.SqlCmd.ToString()).ToString()
remoteDb.ExecuteNonQuery(cmdText)
Catch ex As MySqlException
'记录失败
ServiceLog.WriteErrorLog($"UploadData Error:{ex.Message},ErrorNumber:{ex.Number}")
Select Case ex.Number
Case MySqlErrorCode.DuplicateFieldName '重复字段
ServiceLog.WriteInfoLog($"DuplicateFieldName,Ignore the error!")
Case Else
ServiceLog.WriteWarningLog($"Unknown,Continue to perform!")
Exit For
End Select
Catch ex As Exception
'记录失败
ServiceLog.WriteErrorLog($"UploadData Error:{ex.Message}")
ServiceError.RecodeError("DbSync", $"UploadData Error:{ex.Message}")
Exit For
End Try
Try
DeleteUploadData(localDb, row(LocalPrivate.CacheTable.ColNamesEnum.ID).ToString)
Catch ex As Exception
'记录失败
ServiceLog.WriteErrorLog($"UpdateCacheTable Error:{ex.Message}")
End Try
Next
End Sub
'''
''' 数据上传完成后,删除本地数据库中对应序号的记录
'''
'''
'''
Private Sub DeleteUploadData(localDb As DbExecutor, sn As String)
Dim tableName As String = LocalPrivate.CacheTable.TableName
Dim condition As String = $"{LocalPrivate.CacheTable.ColNamesEnum.ID} = '{sn}'"
localDb.ExecuteNonQuery(localDb.CmdHelper.DeleteRows(tableName, condition))
End Sub
#End Region
#Region "获取需要下载表集合"
'''
''' 比对本地与云端版本表,下载变化的数据表
'''
'''
'''
'''
'''
Public Sub DownloadSyncTable(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, tbName As String)
Dim downloadTables As List(Of SyncTableScheme)
downloadTables = CompareVersionTable(remoteDb, localDb, dbName, tbName)
UpdateTables(remoteDb, localDb, dbName, downloadTables) '更新本地数据
End Sub
'''
''' 比较本地与云端版本表的差异,获取需要下载的数据表
'''
'''
'''
'''
'''
Private Function CompareVersionTable(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, tableName As String) As List(Of SyncTableScheme)
CreateVersionTableWhenNotExists(localDb)
Dim localTable As DataTable = localDb.ExecuteDataTable(localDb.CmdHelper.SearchAll(Customer.SyncListTable.TableName))
Dim remoteTable As DataTable = remoteDb.ExecuteDataTable(remoteDb.CmdHelper.DbSearchAll(dbName, tableName))
Return CompareVersionTable(localTable, remoteTable)
End Function
'''
''' 创建本地数据库表版本记录表
'''
'''
Private Sub CreateVersionTableWhenNotExists(localDb As DbExecutor)
Try
localDb.ExecuteNonQuery(Customer.SyncListTable.CreateTableString(localDb.DatabaseType))
Catch ex As Exception
'记录失败
ServiceLog.WriteErrorLog($"CreateVersionTableWhenNotExists Error:{ex.Message}")
End Try
End Sub
'''
''' 比较本地与云端版本表的差异
'''
''' 源DataTable
''' 目标DataTable
''' 差异信息列表
Private Function CompareVersionTable(srcDataTable As DataTable, destDataTable As DataTable) As List(Of SyncTableScheme)
Dim result As New List(Of SyncTableScheme)
Dim srcTableName As String
Dim destTableName As String
Dim isFind As Boolean
Dim srcReVId As String = String.Empty
Dim destRevId As String
For i As Integer = 0 To destDataTable.Rows.Count - 1
destTableName = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.TableName}")}"
isFind = False
For j As Integer = 0 To srcDataTable.Rows.Count - 1
srcTableName = $"{srcDataTable.Rows(j)($"{Customer.SyncListTable.ColNamesEnum.TableName}")}"
If String.Compare(destTableName, srcTableName, True) = 0 Then
srcReVId = $"{srcDataTable.Rows(j)($"{Customer.SyncListTable.ColNamesEnum.RevisionID}")}"
isFind = True
Exit For
End If
Next
If isFind Then
destRevId = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.RevisionID}")}"
If String.Compare(srcReVId, destRevId, True) <> 0 Then '源表中存在记录且与目的表记录时间不相同
result.Add(New SyncTableScheme() With {.TableName = destTableName,
.SyncType = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.SyncType}")}",
.LastSyncTime = $"{srcDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.SyncTime}")}",
.RevisionID = destRevId,
.IsExistsTable = True})
End If
Else
result.Add(New SyncTableScheme() With {.TableName = destTableName,
.SyncType = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.SyncType}")}",
.LastSyncTime = $"2000-01-01 00:00:00",
.RevisionID = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.RevisionID}")}",
.IsExistsTable = False})
End If
Next
Return result
End Function
#End Region
#Region "下载远程数据至本地"
'''
''' 更新下载数据表
'''
''' 远程数据库执行器
''' 本地数据库执行器
''' 需要同步的数据表信息集合
Private Sub UpdateTables(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, tables As List(Of SyncTableScheme))
Dim ts As Date
Try
ts = CDate(remoteDb.ExecuteScalar("select current_timestamp()"))
Catch ex As Exception
ServiceLog.WriteErrorLog($"Get DB System Time Error:{ex.Message}")
Return
End Try
For Each table As SyncTableScheme In tables
table.NowSyncTime = ts.ToString("yyyy-MM-dd HH:mm:ss")
localDb.BeginTransaction()
Try
If String.Compare(table.SyncType, "all", True) = 0 Then
UpdateTableForAll(remoteDb, localDb, dbName, table)
ElseIf String.Compare(table.SyncType, "new", True) = 0 Then
'增量更新Update限制,必须包含ID、UpdateTime字段,且ID字段为主键
UpdateTableForNew(remoteDb, localDb, dbName, table)
Else
UpdateTableForAll(remoteDb, localDb, dbName, table)
End If
UpdateVersionTable(localDb, table)
Catch ex As Exception
ServiceLog.WriteErrorLog($"UpdateTable {table.TableName} Error:{ex.Message}")
End Try
localDb.CommitTransaction()
Next
End Sub
'''
''' 下载修改全表修改部分的方式下载表
'''
'''
'''
'''
'''
Private Sub UpdateTableForNew(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, table As SyncTableScheme)
Dim maxID As Integer = 0
If table.IsExistsTable = False Then
Dim remoteTable As DataTable = remoteDb.ExecuteDataTable(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName, "1 = 0"))
Dim createTableString As String = CreateTableStringByDataTable(table.TableName, remoteTable) '获取建表语句
localDb.ExecuteNonQuery(createTableString)
Else
'查询本地最大ID,主键必须为ID
Dim cmd As String = localDb.CmdHelper.SearchOrder(New List(Of String)() From {"ID"}, table.TableName, "ORDER BY [ID] DESC LIMIT 1")
Dim maxIdObject As Object = localDb.ExecuteScalar(cmd)
If IsDBNull(maxIdObject) = False Then
maxID = CInt(maxIdObject)
End If
End If
'通过dataReader插入数据,查询远程更新时间后的所有内容
Using reader As DbDataReader = remoteDb.ExecuteReader(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName, $"`UpdateTime` > '{table.LastSyncTime}'"))
While reader.Read()
If CInt(reader("ID")) > maxID Then '小于最大ID则更新,大于最大ID则插入
InsertDataByDataReader(localDb, table.TableName, reader)
Else
UpdateDataByDataReader(localDb, table.TableName, reader)
End If
End While
reader.Close()
End Using
End Sub
'''
''' 更新全表的方式下载表
'''
'''
'''
'''
Private Sub UpdateTableForAll(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, table As SyncTableScheme)
'删除本地表
If table.IsExistsTable Then
localDb.ExecuteNonQuery(localDb.CmdHelper.DropTable(table.TableName))
End If
'新建表(考虑是否应该查询总数后,依此取1000行的方式下载,或考虑使用DataReader)
Dim remoteTable As DataTable = remoteDb.ExecuteDataTable(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName, "1 = 0"))
Dim createTableString As String = CreateTableStringByDataTable(table.TableName, remoteTable) '获取建表语句
localDb.ExecuteNonQuery(createTableString)
'通过dataReader插入数据
Using reader As DbDataReader = remoteDb.ExecuteReader(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName))
While reader.Read()
InsertDataByDataReader(localDb, table.TableName, reader)
End While
reader.Close()
End Using
End Sub
Private Sub UpdateDataByDataReader(localDb As DbExecutor, tableName As String, dbReader As DbDataReader)
Dim colNames As New List(Of String)
Using comm As DbCommand = localDb.CreateCommand()
For i As Integer = 0 To dbReader.FieldCount - 1
colNames.Add(dbReader.GetName(i))
Dim param As DbParameter = comm.CreateParameter()
param.DbType = DbType.Object
param.ParameterName = $"@{dbReader.GetName(i)}"
param.Value = dbReader.Item(i)
comm.Parameters.Add(param)
Next
comm.CommandText = localDb.CmdHelper.UpdateParam(tableName, colNames, $"`ID` = {dbReader("ID")}")
comm.ExecuteNonQuery()
End Using
End Sub
Private Sub InsertDataByDataReader(localDb As DbExecutor, tableName As String, dbReader As DbDataReader)
Dim colNames As New List(Of String)
Using comm As DbCommand = localDb.CreateCommand()
For i As Integer = 0 To dbReader.FieldCount - 1
colNames.Add(dbReader.GetName(i))
Dim param As DbParameter = comm.CreateParameter()
param.DbType = DbType.Object
param.ParameterName = $"@{dbReader.GetName(i)}"
param.Value = dbReader.Item(i)
comm.Parameters.Add(param)
Next
comm.CommandText = localDb.CmdHelper.InsertParam(tableName, colNames)
comm.ExecuteNonQuery()
End Using
End Sub
'''
''' 将目标DataTable的所有数据插入到指定数据表中,不检测是否需要新增列
'''
''' 后续插入字符需要区别数值与字符串
'''
'''
'''
'''
Private Sub InsertDataByDataTable(localDb As DbExecutor, tableName As String, dataTable As DataTable)
Dim colNameParams As New Dictionary(Of String, String)
' Dim colParamValues As New Dictionary(Of String, Object)
For Each dtRow As DataRow In dataTable.Rows
For Each dtCol As DataColumn In dataTable.Columns
colNameParams.Add(dtCol.ColumnName, $"@{dtCol.ColumnName}")
' colParamValues.Add($"@{dtCol.ColumnName}", dtRow(dtCol.ColumnName))
'存疑是否可以修改
Dim param As DbParameter = localDb.Command.CreateParameter()
param.DbType = DbType.Object
param.ParameterName = $"@{dtCol.ColumnName}"
param.Value = dtRow(dtCol.ColumnName)
localDb.Command.Parameters.Add(param)
Next
Dim cmdText As String = localDb.CmdHelper.Insert(tableName, colNameParams)
localDb.ExecuteNonQuery(cmdText) '执行插入
localDb.Command.Parameters.Clear() '清空参数
colNameParams.Clear() '复位
Next
End Sub
'''
''' 更新本地数据成功后,同步更新本地版本记录表
'''
''' 本地数据库执行器
''' 需要同步的数据表信息
Private Sub UpdateVersionTable(localDb As DbExecutor, table As SyncTableScheme)
Dim tableName As String = Customer.SyncListTable.TableName
Dim cmdText As String
If table.IsExistsTable Then
Dim colNameValue As New Dictionary(Of String, String) From {
{$"{Customer.SyncListTable.ColNamesEnum.RevisionID}", table.RevisionID},
{$"{Customer.SyncListTable.ColNamesEnum.SyncTime}", table.NowSyncTime}
}
Dim condition As String = $"{Customer.SyncListTable.ColNamesEnum.TableName} = '{table.TableName}'"
cmdText = localDb.CmdHelper.Update(tableName, colNameValue, condition)
Else
Dim colNameValue As New Dictionary(Of String, String) From {
{$"{Customer.SyncListTable.ColNamesEnum.TableName}", table.TableName},
{$"{Customer.SyncListTable.ColNamesEnum.RevisionID}", table.RevisionID},
{$"{Customer.SyncListTable.ColNamesEnum.SyncType}", table.SyncType},
{$"{Customer.SyncListTable.ColNamesEnum.SyncTime}", table.NowSyncTime}
}
cmdText = localDb.CmdHelper.Insert(tableName, colNameValue)
End If
Try
localDb.ExecuteNonQuery(cmdText)
Catch ex As Exception
ServiceLog.WriteErrorLog($"UpdateVersionTable Error:{ex.Message}")
End Try
End Sub
#End Region
#Region "数据类型转换"
Enum SystemTypeEnum
[AnsiString]
[AnsiStringFixedLength]
Binary
[Boolean]
[Byte]
[DateTime]
[Decimal]
[Double]
[Guid]
[Int16]
[Int32]
[Int64]
[SByte]
[Single]
[String]
[TimeSpan]
[UInt16]
[UInt32]
[UInt64]
End Enum
'''
''' 将.net数据类型转换为Sqlite数据类型
'''
''' .net数据类型
''' 转换后的Sqlite数据类型
Public Shared Function ConvertSystemTypeToSqliteType(dataType As String) As String
Dim reType As String
Select Case dataType
Case SystemTypeEnum.AnsiString.ToString()
reType = Sqlite.DataParam.DataTypeEnum.Varchar.ToString()
Case SystemTypeEnum.[AnsiStringFixedLength].ToString()
reType = Sqlite.DataParam.DataTypeEnum.Nchar.ToString()
Case SystemTypeEnum.Binary.ToString(), SystemTypeEnum.Byte.ToString()
reType = Sqlite.DataParam.DataTypeEnum.Blob.ToString()
Case SystemTypeEnum.[Boolean].ToString()
reType = Sqlite.DataParam.DataTypeEnum.Bit.ToString()
Case SystemTypeEnum.[DateTime].ToString()
reType = Sqlite.DataParam.DataTypeEnum.Datetime.ToString()
Case SystemTypeEnum.[Decimal].ToString()
reType = Sqlite.DataParam.DataTypeEnum.[Decimal].ToString()
Case SystemTypeEnum.[Double].ToString()
reType = Sqlite.DataParam.DataTypeEnum.Real.ToString()
Case SystemTypeEnum.[Guid].ToString()
reType = Sqlite.DataParam.DataTypeEnum.UniqueIdentifier.ToString()
Case SystemTypeEnum.[Int16].ToString()
reType = Sqlite.DataParam.DataTypeEnum.SmallInt.ToString()
Case SystemTypeEnum.[Int32].ToString()
reType = Sqlite.DataParam.DataTypeEnum.[Integer].ToString()
Case SystemTypeEnum.[Int64].ToString()
reType = Sqlite.DataParam.DataTypeEnum.[Integer].ToString()
Case SystemTypeEnum.[SByte].ToString()
reType = Sqlite.DataParam.DataTypeEnum.TinyInt.ToString()
Case SystemTypeEnum.[Single].ToString()
reType = Sqlite.DataParam.DataTypeEnum.[Single].ToString()
Case SystemTypeEnum.[String].ToString()
reType = Sqlite.DataParam.DataTypeEnum.Nvarchar.ToString()
Case SystemTypeEnum.TimeSpan.ToString()
reType = Sqlite.DataParam.DataTypeEnum.Datetime.ToString()
Case SystemTypeEnum.[UInt16].ToString()
reType = Sqlite.DataParam.DataTypeEnum.SmallUint.ToString()
Case SystemTypeEnum.[UInt32].ToString()
reType = Sqlite.DataParam.DataTypeEnum.Uint.ToString()
Case SystemTypeEnum.[UInt64].ToString()
reType = Sqlite.DataParam.DataTypeEnum.UnsignedInteger.ToString()
Case "Byte[]"
reType = Sqlite.DataParam.DataTypeEnum.Blob.ToString()
Case Else
'reType = Sqlite.DataParam.DataTypeEnum.Varchar.ToString()
Throw New Exception($"ConvertSystemTypeToSqliteType,UnDealType:{dataType}{vbNewLine}")
End Select
Return reType
End Function
'''
''' 通过解析DbDataReader列名与列类型,暂不可用
'''
'''
'''
'''
Private Function CreateTableStringByDataReader(tableName As String, reader As DbDataReader) As String
Dim builder As New StringBuilder
builder.Append($"CREATE TABLE If Not Exists [{tableName}] (")
For i As Integer = 0 To reader.FieldCount - 1
If i = 0 Then
'应该修改成通用的类型转换
builder.Append($"[{reader.GetName(i)}] {ConvertSystemTypeToSqliteType(reader.GetDataTypeName(i))}")
Else
builder.Append($", [{reader.GetName(i)}] {ConvertSystemTypeToSqliteType(reader.GetDataTypeName(i))}")
End If
Next
builder.Append(" )")
Return builder.ToString()
End Function
'''
''' 通过解析DataTable,获取建表语句
'''
''' 数据表名
''' 需要解析的内存数据表
'''
Private Function CreateTableStringByDataTable(tableName As String, destTable As DataTable) As String
Dim builder As New StringBuilder
builder.Append($"CREATE TABLE If Not Exists [{tableName}] (")
For i As Integer = 0 To destTable.Columns.Count - 1
If i = 0 Then
'应该修改成通用的类型转换
builder.Append($"[{destTable.Columns(i).ColumnName}] {ConvertSystemTypeToSqliteType(destTable.Columns(i).DataType.Name)}")
Else
builder.Append($", [{destTable.Columns(i).ColumnName}] {ConvertSystemTypeToSqliteType(destTable.Columns(i).DataType.Name)}")
End If
'列名长度
If destTable.Columns(i).MaxLength > -1 Then
builder.Append($"({destTable.Columns(i).MaxLength})")
End If
'列为空
If destTable.Columns(i).AllowDBNull = False Then
builder.Append($" NOT NULL")
End If
'列自增(默认列自增为主键)
If destTable.Columns(i).AutoIncrement Then
builder.Append($" PRIMARY KEY AutoIncrement")
'Else
' '唯一值
' If destTable.Columns(i).Unique Then
' builder.Append($" Unique")
' End If
End If
''默认值
'If destTable.Columns(i).DefaultValue IsNot Nothing Then
' If String.IsNullOrEmpty(destTable.Columns(i).DefaultValue.ToString()) = False Then
' builder.Append($" DEFAULT '{destTable.Columns(i).DefaultValue}'")
' End If
'End If
Next
builder.Append(" )")
Return builder.ToString()
End Function
#End Region
End Class