Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 133 additions & 1 deletion backend/apps/datasource/api/datasource.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import asyncio
import hashlib
import io
import os
import traceback
import uuid
from io import StringIO
from typing import List
from urllib.parse import quote

import orjson
import pandas as pd
from fastapi import APIRouter, File, UploadFile, HTTPException, Path
from fastapi.responses import StreamingResponse
from sqlalchemy import and_

from apps.db.db import get_schema
from apps.db.engine import get_engine_conn
Expand Down Expand Up @@ -81,7 +85,6 @@ def inner():
await asyncio.to_thread(inner)



@router.post("/update", response_model=CoreDatasource, summary=f"{PLACEHOLDER_PREFIX}ds_update")
@require_permissions(permission=SqlbotPermission(type='ds', keyExpression="ds.id"))
async def update(session: SessionDep, trans: Trans, user: CurrentUser, ds: CoreDatasource):
Expand Down Expand Up @@ -360,3 +363,132 @@ def insert_pg(df, tableName, engine):
finally:
cursor.close()
conn.close()


t_sheet = "数据表列表"
t_n_col = "表名"
t_c_col = "表备注"
f_n_col = "字段名"
f_c_col = "字段备注"


@router.get("/exportDsSchema/{id}")
async def export_ds_schema(session: SessionDep, id: int = Path(..., description=f"{PLACEHOLDER_PREFIX}ds_id")):
# {
# 'sheet':'', sheet name
# 'c1_h':'', column1 column name
# 'c2_h':'', column2 column name
# 'c1':[], column1 data
# 'c2':[], column2 data
# }
def inner():
if id == 0: # download template
file_name = '批量上传备注'
df_list = [
{'sheet': t_sheet, 'c1_h': t_n_col, 'c2_h': t_c_col, 'c1': ["user", "score"],
'c2': ["用来存放用户信息的数据表", "用来存放用户课程信息的数据表"]},
{'sheet': '数据表1', 'c1_h': f_n_col, 'c2_h': f_c_col, 'c1': ["id", "name"],
'c2': ["用户id", "用户姓名"]},
{'sheet': '数据表2', 'c1_h': f_n_col, 'c2_h': f_c_col, 'c1': ["course", "user_id", "score"],
'c2': ["课程名称", "用户ID", "课程得分"]},
]
else:
ds = session.query(CoreDatasource).filter(CoreDatasource.id == id).first()
file_name = ds.name
tables = session.query(CoreTable).filter(CoreTable.ds_id == id).all()
if len(tables) == 0:
raise HTTPException(400, "No tables")

df_list = []
df1 = {'sheet': t_sheet, 'c1_h': t_n_col, 'c2_h': t_c_col, 'c1': [], 'c2': []}
df_list.append(df1)
for table in tables:
df1['c1'].append(table.table_name)
df1['c2'].append(table.custom_comment)

fields = session.query(CoreField).filter(CoreField.table_id == table.id).all()
df_fields = {'sheet': table.table_name, 'c1_h': f_n_col, 'c2_h': f_c_col, 'c1': [], 'c2': []}
for field in fields:
df_fields['c1'].append(field.field_name)
df_fields['c2'].append(field.custom_comment)
df_list.append(df_fields)

# build dataframe and export
output = io.BytesIO()

with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
for df in df_list:
pd.DataFrame({df['c1_h']: df['c1'], df['c2_h']: df['c2']}).to_excel(writer, sheet_name=df['sheet'],
index=False)

output.seek(0)

filename = f'{file_name}.xlsx'
encoded_filename = quote(filename)
return io.BytesIO(output.getvalue())

# headers = {
# 'Content-Disposition': f"attachment; filename*=UTF-8''{encoded_filename}"
# }

result = await asyncio.to_thread(inner)
return StreamingResponse(
result,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)


@router.post("/uploadDsSchema/{id}")
async def upload_ds_schema(session: SessionDep, id: int = Path(..., description=f"{PLACEHOLDER_PREFIX}ds_id"),
file: UploadFile = File(...)):
ALLOWED_EXTENSIONS = {"xlsx", "xls"}
if not file.filename.lower().endswith(tuple(ALLOWED_EXTENSIONS)):
raise HTTPException(400, "Only support .xlsx/.xls")

try:
contents = await file.read()
excel_file = io.BytesIO(contents)

sheet_names = pd.ExcelFile(excel_file, engine="openpyxl").sheet_names

excel_file.seek(0)

field_sheets = []
table_sheet = None # []
for sheet in sheet_names:
df = pd.read_excel(excel_file, sheet_name=sheet, engine="openpyxl")
if sheet == t_sheet:
table_sheet = df.where(pd.notnull(df), None).to_dict(orient="records")
else:
field_sheets.append(
{'sheet_name': sheet, 'data': df.where(pd.notnull(df), None).to_dict(orient="records")})

# print(field_sheets)

# get data and update
# update table comment
if table_sheet and len(table_sheet) > 0:
for table in table_sheet:
session.query(CoreTable).filter(
and_(CoreTable.ds_id == id, CoreTable.table_name == table[t_n_col])).update(
{'custom_comment': table[t_c_col]})

# update field comment
if field_sheets and len(field_sheets) > 0:
for fields in field_sheets:
if len(fields['data']) > 0:
# get table id
table = session.query(CoreTable).filter(
and_(CoreTable.ds_id == id, CoreTable.table_name == fields['sheet_name'])).first()
if table:
for field in fields['data']:
session.query(CoreField).filter(
and_(CoreField.ds_id == id,
CoreField.table_id == table.id,
CoreField.field_name == field[f_n_col])).update(
{'custom_comment': field[f_c_col]})
session.commit()

return True
except Exception as e:
raise HTTPException(status_code=500, detail=f"解析 Excel 失败: {str(e)}")