Fix test failures

This commit is contained in:
vishesh92 2026-03-05 17:50:18 +05:30
parent 30891ee545
commit 2c49e92632
No known key found for this signature in database
GPG Key ID: 4E395186CBFA790B
6 changed files with 177 additions and 217 deletions

View File

@ -30,4 +30,6 @@ public interface KMSKeyDao extends GenericDao<KMSKeyVO, Long> {
List<KMSKeyVO> listByZone(Long zoneId, KeyPurpose purpose, Boolean enabled);
long countByHsmProfileId(Long hsmProfileId);
KMSKeyVO findByNameAndAccountId(String name, long accountId);
}

View File

@ -33,6 +33,7 @@ public class KMSKeyDaoImpl extends GenericDaoBase<KMSKeyVO, Long> implements KMS
public KMSKeyDaoImpl() {
allFieldSearch = createSearchBuilder();
allFieldSearch.and("name", allFieldSearch.entity().getName(), SearchCriteria.Op.EQ);
allFieldSearch.and("kekLabel", allFieldSearch.entity().getKekLabel(), SearchCriteria.Op.EQ);
allFieldSearch.and("domainId", allFieldSearch.entity().getDomainId(), SearchCriteria.Op.EQ);
allFieldSearch.and("accountId", allFieldSearch.entity().getAccountId(), SearchCriteria.Op.EQ);
@ -71,4 +72,12 @@ public class KMSKeyDaoImpl extends GenericDaoBase<KMSKeyVO, Long> implements KMS
Integer count = getCount(sc);
return count != null ? count : 0;
}
@Override
public KMSKeyVO findByNameAndAccountId(String name, long accountId) {
SearchCriteria<KMSKeyVO> sc = allFieldSearch.create();
sc.setParameters("name", name);
sc.setParameters("accountId", accountId);
return findOneBy(sc);
}
}

View File

@ -164,7 +164,7 @@ CREATE TABLE IF NOT EXISTS `cloud`.`kms_keys` (
`name` VARCHAR(255) NOT NULL COMMENT 'User-friendly name',
`description` VARCHAR(1024) COMMENT 'User description',
`kek_label` VARCHAR(255) NOT NULL COMMENT 'Provider-specific KEK label/ID',
`purpose` VARCHAR(32) NOT NULL COMMENT 'Key purpose (VOLUME_ENCRYPTION, TLS_CERT, CONFIG_SECRET)',
`purpose` VARCHAR(32) NOT NULL COMMENT 'Key purpose (VOLUME_ENCRYPTION, TLS_CERT)',
`account_id` BIGINT UNSIGNED NOT NULL COMMENT 'Owning account',
`domain_id` BIGINT UNSIGNED NOT NULL COMMENT 'Owning domain',
`zone_id` BIGINT UNSIGNED NOT NULL COMMENT 'Zone where key is valid',
@ -261,7 +261,7 @@ CREATE TABLE IF NOT EXISTS `cloud`.`kms_database_kek_objects` (
`always_sensitive` BOOLEAN NOT NULL DEFAULT TRUE COMMENT 'PKCS#11 CKA_ALWAYS_SENSITIVE - key was always sensitive',
`never_extractable` BOOLEAN NOT NULL DEFAULT TRUE COMMENT 'PKCS#11 CKA_NEVER_EXTRACTABLE - key was never extractable',
-- Key Metadata
`purpose` VARCHAR(32) NOT NULL COMMENT 'Key purpose (VOLUME_ENCRYPTION, TLS_CERT, CONFIG_SECRET)',
`purpose` VARCHAR(32) NOT NULL COMMENT 'Key purpose (VOLUME_ENCRYPTION, TLS_CERT)',
`key_bits` INT NOT NULL COMMENT 'Key size in bits (128, 192, 256)',
`algorithm` VARCHAR(64) NOT NULL DEFAULT 'AES/GCM/NoPadding' COMMENT 'Encryption algorithm',
-- Validity Dates (PKCS#11 CKA_START_DATE, CKA_END_DATE)

View File

@ -392,6 +392,9 @@ public class KMSManagerImpl extends ManagerBase implements KMSManager, Pluggable
if (profile == null) {
throw KMSException.invalidParameter("HSM Profile not found");
}
if (kmsKeyDao.findByNameAndAccountId(name, accountId) != null) {
throw new InvalidParameterValueException("A KMS key with name " + name + " already exists in this account");
}
KMSKeyVO kmsKey = new KMSKeyVO(name, description, "", purpose,
accountId, domainId, zoneId, "AES/GCM/NoPadding", keyBits);

View File

@ -38,12 +38,14 @@ from marvin.lib.base import (
Domain,
HSMProfile,
KMSKey,
VirtualMachine,
ServiceOffering,
Volume,
)
from marvin.lib.common import get_zone, get_domain
from marvin.lib.common import get_zone, get_domain, get_template
from marvin.lib.utils import cleanup_resources
from nose.plugins.attrib import attr
def _random_name(prefix="test-kms"):
suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
return f"{prefix}-{suffix}"
@ -63,9 +65,22 @@ class TestKMSLifecycle(cloudstackTestCase):
cls.apiclient = cls.test_client.getApiClient()
cls.zone = get_zone(cls.apiclient, cls.test_client.getZoneForTests())
cls.domain = get_domain(cls.apiclient)
cls.logger = cls.test_client.getLogger()
cls._cleanup = []
cls.default_profile = None
profiles = HSMProfile.list(cls.apiclient, name="default")
if profiles and len(profiles) > 0:
cls.default_profile = profiles[0]
if hasattr(cls.default_profile, 'enabled') and not cls.default_profile.enabled:
hsm = HSMProfile({"id": cls.default_profile.id})
hsm.update(cls.apiclient, enabled=True)
# Re-fetch to get updated state
profiles = HSMProfile.list(cls.apiclient, name="default")
if profiles and len(profiles) > 0:
cls.default_profile = profiles[0]
@classmethod
def tearDownClass(cls):
super(TestKMSLifecycle, cls).tearDownClass()
@ -79,6 +94,7 @@ class TestKMSLifecycle(cloudstackTestCase):
self._create_domain_and_account()
def tearDown(self):
self.cleanup.reverse()
cleanup_resources(self.apiclient, self.cleanup)
def _create_domain_and_account(self, is_domain_admin=False):
@ -90,7 +106,6 @@ class TestKMSLifecycle(cloudstackTestCase):
)
self.cleanup.append(self.child_domain)
acct_type = 2 if is_domain_admin else 0 # 2 = DomainAdmin, 0 = User
self.user_account = Account.create(
self.apiclient,
{
@ -101,7 +116,7 @@ class TestKMSLifecycle(cloudstackTestCase):
"password": "password",
},
domainid=self.child_domain.id,
accounttype=acct_type,
admin=is_domain_admin,
)
self.cleanup.append(self.user_account)
@ -111,8 +126,34 @@ class TestKMSLifecycle(cloudstackTestCase):
DomainName=self.child_domain.name,
)
def _create_kms_key(self, name, profile_id, apiclient=None, zoneid=None, purpose=None):
api_client = apiclient or self.apiclient
zone_id = zoneid or self.zone.id
key = KMSKey.create(
api_client,
name=name,
zoneid=zone_id,
hsmprofileid=profile_id,
purpose=purpose
)
self.cleanup.append(key)
return key
def _create_hsm_profile(self, name, protocol="database", system=True, zoneid=None):
zone_id = zoneid or self.zone.id
profile = HSMProfile.create(
self.apiclient,
name=name,
protocol=protocol,
system=system,
zoneid=zone_id,
)
self.cleanup.append(profile)
return profile
# ==================================================================
# HSM Profile tests (tests 01 03 and 13 14)
# HSM Profile lifecycle tests (01 03, 11, 13)
# ==================================================================
@attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"],
@ -120,15 +161,8 @@ class TestKMSLifecycle(cloudstackTestCase):
def test_01_add_hsm_profile_admin(self):
"""Test: admin creates a system-wide database HSM profile."""
profile_name = _random_name("hsm-prof")
profile = HSMProfile.create(
self.apiclient,
name=profile_name,
protocol="database",
system=True,
zoneid=self.zone.id,
)
profile = self._create_hsm_profile(name=profile_name)
self.assertIsNotNone(profile, "HSM profile creation returned None")
self.cleanup.append(profile)
self.assertEqual(
profile.name, profile_name,
@ -144,14 +178,7 @@ class TestKMSLifecycle(cloudstackTestCase):
def test_02_list_hsm_profiles(self):
"""Test: list HSM profiles and verify a created profile is present."""
profile_name = _random_name("hsm-list")
profile = HSMProfile.create(
self.apiclient,
name=profile_name,
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
profile = self._create_hsm_profile(name=profile_name)
profiles = HSMProfile.list(self.apiclient, id=profile.id)
self.assertIsNotNone(profiles, "listHSMProfiles returned None")
@ -162,14 +189,7 @@ class TestKMSLifecycle(cloudstackTestCase):
required_hardware="false")
def test_03_update_hsm_profile(self):
"""Test: update the name of an existing HSM profile."""
profile = HSMProfile.create(
self.apiclient,
name=_random_name("hsm-upd"),
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
profile = self._create_hsm_profile(name=_random_name("hsm-upd"))
new_name = _random_name("hsm-renamed")
updated = profile.update(self.apiclient, name=new_name)
@ -188,25 +208,13 @@ class TestKMSLifecycle(cloudstackTestCase):
required_hardware="false")
def test_04_create_kms_key_admin(self):
"""Test: admin creates a KMS key in the zone, verifies fields."""
profile = HSMProfile.create(
self.apiclient,
name=_random_name("hsm-for-key"),
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
if not self.default_profile:
self.skipTest("Default HSM profile 'default' not found")
profile = self.default_profile
key_name = _random_name("kms-key")
key = KMSKey.create(
self.apiclient,
name=key_name,
zoneid=self.zone.id,
hsmprofileid=profile.id,
purpose="volume",
)
key = self._create_kms_key(name=key_name, profile_id=profile.id, purpose="volume")
self.assertIsNotNone(key, "createKMSKey returned None")
self.cleanup.append(key)
self.assertEqual(key.name, key_name, "Key name does not match")
self.assertEqual(
@ -220,22 +228,11 @@ class TestKMSLifecycle(cloudstackTestCase):
required_hardware="false")
def test_05_list_kms_keys(self):
"""Test: list KMS keys filtered by zone and by id."""
profile = HSMProfile.create(
self.apiclient,
name=_random_name("hsm"),
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
if not self.default_profile:
self.skipTest("Default HSM profile 'default' not found")
profile = self.default_profile
key = KMSKey.create(
self.apiclient,
name=_random_name("key"),
zoneid=self.zone.id,
hsmprofileid=profile.id,
)
self.cleanup.append(key)
key = self._create_kms_key(name=_random_name("key"), profile_id=profile.id)
# Filter by explicit key ID
keys_by_id = KMSKey.list(self.apiclient, id=key.id)
@ -253,22 +250,11 @@ class TestKMSLifecycle(cloudstackTestCase):
required_hardware="false")
def test_06_update_kms_key(self):
"""Test: update key name, description, and enabled status."""
profile = HSMProfile.create(
self.apiclient,
name=_random_name("hsm"),
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
if not self.default_profile:
self.skipTest("Default HSM profile 'default' not found")
profile = self.default_profile
key = KMSKey.create(
self.apiclient,
name=_random_name("key-upd"),
zoneid=self.zone.id,
hsmprofileid=profile.id,
)
self.cleanup.append(key)
key = self._create_kms_key(name=_random_name("key-upd"), profile_id=profile.id)
new_name = _random_name("key-renamed")
new_desc = "Updated description"
@ -289,24 +275,13 @@ class TestKMSLifecycle(cloudstackTestCase):
def test_07_create_kms_key_user(self):
"""Test: domain user creates their own KMS key; verifies ownership."""
# Admin creates the system HSM profile first
profile = HSMProfile.create(
self.apiclient,
name=_random_name("hsm-sys"),
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
if not self.default_profile:
self.skipTest("Default HSM profile 'default' not found")
profile = self.default_profile
key_name = _random_name("user-key")
key = KMSKey.create(
self.user_apiclient,
name=key_name,
zoneid=self.zone.id,
hsmprofileid=profile.id,
)
key = self._create_kms_key(name=key_name, profile_id=profile.id, apiclient=self.user_apiclient)
self.assertIsNotNone(key, "User-level createKMSKey returned None")
self.cleanup.append(key)
self.assertEqual(key.name, key_name, "Key name does not match")
self.assertEqual(
@ -318,23 +293,12 @@ class TestKMSLifecycle(cloudstackTestCase):
required_hardware="false")
def test_08_list_kms_keys_user_isolation(self):
"""Test: User A's keys are NOT visible to User B."""
profile = HSMProfile.create(
self.apiclient,
name=_random_name("hsm-iso"),
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
if not self.default_profile:
self.skipTest("Default HSM profile 'default' not found")
profile = self.default_profile
# User A key (self.user_account)
key_a = KMSKey.create(
self.user_apiclient,
name=_random_name("key-a"),
zoneid=self.zone.id,
hsmprofileid=profile.id,
)
self.cleanup.append(key_a)
key_a = self._create_kms_key(name=_random_name("key-a"), profile_id=profile.id, apiclient=self.user_apiclient)
# Create User B in a separate child domain
domain_b = Domain.create(
@ -353,7 +317,7 @@ class TestKMSLifecycle(cloudstackTestCase):
"password": "password",
},
domainid=domain_b.id,
accounttype=0,
admin=False,
)
self.cleanup.append(account_b)
apiclient_b = self.test_client.getUserApiClient(
@ -373,21 +337,11 @@ class TestKMSLifecycle(cloudstackTestCase):
required_hardware="false")
def test_09_delete_kms_key(self):
"""Test: delete a KMS key that is not in use; verify it is gone."""
profile = HSMProfile.create(
self.apiclient,
name=_random_name("hsm-del"),
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
if not self.default_profile:
self.skipTest("Default HSM profile 'default' not found")
profile = self.default_profile
key = KMSKey.create(
self.apiclient,
name=_random_name("key-del"),
zoneid=self.zone.id,
hsmprofileid=profile.id,
)
key = self._create_kms_key(name=_random_name("key-del"), profile_id=profile.id)
key.delete(self.apiclient)
@ -399,29 +353,18 @@ class TestKMSLifecycle(cloudstackTestCase):
)
# ==================================================================
# Key rotation (test 11)
# Key rotation (test 10)
# ==================================================================
@attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"],
required_hardware="false")
def test_11_rotate_kms_key(self):
def test_10_rotate_kms_key(self):
"""Test: rotate a KMS key; verify the key version increments."""
profile = HSMProfile.create(
self.apiclient,
name=_random_name("hsm-rot"),
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
if not self.default_profile:
self.skipTest("Default HSM profile 'default' not found")
profile = self.default_profile
key = KMSKey.create(
self.apiclient,
name=_random_name("key-rot"),
zoneid=self.zone.id,
hsmprofileid=profile.id,
)
self.cleanup.append(key)
key = self._create_kms_key(name=_random_name("key-rot"), profile_id=profile.id)
initial_version = key.version
@ -441,101 +384,100 @@ class TestKMSLifecycle(cloudstackTestCase):
)
# ==================================================================
# Negative tests (tests 10, 12, 13)
# Negative tests (11)
# ==================================================================
@attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"],
required_hardware="true")
def test_10_delete_kms_key_in_use_negative(self):
"""
Negative test: deleting a KMS key that is attached to an encrypted
volume should be rejected.
Marked required_hardware="true" because it needs a running hypervisor
and storage to deploy a VM with an encrypted volume.
"""
# This test requires a deployed VM with an encrypted volume that
# references this key. Defer to environment with actual hypervisor.
self.skipTest(
"Skipped: requires a hypervisor with volume encryption support. "
"Run manually in an advanced zone."
)
@attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"],
required_hardware="false")
def test_12_create_kms_key_duplicate_name_negative(self):
"""
Negative test: creating two KMS keys with the same name in the same
account should raise an exception.
"""
profile = HSMProfile.create(
self.apiclient,
name=_random_name("hsm-dup"),
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
key_name = _random_name("key-dup")
key_first = KMSKey.create(
self.user_apiclient,
name=key_name,
zoneid=self.zone.id,
hsmprofileid=profile.id,
)
self.cleanup.append(key_first)
with self.assertRaises(Exception,
msg="Duplicate key name in same account should raise an exception"):
dupe = KMSKey.create(
self.user_apiclient,
name=key_name,
zoneid=self.zone.id,
hsmprofileid=profile.id,
)
# If creation somehow succeeded, register for cleanup and fail
self.cleanup.append(dupe)
@attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"],
required_hardware="false")
def test_13_delete_hsm_profile_with_keys_negative(self):
def test_11_delete_hsm_profile_with_keys_negative(self):
"""
Negative test: deleting an HSM profile that still has associated KMS
keys should be rejected.
"""
profile = HSMProfile.create(
self.apiclient,
name=_random_name("hsm-with-key"),
protocol="database",
system=True,
zoneid=self.zone.id,
)
self.cleanup.append(profile)
profile = self._create_hsm_profile(name=_random_name("hsm-with-key"))
key = KMSKey.create(
self.apiclient,
name=_random_name("key-blocks-del"),
zoneid=self.zone.id,
hsmprofileid=profile.id,
)
self.cleanup.append(key)
key = self._create_kms_key(name=_random_name("key-blocks-del"), profile_id=profile.id)
with self.assertRaises(Exception,
msg="Deleting HSM profile with active keys should fail"):
profile.delete(self.apiclient)
# ==================================================================
# VM Encryption tests (12)
# ==================================================================
@attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"],
required_hardware="false")
def test_14_delete_hsm_profile(self):
"""Test: delete an HSM profile that has no associated keys; verify it is gone."""
profile = HSMProfile.create(
def test_12_deploy_vm_with_root_disk_encryption(self):
"""
Test: deploy a VM with its root disk encrypted using a KMS key.
Verify that the VM starts and the root volume has the KMS key ID.
"""
# 1. Create a KMS key for the user
key = self._create_kms_key(name=_random_name("vm-root-key"), profile_id=self.default_profile.id, apiclient=self.user_apiclient)
# 2. Get a template and a service offering
template = get_template(
self.apiclient,
name=_random_name("hsm-gone"),
protocol="database",
system=True,
zoneid=self.zone.id,
self.zone.id,
self.test_client.getParsedTestDataConfig().get("ostype", "CentOS 7.0 (64-bit)")
)
if template == -1 or template is None:
self.fail("Check for template failed")
service_offering = ServiceOffering.create(
self.apiclient,
self.test_client.getParsedTestDataConfig()["service_offering"]
)
self.cleanup.append(service_offering)
# 3. Deploy VM with root disk encryption
vm = VirtualMachine.create(
self.user_apiclient,
self.test_client.getParsedTestDataConfig()["virtual_machine"],
templateid=template.id,
accountid=self.user_account.name,
domainid=self.child_domain.id,
serviceofferingid=service_offering.id,
zoneid=self.zone.id,
rootdiskkmskeyid=key.id
)
self.cleanup.append(vm)
self.assertEqual(
vm.state,
"Running",
"VM should be in Running state after deployment"
)
# 4. Verify the root volume has the KMS key ID
volumes = Volume.list(
self.user_apiclient,
virtualmachineid=vm.id,
type='ROOT',
listall=True
)
self.assertTrue(
volumes and len(volumes) > 0,
"VM should have at least one ROOT volume"
)
root_volume = volumes[0]
self.assertEqual(
str(root_volume.kmskeyid),
str(key.id),
f"Root volume should have KMS key ID {key.id}, found {root_volume.kmskeyid}"
)
# ==================================================================
# HSM Profile cleanup (13)
# ==================================================================
@attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"],
required_hardware="false")
def test_13_delete_hsm_profile(self):
"""Test: delete an HSM profile that has no associated keys; verify it is gone."""
profile = self._create_hsm_profile(name=_random_name("hsm-gone"))
profile.delete(self.apiclient)

View File

@ -567,7 +567,8 @@ class VirtualMachine:
rootdiskcontroller=None, vpcid=None, macaddress=None, datadisktemplate_diskoffering_list={},
properties=None, nicnetworklist=None, bootmode=None, boottype=None, dynamicscalingenabled=None,
userdataid=None, userdatadetails=None, extraconfig=None, size=None, overridediskofferingid=None,
leaseduration=None, leaseexpiryaction=None, volumeid=None, snapshotid=None):
leaseduration=None, leaseexpiryaction=None, volumeid=None, snapshotid=None,
rootdiskkmskeyid=None):
"""Create the instance"""
@ -744,6 +745,9 @@ class VirtualMachine:
if snapshotid:
cmd.snapshotid = snapshotid
if rootdiskkmskeyid:
cmd.rootdiskkmskeyid = rootdiskkmskeyid
virtual_machine = apiclient.deployVirtualMachine(cmd, method=method)
if 'password' in list(virtual_machine.__dict__.keys()):