Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 100 additions & 91 deletions backend/apps/db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from apps.datasource.utils.utils import aes_decrypt
from apps.db.constant import DB, ConnectType
from apps.db.engine import get_engine_config
from apps.system.crud.assistant import get_ds_engine
from apps.system.crud.assistant import get_out_ds_conf
from apps.system.schemas.system_schema import AssistantOutDsSchema
from common.core.deps import Trans
from common.utils.utils import SQLBotLogUtil, equals_ignore_case
Expand Down Expand Up @@ -146,92 +146,25 @@ def get_engine(ds: CoreDatasource, timeout: int = 0) -> Engine:


def get_session(ds: CoreDatasource | AssistantOutDsSchema):
engine = get_engine(ds) if isinstance(ds, CoreDatasource) else get_ds_engine(ds)
# engine = get_engine(ds) if isinstance(ds, CoreDatasource) else get_ds_engine(ds)
if isinstance(ds, AssistantOutDsSchema):
out_conf = get_out_ds_conf(ds, 30)
ds.configuration = out_conf

engine = get_engine(ds)
session_maker = sessionmaker(bind=engine)
session = session_maker()
return session


def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDsSchema, is_raise: bool = False):
if isinstance(ds, CoreDatasource):
db = DB.get_db(ds.type)
if db.connect_type == ConnectType.sqlalchemy:
conn = get_engine(ds, 10)
try:
with conn.connect() as connection:
SQLBotLogUtil.info("success")
return True
except Exception as e:
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
if is_raise:
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
return False
else:
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
extra_config_dict = get_extra_config(conf)
if equals_ignore_case(ds.type, 'dm'):
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor:
try:
cursor.execute('select 1', timeout=10).fetchall()
SQLBotLogUtil.info("success")
return True
except Exception as e:
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
if is_raise:
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
return False
elif equals_ignore_case(ds.type, 'doris', 'starrocks'):
with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host,
port=conf.port, db=conf.database, connect_timeout=10,
read_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
try:
cursor.execute('select 1')
SQLBotLogUtil.info("success")
return True
except Exception as e:
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
if is_raise:
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
return False
elif equals_ignore_case(ds.type, 'redshift'):
with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database,
user=conf.username,
password=conf.password,
timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
try:
cursor.execute('select 1')
SQLBotLogUtil.info("success")
return True
except Exception as e:
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
if is_raise:
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
return False
elif equals_ignore_case(ds.type, 'kingbase'):
with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database,
user=conf.username,
password=conf.password,
connect_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
try:
cursor.execute('select 1')
SQLBotLogUtil.info("success")
return True
except Exception as e:
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
if is_raise:
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
return False
elif equals_ignore_case(ds.type, 'es'):
es_conn = get_es_connect(conf)
if es_conn.ping():
SQLBotLogUtil.info("success")
return True
else:
SQLBotLogUtil.info("failed")
return False
else:
conn = get_ds_engine(ds)
if isinstance(ds, AssistantOutDsSchema):
out_conf = get_out_ds_conf(ds, 10)
ds.configuration = out_conf

db = DB.get_db(ds.type)
if db.connect_type == ConnectType.sqlalchemy:
conn = get_engine(ds, 10)
try:
with conn.connect() as connection:
SQLBotLogUtil.info("success")
Expand All @@ -241,26 +174,102 @@ def check_connection(trans: Optional[Trans], ds: CoreDatasource | AssistantOutDs
if is_raise:
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
return False
else:
conf = DatasourceConf(**json.loads(aes_decrypt(ds.configuration)))
extra_config_dict = get_extra_config(conf)
if equals_ignore_case(ds.type, 'dm'):
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor:
try:
cursor.execute('select 1', timeout=10).fetchall()
SQLBotLogUtil.info("success")
return True
except Exception as e:
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
if is_raise:
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
return False
elif equals_ignore_case(ds.type, 'doris', 'starrocks'):
with pymysql.connect(user=conf.username, passwd=conf.password, host=conf.host,
port=conf.port, db=conf.database, connect_timeout=10,
read_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
try:
cursor.execute('select 1')
SQLBotLogUtil.info("success")
return True
except Exception as e:
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
if is_raise:
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
return False
elif equals_ignore_case(ds.type, 'redshift'):
with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database,
user=conf.username,
password=conf.password,
timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
try:
cursor.execute('select 1')
SQLBotLogUtil.info("success")
return True
except Exception as e:
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
if is_raise:
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
return False
elif equals_ignore_case(ds.type, 'kingbase'):
with psycopg2.connect(host=conf.host, port=conf.port, database=conf.database,
user=conf.username,
password=conf.password,
connect_timeout=10, **extra_config_dict) as conn, conn.cursor() as cursor:
try:
cursor.execute('select 1')
SQLBotLogUtil.info("success")
return True
except Exception as e:
SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
if is_raise:
raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
return False
elif equals_ignore_case(ds.type, 'es'):
es_conn = get_es_connect(conf)
if es_conn.ping():
SQLBotLogUtil.info("success")
return True
else:
SQLBotLogUtil.info("failed")
return False
# else:
# conn = get_ds_engine(ds)
# try:
# with conn.connect() as connection:
# SQLBotLogUtil.info("success")
# return True
# except Exception as e:
# SQLBotLogUtil.error(f"Datasource {ds.id} connection failed: {e}")
# if is_raise:
# raise HTTPException(status_code=500, detail=trans('i18n_ds_invalid') + f': {e.args}')
# return False

return False


def get_version(ds: CoreDatasource | AssistantOutDsSchema):
version = ''
conf = None
if isinstance(ds, CoreDatasource):
conf = DatasourceConf(
**json.loads(aes_decrypt(ds.configuration))) if not equals_ignore_case(ds.type,
"excel") else get_engine_config()
if isinstance(ds, AssistantOutDsSchema):
conf = DatasourceConf()
conf.host = ds.host
conf.port = ds.port
conf.username = ds.user
conf.password = ds.password
conf.database = ds.dataBase
conf.dbSchema = ds.db_schema
conf.timeout = 10
else:
conf = DatasourceConf(**json.loads(aes_decrypt(get_out_ds_conf(ds, 10))))
# if isinstance(ds, AssistantOutDsSchema):
# conf = DatasourceConf()
# conf.host = ds.host
# conf.port = ds.port
# conf.username = ds.user
# conf.password = ds.password
# conf.database = ds.dataBase
# conf.dbSchema = ds.db_schema
# conf.timeout = 10
db = DB.get_db(ds.type)
sql = get_version_sql(ds, conf)
try:
Expand Down
17 changes: 17 additions & 0 deletions backend/apps/system/crud/assistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# from apps.datasource.embedding.table_embedding import get_table_embedding
from apps.datasource.models.datasource import CoreDatasource, DatasourceConf
from apps.datasource.utils.utils import aes_encrypt
from apps.system.models.system_model import AssistantModel
from apps.system.schemas.auth import CacheName, CacheNamespace
from apps.system.schemas.system_schema import AssistantHeader, AssistantOutDsSchema, UserInfoDTO
Expand Down Expand Up @@ -266,3 +267,19 @@ def get_ds_engine(ds: AssistantOutDsSchema) -> Engine:
else:
engine = create_engine(uri, connect_args={"connect_timeout": timeout}, pool_timeout=timeout)
return engine


def get_out_ds_conf(ds: AssistantOutDsSchema, timeout:int=30) -> str:
conf = {
"host":ds.host,
"port":ds.port,
"username":ds.user,
"password":ds.password,
"database":ds.dataBase,
"driver":'',
"extraJdbc":ds.extraParams or '',
"dbSchema":ds.db_schema or '',
"timeout":timeout
}
conf.extraJdbc = ''
return aes_encrypt(json.dumps(conf))
1 change: 1 addition & 0 deletions backend/apps/system/schemas/system_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class AssistantOutDsBase(BaseModel):
type_name: Optional[str] = None
comment: Optional[str] = None
description: Optional[str] = None
configuration: Optional[str] = None


class AssistantOutDsSchema(AssistantOutDsBase):
Expand Down