Skip to content

Commit 1098001

Browse files
committed
Add remote file copy for cisco nxos devices
1 parent b21fb64 commit 1098001

1 file changed

Lines changed: 220 additions & 0 deletions

File tree

pyntc/devices/nxos_device.py

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
import re
55
import time
66

7+
from urllib.parse import urlparse
8+
from netmiko import ConnectHandler
9+
from pyntc.utils.models import FileCopyModel
10+
711
from pynxos.device import Device as NXOSNative
812
from pynxos.errors import CLIError
913
from pynxos.features.file_copy import FileTransferError as NXOSFileTransferError
@@ -300,6 +304,222 @@ def file_copy_remote_exists(self, src, dest=None, file_system="bootflash:"):
300304
)
301305
return self.native.file_copy_remote_exists(src, dest, file_system=file_system)
302306

307+
###
308+
# MM Begin
309+
###
310+
def check_file_exists(self, filename, file_system=None):
311+
"""Check if a remote file exists by filename.
312+
313+
Args:
314+
filename (str): The name of the file to check for on the remote device.
315+
file_system (str): Supported only for Arista. The file system for the
316+
remote file. If no file_system is provided, then the `get_file_system`
317+
method is used to determine the correct file system to use.
318+
319+
Returns:
320+
(bool): True if the remote file exists, False if it doesn't.
321+
322+
Raises:
323+
CommandError: If there is an error in executing the command to check if the file exists.
324+
"""
325+
exists = False
326+
327+
file_system = file_system or self._get_file_system()
328+
command = f"dir {file_system}/{filename}"
329+
result = self.native_ssh.send_command(command, read_timeout=30)
330+
331+
log.debug(
332+
"Host %s: Checking if file %s exists on remote with command '%s' and result: %s",
333+
self.host,
334+
filename,
335+
command,
336+
result,
337+
)
338+
339+
# Check for error patterns
340+
if re.search(r"% Error listing directory|No such file|No files found|Path does not exist", result):
341+
log.debug("Host %s: File %s does not exist on remote.", self.host, filename)
342+
exists = False
343+
elif re.search(rf"Directory of .*{filename}", result):
344+
log.debug("Host %s: File %s exists on remote.", self.host, filename)
345+
exists = True
346+
else:
347+
raise CommandError(command, f"Unable to determine if file {filename} exists on remote: {result}")
348+
349+
return exists
350+
351+
def get_remote_checksum(self, filename, hashing_algorithm="md5", **kwargs):
352+
# Validate hashing algorithm
353+
if hashing_algorithm not in {"md5", "sha512"}:
354+
log.error(
355+
"Host %s: Unsupported hashing algorithm %s. Supported algorithms are: md5, sha512",
356+
self.host,
357+
hashing_algorithm,
358+
)
359+
raise ValueError(f"hashing_algorithm must be either 'md5' or 'sha512', got '{hashing_algorithm}'")
360+
361+
# Determine file system
362+
if file_system is None:
363+
file_system = self._get_file_system()
364+
365+
# Normalize file_system
366+
if not file_system.startswith("/") and not file_system.endswith(":"):
367+
file_system = f"{file_system}:"
368+
369+
# Use NXOS verify command to get the checksum
370+
# Example: show file bootflash:nautobot.png sha512sum
371+
command = f"show file {file_system}/{filename} {hashing_algorithm}sum"
372+
373+
try:
374+
result = self.native_ssh.send_command(command, read_timeout=30)
375+
log.debug(
376+
"Host %s: Getting remote checksum for file %s with command '%s' and result: %s",
377+
self.host,
378+
filename,
379+
command,
380+
result,
381+
)
382+
383+
# Parse the output to extract the checksum
384+
match = re.search(r"=\s*([a-fA-F0-9]+)", result)
385+
if match:
386+
remote_checksum = match.group(1).lower()
387+
log.debug("Host %s: Remote checksum for %s: %s", self.host, filename, remote_checksum)
388+
return remote_checksum
389+
390+
log.error("Host %s: Could not parse checksum from output: %s", self.host, result)
391+
raise CommandError(command, f"Could not parse checksum from output: {result}")
392+
393+
except Exception as e:
394+
log.error("Host %s: Error getting remote checksum: %s", self.host, str(e))
395+
raise CommandError(command, f"Error getting remote checksum: {str(e)}")
396+
397+
def remote_file_copy(self, src: FileCopyModel, dest=None, file_system=None, **kwargs):
398+
"""Copy a file from remote source to device.
399+
400+
Args:
401+
src (FileCopyModel): The source file model with transfer parameters.
402+
dest (str): Destination filename (defaults to src.file_name).
403+
file_system (str): Device filesystem (auto-detected if not provided).
404+
**kwargs (Any): Passible parameters such as file_system.
405+
406+
Raises:
407+
TypeError: If src is not a FileCopyModel.
408+
FileTransferError: If transfer or verification fails.
409+
FileSystemNotFoundError: If filesystem cannot be determined.
410+
"""
411+
# Validate input
412+
if not isinstance(src, FileCopyModel):
413+
raise TypeError("src must be an instance of FileCopyModel")
414+
415+
# Determine file system
416+
if file_system is None:
417+
file_system = self._get_file_system()
418+
419+
# Determine destination
420+
if dest is None:
421+
dest = src.file_name
422+
423+
# Validate scheme
424+
supported_schemes = ["http", "https", "scp", "ftp", "sftp", "tftp"]
425+
if src.scheme not in supported_schemes:
426+
raise ValueError(f"Unsupported scheme: {src.scheme}")
427+
428+
log.debug(
429+
"Host %s: Verifying file %s exists before attempting a copy",
430+
self.host,
431+
dest,
432+
)
433+
if not self.verify_file(src.checksum, dest, hashing_algorithm=src.hashing_algorithm, file_system=file_system):
434+
current_prompt = self.native.find_prompt()
435+
prompt_answers = {
436+
r"Password": src.token or "",
437+
r"Source username": src.username or "",
438+
r"yes/no|Are you sure you want to continue connecting": "yes",
439+
r"(confirm|Address or name of remote host|Source filename|Destination filename)": "",
440+
}
441+
keys = list(prompt_answers.keys()) + [re.escape(current_prompt)]
442+
expect_regex = f"({'|'.join(keys)})"
443+
444+
# Construct copy command
445+
cmd = f"copy {src.clean_url} {file_system}"
446+
if src.vrf:
447+
cmd += f" vrf {src.vrf}"
448+
449+
log.debug("Host %s: Starting remote file copy for %s to %s/%s", self.host, src.file_name, file_system, dest)
450+
try:
451+
result = self.native_netmiko.send_command_timing(cmd, read_timeout=src.timeout)
452+
except Exception as e:
453+
log.error(
454+
"Host %s: File transfer error for file %s: %s",
455+
self.host,
456+
dest,
457+
str(e),
458+
)
459+
raise FileTransferError(str(e))
460+
461+
log.debug(
462+
"Host %s: Copy command output: %s",
463+
self.host,
464+
result,
465+
)
466+
467+
# Check for success indicators
468+
if "copy complete" not in result.lower():
469+
log.error(
470+
"Host %s: File transfer did not complete successfully for file %s",
471+
self.host,
472+
dest,
473+
)
474+
raise FileTransferError("Copy command did not complete successfully")
475+
476+
# Verify file after transfer
477+
if not self.verify_file(src.checksum, dest, hashing_algorithm=src.hashing_algorithm, file_system=file_system):
478+
log.error(
479+
"Host %s: File verification failed after transfer for file %s",
480+
self.host,
481+
dest,
482+
)
483+
raise FileTransferError("File verification failed after transfer")
484+
485+
log.info(
486+
"Host %s: File %s transferred successfully.",
487+
self.host,
488+
dest,
489+
)
490+
491+
def verify_file(self, checksum, filename, hashing_algorithm="md5", **kwargs):
492+
"""Verify a file on the device by comparing checksums.
493+
494+
Args:
495+
checksum (str): The expected checksum of the file.
496+
filename (str): The name of the file on the device.
497+
hashing_algorithm (str): The hashing algorithm to use (default: "md5").
498+
**kwargs (Any): Passible parameters such as file_system.
499+
500+
Returns:
501+
(bool): True if the file is verified successfully, False otherwise.
502+
"""
503+
exists = self.check_file_exists(filename, **kwargs)
504+
device_checksum = (
505+
self.get_remote_checksum(filename, hashing_algorithm=hashing_algorithm, **kwargs) if exists else None
506+
)
507+
if checksum == device_checksum:
508+
log.debug("Host %s: Checksum verification successful for file %s", self.host, filename)
509+
return True
510+
511+
log.debug(
512+
"Host %s: Checksum verification failed for file %s - Expected: %s, Actual: %s",
513+
self.host,
514+
filename,
515+
checksum,
516+
device_checksum,
517+
)
518+
return False
519+
###
520+
# MM End
521+
###
522+
303523
def install_os(self, image_name, **vendor_specifics):
304524
"""Upgrade device with provided image.
305525

0 commit comments

Comments
 (0)