Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "rda_python_common"
version = "1.0.18"
version = "1.0.20"
authors = [
{ name="Zaihua Ji", email="zji@ucar.edu" },
]
Expand All @@ -18,7 +18,8 @@ classifiers = [
"Development Status :: 5 - Production/Stable",
]
dependencies = [
"psycopg2-binary"
"psycopg2-binary",
"rda-python-globus @ git+https://github.com/NCAR/rda-python-globus.git@main",
]

[project.urls]
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ packaging==24.2
pluggy==1.5.0
psycopg2-binary==2.9.10
pytest==8.3.5
rda-python-globus @ git+https://github.com/NCAR/rda-python-globus.git@main

51 changes: 33 additions & 18 deletions src/rda_python_common/PgFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
BHOST = PgLOG.PGLOG['BACKUPNM']
DHOST = PgLOG.PGLOG['DRDATANM']
OBJCTCMD = "isd_s3_cli"
BACKCMD = "dsglobus"
BACKCMD = "dsglobus"

HLIMIT = 0 # HTAR file count limit
BLIMIT = 2 # minimum back tar file size in DB
Expand Down Expand Up @@ -338,17 +338,28 @@ def local_copy_object(tofile, fromfile, bucket = None, meta = None, logact = 0):
def quasar_multiple_trasnfer(tofiles, fromfiles, topoint, frompoint, logact = 0):

ret = PgLOG.FAILURE
qstr = '{"action":"transfer","label":"%s","verify_checksum":true,' % ENDPOINTS[topoint]
# qstr = '{"action":"transfer","label":"%s",' % ENDPOINTS[topoint]
qstr += '"source_endpoint":"%s","destination_endpoint":"%s","files":[\n' % (frompoint, topoint)

fcnt = len(fromfiles)
bstr = ''
transfer_files = {"files": []}
for i in range(fcnt):
qstr += '%s{"source_file":"%s","destination_file":"%s"}' % (bstr, fromfiles[i], tofiles[i])
if i == 0: bstr = ',\n'
qstr += ']}'

task = submit_globus_task(BACKCMD, topoint, logact, qstr)
transfer_files["files"].append({
"source_file": fromfiles[i],
"destination_file": tofiles[i]
})
qstr = json.dumps(transfer_files)

action = 'transfer'
source_endpoint = frompoint
destination_endpoint = topoint
label = f"{ENDPOINTS[frompoint]} to {ENDPOINTS[topoint]} {action}"
verify_checksum = True

cmd = f'{BACKCMD} {action} -se {source_endpoint} -de {destination_endpoint} --label "{label}"'
if verify_checksum:
cmd += ' -vc'
cmd += ' --batch -'

task = submit_globus_task(cmd, topoint, logact, qstr)
if task['stat'] == 'S':
ret = PgLOG.SUCCESS
elif task['stat'] == 'A':
Expand Down Expand Up @@ -379,7 +390,9 @@ def endpoint_copy_endpoint(tofile, fromfile, topoint, frompoint, logact = 0):
if tinfo and tinfo['data_size'] > 0:
return PgLOG.pglog("{}-{}: file exists already".format(topoint, tofile), logact)

cmd = "{} -t -vc -se {} -de {} -sf {} -df {}".format(BACKCMD, frompoint, topoint, fromfile, tofile)
action = 'transfer'
cmd = f'{BACKCMD} {action} -se {frompoint} -de {topoint} -sf {fromfile} -df {tofile} -vc'

task = submit_globus_task(cmd, topoint, logact)
if task['stat'] == 'S':
ret = PgLOG.SUCCESS
Expand Down Expand Up @@ -435,7 +448,8 @@ def check_globus_status(taskid, endpoint = None, logact = 0):
if not taskid: return ret
if not endpoint: endpoint = PgLOG.PGLOG['BACKUPEP']
mp = r'Status:\s+({})'.format('|'.join(QSTATS.values()))
cmd = "{} -gt --task-id {}".format(BACKCMD, taskid)

cmd = f"{BACKCMD} get-task {taskid}"
astats = ['OK', 'Queued']

for loop in range(2):
Expand All @@ -452,7 +466,7 @@ def check_globus_status(taskid, endpoint = None, logact = 0):
if logact&PgLOG.NOWAIT:
errmsg = "{}: Cancel Task due to {}:\n{}".format(taskid, detail, buf)
errlog(errmsg, 'B', 1, logact)
ccmd = "{} -ct --task-id {}".format(BACKCMD, taskid)
ccmd = f"{BACKCMD} cancel-task {taskid}"
PgLOG.pgsystem(ccmd, logact, 7)
else:
time.sleep(PgSIG.PGSIG['ETIME'])
Expand Down Expand Up @@ -759,7 +773,7 @@ def delete_backup_file(file, endpoint = None, logact = 0):
info = check_backup_file(file, endpoint, 0, logact)
if not info: return PgLOG.FAILURE

cmd = "{} -d -ep {} -tf {}".format(BACKCMD, endpoint, file)
cmd = f"{BACKCMD} delete -ep {endpoint} -tf {file}"
task = submit_globus_task(cmd, endpoint, logact)
if task['stat'] == 'S':
return PgLOG.SUCCESS
Expand Down Expand Up @@ -1028,7 +1042,7 @@ def move_backup_file(tofile, fromfile, endpoint = None, logact = 0):
elif tinfo != None:
return ret

cmd = "{} --rename -ep {} --oldpath {} --newpath {}".format(BACKCMD, endpoint, fromfile, tofile)
cmd = f"{BACKCMD} rename -ep {endpoint} --old-path {fromfile} --new-path {tofile}"
loop = 0
while loop < 2:
buf = PgLOG.pgsystem(cmd, logact, CMDRET)
Expand Down Expand Up @@ -1158,7 +1172,7 @@ def make_one_backup_directory(dir, odir, endpoint = None, logact = 0):
if not odir: odir = dir
if not make_one_backup_directory(op.dirname(dir), odir, endpoint, logact): return PgLOG.FAILURE

cmd = "{} --mkdir -ep {} -p {}".format(BACKCMD, endpoint, dir)
cmd = f"{BACKCMD} mkdir -ep {endpoint} -p {dir}"
for loop in range(2):
buf = PgLOG.pgsystem(cmd, logact, CMDRET)
syserr = PgLOG.PGLOG['SYSERR']
Expand Down Expand Up @@ -1843,7 +1857,7 @@ def check_backup_file(file, endpoint = None, opt = 0, logact = 0):
if not endpoint: endpoint = PgLOG.PGLOG['BACKUPEP']
bdir = op.dirname(file)
bfile = op.basename(file)
cmd = "{} -ls -ep {} -p {} --filter {}".format(BACKCMD, endpoint, bdir, bfile)
cmd = f"{BACKCMD} ls -ep {endpoint} -p {bdir} --filter {bfile}"
ccnt = loop = 0
while loop < 2:
buf = PgLOG.pgsystem(cmd, logact, CMDRET)
Expand Down Expand Up @@ -2208,7 +2222,8 @@ def backup_glob(dir, endpoint = None, opt = 0, logact = 0):

if not dir: return None
if not endpoint: endpoint = PgLOG.PGLOG['BACKUPEP']
cmd = "{} -ls -ep {} -p {}".format(BACKCMD, endpoint, dir)

cmd = f"{BACKCMD} ls -ep {endpoint} -p {dir}"
flist = {}
for loop in range(2):
buf = PgLOG.pgsystem(cmd, logact, CMDRET)
Expand Down