Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 65 additions & 52 deletions connect_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@

We can use an instance of this class into our request processor, like this: ::

class ProductFulfillment(FulfillmentAutomation):
class PurchaseFlow(Flow):
def __init__(self):
self.migration_handler = connect_migration.MigrationHandler({
super().__init__(lambda request: request.type == 'purchase')
self.step('Checking if migration is need', PurchaseFlow._need_migration)
self.step('Provision real purchase order', PurchaseFlow._purchase)
self.step('Approving request', PurchaseFlow._approve_request)



def _need_migration(self):
self.getRequest().migration_handler = MigrationHandler({
'email': lambda data, request_id: data['teamAdminEmail'].upper(),
'team_id': lambda data, request_id: data['teamId'].upper(),
'team_name': lambda data, request_id: data['teamName'].upper(),
'num_licensed_users': lambda data, request_id: int(data['licNumber']) * 10
})

def process_request(request):
if request.type == 'purchase':
request = self.migration_handler.migrate(request)

# The migrate() method returns a new request object with
# the parameter values updated, we must update the parameters
# and approve the fulfillment
})

self.update_parameters(request.id, request.asset.params)
return ActivationTileResponse('The data has been migrated :)')
if self.getRequest().needsMigration():
req = self.migration_handler.migrate(self.getRequest())
self.getRequest().asset.params = req.asset.params
self._approve_request()

"""

Expand All @@ -36,9 +38,8 @@ def process_request(request):
import six
from typing import List

from connect.exceptions import SkipRequest
from connect.logger import logger
from connect.models import Fulfillment
from connect import Env
from connect.models import AssetRequest


class MigrationAbortError(Exception):
Expand All @@ -64,7 +65,8 @@ class MigrationHandler(object):
in the migration data on direct assignation flow. Default value is ``False``.
"""

def __init__(self, transformations=None, migration_key='migration_info', serialize=False):
def __init__(self, transformations=None, migration_key='migration_info',
serialize=False):
self._transformations = transformations or {}
self._migration_key = migration_key
self._serialize = serialize
Expand Down Expand Up @@ -97,28 +99,30 @@ def serialize(self):
def migrate(self, request):
""" Call this function to perform migration of one request.

:param Fulfillment request: The request to migrate.
:param AssetRequest request: The request to migrate.
:return: A new request object with the parameter values updated.
:rtype: Fulfillment
:raises SkipRequest: Raised if migration fails for some reason.
:rtype: AssetRequest
:raises MigrationParamError: Raised if the value for a parameter is not a string.
"""
if request.needs_migration(self.migration_key):
logger.info('[MIGRATION::{}] Running migration operations for request {}'
.format(request.id, request.id))
if request.needsMigration(self.migration_key):
Env.getLogger().info(
'[MIGRATION::{}] Running migration operations for request {}'
.format(request.id, request.id))
request_copy = copy.deepcopy(request)

raw_data = request.asset.get_param_by_id(self.migration_key).value
logger.debug('[MIGRATION::{}] Migration data `{}`: {}'
.format(request.id, self.migration_key, raw_data))
Env.getLogger().debug('[MIGRATION::{}] Migration data `{}`: {}'
.format(request.id, self.migration_key,
raw_data))

try:
try:
parsed_data = json.loads(raw_data)
except ValueError as ex:
raise MigrationAbortError(str(ex))
logger.debug('[MIGRATION::{}] Migration data `{}` parsed correctly'
.format(request.id, self.migration_key))
Env.getLogger().debug(
'[MIGRATION::{}] Migration data `{}` parsed correctly'
.format(request.id, self.migration_key))

# These will keep track of processing status
processed_params = []
Expand All @@ -127,64 +131,73 @@ def migrate(self, request):
skipped_params = []

# Exclude param for migration_info from process list
params = [param for param in request_copy.asset.params
params = [param for param in request_copy.asset.params.toArray()
if param.id != self.migration_key]

for param in params:
# Try to process the param and report success or failure
try:
if param.id in self.transformations:
# Transformation is defined, so apply it
logger.info('[MIGRATION::{}] Running transformation for parameter {}'
.format(request.id, param.id))
param.value = self.transformations[param.id](parsed_data, request.id)
Env.getLogger().info(
'[MIGRATION::{}] Running transformation for parameter {}'
.format(request.id, param.id))
param.value = self.transformations[param.id](
parsed_data, request.id)
succeeded_params.append(param.id)
elif param.id in parsed_data:
# Parsed data contains the key, so assign it
if not isinstance(parsed_data[param.id], six.string_types):
if not isinstance(parsed_data[param.id],
six.string_types):
if self.serialize:
parsed_data[param.id] = json.dumps(parsed_data[param.id])
parsed_data[param.id] = json.dumps(
parsed_data[param.id])
else:
type_name = type(parsed_data[param.id]).__name__
type_name = type(
parsed_data[param.id]).__name__
raise MigrationParamError(
'Parameter {} type must be str, but {} was given'
.format(param.id, type_name))
.format(param.id, type_name))
param.value = parsed_data[param.id]
succeeded_params.append(param.id)
else:
skipped_params.append(param.id)
except MigrationParamError as ex:
logger.error('[MIGRATION::{}] {}'.format(request.id, ex))
Env.getLogger().error(
'[MIGRATION::{}] {}'.format(request.id, ex))
failed_params.append(param.id)

# Report processed param
processed_params.append(param.id)

logger.info('[MIGRATION::{}] {} processed, {} succeeded{}, {} failed{}, '
'{} skipped{}.'
.format(
request.id,
len(processed_params),
len(succeeded_params),
self._format_params(succeeded_params),
len(failed_params),
self._format_params(failed_params),
len(skipped_params),
self._format_params(skipped_params)))
Env.getLogger().info(
'[MIGRATION::{}] {} processed, {} succeeded{}, {} failed{}, '
'{} skipped{}.'
.format(
request.id,
len(processed_params),
len(succeeded_params),
self._format_params(succeeded_params),
len(failed_params),
self._format_params(failed_params),
len(skipped_params),
self._format_params(skipped_params)))

# Raise abort if any params failed
if failed_params:
raise MigrationAbortError(
'Processing of parameters {} failed, unable to complete migration.'
.format(', '.join(failed_params)))
.format(', '.join(failed_params)))
except MigrationAbortError as ex:
logger.error('[MIGRATION::{}] {}'.format(request.id, ex))
raise SkipRequest('Migration failed.')
Env.getLogger().error(
'[MIGRATION::{}] {}'.format(request.id, ex))
raise

return request_copy
else:
logger.info('[MIGRATION::{}] Request does not need migration.'
.format(request.id))
Env.getLogger().info(
'[MIGRATION::{}] AssetRequest does not need migration.'
.format(request.id))
return request

@staticmethod
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
connect-sdk>=17.4
connect-sdk-haxe-port==18.0.1
six==1.12.0
typing==3.6.6
mock == 3.0.5
pytest == 5.3.0
Loading