Skip to content

Conversation

@winfredLIN
Copy link
Collaborator

@winfredLIN winfredLIN commented Jan 15, 2026

User description

改动内容和原因

由于审核没有传递数据源名称,导致sqled无法定位数据源,从而审核报错,导致SQL执行操作在sqle处开始中断,但后续错误信息并未准确传递,也未打印日志导致难以排查。暂时补充日志消息以便后续排查。

Due to the failure to pass the data source name during the audit, sqled was unable to locate the data source, resulting in an audit error and causing SQL execution to start interrupting at sqle. However, subsequent error messages were not accurately transmitted and logs were not printed, making it difficult to troubleshoot.Temporarily supplement log messages for future troubleshooting.

✅测试结果:在基于DMS开发的ODC上能够在SQLServer数据源上执行SQL语句

image

关联的 issue

https://github.com/actiontech/dms-ee/issues/712?open_in_browser=true

描述你的变更

  1. ODC工作台出发SQL审核动作时,补充数据源名称
  2. 补充日志

确认项(pr提交后操作)

Tip

请在指定复审人之前,确认并完成以下事项,完成后✅


  • [ X ] 我已完成自测
  • [ X ] 我已记录完整日志方便进行诊断
  • [ X ] 我已在关联的issue里补充了实现方案
  • [ X ] 我已在关联的issue里补充了测试影响面
  • [ X ] 我已确认了变更的兼容性,如果不兼容则在issue里标记 not_compatible
  • [ X ] 我已确认了是否要更新文档,如果要更新则在issue里标记 need_update_doc


Description

  • 增加详细错误日志记录

  • 补充数据源名称信息传递

  • 优化 SQL 审核错误提示


Diagram Walkthrough

flowchart LR
  A["请求拦截与处理"]
  B["读取与解析请求体"]
  C["错误检测及日志记录"]
  D["构建审核请求并传递数据源名称"]
  A -- "读取请求体" --> B
  B -- "检测到错误" --> C
  C -- "记录详细日志" --> D
Loading

File Walkthrough

Relevant files
Bug fix
sql_workbench_service.go
增强错误日志记录及数据源名称补充                                                                                 

internal/sql_workbench/service/sql_workbench_service.go

  • 新增读取请求体失败日志
  • 新增解析请求体失败日志
  • 新增获取用户与DBService失败日志
  • 在构建审核请求中补充数据源名称
+10/-0   

Due to the failure to pass the data source name during the audit, sqled was unable to locate the data source, resulting in an audit error and causing SQL execution to start interrupting at sqle. However, subsequent error messages were not accurately transmitted and logs were not printed, making it difficult to troubleshoot.Temporarily supplement log messages for future troubleshooting.
@github-actions
Copy link

PR Reviewer Guide 🔍

⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

日志记录重复

在多个错误处理分支中新增了错误日志记录,同时又直接返回错误,可能导致同一错误在日志中被重复记录。建议统一日志记录策略,避免重复打印,以便后续问题追踪和日志分析。

				sqlWorkbenchService.log.Errorf("failed to read request body: %v", err)
				return fmt.Errorf("failed to read request body: %w", err)
			}
			// 恢复请求体,供后续处理使用
			c.Request().Body = io.NopCloser(bytes.NewBuffer(bodyBytes))

			// 解析请求体获取 SQL 和 datasource ID
			sql, datasourceID, err := sqlWorkbenchService.parseStreamExecuteRequest(bodyBytes)
			if err != nil {
				sqlWorkbenchService.log.Errorf("failed to parse streamExecute request, skipping audit: %v", err)
				return fmt.Errorf("failed to parse streamExecute request, skipping audit: %v", err)
			}

			if sql == "" || datasourceID == "" {
				sqlWorkbenchService.log.Debugf("SQL or datasource ID is empty, skipping audit")
				return fmt.Errorf("SQL or datasource ID is empty, skipping audit")
			}

			// 获取当前用户 ID
			dmsUserId, err := sqlWorkbenchService.getDMSUserIdFromRequest(c)
			if err != nil {
				sqlWorkbenchService.log.Errorf("failed to get DMS user ID: %v", err)
				return fmt.Errorf("failed to get DMS user ID: %v", err)
			}

			// 从缓存表获取 dms_db_service_id
			dmsDBServiceID, err := sqlWorkbenchService.getDMSDBServiceIDFromCache(c.Request().Context(), datasourceID, dmsUserId)
			if err != nil {
				sqlWorkbenchService.log.Errorf("failed to get dms_db_service_id from cache: %v", err)
				return fmt.Errorf("failed to get dms_db_service_id from cache: %v", err)
			}

			if dmsDBServiceID == "" {
				sqlWorkbenchService.log.Debugf("dms_db_service_id not found in cache for datasource: %s", datasourceID)
				return fmt.Errorf("dms_db_service_id not found in cache for datasource: %s", datasourceID)
			}

			// 获取 DBService 信息
			dbService, err := sqlWorkbenchService.dbServiceUsecase.GetDBService(c.Request().Context(), dmsDBServiceID)
			if err != nil {
				sqlWorkbenchService.log.Errorf("failed to get DBService: %v", err)
				return fmt.Errorf("failed to get DBService: %v", err)
			}

			// 检查是否启用 SQL 审核
			if !sqlWorkbenchService.isEnableSQLAudit(dbService) {
				sqlWorkbenchService.log.Debugf("SQL audit is not enabled for DBService: %s", dmsDBServiceID)
				return fmt.Errorf("SQL audit is not enabled for DBService: %s", dmsDBServiceID)
			}

			// 调用 SQLE 审核接口
			auditResult, err := sqlWorkbenchService.callSQLEAudit(c.Request().Context(), sql, dbService)
			if err != nil {
				sqlWorkbenchService.log.Errorf("call SQLE audit failed: %v", err)
				return fmt.Errorf("call SQLE audit failed: %v", err)
			}

			// 拦截响应并添加审核结果
			return sqlWorkbenchService.interceptAndAddAuditResult(c, next, dmsUserId, auditResult, dbService)
		}
	}
}

// parseStreamExecuteRequest 解析 streamExecute 请求体,提取 SQL 和 datasource ID
func (sqlWorkbenchService *SqlWorkbenchService) parseStreamExecuteRequest(bodyBytes []byte) (sql string, datasourceID string, err error) {
	var requestBody map[string]interface{}
	if err := json.Unmarshal(bodyBytes, &requestBody); err != nil {
		return "", "", fmt.Errorf("failed to unmarshal request body: %v", err)
	}

	// 从 sql 字段获取 SQL
	if sqlVal, ok := requestBody["sql"]; ok {
		if sqlStr, ok := sqlVal.(string); ok {
			sql = sqlStr
		}
	}

	// 从 sid 字段解析 datasource ID
	// sid 格式: sid:{base64编码的JSON}:d:dms
	// base64 JSON 包含: {"dbId":623,"dsId":28,"from":"192.168.21.47","logicalSession":false,"realId":"ee9b8ab276"}
	if sidVal, ok := requestBody["sid"]; ok {
		if sidStr, ok := sidVal.(string); ok {
			dsId, parseErr := sqlWorkbenchService.parseSidToDatasourceID(sidStr)
			if parseErr != nil {
				sqlWorkbenchService.log.Debugf("Failed to parse sid to datasource ID: %v", parseErr)
			} else {
				datasourceID = dsId
			}
		}
	}

	return sql, datasourceID, nil
}

// parseSidToDatasourceID 从 sid 字符串中解析出 datasource ID
// sid 格式: sid:{base64编码的JSON}:d:dms
func (sqlWorkbenchService *SqlWorkbenchService) parseSidToDatasourceID(sid string) (string, error) {
	// 检查 sid 格式: sid:...:d:dms
	if !strings.HasPrefix(sid, "sid:") {
		return "", fmt.Errorf("invalid sid format, missing 'sid:' prefix")
	}

	// 移除 "sid:" 前缀
	sid = strings.TrimPrefix(sid, "sid:")

	// 查找最后一个 ":d" 后缀并移除从 ":d" 开始的所有字符
	if idx := strings.LastIndex(sid, ":d"); idx != -1 {
		sid = sid[:idx]
	}

	// 解码 base64
	decodedBytes, err := base64.StdEncoding.DecodeString(sid)
	if err != nil {
		return "", fmt.Errorf("failed to decode base64 sid: %v", err)
	}

	// 解析 JSON
	var sidData struct {
		DbId           int    `json:"dbId"`
		DsId           int    `json:"dsId"`
		From           string `json:"from"`
		LogicalSession bool   `json:"logicalSession"`
		RealId         string `json:"realId"`
	}

	if err := json.Unmarshal(decodedBytes, &sidData); err != nil {
		return "", fmt.Errorf("failed to unmarshal sid JSON: %v", err)
	}

	// 返回 dsId 作为字符串
	return fmt.Sprintf("%d", sidData.DsId), nil
}

// getDMSUserIdFromRequest 从请求中获取 DMS 用户 ID
func (sqlWorkbenchService *SqlWorkbenchService) getDMSUserIdFromRequest(c echo.Context) (string, error) {
	var dmsToken string
	for _, cookie := range c.Cookies() {
		if cookie.Name == pkgConst.DMSToken {
			dmsToken = cookie.Value
			break
		}
	}

	if dmsToken == "" {
		return "", fmt.Errorf("dms token is empty")
	}

	dmsUserId, err := jwt.ParseUidFromJwtTokenStr(dmsToken)
	if err != nil {
		return "", fmt.Errorf("failed to parse dms user id from token: %v", err)
	}

	return dmsUserId, nil
}

// getDMSDBServiceIDFromCache 从 sql_workbench_datasource_caches 表获取 dms_db_service_id
func (sqlWorkbenchService *SqlWorkbenchService) getDMSDBServiceIDFromCache(ctx context.Context, datasourceID, dmsUserID string) (string, error) {
	// 尝试将 datasourceID 转换为 int64(ODC 的 datasource ID 通常是数字)
	var sqlWorkbenchDatasourceID int64
	if _, err := fmt.Sscanf(datasourceID, "%d", &sqlWorkbenchDatasourceID); err != nil {
		// 如果转换失败,尝试直接使用字符串作为 datasource ID
		sqlWorkbenchService.log.Debugf("Failed to convert datasourceID to int64, trying to find by string: %s", datasourceID)
	}

	// 从缓存表中查找,需要根据 sql_workbench_datasource_id 和 dms_user_id 查找
	// 由于缓存表可能没有直接存储 sql_workbench_datasource_id,我们需要通过其他方式查找
	// 这里先尝试通过用户 ID 获取所有数据源,然后匹配
	datasources, err := sqlWorkbenchService.sqlWorkbenchDatasourceRepo.GetSqlWorkbenchDatasourcesByUserID(ctx, dmsUserID)
	if err != nil {
		return "", fmt.Errorf("failed to get datasources by user id: %v", err)
	}

	// 如果 datasourceID 是数字,尝试匹配 SqlWorkbenchDatasourceID
	if sqlWorkbenchDatasourceID > 0 {
		for _, ds := range datasources {
			if ds.SqlWorkbenchDatasourceID == sqlWorkbenchDatasourceID {
				return ds.DMSDBServiceID, nil
			}
		}
	}

	// 如果找不到,返回第一个匹配的数据源(临时方案,后续可能需要更精确的匹配逻辑)
	if len(datasources) > 0 {
		// 这里可以根据实际业务逻辑选择合适的数据源
		// 暂时返回第一个数据源的 dms_db_service_id
		return datasources[0].DMSDBServiceID, nil
	}

	return "", fmt.Errorf("no datasource found for datasourceID: %s, userID: %s", datasourceID, dmsUserID)
}

// isEnableSQLAudit 检查是否启用 SQL 审核
func (sqlWorkbenchService *SqlWorkbenchService) isEnableSQLAudit(dbService *biz.DBService) bool {
	if dbService.SQLEConfig == nil || dbService.SQLEConfig.SQLQueryConfig == nil {
		return false
	}
	return dbService.SQLEConfig.AuditEnabled && dbService.SQLEConfig.SQLQueryConfig.AuditEnabled
}

// callSQLEAudit 调用 SQLE 直接审核接口
func (sqlWorkbenchService *SqlWorkbenchService) callSQLEAudit(ctx context.Context, sql string, dbService *biz.DBService) (*cloudbeaver.AuditSQLReply, error) {
	// 获取 SQLE 服务地址
	target, err := sqlWorkbenchService.proxyTargetRepo.GetProxyTargetByName(ctx, _const.SqleComponentName)
	if err != nil {
		return nil, fmt.Errorf("failed to get SQLE proxy target: %v", err)
	}

	sqleAddr := fmt.Sprintf("%s/v2/sql_audit", target.URL.String())

	// 构建审核请求
	auditReq := cloudbeaver.AuditSQLReq{
		InstanceType:     dbService.DBType,
		SQLContent:       sql,
		SQLType:          "sql",
		ProjectId:        dbService.ProjectUID,
		InstanceName:     dbService.Name,

@github-actions
Copy link

PR Code Suggestions ✨

No code suggestions found for the PR.

@winfredLIN winfredLIN self-assigned this Jan 15, 2026
@LordofAvernus LordofAvernus merged commit 1708548 into main Jan 15, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants