Skip to content

Commit ddb48d2

Browse files
committed
update code
1 parent fdbf511 commit ddb48d2

5 files changed

Lines changed: 46 additions & 32 deletions

File tree

src/main/java/com/codingapi/dbstream/listener/SQLRunningState.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.codingapi.dbstream.proxy.ConnectionProxy;
44
import com.codingapi.dbstream.query.JdbcQuery;
55
import com.codingapi.dbstream.scanner.DBMetaData;
6-
import com.codingapi.dbstream.scanner.DBScanner;
76
import com.codingapi.dbstream.scanner.DbColumn;
87
import com.codingapi.dbstream.scanner.DbTable;
98
import lombok.Getter;
@@ -356,10 +355,6 @@ public String getJdbcKey() {
356355
* @param tableName 表名
357356
*/
358357
public void triggerDBMetaData(String tableName) throws SQLException {
359-
// 当前表需要更新时,将会连同所有带更新的表一次性全部更新
360-
if (this.metaData.isSubjectUpdate(tableName)) {
361-
DBScanner dbScanner = new DBScanner(connectionProxy.getConnection(), getDriverProperties());
362-
dbScanner.updateMetadata(this.metaData);
363-
}
358+
this.metaData.triggerSubscribeListUpdate(connectionProxy,tableName);
364359
}
365360
}

src/main/java/com/codingapi/dbstream/scanner/DBMetaData.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.codingapi.dbstream.scanner;
22

3+
import com.codingapi.dbstream.proxy.ConnectionProxy;
34
import com.codingapi.dbstream.serializable.DBTableSerializableHelper;
45
import lombok.Getter;
56

7+
import java.sql.SQLException;
68
import java.util.*;
79

810
/**
@@ -17,7 +19,7 @@ public class DBMetaData {
1719
* 待更新的table meta数据
1820
*/
1921
@Getter
20-
private final List<String> subjectTableNameList = new ArrayList<>();
22+
private final List<String> subscribeTableNameList = new ArrayList<>();
2123

2224
/**
2325
* 数据记录时间
@@ -50,8 +52,8 @@ public DBMetaData(Properties properties) {
5052
*/
5153
public void addUpdateSubscribe(String tableName) {
5254
String upTableName = tableName.toUpperCase();
53-
if (!this.subjectTableNameList.contains(upTableName)) {
54-
this.subjectTableNameList.add(upTableName);
55+
if (!this.subscribeTableNameList.contains(upTableName)) {
56+
this.subscribeTableNameList.add(upTableName);
5557
}
5658
}
5759

@@ -99,7 +101,7 @@ void cleanSerializable() {
99101
DBTableSerializableHelper tableSerializableHelper = new DBTableSerializableHelper(this.getKeyJdbcKey());
100102
tableSerializableHelper.remove();
101103
this.tables.clear();
102-
this.subjectTableNameList.clear();
104+
this.subscribeTableNameList.clear();
103105
}
104106

105107
/**
@@ -132,16 +134,16 @@ public String getKeyJdbcKey() {
132134
* @param tableName 表名称
133135
* @return true 是
134136
*/
135-
public boolean isSubjectUpdate(String tableName) {
136-
return this.subjectTableNameList.contains(tableName.toUpperCase());
137+
public boolean isSubscribeUpdate(String tableName) {
138+
return this.subscribeTableNameList.contains(tableName.toUpperCase());
137139
}
138140

139141
/**
140142
* 更新DbTable数据
141143
*
142144
* @param updateList 更新的表元数据信息
143145
*/
144-
void updateDbTable(List<DbTable> updateList) {
146+
private void updateDbTable(List<DbTable> updateList) {
145147
if (this.tables == null || this.tables.isEmpty()) {
146148
this.tables = new ArrayList<>(updateList);
147149
return;
@@ -152,7 +154,7 @@ void updateDbTable(List<DbTable> updateList) {
152154
for (DbTable update : updateList) {
153155
String tableName = update.getName().toUpperCase();
154156
updateDbTables.put(tableName, update);
155-
this.subjectTableNameList.remove(tableName);
157+
this.subscribeTableNameList.remove(tableName);
156158
}
157159

158160
for (DbTable dbTable : tables) {
@@ -182,4 +184,17 @@ void updateDbTable(List<DbTable> updateList) {
182184

183185
this.tables = list;
184186
}
187+
188+
/**
189+
* 触发订阅表元数据检测更新
190+
*/
191+
public void triggerSubscribeListUpdate(ConnectionProxy connectionProxy, String tableName) throws SQLException {
192+
if (this.isSubscribeUpdate(tableName)) {
193+
DBScanner dbScanner = new DBScanner(connectionProxy.getConnection(), this.properties);
194+
// 当前表需要更新时,将会连同所有带更新的表一次性全部更新
195+
List<DbTable> tables = dbScanner.findTableMetadata(this.subscribeTableNameList);
196+
this.updateDbTable(tables);
197+
this.success();
198+
}
199+
}
185200
}

src/main/java/com/codingapi/dbstream/scanner/DBScanner.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
public class DBScanner {
1919

2020
private final Connection connection;
21-
private final DBMetaData dbMetaData;
21+
private final Properties info;
2222
private final DatabaseMetaData metaData;
2323
private final String catalog;
2424
private final String schema;
@@ -29,21 +29,21 @@ public DBScanner(Connection connection, Properties info) throws SQLException {
2929
this.schema = connection.getSchema();
3030
this.metaData = connection.getMetaData();
3131
this.catalog = connection.getCatalog();
32-
this.dbMetaData = new DBMetaData(info);
32+
this.info = info;
3333
String jdbcKey = JdbcPropertyUtils.getOrGenerateJdbcKey(info,this.schema);
3434
this.dbTableSerializableHelper = new DBTableSerializableHelper(jdbcKey);
3535
}
3636

3737

3838
private void loadDbTableInfo(String tableName, DbTable tableInfo) throws SQLException {
3939
String dbTableName = tableInfo.getName();
40-
List<String> keys = dbTableSerializableHelper.loadPrimaryKeyByLocalFile(tableInfo.getName());
40+
List<String> keys = dbTableSerializableHelper.loadTablePrimaryKeysByKeyFile(tableInfo.getName());
4141
if (dbTableSerializableHelper.hasSerialize(dbTableName)) {
4242
DbTable dbTableCache = dbTableSerializableHelper.deserialize(dbTableName);
4343
tableInfo.setColumns(dbTableCache.getColumns());
4444
tableInfo.setPrimaryKeys(dbTableCache.getPrimaryKeys());
45-
tableInfo.loadLocalPrimaryKeys(keys);
46-
tableInfo.reloadPrimaryColumns();
45+
tableInfo.validateAndAddPrimaryKeys(keys);
46+
tableInfo.reloadPrimaryKeyColumns();
4747
return;
4848
}
4949

@@ -67,8 +67,8 @@ private void loadDbTableInfo(String tableName, DbTable tableInfo) throws SQLExce
6767
tableInfo.addPrimaryKey(pkColumn);
6868
}
6969
pkRs.close();
70-
tableInfo.loadLocalPrimaryKeys(keys);
71-
tableInfo.reloadPrimaryColumns();
70+
tableInfo.validateAndAddPrimaryKeys(keys);
71+
tableInfo.reloadPrimaryKeyColumns();
7272

7373
dbTableSerializableHelper.serialize(tableInfo);
7474
}
@@ -77,6 +77,7 @@ private void loadDbTableInfo(String tableName, DbTable tableInfo) throws SQLExce
7777
* 扫描数据库中的所有表、字段和主键信息,并缓存
7878
*/
7979
public DBMetaData loadMetadata() throws SQLException {
80+
DBMetaData dbMetaData = new DBMetaData(info);
8081
DatabaseMetaData metaData = connection.getMetaData();
8182
String catalog = connection.getCatalog();
8283
String schema = connection.getSchema();
@@ -98,29 +99,28 @@ public DBMetaData loadMetadata() throws SQLException {
9899

99100

100101
/**
101-
* 更新对应表的元数据信息
102+
* 获取元数据表信息
102103
*
103-
* @param dbMetaData 数据库下的所有元数据信息
104+
* @param tableNames 查询的表名称
104105
* @throws SQLException SQLException
105106
*/
106-
public void updateMetadata(DBMetaData dbMetaData) throws SQLException {
107+
public List<DbTable> findTableMetadata(List<String> tableNames) throws SQLException {
107108
DatabaseMetaData metaData = connection.getMetaData();
108109
String catalog = connection.getCatalog();
109110
String schema = connection.getSchema();
110111
ResultSet tables = metaData.getTables(catalog, schema, "%", new String[]{"TABLE"});
111-
List<DbTable> updateList = new ArrayList<>();
112+
List<DbTable> tableList = new ArrayList<>();
112113
while (tables.next()) {
113114
String tableName = tables.getString("TABLE_NAME");
114115
String remarks = tables.getString("REMARKS");
115-
if (dbMetaData.isSubjectUpdate(tableName)) {
116+
if (tableNames.contains(tableName.toUpperCase())) {
116117
DbTable tableInfo = new DbTable(tableName, remarks);
117118
this.loadDbTableInfo(tableName, tableInfo);
118-
updateList.add(tableInfo);
119+
tableList.add(tableInfo);
119120
}
120121
}
121122
tables.close();
122-
dbMetaData.updateDbTable(updateList);
123-
dbMetaData.success();
123+
return tableList;
124124
}
125125

126126

src/main/java/com/codingapi/dbstream/scanner/DbTable.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public DbTable(String name, String comment) {
4141
/**
4242
* 加载主键的字段
4343
*/
44-
void reloadPrimaryColumns() {
44+
void reloadPrimaryKeyColumns() {
4545
if (this.primaryKeys.isEmpty()) {
4646
return;
4747
}
@@ -96,7 +96,11 @@ void setPrimaryKeys(List<String> primaryKeys) {
9696
this.primaryKeys.addAll(primaryKeys);
9797
}
9898

99-
public void loadLocalPrimaryKeys(List<String> primaryKeys) {
99+
/**
100+
* 校验并添加主键字段
101+
* @param primaryKeys 主键字段list
102+
*/
103+
void validateAndAddPrimaryKeys(List<String> primaryKeys) {
100104
if (primaryKeys != null && !primaryKeys.isEmpty()) {
101105
for (String primaryKey : primaryKeys) {
102106
DbColumn column = this.getColumnByName(primaryKey);

src/main/java/com/codingapi/dbstream/serializable/DBTableSerializableHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void remove() {
5050
/**
5151
* 通过表名称读取手动配置主键信息
5252
*/
53-
public List<String> loadPrimaryKeyByLocalFile(String tableName) {
53+
public List<String> loadTablePrimaryKeysByKeyFile(String tableName) {
5454
File file = new File(this.path + "/" + tableName + ".key");
5555
List<String> data = FileReaderUtils.read(file);
5656
if (data != null && !data.isEmpty()) {

0 commit comments

Comments
 (0)