Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 124 additions & 24 deletions WaterNetworkAnalysis/WaterNetworkAnalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def extract_waters_from_trajectory(
HW: str | None = None,
extract_only_O: bool = False,
save_file: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
) -> np.ndarray | tuple[np.ndarray, np.ndarray]:
"""Extract waters for clustering analysis.

Calculates water (oxygen and hydrogen) coordinates for all the
Expand Down Expand Up @@ -425,7 +425,7 @@ def extract_waters_from_trajectory(
be saved. If none doesn't save to a file. Defaults to None.

Returns:
tuple[np.ndarray, np.ndarray]:
np.ndarray or tuple[np.ndarray, np.ndarray]:
returns xyz numpy arrays that contain coordinates of oxygens,
and combined array of hydrogen 1 and hydrogen 2 coordinates.
If ``extract_only_O`` is True, returns only oxygen coordinates.
Expand Down Expand Up @@ -467,29 +467,106 @@ def extract_waters_from_trajectory(
if extract_only_O is False:
coordsH = []
coordsO = []
# loop over
for _ in u.trajectory[::every]:
oxygens = u.select_atoms(
(
f"{oxygen_selection} and {water_selection} and point "
f"{selection_center[0]} {selection_center[1]} {selection_center[2]}"
f" {dist}"
),
)
for oxygen in oxygens:
coordsO.append(oxygen.position)
if extract_only_O is False:
hydrogens = u.select_atoms(
f"resid {oxygen.resid} and ({hydrogen_selection})"
)
if len(hydrogens) != 2:
exception_string: str = (
f"Water {oxygen.resid} has too many"
f" hydrogens ({len(hydrogens)})."
mismatch_detected = False
if is_pdb_file(trajectory):
# Detect if any frame has a different number of atoms
expected_n_atoms = len(u.atoms)
try:
for ts in u.trajectory:
if ts.n_atoms != expected_n_atoms:
mismatch_detected = True
break
except ValueError:
mismatch_detected = True
if mismatch_detected:
model_counter = 0
inside_model = False
buffer_lines = []

with open(trajectory) as f:
for line in f:
# Start of a new MODEL block
if line.startswith("MODEL"):
if model_counter % every == 0:
inside_model = True
model_counter += 1
buffer_lines = [line]
continue

# End of that MODEL block
if inside_model and line.startswith("ENDMDL"):
buffer_lines.append(line)
# We have collected a full MODEL … ENDMDL block in buffer_lines

# Write it to a temp PDB file
frame_name = f"_temp_frame_{model_counter:03d}.pdb"
with open(frame_name, "w") as tmpf:
tmpf.writelines(buffer_lines)
tmpf.close()

# Load and process that single frame PDB
u_frame = mda.Universe(frame_name)
oxygens = u_frame.select_atoms(
f"{oxygen_selection} and {water_selection} and point "
f"{selection_center[0]} {selection_center[1]} "
f"{selection_center[2]} {dist}"
)
for oxygen in oxygens:
coordsO.append(oxygen.position.copy())
if not extract_only_O:
hydrogens = u_frame.select_atoms(
f"resid {oxygen.resid} and "
f"({hydrogen_selection}) and "
f"({water_selection})"
)
if len(hydrogens) != 2:
msg = (
f"Water {oxygen.resid} has too many hydrogens "
f"({len(hydrogens)}) in frame {model_counter}."
)
os.remove(frame_name)
raise Exception(msg)
for hpos in hydrogens.positions:
coordsH.append(hpos.copy())

# Delete the temp file before reading the next model
os.remove(frame_name)

# Reset for next MODEL block
inside_model = False
buffer_lines = []
continue

# If we are inside a MODEL…ENDMDL block, keep adding lines
if inside_model:
buffer_lines.append(line)

else:
# loop over
for _ in u.trajectory[::every]:
oxygens = u.select_atoms(
(
f"{oxygen_selection} and {water_selection} and point "
f"{selection_center[0]} {selection_center[1]} {selection_center[2]}"
f" {dist}"
),
)
for oxygen in oxygens:
coordsO.append(oxygen.position)
if extract_only_O is False:
hydrogens = u.select_atoms(
f"resid {oxygen.resid} and "
f"({hydrogen_selection}) and "
f"({water_selection})"
)
raise Exception(exception_string)
for hydrogen_positions in hydrogens.positions:
coordsH.append(hydrogen_positions)
if len(hydrogens) != 2:
exception_string: str = (
f"Water {oxygen.resid} has too many"
f" hydrogens ({len(hydrogens)})."
)
raise Exception(exception_string)
for hydrogen_positions in hydrogens.positions:
coordsH.append(hydrogen_positions)
Odata: np.ndarray = np.asarray(coordsO)
if extract_only_O is False:
coordsH = np.asarray(coordsH)
Expand Down Expand Up @@ -987,3 +1064,26 @@ def read_results_and_make_pdb(
output_fname=output_fname,
mode=mode,
)


def is_pdb_file(filepath: str) -> bool:
"""
Return True if `filepath` is probably a PDB (by extension or by peeking at contents).
If it’s binary (like XTC/DCD), this will return False safely.
"""
ext = os.path.splitext(filepath)[1].lower()
if ext in {".pdb", ".ent", ".pdb1", ".pdb2"}:
return True

# If extension isn't a known PDB, peek at first few lines:
try:
with open(filepath, "r", encoding="utf-8") as f:
for _ in range(10):
line = next(f)
if line.startswith(("ATOM ", "HETATM", "HEADER", "TITLE")):
return True
except (UnicodeDecodeError, IOError, StopIteration):
# If it fails to decode (binary), or can't open, it’s not a PDB.
return False

return False
Loading