Skip to content
Open
26 changes: 12 additions & 14 deletions fireworks/core/launchpad.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from collections import defaultdict
from itertools import chain


import gridfs
from bson import ObjectId
from monty.os.path import zpath
Expand All @@ -20,6 +21,8 @@
from tqdm import tqdm

from fireworks.core.firework import Firework, FWAction, Launch, Tracker, Workflow
from fireworks.utilities import fw_id_from_reservation_id
from fireworks.utilities import reservation_id_from_fw_id
from fireworks.fw_config import MongoClient
from fireworks.fw_config import (
GRIDFS_FALLBACK_COLLECTION,
Expand Down Expand Up @@ -1192,18 +1195,17 @@ def reserve_fw(self, fworker, launch_dir, host=None, ip=None, fw_id=None):
"""
return self.checkout_fw(fworker, launch_dir, host=host, ip=ip, fw_id=fw_id, state="RESERVED")

def get_fw_ids_from_reservation_id(self, reservation_id):
def get_fw_id_from_reservation_id(self, reservation_id):
"""
Given the reservation id, return the list of firework ids.

Args:
reservation_id (int)

Returns:
[int]: list of firework ids.
[int]: Return the firework id.
"""
l_id = self.launches.find_one({"state_history.reservation_id": reservation_id}, {"launch_id": 1})["launch_id"]
return [fw["fw_id"] for fw in self.fireworks.find({"launches": l_id}, {"fw_id": 1})]
fw_id=fw_id_from_reservation_id.get_fwid(reservation_id)

return fw_id

def cancel_reservation_by_reservation_id(self, reservation_id) -> None:
"""Given the reservation id, cancel the reservation and rerun the corresponding fireworks."""
Expand All @@ -1217,14 +1219,10 @@ def cancel_reservation_by_reservation_id(self, reservation_id) -> None:

def get_reservation_id_from_fw_id(self, fw_id):
"""Given the firework id, return the reservation id."""
fw = self.fireworks.find_one({"fw_id": fw_id}, {"launches": 1})
if fw:
for launch in self.launches.find({"launch_id": {"$in": fw["launches"]}}, {"state_history": 1}):
for d in launch["state_history"]:
if "reservation_id" in d:
return d["reservation_id"]
return None
return None
jobid=reservation_id_from_fw_id.main(fw_id)
if jobid==None:
print('No matching fw_id-JobID pair. The firework may be a lost run')
return jobid

def cancel_reservation(self, launch_id) -> None:
"""Given the launch id, cancel the reservation and rerun the fireworks."""
Expand Down
169 changes: 169 additions & 0 deletions fireworks/utilities/fw_id_from_reservation_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import os
import subprocess
import json
import paramiko
import getpass
import re

# Function to execute local shell commands and return the output
def execute_command(command):
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
#raise Exception(f"Command failed: {command}\n{result.stderr}")
raise Exception('Running fireworks locally')
ssh=None
return result.stdout.strip(),ssh
except Exception as e:
result,ssh=ssh_login(command)
print(e)
return result,ssh


def extract_username_hostname(input_string):
# Define the regex pattern
pattern = r'(?P<username>[^@]+)@(?P<hostname>.+)'

# Search for the pattern in the input string
match = re.match(pattern, input_string)

if match:
# Extract username and hostname from named groups
username = match.group('username')
hostname = match.group('hostname')
return username, hostname
else:
raise ValueError("The input does not match the required format 'username@hostname'.")

# Get user input

# SSH login and execute remote command
def ssh_login(command):
input_string = input("Enter username@hostname: ").strip()
username, hostname = extract_username_hostname(input_string)
password = getpass.getpass('Enter password+OTP: ')

# Create an SSH client
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

try:
# Connect to the server
ssh.connect(hostname, username=username, password=password)
# Execute the command
stdin, stdout, stderr = ssh.exec_command(command)
output = stdout.read().decode('utf-8').strip()
errors = stderr.read().decode('utf-8').strip()

if errors:
raise Exception(f"Command failed: {command}\n{errors}")

except Exception as e:
print(e)
return output, ssh


def get_fwid(jobid):
job_info,ssh = execute_command(f"scontrol show jobid {jobid}")
if ssh !=None:
fw_id=find_worker(job_info,ssh)
ssh.close()
else:
fw_id=find_worker(job_info,ssh)

return fw_id


def find_worker(job_info, ssh):
stdout_dir = ""
for line in job_info.splitlines():
if "StdOut=" in line:
stdout_dir = line.split("=", 1)[1]
break

if not stdout_dir:
print("StdOut path not found in job information")
return

base_dir = os.path.dirname(stdout_dir)

if ssh!=None:
# Change directory to the base directory on the remote server
stdin, stdout, stderr = ssh.exec_command(f"cd {base_dir} && pwd")
current_dir = stdout.read().decode('utf-8').strip()
errors = stderr.read().decode('utf-8').strip()
if errors:
raise Exception(f"Failed to change directory: {errors}")

print(f"Changed directory to: {current_dir}")

stdin, stdout, stderr = ssh.exec_command(f"find {current_dir} -type d -name 'launcher_*'")
launch_dirs = stdout.read().decode('utf-8').splitlines()
errors = stderr.read().decode('utf-8').strip()
if errors:
raise Exception(f"Failed to find launch directories: {errors}")

largest_dir = max(launch_dirs, key=lambda d: d.split('_')[-1])

# Change to the largest directory
stdin, stdout, stderr = ssh.exec_command(f"cd {largest_dir} && pwd")
final_dir = stdout.read().decode('utf-8').strip()
errors = stderr.read().decode('utf-8').strip()
if errors:
raise Exception(f"Failed to change directory to {largest_dir}: {errors}")

print(f"Changed directory to: {final_dir}")

# Check for the JSON file in the directory
stdin, stdout, stderr = ssh.exec_command(f"cat {final_dir}/FW.json")
json_data = stdout.read().decode('utf-8').strip()
errors = stderr.read().decode('utf-8').strip()
if errors:
raise Exception(f"Failed to read FW.json: {errors}")

data = json.loads(json_data)
spec_mpid = data.get('spec', {}).get('MPID', 'N/A')
fw_id = data.get('fw_id', 'N/A')

print(f"spec.MPID: {spec_mpid}")
print(f"fw_id: {fw_id}")
else:
# Change directory to the extracted base directory
try:
os.chdir(base_dir)
except OSError:
print(f"Failed to change directory to {base_dir}")
exit(1)

# Print the current directory to confirm
print(f"Changed directory to: {os.getcwd()}")

# Find the largest directory with the pattern "launcher_*"
launch_dirs = subprocess.check_output(f"find {os.getcwd()} -type d -name 'launcher_*'", shell=True).decode().splitlines()
largest_dir = max(launch_dirs, key=lambda d: d.split('_')[-1])

try:
os.chdir(largest_dir)
except OSError:
print(f"Failed to change directory to {largest_dir}")
exit(1)

print(f"Changed directory to: {os.getcwd()}")

json_file = os.path.join(os.getcwd(), "FW.json")

# Check if the JSON file exists
if os.path.isfile(json_file):
with open(json_file, 'r') as f:
data = json.load(f)
spec_mpid = data.get('spec', {}).get('MPID', 'N/A')
fw_id = data.get('fw_id', 'N/A')

# Output the extracted values
print(f"spec.MPID: {spec_mpid}")
print(f"fw_id: {fw_id}")
else:
print(f"FW.json not found in {largest_dir}")

return fw_id
return fw_id
Loading