Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 210 additions & 9 deletions coriolis/tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,33 @@
from keystoneauth1 import session as ks_session
from keystoneauth1 import token_endpoint
from oslo_config import cfg
from oslo_db.sqlalchemy import models
from oslo_log import log as logging
import oslo_messaging as messaging

from coriolis import constants
from coriolis import context
from coriolis.db import api as db_api
from coriolis.providers import factory as providers_factory
from coriolis.tests.integration import harness
from coriolis.tests.integration import utils as test_utils
from coriolis.tests import test_base

CONF = cfg.CONF
LOG = logging.getLogger(__name__)

# Statuses that represent a completed allocation attempt.
MINION_ALLOCATED_TERMINAL = {
constants.MINION_POOL_STATUS_ALLOCATED,
constants.MINION_POOL_STATUS_ERROR,
}

# Statuses that represent a completed deallocation attempt.
MINION_DEALLOCATED_TERMINAL = {
constants.MINION_POOL_STATUS_DEALLOCATED,
constants.MINION_POOL_STATUS_ERROR,
}


class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
"""Base class for integration tests."""
Expand All @@ -55,7 +70,36 @@ def setUpClass(cls):
def setUp(self):
super().setUp()

patcher = mock.patch("psutil.Process.send_signal")
to_patch = [
# Prevent the test runner from being killed by Coriolis sending
# SIGINT to in-process workers.
"psutil.Process.send_signal",
# When removing minion pools, this is also called.
# There is no Keystone, so it needs to be mocked.
"coriolis.keystone.delete_trust",
]
for thing in to_patch:
patcher = mock.patch(thing)
patcher.start()
self.addCleanup(patcher.stop)

# fake:// oslo_messaging doesn't serialize objects. After calls, some
# actions may remain as objects, yet they are expected to be dicts.
self._patch_rpc_client_method("call")
self._patch_rpc_client_method("cast")

def _patch_rpc_client_method(self, method):
original_call = getattr(messaging.RPCClient, method)

def _call(self, ctxt, method, **kwargs):
for key, value in kwargs.items():
if isinstance(value, models.ModelBase):
kwargs[key] = dict(value.items())

return original_call(self, ctxt, method, **kwargs)

patcher = mock.patch.object(
messaging.RPCClient, method, _call)
patcher.start()
self.addCleanup(patcher.stop)

Expand Down Expand Up @@ -83,7 +127,7 @@ def _create_endpoint(self, **kwargs):

return endpoint

def _create_transfer(self, src_id, dst_id, instances):
def _create_transfer(self, src_id, dst_id, instances, **kwargs):
"""Create a Replica transfer object and return its ID."""
transfer = self._client.transfers.create(
origin_endpoint_id=src_id,
Expand All @@ -96,11 +140,78 @@ def _create_transfer(self, src_id, dst_id, instances):
storage_mappings={},
notes="integration test replica",
skip_os_morphing=True,
**kwargs,
)
self.addCleanup(self._client.transfers.delete, transfer.id)

return transfer

def _create_pool(
self, endpoint_id, name="test-pool", skip_allocation=True):
pool = self._client.minion_pools.create(
name=name,
endpoint=endpoint_id,
platform=constants.PROVIDER_PLATFORM_DESTINATION,
os_type=constants.OS_TYPE_LINUX,
environment_options={},
minimum_minions=1,
maximum_minions=1,
minion_max_idle_time=3600,
minion_retention_strategy=(
constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE),
skip_allocation=skip_allocation,
)
self.addCleanup(self._safe_delete_pool, pool.id)

return pool

def _safe_delete_pool(self, pool_id):
"""Delete pool, force-deallocating first if needed."""
try:
pool = self._client.minion_pools.get(pool_id)
except Exception:
return

if pool.status not in MINION_DEALLOCATED_TERMINAL:
try:
self._client.minion_pools.deallocate_minion_pool(
pool_id, force=True)
self._wait_for_pool(pool_id, MINION_DEALLOCATED_TERMINAL)
except Exception:
pass

try:
self._client.minion_pools.delete(pool_id)
except Exception:
pass

def _wait_for_pool(self, pool_id, terminal_statuses, timeout=180):
"""Poll the DB until *pool_id* reaches one of *terminal_statuses*.

:returns: minion pool ORM object.
:raises: AssertionError on timeout.
"""
ctxt = self._get_db_context()
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
pool = db_api.get_minion_pool(ctxt, pool_id)
if pool and pool.status in terminal_statuses:
return pool
time.sleep(1)
pool = db_api.get_minion_pool(ctxt, pool_id)
last = pool.status if pool else "not found"
self.fail(
"Pool %s did not reach one of %r within %ds (last: %s)"
% (pool_id, terminal_statuses, timeout, last)
)

def _get_db_context(self):
return context.RequestContext(
user='int-test',
project_id=harness._TEST_PROJECT_ID,
is_admin=True,
)

@staticmethod
def _ignoreExc(func, ignored_exc=Exception):
"""Wrap the given function, ignoring exceptions."""
Expand All @@ -116,6 +227,7 @@ def f(*args, **kwargs):
class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):

_CREATE_SCSI_DBG_DEVS = True
_CREATE_MINION_POOLS = False

@classmethod
def setUpClass(cls):
Expand All @@ -138,6 +250,7 @@ def setUp(self):

self._src_device = None
self._dst_device = None
self._pool_id = None

if self._CREATE_SCSI_DBG_DEVS:
self._src_device = test_utils.add_scsi_debug_device()
Expand Down Expand Up @@ -169,11 +282,26 @@ def setUp(self):
},
)

# Create minion pool if needed.
if self._CREATE_MINION_POOLS:
pool = self._create_pool(
self._dst_endpoint.id, "transfer-pool", skip_allocation=False)
self._pool_id = pool.id

pool_obj = self._wait_for_pool(pool.id, MINION_ALLOCATED_TERMINAL)

self.assertEqual(
constants.MINION_POOL_STATUS_ALLOCATED,
pool_obj.status,
"Pool did not reach ALLOCATED (got %s)" % pool_obj.status,
)

# Create transfer replica.
self._transfer = self._create_transfer(
self._src_endpoint.id,
self._dst_endpoint.id,
instances=[self._src_device],
destination_minion_pool_id=self._pool_id,
)

# mock a few commands that are going to be ran through ssh; they won't
Expand All @@ -196,13 +324,6 @@ def _execute_and_wait(self, transfer_id, timeout=300):
transfer_id, shutdown_instances=False)
self.assertExecutionCompleted(execution.id, timeout=timeout)

def _get_db_context(self):
return context.RequestContext(
user='int-test',
project_id=harness._TEST_PROJECT_ID,
is_admin=True,
)

def wait_for_execution(self, execution_id, timeout=300,
desired_statuses=None):
"""Block until *execution_id* reaches a terminal state.
Expand Down Expand Up @@ -324,3 +445,83 @@ def _slow_call(*args, **kwargs):
)
patcher.start()
self.addCleanup(patcher.stop)


class MinionPoolTestBase(CoriolisIntegrationTestBase):
"""Base class for minion pool integration tests.

Skips the entire test class when the import provider does not advertise
``PROVIDER_TYPE_DESTINATION_MINION_POOL`` support.
"""

@classmethod
def setUpClass(cls):
super().setUpClass()

available = providers_factory.get_available_providers()
imp_types = available.get(cls._imp_platform, {}).get("types", [])
if constants.PROVIDER_TYPE_DESTINATION_MINION_POOL not in imp_types:
raise unittest.SkipTest(
"Import provider '%s' does not support minion pools"
% cls._imp_platform
)


class MinionPoolReplicaTestBase(
MinionPoolTestBase, ReplicaIntegrationTestBase):
"""Base class for replica integration tests using minion pools.

Extends the assertions to also verify that the minions in the pool have
been used, and that the minions and the pool returns to an available state.
"""

_CREATE_MINION_POOLS = True

def _execute_and_wait(self, transfer_id, timeout=300):
super()._execute_and_wait(transfer_id, timeout=timeout)
self.assertPoolAllocated(self._pool_id)
self.assertMachinesAvailable(self._pool_id)

def assertExecutionCompleted(self, execution_id, timeout=300):
super().assertExecutionCompleted(execution_id, timeout=timeout)
self.assertPoolAllocated(self._pool_id)
self.assertMachinesAvailable(self._pool_id)

def assertDeploymentCompleted(self, deployment_id, timeout=300):
super().assertDeploymentCompleted(deployment_id, timeout=timeout)
self.assertPoolAllocated(self._pool_id)
self.assertMachinesAvailable(self._pool_id)

def assertPoolAllocated(self, pool_id):
"""Assert the pool is healthy and still in ALLOCATED status."""
ctxt = self._get_db_context()
pool = db_api.get_minion_pool(ctxt, pool_id)
self.assertIsNotNone(pool, "Pool %s not found" % pool_id)
self.assertEqual(
constants.MINION_POOL_STATUS_ALLOCATED,
pool.status,
"Pool %s is not ALLOCATED (got %s)" % (pool_id, pool.status),
)

def assertMachinesAvailable(self, pool_id):
"""Assert all machines in the pool are AVAILABLE and have been used."""
ctxt = self._get_db_context()
pool = db_api.get_minion_pool(ctxt, pool_id, include_machines=True)
self.assertIsNotNone(pool, "Pool %s not found" % pool_id)
self.assertTrue(
pool.minion_machines,
"Pool %s has no minion machines" % pool_id,
)
for machine in pool.minion_machines:
self.assertEqual(
constants.MINION_MACHINE_STATUS_AVAILABLE,
machine.allocation_status,
"Machine %s in pool %s is not AVAILABLE (got %s)"
% (machine.id, pool_id, machine.allocation_status),
)
self.assertIsNotNone(
machine.last_used_at,
"Machine %s in pool %s has no last_used_at; "
"it may not have been used by the transfer"
% (machine.id, pool_id),
)
Loading
Loading