Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kmip/core/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1248,10 +1248,10 @@ def __repr__(self):
def __str__(self):
value = ", ".join(
[
'"application_namespace": {}'.format(
'"application_namespace": "{}"'.format(
self.application_namespace
),
'"application_data": {}'.format(self.application_data)
'"application_data": "{}"'.format(self.application_data)
]
)
return "{" + value + "}"
Expand Down
27 changes: 6 additions & 21 deletions kmip/core/factories/attribute_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,28 +277,13 @@ def _create_object_group(self, group):
return attributes.ObjectGroup(group)

def _create_application_specific_information(self, info):
if info is None:
return attributes.ApplicationSpecificInformation()
if info:
return attributes.ApplicationSpecificInformation(
application_namespace=info.get("application_namespace"),
application_data=info.get("application_data")
)
else:
application_namespace = info.get('application_namespace')
application_data = info.get('application_data')

if not isinstance(application_namespace, str):
msg = utils.build_er_error(
attributes.ApplicationSpecificInformation,
'constructor argument type',
str, type(application_namespace))
raise TypeError(msg)

if not isinstance(application_data, str):
msg = utils.build_er_error(
attributes.ApplicationSpecificInformation,
'constructor argument type',
str, type(application_data))
raise TypeError(msg)

return attributes.ApplicationSpecificInformation.create(
application_namespace, application_data)
return attributes.ApplicationSpecificInformation()

def _create_contact_information(self, info):
if info is None:
Expand Down
40 changes: 38 additions & 2 deletions kmip/services/server/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,8 +730,16 @@ def _get_attribute_from_managed_object(self, managed_object, attr_name):
return None
elif attr_name == 'Link':
return None
elif attr_name == 'Application Specific Information':
return None
elif attr_name == "Application Specific Information":
values = []
for info in managed_object.app_specific_info:
values.append(
{
"application_namespace": info.application_namespace,
"application_data": info.application_data
}
)
return values
elif attr_name == 'Contact Information':
return None
elif attr_name == 'Last Change Date':
Expand Down Expand Up @@ -781,6 +789,14 @@ def _set_attribute_on_managed_object(self, managed_object, attribute):
raise exceptions.InvalidField(
"Cannot set duplicate name values."
)
elif attribute_name == "Application Specific Information":
for value in attribute_value:
managed_object.app_specific_info.append(
objects.ApplicationSpecificInformation(
application_namespace=value.application_namespace,
application_data=value.application_data
)
)
else:
# TODO (peterhamilton) Remove when all attributes are supported
raise exceptions.InvalidField(
Expand Down Expand Up @@ -1662,6 +1678,26 @@ def _process_locate(self, payload):
)
if attribute is None:
continue
elif name == "Application Specific Information":
application_namespace = value.application_namespace
application_data = value.application_data
v = {
"application_namespace": application_namespace,
"application_data": application_data
}
if v not in attribute:
self._logger.debug(
"Failed match: "
"the specified application specific "
"information ('{}', '{}') does not match any "
"of the object's associated application "
"specific information attributes.".format(
v.get("application_namespace"),
v.get("application_data")
)
)
add_object = False
break
elif name == "Name":
if value not in attribute:
self._logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def test_str(self):
("application_data", "www.example.com")
]
value = "{}".format(
", ".join(['"{}": {}'.format(arg[0], arg[1]) for arg in args])
", ".join(['"{}": "{}"'.format(arg[0], arg[1]) for arg in args])
)
self.assertEqual(
"{" + value + "}",
Expand Down
50 changes: 49 additions & 1 deletion kmip/tests/unit/core/factories/test_attribute_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,55 @@ def test_create_application_specific_information(self):
"""
Test that an ApplicationSpecificInformation attribute can be created.
"""
self.skipTest('')
attribute = self.factory.create_attribute_value(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "ssl",
"application_data": "www.example.com"
}
)
self.assertIsInstance(
attribute,
attributes.ApplicationSpecificInformation
)
self.assertEqual("ssl", attribute.application_namespace)
self.assertEqual("www.example.com", attribute.application_data)

attribute = self.factory.create_attribute_value(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
None
)
self.assertIsInstance(
attribute,
attributes.ApplicationSpecificInformation
)
self.assertIsNone(attribute.application_namespace)
self.assertIsNone(attribute.application_data)

attribute = self.factory.create_attribute_value_by_enum(
enums.Tags.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "ssl",
"application_data": "www.example.com"
}
)
self.assertIsInstance(
attribute,
attributes.ApplicationSpecificInformation
)
self.assertEqual("ssl", attribute.application_namespace)
self.assertEqual("www.example.com", attribute.application_data)

attribute = self.factory.create_attribute_value_by_enum(
enums.Tags.APPLICATION_SPECIFIC_INFORMATION,
None
)
self.assertIsInstance(
attribute,
attributes.ApplicationSpecificInformation
)
self.assertIsNone(attribute.application_namespace)
self.assertIsNone(attribute.application_data)

def test_create_contact_information(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,15 @@ def test_get(self):
session.add(app_specific_info)
session.commit()

# Grab the ID now before making a new session to avoid a Detached error
# See http://sqlalche.me/e/bhk3 for more info.
app_specific_info_id = app_specific_info.id

session = sqlalchemy.orm.sessionmaker(bind=engine)()
retrieved_info = session.query(
objects.ApplicationSpecificInformation
).filter(
objects.ApplicationSpecificInformation.id == app_specific_info.id
objects.ApplicationSpecificInformation.id == app_specific_info_id
).one()
session.commit()

Expand Down
138 changes: 137 additions & 1 deletion kmip/tests/unit/services/server/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1602,7 +1602,7 @@ def test_get_attribute_from_managed_object(self):
symmetric_key,
'Application Specific Information'
)
self.assertEqual(None, result)
self.assertEqual([], result)

result = e._get_attribute_from_managed_object(
symmetric_key,
Expand Down Expand Up @@ -2423,6 +2423,13 @@ def test_create(self):
attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
'test'
),
attribute_factory.create_attribute(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "ssl",
"application_data": "www.example.com"
}
)
]
)
Expand Down Expand Up @@ -2473,6 +2480,15 @@ def test_create(self):
self.assertEqual('test', symmetric_key.operation_policy_name)
self.assertIsNotNone(symmetric_key.initial_date)
self.assertNotEqual(0, symmetric_key.initial_date)
self.assertEqual(1, len(symmetric_key.app_specific_info))
self.assertEqual(
"ssl",
symmetric_key.app_specific_info[0].application_namespace
)
self.assertEqual(
"www.example.com",
symmetric_key.app_specific_info[0].application_data
)

self.assertEqual(uid, e._id_placeholder)

Expand Down Expand Up @@ -5599,6 +5615,126 @@ def test_locate_with_operation_policy_name(self):
)
self.assertEqual(0, len(response_payload.unique_identifiers))

def test_locate_with_application_specific_information(self):
"""
Test the Locate operation when the 'Application Specific Information'
attribute is given.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()

key = (
b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
)

app_specific_info_a = pie_objects.ApplicationSpecificInformation(
application_namespace="ssl",
application_data="www.example.com"
)
app_specific_info_b = pie_objects.ApplicationSpecificInformation(
application_namespace="ssl",
application_data="www.test.com"
)

obj_a = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
128,
key,
name='name1'
)
obj_a.app_specific_info.append(app_specific_info_a)
obj_a.app_specific_info.append(app_specific_info_b)
obj_b = pie_objects.SecretData(
key,
enums.SecretDataType.PASSWORD
)
obj_b.app_specific_info.append(app_specific_info_a)
obj_c = pie_objects.SecretData(
key,
enums.SecretDataType.PASSWORD
)

e._data_session.add(obj_a)
e._data_session.add(obj_b)
e._data_session.add(obj_c)
e._data_session.commit()
e._data_session = e._data_store_session_factory()

id_a = str(obj_a.unique_identifier)
id_b = str(obj_b.unique_identifier)

attribute_factory = factory.AttributeFactory()

# Locate the symmetric key objects based on their shared application
# specific information attribute.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "ssl",
"application_data": "www.example.com"
}
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()

e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_a)
)
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_b)
)
e._logger.debug.assert_any_call(
"Failed match: "
"the specified application specific "
"information ('ssl', 'www.example.com') does not match any "
"of the object's associated application "
"specific information attributes."
)
self.assertEqual(2, len(response_payload.unique_identifiers))
self.assertIn(id_a, response_payload.unique_identifiers)
self.assertIn(id_b, response_payload.unique_identifiers)

# Locate a single symmetric key object based on its unique application
# specific information attribute.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "ssl",
"application_data": "www.test.com"
}
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()

e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_a)
)
e._logger.debug.assert_any_call(
"Failed match: "
"the specified application specific "
"information ('ssl', 'www.test.com') does not match any "
"of the object's associated application "
"specific information attributes."
)
self.assertEqual(1, len(response_payload.unique_identifiers))
self.assertIn(id_a, response_payload.unique_identifiers)

def test_get(self):
"""
Test that a Get request can be processed correctly.
Expand Down