Update test_certauthority_root

This commit is contained in:
vishesh92 2026-04-08 08:57:40 +05:30
parent e8c4ca943e
commit c0572d7992
No known key found for this signature in database
GPG Key ID: 4E395186CBFA790B
1 changed files with 176 additions and 1 deletions

View File

@ -15,9 +15,12 @@
# specific language governing permissions and limitations
# under the License.
import re
from datetime import datetime, timedelta
from nose.plugins.attrib import attr
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.lib.utils import cleanup_resources
from marvin.lib.utils import cleanup_resources, wait_until
from marvin.lib.base import *
from marvin.lib.common import list_hosts
@ -60,6 +63,29 @@ class TestCARootProvider(cloudstackTestCase):
except Exception as e:
print(f"Certificate verification failed: {e}")
def parseCertificateChain(self, pem):
"""Split a PEM blob containing one or more certificates into a list of x509 objects."""
certs = []
matches = re.findall(
r'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----',
pem,
re.DOTALL
)
for match in matches:
certs.append(x509.load_pem_x509_certificate(match.encode(), default_backend()))
return certs
def assertSignatureValid(self, issuerCert, cert):
"""Verify cert is signed by issuerCert; raise on failure."""
issuerCert.public_key().verify(
cert.signature,
cert.tbs_certificate_bytes,
padding.PKCS1v15(),
cert.signature_hash_algorithm,
)
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
@ -224,3 +250,152 @@ class TestCARootProvider(cloudstackTestCase):
self.assertTrue(len(hosts) == 1)
else:
self.fail("Failed to have systemvm host in Up state after cert provisioning")
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_ca_certificate_chain_validity(self):
"""
Tests that listCaCertificate returns a valid certificate chain.
When an intermediate CA is configured, the response is a PEM blob
containing multiple certificates. Each non-root cert must be signed
by the next cert in the chain, and the final cert must be self-signed.
"""
pem = self.getCaCertificate()
self.assertTrue(len(pem) > 0)
chain = self.parseCertificateChain(pem)
self.assertTrue(len(chain) >= 1, "Expected at least one certificate in CA chain")
# Each non-root cert must be signed by the next cert in the chain
for i in range(len(chain) - 1):
child = chain[i]
parent = chain[i + 1]
self.assertEqual(
child.issuer, parent.subject,
f"Chain break: cert[{i}] issuer does not match cert[{i + 1}] subject"
)
try:
self.assertSignatureValid(parent, child)
except Exception as e:
self.fail(f"Signature verification failed for chain link {i} -> {i + 1}: {e}")
# The last cert in the chain must be self-signed (root CA)
root = chain[-1]
self.assertEqual(
root.issuer, root.subject,
"Final cert in CA chain is not self-signed"
)
try:
self.assertSignatureValid(root, root)
except Exception as e:
self.fail(f"Root CA self-signature verification failed: {e}")
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_issue_certificate_issuer_matches_ca(self):
"""
Tests that an issued certificate's issuer DN matches the subject DN
of the first cert in the returned CA chain, and that the signature
verifies against that cert's public key.
"""
cmd = issueCertificate.issueCertificateCmd()
cmd.domain = 'apache.org'
cmd.ipaddress = '10.1.1.1'
cmd.provider = 'root'
response = self.apiclient.issueCertificate(cmd)
self.assertTrue(len(response.certificate) > 0)
self.assertTrue(len(response.cacertificates) > 0)
leaf = x509.load_pem_x509_certificate(response.certificate.encode(), default_backend())
caChain = self.parseCertificateChain(response.cacertificates)
self.assertTrue(len(caChain) >= 1, "Expected at least one CA certificate in response")
# The issuing CA is the first cert in the returned chain (intermediate
# if an intermediate CA is configured, otherwise the root).
issuingCa = caChain[0]
self.assertEqual(
leaf.issuer, issuingCa.subject,
"Leaf certificate issuer does not match issuing CA subject"
)
try:
self.assertSignatureValid(issuingCa, leaf)
except Exception as e:
self.fail(f"Leaf certificate signature does not verify against issuing CA: {e}")
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_certificate_validity_period(self):
"""
Tests that an issued certificate has sensible validity bounds:
not_valid_before <= now <= not_valid_after, and validity duration
is at least 300 days (CloudStack default is 1 year).
"""
cmd = issueCertificate.issueCertificateCmd()
cmd.domain = 'apache.org'
cmd.provider = 'root'
response = self.apiclient.issueCertificate(cmd)
self.assertTrue(len(response.certificate) > 0)
cert = x509.load_pem_x509_certificate(response.certificate.encode(), default_backend())
# cryptography >= 42 prefers the *_utc variants; fall back for older versions.
notBefore = getattr(cert, 'not_valid_before_utc', None) or cert.not_valid_before
notAfter = getattr(cert, 'not_valid_after_utc', None) or cert.not_valid_after
now = datetime.now(notBefore.tzinfo) if notBefore.tzinfo else datetime.utcnow()
self.assertTrue(notBefore <= now, f"Certificate not_valid_before {notBefore} is in the future")
self.assertTrue(now <= notAfter, f"Certificate not_valid_after {notAfter} is in the past")
duration = notAfter - notBefore
self.assertTrue(
duration >= timedelta(days=300),
f"Certificate validity duration {duration} is less than expected minimum of 300 days"
)
def getUpKVMHosts(self, hostId=None):
hosts = list_hosts(
self.apiclient,
type='Routing',
hypervisor='KVM',
state='Up',
resourcestate='Enabled',
id=hostId
)
return hosts
@attr(tags=['advanced'], required_hardware=True)
def test_provision_certificate_kvm(self):
"""
Tests certificate provisioning on a KVM host.
Exercises the keystore-cert-import + cloud.jks provisioning flow
against a real agent. Skipped when no KVM hosts are available.
"""
if self.hypervisor.lower() != 'kvm':
raise self.skipTest("Hypervisor is not KVM, skipping test")
hosts = self.getUpKVMHosts()
if not hosts or len(hosts) < 1:
raise self.skipTest("No Up KVM hosts found, skipping test")
host = hosts[0]
cmd = provisionCertificate.provisionCertificateCmd()
cmd.hostid = host.id
cmd.reconnect = True
cmd.provider = 'root'
response = self.apiclient.provisionCertificate(cmd)
self.assertTrue(response.success)
def checkHostIsUp(hostId):
hosts = self.getUpKVMHosts(hostId)
return (hosts is not None and len(hosts) > 0), hosts
result, hosts = wait_until(2, 30, checkHostIsUp, host.id)
if not result:
self.fail("KVM host did not return to Up state after certificate provisioning")
self.assertEqual(len(hosts), 1)