Imports System.Data.Common Imports System.Text Imports MySql.Data.MySqlClient Imports UTS_Core.Database Imports UTS_Core.UTSModule.DbTableModel ''' ''' 数据库同步器 ''' Public Class DbSynchronizer ''' ''' 同步表内容 ''' Structure SyncTableScheme ''' 表名 Public TableName As String ''' 同步类型 Public SyncType As String ''' 是否存在当前数据表 Public IsExistsTable As Boolean ''' 更新版本 Public RevisionID As String ''' 上一次更新时间字符串 Public LastSyncTime As String ''' 同步后更新的时间字符串 Public NowSyncTime As String End Structure ''' ''' 同步数据库参数 ''' Structure SyncParam Public PublicDb As String Public PrivateDb As String Public RemoteType As DbExecutor.DbTypeEnum Public RemoteConnString As String Public LocalType As DbExecutor.DbTypeEnum Public LocalConnString As String End Structure Sub New(param As SyncParam) Parameters = param End Sub ''' ''' 连接参数 ''' ''' Public Property Parameters() As SyncParam ''' ''' 同步数据库,开始同步 ''' Public Sub SyncDatabase() Using remoteDb As New DbExecutor(Parameters.RemoteType, Parameters.RemoteConnString) remoteDb.Open() Using localDb As New DbExecutor(Parameters.LocalType, Parameters.LocalConnString) localDb.Open() SyncDatabase(localDb, remoteDb, Parameters.PublicDb, Parameters.PrivateDb) localDb.Close() End Using remoteDb.Close() End Using End Sub ''' ''' 同步数据库执行过程 ''' ''' 本地数据库执行器 ''' 云端数据库执行器 ''' 远端数据库公共库名 ''' 远端数据库私有库名 Public Sub SyncDatabase(localDb As DbExecutor, remoteDb As DbExecutor, publicDb As String, privateDb As String) If remoteDb Is Nothing Then Return If localDb Is Nothing Then Return '上传本地缓存数据 remoteDb.BeginTransaction() UploadCacheData(remoteDb, localDb) remoteDb.CommitTransaction() ServiceLog.WriteDebugLog($"UploadCacheData Successded!") '获取需要公有下载的数据表 DownloadSyncTable(remoteDb, localDb, publicDb, Manage.SyncListTable.TableName) ServiceLog.WriteDebugLog($"Download Puclic SyncTable Successded!") '获取需要私有下载的数据表 DownloadSyncTable(remoteDb, localDb, privateDb, Customer.SyncListTable.TableName) ServiceLog.WriteDebugLog($"Download Private SyncTable Successded!") End Sub ''' ''' 上传本地缓存数据,DataTable的方式 ''' ''' 远程数据库执行器 ''' 本地数据库执行器 Private Sub UploadCacheData(remoteDb As DbExecutor, localDb As DbExecutor) CreateCacheTableWhenNotExists(localDb) Dim dataTable As DataTable = SearchCacheData(localDb) UploadCacheData(remoteDb, localDb, dataTable) End Sub #Region "上传本地缓存数据" ''' ''' 本地创建缓存记录表,如果不存在则创建 ''' ''' 本地数据库的执行器 Private Sub CreateCacheTableWhenNotExists(localDb As DbExecutor) localDb.ExecuteNonQuery(LocalPrivate.CacheTable.CreateTableString) End Sub ''' ''' 查询本地未上传的数据,单次查询5000条数据 ''' ''' 本地数据库的执行器 ''' Private Function SearchCacheData(localDb As DbExecutor) As DataTable Dim tableName As String = LocalPrivate.CacheTable.TableName Dim colNames As String = $"{LocalPrivate.CacheTable.ColNamesEnum.ID},{LocalPrivate.CacheTable.ColNamesEnum.SqlCmd}" Dim condition As String = $"{LocalPrivate.CacheTable.ColNamesEnum.IsUpload} = 0 order by `{LocalPrivate.CacheTable.ColNamesEnum.ID}` Limit 5000" Dim cmdText As String = localDb.CmdHelper.Search(colNames, tableName, condition) Return localDb.ExecuteDataTable(cmdText) End Function ''' ''' 上传本地数据至云端数据库 ''' ''' ''' ''' Private Sub UploadCacheData(remoteDb As DbExecutor, localDb As DbExecutor, dataTable As DataTable) ServiceLog.WriteDebugLog($"Begin Upload Cache DataTable [{dataTable.Rows.Count}] Rows!") For Each row As DataRow In dataTable.Rows Try Dim cmdText As String = row(LocalPrivate.CacheTable.ColNamesEnum.SqlCmd.ToString()).ToString() remoteDb.ExecuteNonQuery(cmdText) Catch ex As MySqlException '记录失败 ServiceLog.WriteErrorLog($"UploadData Error:{ex.Message},ErrorNumber:{ex.Number}") Select Case ex.Number Case MySqlErrorCode.DuplicateFieldName '重复字段 ServiceLog.WriteInfoLog($"DuplicateFieldName,Ignore the error!") Case Else ServiceLog.WriteWarningLog($"Unknown,Continue to perform!") Exit For End Select Catch ex As Exception '记录失败 ServiceLog.WriteErrorLog($"UploadData Error:{ex.Message}") ServiceError.RecodeError("DbSync", $"UploadData Error:{ex.Message}") Exit For End Try Try DeleteUploadData(localDb, row(LocalPrivate.CacheTable.ColNamesEnum.ID).ToString) Catch ex As Exception '记录失败 ServiceLog.WriteErrorLog($"UpdateCacheTable Error:{ex.Message}") End Try Next End Sub ''' ''' 数据上传完成后,删除本地数据库中对应序号的记录 ''' ''' ''' Private Sub DeleteUploadData(localDb As DbExecutor, sn As String) Dim tableName As String = LocalPrivate.CacheTable.TableName Dim condition As String = $"{LocalPrivate.CacheTable.ColNamesEnum.ID} = '{sn}'" localDb.ExecuteNonQuery(localDb.CmdHelper.DeleteRows(tableName, condition)) End Sub #End Region #Region "获取需要下载表集合" ''' ''' 比对本地与云端版本表,下载变化的数据表 ''' ''' ''' ''' ''' Public Sub DownloadSyncTable(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, tbName As String) Dim downloadTables As List(Of SyncTableScheme) downloadTables = CompareVersionTable(remoteDb, localDb, dbName, tbName) UpdateTables(remoteDb, localDb, dbName, downloadTables) '更新本地数据 End Sub ''' ''' 比较本地与云端版本表的差异,获取需要下载的数据表 ''' ''' ''' ''' ''' Private Function CompareVersionTable(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, tableName As String) As List(Of SyncTableScheme) CreateVersionTableWhenNotExists(localDb) Dim localTable As DataTable = localDb.ExecuteDataTable(localDb.CmdHelper.SearchAll(Customer.SyncListTable.TableName)) Dim remoteTable As DataTable = remoteDb.ExecuteDataTable(remoteDb.CmdHelper.DbSearchAll(dbName, tableName)) Return CompareVersionTable(localTable, remoteTable) End Function ''' ''' 创建本地数据库表版本记录表 ''' ''' Private Sub CreateVersionTableWhenNotExists(localDb As DbExecutor) Try localDb.ExecuteNonQuery(Customer.SyncListTable.CreateTableString(localDb.DatabaseType)) Catch ex As Exception '记录失败 ServiceLog.WriteErrorLog($"CreateVersionTableWhenNotExists Error:{ex.Message}") End Try End Sub ''' ''' 比较本地与云端版本表的差异 ''' ''' 源DataTable ''' 目标DataTable ''' 差异信息列表 Private Function CompareVersionTable(srcDataTable As DataTable, destDataTable As DataTable) As List(Of SyncTableScheme) Dim result As New List(Of SyncTableScheme) Dim srcTableName As String Dim destTableName As String Dim isFind As Boolean Dim srcReVId As String = String.Empty Dim destRevId As String For i As Integer = 0 To destDataTable.Rows.Count - 1 destTableName = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.TableName}")}" isFind = False For j As Integer = 0 To srcDataTable.Rows.Count - 1 srcTableName = $"{srcDataTable.Rows(j)($"{Customer.SyncListTable.ColNamesEnum.TableName}")}" If String.Compare(destTableName, srcTableName, True) = 0 Then srcReVId = $"{srcDataTable.Rows(j)($"{Customer.SyncListTable.ColNamesEnum.RevisionID}")}" isFind = True Exit For End If Next If isFind Then destRevId = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.RevisionID}")}" If String.Compare(srcReVId, destRevId, True) <> 0 Then '源表中存在记录且与目的表记录时间不相同 result.Add(New SyncTableScheme() With {.TableName = destTableName, .SyncType = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.SyncType}")}", .LastSyncTime = $"{srcDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.SyncTime}")}", .RevisionID = destRevId, .IsExistsTable = True}) End If Else result.Add(New SyncTableScheme() With {.TableName = destTableName, .SyncType = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.SyncType}")}", .LastSyncTime = $"2000-01-01 00:00:00", .RevisionID = $"{destDataTable.Rows(i)($"{Customer.SyncListTable.ColNamesEnum.RevisionID}")}", .IsExistsTable = False}) End If Next Return result End Function #End Region #Region "下载远程数据至本地" ''' ''' 更新下载数据表 ''' ''' 远程数据库执行器 ''' 本地数据库执行器 ''' 需要同步的数据表信息集合 Private Sub UpdateTables(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, tables As List(Of SyncTableScheme)) Dim ts As Date Try ts = CDate(remoteDb.ExecuteScalar("select current_timestamp()")) Catch ex As Exception ServiceLog.WriteErrorLog($"Get DB System Time Error:{ex.Message}") Return End Try For Each table As SyncTableScheme In tables table.NowSyncTime = ts.ToString("yyyy-MM-dd HH:mm:ss") localDb.BeginTransaction() Try If String.Compare(table.SyncType, "all", True) = 0 Then UpdateTableForAll(remoteDb, localDb, dbName, table) ElseIf String.Compare(table.SyncType, "new", True) = 0 Then '增量更新Update限制,必须包含ID、UpdateTime字段,且ID字段为主键 UpdateTableForNew(remoteDb, localDb, dbName, table) Else UpdateTableForAll(remoteDb, localDb, dbName, table) End If UpdateVersionTable(localDb, table) Catch ex As Exception ServiceLog.WriteErrorLog($"UpdateTable {table.TableName} Error:{ex.Message}") End Try localDb.CommitTransaction() Next End Sub ''' ''' 下载修改全表修改部分的方式下载表 ''' ''' ''' ''' ''' Private Sub UpdateTableForNew(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, table As SyncTableScheme) Dim maxID As Integer = 0 If table.IsExistsTable = False Then Dim remoteTable As DataTable = remoteDb.ExecuteDataTable(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName, "1 = 0")) Dim createTableString As String = CreateTableStringByDataTable(table.TableName, remoteTable) '获取建表语句 localDb.ExecuteNonQuery(createTableString) Else '查询本地最大ID,主键必须为ID Dim cmd As String = localDb.CmdHelper.SearchOrder(New List(Of String)() From {"ID"}, table.TableName, "ORDER BY [ID] DESC LIMIT 1") Dim maxIdObject As Object = localDb.ExecuteScalar(cmd) If IsDBNull(maxIdObject) = False Then maxID = CInt(maxIdObject) End If End If '通过dataReader插入数据,查询远程更新时间后的所有内容 Using reader As DbDataReader = remoteDb.ExecuteReader(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName, $"`UpdateTime` > '{table.LastSyncTime}'")) While reader.Read() If CInt(reader("ID")) > maxID Then '小于最大ID则更新,大于最大ID则插入 InsertDataByDataReader(localDb, table.TableName, reader) Else UpdateDataByDataReader(localDb, table.TableName, reader) End If End While reader.Close() End Using End Sub ''' ''' 更新全表的方式下载表 ''' ''' ''' ''' Private Sub UpdateTableForAll(remoteDb As DbExecutor, localDb As DbExecutor, dbName As String, table As SyncTableScheme) '删除本地表 If table.IsExistsTable Then localDb.ExecuteNonQuery(localDb.CmdHelper.DropTable(table.TableName)) End If '新建表(考虑是否应该查询总数后,依此取1000行的方式下载,或考虑使用DataReader) Dim remoteTable As DataTable = remoteDb.ExecuteDataTable(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName, "1 = 0")) Dim createTableString As String = CreateTableStringByDataTable(table.TableName, remoteTable) '获取建表语句 localDb.ExecuteNonQuery(createTableString) '通过dataReader插入数据 Using reader As DbDataReader = remoteDb.ExecuteReader(remoteDb.CmdHelper.DbSearchAll(dbName, table.TableName)) While reader.Read() InsertDataByDataReader(localDb, table.TableName, reader) End While reader.Close() End Using End Sub Private Sub UpdateDataByDataReader(localDb As DbExecutor, tableName As String, dbReader As DbDataReader) Dim colNames As New List(Of String) Using comm As DbCommand = localDb.CreateCommand() For i As Integer = 0 To dbReader.FieldCount - 1 colNames.Add(dbReader.GetName(i)) Dim param As DbParameter = comm.CreateParameter() param.DbType = DbType.Object param.ParameterName = $"@{dbReader.GetName(i)}" param.Value = dbReader.Item(i) comm.Parameters.Add(param) Next comm.CommandText = localDb.CmdHelper.UpdateParam(tableName, colNames, $"`ID` = {dbReader("ID")}") comm.ExecuteNonQuery() End Using End Sub Private Sub InsertDataByDataReader(localDb As DbExecutor, tableName As String, dbReader As DbDataReader) Dim colNames As New List(Of String) Using comm As DbCommand = localDb.CreateCommand() For i As Integer = 0 To dbReader.FieldCount - 1 colNames.Add(dbReader.GetName(i)) Dim param As DbParameter = comm.CreateParameter() param.DbType = DbType.Object param.ParameterName = $"@{dbReader.GetName(i)}" param.Value = dbReader.Item(i) comm.Parameters.Add(param) Next comm.CommandText = localDb.CmdHelper.InsertParam(tableName, colNames) comm.ExecuteNonQuery() End Using End Sub ''' ''' 将目标DataTable的所有数据插入到指定数据表中,不检测是否需要新增列 ''' ''' 后续插入字符需要区别数值与字符串 ''' ''' ''' ''' Private Sub InsertDataByDataTable(localDb As DbExecutor, tableName As String, dataTable As DataTable) Dim colNameParams As New Dictionary(Of String, String) ' Dim colParamValues As New Dictionary(Of String, Object) For Each dtRow As DataRow In dataTable.Rows For Each dtCol As DataColumn In dataTable.Columns colNameParams.Add(dtCol.ColumnName, $"@{dtCol.ColumnName}") ' colParamValues.Add($"@{dtCol.ColumnName}", dtRow(dtCol.ColumnName)) '存疑是否可以修改 Dim param As DbParameter = localDb.Command.CreateParameter() param.DbType = DbType.Object param.ParameterName = $"@{dtCol.ColumnName}" param.Value = dtRow(dtCol.ColumnName) localDb.Command.Parameters.Add(param) Next Dim cmdText As String = localDb.CmdHelper.Insert(tableName, colNameParams) localDb.ExecuteNonQuery(cmdText) '执行插入 localDb.Command.Parameters.Clear() '清空参数 colNameParams.Clear() '复位 Next End Sub ''' ''' 更新本地数据成功后,同步更新本地版本记录表 ''' ''' 本地数据库执行器 ''' 需要同步的数据表信息 Private Sub UpdateVersionTable(localDb As DbExecutor, table As SyncTableScheme) Dim tableName As String = Customer.SyncListTable.TableName Dim cmdText As String If table.IsExistsTable Then Dim colNameValue As New Dictionary(Of String, String) From { {$"{Customer.SyncListTable.ColNamesEnum.RevisionID}", table.RevisionID}, {$"{Customer.SyncListTable.ColNamesEnum.SyncTime}", table.NowSyncTime} } Dim condition As String = $"{Customer.SyncListTable.ColNamesEnum.TableName} = '{table.TableName}'" cmdText = localDb.CmdHelper.Update(tableName, colNameValue, condition) Else Dim colNameValue As New Dictionary(Of String, String) From { {$"{Customer.SyncListTable.ColNamesEnum.TableName}", table.TableName}, {$"{Customer.SyncListTable.ColNamesEnum.RevisionID}", table.RevisionID}, {$"{Customer.SyncListTable.ColNamesEnum.SyncType}", table.SyncType}, {$"{Customer.SyncListTable.ColNamesEnum.SyncTime}", table.NowSyncTime} } cmdText = localDb.CmdHelper.Insert(tableName, colNameValue) End If Try localDb.ExecuteNonQuery(cmdText) Catch ex As Exception ServiceLog.WriteErrorLog($"UpdateVersionTable Error:{ex.Message}") End Try End Sub #End Region #Region "数据类型转换" Enum SystemTypeEnum [AnsiString] [AnsiStringFixedLength] Binary [Boolean] [Byte] [DateTime] [Decimal] [Double] [Guid] [Int16] [Int32] [Int64] [SByte] [Single] [String] [TimeSpan] [UInt16] [UInt32] [UInt64] End Enum ''' ''' 将.net数据类型转换为Sqlite数据类型 ''' ''' .net数据类型 ''' 转换后的Sqlite数据类型 Public Shared Function ConvertSystemTypeToSqliteType(dataType As String) As String Dim reType As String Select Case dataType Case SystemTypeEnum.AnsiString.ToString() reType = Sqlite.DataParam.DataTypeEnum.Varchar.ToString() Case SystemTypeEnum.[AnsiStringFixedLength].ToString() reType = Sqlite.DataParam.DataTypeEnum.Nchar.ToString() Case SystemTypeEnum.Binary.ToString(), SystemTypeEnum.Byte.ToString() reType = Sqlite.DataParam.DataTypeEnum.Blob.ToString() Case SystemTypeEnum.[Boolean].ToString() reType = Sqlite.DataParam.DataTypeEnum.Bit.ToString() Case SystemTypeEnum.[DateTime].ToString() reType = Sqlite.DataParam.DataTypeEnum.Datetime.ToString() Case SystemTypeEnum.[Decimal].ToString() reType = Sqlite.DataParam.DataTypeEnum.[Decimal].ToString() Case SystemTypeEnum.[Double].ToString() reType = Sqlite.DataParam.DataTypeEnum.Real.ToString() Case SystemTypeEnum.[Guid].ToString() reType = Sqlite.DataParam.DataTypeEnum.UniqueIdentifier.ToString() Case SystemTypeEnum.[Int16].ToString() reType = Sqlite.DataParam.DataTypeEnum.SmallInt.ToString() Case SystemTypeEnum.[Int32].ToString() reType = Sqlite.DataParam.DataTypeEnum.[Integer].ToString() Case SystemTypeEnum.[Int64].ToString() reType = Sqlite.DataParam.DataTypeEnum.[Integer].ToString() Case SystemTypeEnum.[SByte].ToString() reType = Sqlite.DataParam.DataTypeEnum.TinyInt.ToString() Case SystemTypeEnum.[Single].ToString() reType = Sqlite.DataParam.DataTypeEnum.[Single].ToString() Case SystemTypeEnum.[String].ToString() reType = Sqlite.DataParam.DataTypeEnum.Nvarchar.ToString() Case SystemTypeEnum.TimeSpan.ToString() reType = Sqlite.DataParam.DataTypeEnum.Datetime.ToString() Case SystemTypeEnum.[UInt16].ToString() reType = Sqlite.DataParam.DataTypeEnum.SmallUint.ToString() Case SystemTypeEnum.[UInt32].ToString() reType = Sqlite.DataParam.DataTypeEnum.Uint.ToString() Case SystemTypeEnum.[UInt64].ToString() reType = Sqlite.DataParam.DataTypeEnum.UnsignedInteger.ToString() Case "Byte[]" reType = Sqlite.DataParam.DataTypeEnum.Blob.ToString() Case Else 'reType = Sqlite.DataParam.DataTypeEnum.Varchar.ToString() Throw New Exception($"ConvertSystemTypeToSqliteType,UnDealType:{dataType}{vbNewLine}") End Select Return reType End Function ''' ''' 通过解析DbDataReader列名与列类型,暂不可用 ''' ''' ''' ''' Private Function CreateTableStringByDataReader(tableName As String, reader As DbDataReader) As String Dim builder As New StringBuilder builder.Append($"CREATE TABLE If Not Exists [{tableName}] (") For i As Integer = 0 To reader.FieldCount - 1 If i = 0 Then '应该修改成通用的类型转换 builder.Append($"[{reader.GetName(i)}] {ConvertSystemTypeToSqliteType(reader.GetDataTypeName(i))}") Else builder.Append($", [{reader.GetName(i)}] {ConvertSystemTypeToSqliteType(reader.GetDataTypeName(i))}") End If Next builder.Append(" )") Return builder.ToString() End Function ''' ''' 通过解析DataTable,获取建表语句 ''' ''' 数据表名 ''' 需要解析的内存数据表 ''' Private Function CreateTableStringByDataTable(tableName As String, destTable As DataTable) As String Dim builder As New StringBuilder builder.Append($"CREATE TABLE If Not Exists [{tableName}] (") For i As Integer = 0 To destTable.Columns.Count - 1 If i = 0 Then '应该修改成通用的类型转换 builder.Append($"[{destTable.Columns(i).ColumnName}] {ConvertSystemTypeToSqliteType(destTable.Columns(i).DataType.Name)}") Else builder.Append($", [{destTable.Columns(i).ColumnName}] {ConvertSystemTypeToSqliteType(destTable.Columns(i).DataType.Name)}") End If '列名长度 If destTable.Columns(i).MaxLength > -1 Then builder.Append($"({destTable.Columns(i).MaxLength})") End If '列为空 If destTable.Columns(i).AllowDBNull = False Then builder.Append($" NOT NULL") End If '列自增(默认列自增为主键) If destTable.Columns(i).AutoIncrement Then builder.Append($" PRIMARY KEY AutoIncrement") 'Else ' '唯一值 ' If destTable.Columns(i).Unique Then ' builder.Append($" Unique") ' End If End If ''默认值 'If destTable.Columns(i).DefaultValue IsNot Nothing Then ' If String.IsNullOrEmpty(destTable.Columns(i).DefaultValue.ToString()) = False Then ' builder.Append($" DEFAULT '{destTable.Columns(i).DefaultValue}'") ' End If 'End If Next builder.Append(" )") Return builder.ToString() End Function #End Region End Class