From c0572d7992d7b3727b35ad3e8d525774c75e8017 Mon Sep 17 00:00:00 2001 From: vishesh92 Date: Wed, 8 Apr 2026 08:57:40 +0530 Subject: [PATCH] Update test_certauthority_root --- .../smoke/test_certauthority_root.py | 177 +++++++++++++++++- 1 file changed, 176 insertions(+), 1 deletion(-) diff --git a/test/integration/smoke/test_certauthority_root.py b/test/integration/smoke/test_certauthority_root.py index dc6420d6369..491b8abeb2e 100644 --- a/test/integration/smoke/test_certauthority_root.py +++ b/test/integration/smoke/test_certauthority_root.py @@ -15,9 +15,12 @@ # specific language governing permissions and limitations # under the License. +import re +from datetime import datetime, timedelta + from nose.plugins.attrib import attr from marvin.cloudstackTestCase import cloudstackTestCase -from marvin.lib.utils import cleanup_resources +from marvin.lib.utils import cleanup_resources, wait_until from marvin.lib.base import * from marvin.lib.common import list_hosts @@ -60,6 +63,29 @@ class TestCARootProvider(cloudstackTestCase): except Exception as e: print(f"Certificate verification failed: {e}") + + def parseCertificateChain(self, pem): + """Split a PEM blob containing one or more certificates into a list of x509 objects.""" + certs = [] + matches = re.findall( + r'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', + pem, + re.DOTALL + ) + for match in matches: + certs.append(x509.load_pem_x509_certificate(match.encode(), default_backend())) + return certs + + + def assertSignatureValid(self, issuerCert, cert): + """Verify cert is signed by issuerCert; raise on failure.""" + issuerCert.public_key().verify( + cert.signature, + cert.tbs_certificate_bytes, + padding.PKCS1v15(), + cert.signature_hash_algorithm, + ) + def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() @@ -224,3 +250,152 @@ class TestCARootProvider(cloudstackTestCase): self.assertTrue(len(hosts) == 1) else: self.fail("Failed to have systemvm host in Up state after cert provisioning") + + + @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False) + def test_ca_certificate_chain_validity(self): + """ + Tests that listCaCertificate returns a valid certificate chain. + When an intermediate CA is configured, the response is a PEM blob + containing multiple certificates. Each non-root cert must be signed + by the next cert in the chain, and the final cert must be self-signed. + """ + pem = self.getCaCertificate() + self.assertTrue(len(pem) > 0) + + chain = self.parseCertificateChain(pem) + self.assertTrue(len(chain) >= 1, "Expected at least one certificate in CA chain") + + # Each non-root cert must be signed by the next cert in the chain + for i in range(len(chain) - 1): + child = chain[i] + parent = chain[i + 1] + self.assertEqual( + child.issuer, parent.subject, + f"Chain break: cert[{i}] issuer does not match cert[{i + 1}] subject" + ) + try: + self.assertSignatureValid(parent, child) + except Exception as e: + self.fail(f"Signature verification failed for chain link {i} -> {i + 1}: {e}") + + # The last cert in the chain must be self-signed (root CA) + root = chain[-1] + self.assertEqual( + root.issuer, root.subject, + "Final cert in CA chain is not self-signed" + ) + try: + self.assertSignatureValid(root, root) + except Exception as e: + self.fail(f"Root CA self-signature verification failed: {e}") + + + @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False) + def test_issue_certificate_issuer_matches_ca(self): + """ + Tests that an issued certificate's issuer DN matches the subject DN + of the first cert in the returned CA chain, and that the signature + verifies against that cert's public key. + """ + cmd = issueCertificate.issueCertificateCmd() + cmd.domain = 'apache.org' + cmd.ipaddress = '10.1.1.1' + cmd.provider = 'root' + + response = self.apiclient.issueCertificate(cmd) + self.assertTrue(len(response.certificate) > 0) + self.assertTrue(len(response.cacertificates) > 0) + + leaf = x509.load_pem_x509_certificate(response.certificate.encode(), default_backend()) + caChain = self.parseCertificateChain(response.cacertificates) + self.assertTrue(len(caChain) >= 1, "Expected at least one CA certificate in response") + + # The issuing CA is the first cert in the returned chain (intermediate + # if an intermediate CA is configured, otherwise the root). + issuingCa = caChain[0] + self.assertEqual( + leaf.issuer, issuingCa.subject, + "Leaf certificate issuer does not match issuing CA subject" + ) + try: + self.assertSignatureValid(issuingCa, leaf) + except Exception as e: + self.fail(f"Leaf certificate signature does not verify against issuing CA: {e}") + + + @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False) + def test_certificate_validity_period(self): + """ + Tests that an issued certificate has sensible validity bounds: + not_valid_before <= now <= not_valid_after, and validity duration + is at least 300 days (CloudStack default is 1 year). + """ + cmd = issueCertificate.issueCertificateCmd() + cmd.domain = 'apache.org' + cmd.provider = 'root' + + response = self.apiclient.issueCertificate(cmd) + self.assertTrue(len(response.certificate) > 0) + + cert = x509.load_pem_x509_certificate(response.certificate.encode(), default_backend()) + + # cryptography >= 42 prefers the *_utc variants; fall back for older versions. + notBefore = getattr(cert, 'not_valid_before_utc', None) or cert.not_valid_before + notAfter = getattr(cert, 'not_valid_after_utc', None) or cert.not_valid_after + + now = datetime.now(notBefore.tzinfo) if notBefore.tzinfo else datetime.utcnow() + self.assertTrue(notBefore <= now, f"Certificate not_valid_before {notBefore} is in the future") + self.assertTrue(now <= notAfter, f"Certificate not_valid_after {notAfter} is in the past") + + duration = notAfter - notBefore + self.assertTrue( + duration >= timedelta(days=300), + f"Certificate validity duration {duration} is less than expected minimum of 300 days" + ) + + + def getUpKVMHosts(self, hostId=None): + hosts = list_hosts( + self.apiclient, + type='Routing', + hypervisor='KVM', + state='Up', + resourcestate='Enabled', + id=hostId + ) + return hosts + + + @attr(tags=['advanced'], required_hardware=True) + def test_provision_certificate_kvm(self): + """ + Tests certificate provisioning on a KVM host. + Exercises the keystore-cert-import + cloud.jks provisioning flow + against a real agent. Skipped when no KVM hosts are available. + """ + if self.hypervisor.lower() != 'kvm': + raise self.skipTest("Hypervisor is not KVM, skipping test") + + hosts = self.getUpKVMHosts() + if not hosts or len(hosts) < 1: + raise self.skipTest("No Up KVM hosts found, skipping test") + + host = hosts[0] + + cmd = provisionCertificate.provisionCertificateCmd() + cmd.hostid = host.id + cmd.reconnect = True + cmd.provider = 'root' + + response = self.apiclient.provisionCertificate(cmd) + self.assertTrue(response.success) + + def checkHostIsUp(hostId): + hosts = self.getUpKVMHosts(hostId) + return (hosts is not None and len(hosts) > 0), hosts + + result, hosts = wait_until(2, 30, checkHostIsUp, host.id) + if not result: + self.fail("KVM host did not return to Up state after certificate provisioning") + self.assertEqual(len(hosts), 1)