Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions libsubmit/channels/channel_base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from abc import ABCMeta, abstractmethod, abstractproperty
import six


class Channel(metaclass=ABCMeta):
@six.add_metaclass(ABCMeta)
class Channel():
""" Define the interface to all channels. Channels are usually called via the execute_wait function.
For channels that execute remotely, a push_file function allows you to copy over files.

Expand Down
14 changes: 7 additions & 7 deletions libsubmit/channels/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class BadHostKeyException(ChannelError):
'''

def __init__(self, e, hostname):
super().__init__()
super(BadHostKeyException, self).__init__()
self.reason = "SSH channel could not be created since server's host keys could not be verified"
self.hostname = hostname
self.e = e
Expand All @@ -41,7 +41,7 @@ class BadScriptPath(ChannelError):
'''

def __init__(self, e, hostname):
super().__init__()
super(BadScriptPath, self).__init__()
self.reason = "Inaccessible remote script dir. Specify script_dir"
self.hostname = hostname
self.e = e
Expand All @@ -57,7 +57,7 @@ class BadPermsScriptPath(ChannelError):
'''

def __init__(self, e, hostname):
super().__init__()
super(BadPermsScriptPath, self).__init__()
self.reason = "User does not have permissions to access the script_dir"
self.hostname = hostname
self.e = e
Expand All @@ -74,7 +74,7 @@ class FileExists(ChannelError):
'''

def __init__(self, e, hostname, filename=None):
super().__init__()
super(FileExists, self).__init__()
self.reason = "File name collision in channel transport phase:" + filename
self.hostname = hostname
self.e = e
Expand All @@ -90,7 +90,7 @@ class AuthException(ChannelError):
'''

def __init__(self, e, hostname):
super().__init__()
super(AuthException, self).__init__()
self.reason = "Authentication to remote server failed"
self.hostname = hostname
self.e = e
Expand All @@ -106,7 +106,7 @@ class SSHException(ChannelError):
'''

def __init__(self, e, hostname):
super().__init__()
super(SSHException, self).__init__()
self.reason = "Error connecting or establishing an SSH session"
self.hostname = hostname
self.e = e
Expand All @@ -122,7 +122,7 @@ class FileCopyException(ChannelError):
'''

def __init__(self, e, hostname):
super().__init__()
super(FileCopyException, self).__init__()
self.reason = "File copy failed due to {0}".format(e)
self.hostname = hostname
self.e = e
4 changes: 3 additions & 1 deletion libsubmit/launchers/launchers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from abc import ABCMeta, abstractmethod
import six

from libsubmit.utils import RepresentationMixin


class Launcher(RepresentationMixin, metaclass=ABCMeta):
@six.add_metaclass(ABCMeta)
class Launcher(RepresentationMixin):
""" Launcher base class to enforce launcher interface
"""
@abstractmethod
Expand Down
5 changes: 3 additions & 2 deletions libsubmit/providers/aws/aws.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: future_fstrings -*-
import json
import logging
import os
Expand Down Expand Up @@ -593,8 +594,8 @@ def submit(self, command='sleep 1', blocksize=1, job_name="parsl.auto"):
wrapped_cmd = self.launcher(command,
self.tasks_per_node,
self.nodes_per_block)
[instance, *rest] = self.spin_up_instance(command=wrapped_cmd, job_name=job_name)

instance_data = self.spin_up_instance(command=wrapped_cmd, job_name=job_name)
instance, rest = instance_data[0], instance_data[1:]
if not instance:
logger.error("Failed to submit request to EC2")
return None
Expand Down
3 changes: 2 additions & 1 deletion libsubmit/providers/azure/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ def submit(self, command='sleep 1', blocksize=1, job_name="parsl.auto"):
"""

job_name = "parsl.auto.{0}".format(time.time())
[instance, *rest] = self.deployer.deploy(command=command, job_name=job_name, blocksize=1)
instance_data = self.deployer.deploy(command=command, job_name=job_name, blocksize=1)
instance, rest = instance_data[0], instance_data[1:]

if not instance:
logger.error("Failed to submit request to Azure")
Expand Down
8 changes: 4 additions & 4 deletions libsubmit/providers/cobalt/cobalt.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(self,
overrides='',
launcher=AprunLauncher(),
cmd_timeout=10):
super().__init__(label,
super(CobaltProvider, self).__init__(label,
channel=channel,
script_dir=script_dir,
nodes_per_block=nodes_per_block,
Expand Down Expand Up @@ -104,7 +104,7 @@ def _status(self):

jobs_missing = list(self.resources.keys())

retcode, stdout, stderr = super().execute_wait("qstat -u $USER")
retcode, stdout, stderr = super(CobaltProvider, self).execute_wait("qstat -u $USER")

# Execute_wait failed. Do no update.
if retcode != 0:
Expand Down Expand Up @@ -193,7 +193,7 @@ def submit(self, command, blocksize, job_name="parsl.auto"):
self.nodes_per_block, queue_opt, wtime_to_minutes(self.walltime), account_opt, channel_script_path)
logger.debug("Executing {}".format(command))

retcode, stdout, stderr = super().execute_wait(command)
retcode, stdout, stderr = super(CobaltProvider, self).execute_wait(command)

# TODO : FIX this block
if retcode != 0:
Expand Down Expand Up @@ -226,7 +226,7 @@ def cancel(self, job_ids):
"""

job_id_list = ' '.join(job_ids)
retcode, stdout, stderr = super().execute_wait("qdel {0}".format(job_id_list))
retcode, stdout, stderr = super(CobaltProvider, self).execute_wait("qdel {0}".format(job_id_list))
rets = None
if retcode == 0:
for jid in job_ids:
Expand Down
6 changes: 3 additions & 3 deletions libsubmit/providers/condor/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self,
launcher=SingleNodeLauncher(),
requirements=''):

super().__init__(label,
super(CondorProvider, self).__init__(label,
channel,
script_dir,
nodes_per_block,
Expand Down Expand Up @@ -115,7 +115,7 @@ def _status(self):

job_id_list = ' '.join(self.resources.keys())
cmd = "condor_q {0} -af:jr JobStatus".format(job_id_list)
retcode, stdout, stderr = super().execute_wait(cmd)
retcode, stdout, stderr = super(CondorProvider, self).execute_wait(cmd)
"""
Example output:

Expand Down Expand Up @@ -230,7 +230,7 @@ def submit(self, command, blocksize, job_name="parsl.auto"):
channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)

cmd = "condor_submit {0}".format(channel_script_path)
retcode, stdout, stderr = super().execute_wait(cmd, 3)
retcode, stdout, stderr = super(CondorProvider, self).execute_wait(cmd, 3)
logger.debug("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip())

job_id = []
Expand Down
8 changes: 4 additions & 4 deletions libsubmit/providers/grid_engine/grid_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self,
walltime="00:10:00",
overrides='',
launcher=SingleNodeLauncher()):
super().__init__(label,
super(GridEngineProvider, self).__init__(label,
channel,
script_dir,
nodes_per_block,
Expand Down Expand Up @@ -151,7 +151,7 @@ def submit(self, command="", blocksize=1, job_name="parsl.auto"):

channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)
cmd = "qsub -terse {0}".format(channel_script_path)
retcode, stdout, stderr = super().execute_wait(cmd, 10)
retcode, stdout, stderr = super(GridEngineProvider, self).execute_wait(cmd, 10)

if retcode == 0:
for line in stdout.split('\n'):
Expand Down Expand Up @@ -179,7 +179,7 @@ def _status(self):

cmd = "qstat"

retcode, stdout, stderr = super().execute_wait(cmd)
retcode, stdout, stderr = super(GridEngineProvider, self).execute_wait(cmd)

# Execute_wait failed. Do no update
if retcode != 0:
Expand Down Expand Up @@ -217,7 +217,7 @@ def cancel(self, job_ids):

job_id_list = ' '.join(job_ids)
cmd = "qdel {}".format(job_id_list)
retcode, stdout, stderr = super().execute_wait(cmd, 3)
retcode, stdout, stderr = super(GridEngineProvider, self).execute_wait(cmd, 3)

rets = None
if retcode == 0:
Expand Down
6 changes: 6 additions & 0 deletions libsubmit/providers/kubernetes/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
from libsubmit.error import *
from libsubmit.providers.provider_base import ExecutionProvider

# Compatibility with python2.7, FileNotFoundError is not defined
try:
FileNotFoundError
except NameError:
FileNotFoundError = IOError

try:
from kubernetes import client, config
config.load_kube_config()
Expand Down
4 changes: 3 additions & 1 deletion libsubmit/providers/provider_base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from abc import ABCMeta, abstractmethod, abstractproperty
import six


class ExecutionProvider(metaclass=ABCMeta):
@six.add_metaclass(ABCMeta)
class ExecutionProvider():
""" Define the strict interface for all Execution Provider

.. code:: python
Expand Down
9 changes: 5 additions & 4 deletions libsubmit/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: future_fstrings -*-
import logging
import os
import time
Expand Down Expand Up @@ -83,7 +84,7 @@ def __init__(self,
overrides='',
cmd_timeout=10,
launcher=SingleNodeLauncher()):
super().__init__(label,
super(SlurmProvider, self).__init__(label,
channel,
script_dir,
nodes_per_block,
Expand All @@ -110,7 +111,7 @@ def _status(self):
job_id_list = ','.join(self.resources.keys())
cmd = "squeue --job {0}".format(job_id_list)

retcode, stdout, stderr = super().execute_wait(cmd)
retcode, stdout, stderr = super(SlurmProvider, self).execute_wait(cmd)

# Execute_wait failed. Do no update
if retcode != 0:
Expand Down Expand Up @@ -179,7 +180,7 @@ def submit(self, command, blocksize, job_name="parsl.auto"):

channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)

retcode, stdout, stderr = super().execute_wait("sbatch {0}".format(channel_script_path))
retcode, stdout, stderr = super(SlurmProvider, self).execute_wait("sbatch {0}".format(channel_script_path))

job_id = None
if retcode == 0:
Expand All @@ -203,7 +204,7 @@ def cancel(self, job_ids):
'''

job_id_list = ' '.join(job_ids)
retcode, stdout, stderr = super().execute_wait("scancel {0}".format(job_id_list))
retcode, stdout, stderr = super(SlurmProvider, self).execute_wait("scancel {0}".format(job_id_list))
rets = None
if retcode == 0:
for jid in job_ids:
Expand Down
2 changes: 1 addition & 1 deletion libsubmit/providers/torque/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(self,
parallelism=1,
launcher=AprunLauncher(),
walltime="00:20:00"):
super().__init__(label,
super(TorqueProvider, self).__init__(label,
channel,
script_dir,
nodes_per_block,
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ google-api-python-client
google-auth
nbsphinx
kubernetes>=6.0.0
six
configparser
future-fstrings
9 changes: 8 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
exec(f.read())

install_requires = [
'paramiko'
'paramiko',
'six',
'configparser',
'future-fstrings',
]

tests_require = [
'paramiko',
'six',
'configparser',
'future-fstrings',
'mock>=1.0.0',
'nose',
'pytest'
Expand Down Expand Up @@ -40,6 +46,7 @@
# Licence, must match with licence above
'License :: OSI Approved :: Apache Software License',
# Python versions supported
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
],
Expand Down