CLOUDSTACK-9544: Check access on account trying to generate user API keys

This fixes CVE-2016-6813

Signed-off-by: Marc-Aurèle Brothier <m@brothier.org>
Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>
(cherry picked from commit 158497d68a)
Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>
This commit is contained in:
Marc-Aurèle Brothier 2016-10-18 15:33:38 +02:00 committed by Rohit Yadav
parent 9127af61e4
commit 0f89a8939f
2 changed files with 165 additions and 0 deletions

View File

@ -2231,6 +2231,7 @@ public class AccountManagerImpl extends ManagerBase implements AccountManager, M
@DB
@ActionEvent(eventType = EventTypes.EVENT_REGISTER_FOR_SECRET_API_KEY, eventDescription = "register for the developer API keys")
public String[] createApiKeyAndSecretKey(RegisterCmd cmd) {
Account caller = CallContext.current().getCallingAccount();
final Long userId = cmd.getId();
User user = getUserIncludingRemoved(userId);
@ -2238,6 +2239,9 @@ public class AccountManagerImpl extends ManagerBase implements AccountManager, M
throw new InvalidParameterValueException("unable to find user by id");
}
Account account = _accountDao.findById(user.getAccountId());
checkAccess(caller, null, true, account);
// don't allow updating system user
if (user.getId() == User.UID_SYSTEM) {
throw new PermissionDeniedException("user id : " + user.getId() + " is system account, update is not allowed");

View File

@ -1531,6 +1531,167 @@ class TestUserLogin(cloudstackTestCase):
return
class TestUserAPIKeys(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.testClient = super(TestUserAPIKeys, cls).getClsTestClient()
cls.api_client = cls.testClient.getApiClient()
cls.services = Services().services
cls.domain = get_domain(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
cls.services['mode'] = cls.zone.networktype
# Create an account, domain etc
cls.domain = Domain.create(
cls.api_client,
cls.services["domain"],
)
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=False,
domainid=cls.domain.id
)
cls.domain_2 = Domain.create(
cls.api_client,
cls.services["domain"],
)
cls.account_2 = Account.create(
cls.api_client,
cls.services["account"],
admin=False,
domainid=cls.domain_2.id
)
cls._cleanup = [
cls.account,
cls.domain,
cls.account_2,
cls.domain_2
]
return
@classmethod
def tearDownClass(cls):
try:
# Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
# Clean up, terminate the created network offerings
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags=[
"role",
"accounts",
"simulator",
"advanced",
"advancedns",
"basic",
"eip",
"sg"
])
def test_user_key_renew_same_account(self):
# Create an User associated with the account
user_1 = User.create(
self.apiclient,
self.services["user"],
account=self.account.name,
domainid=self.domain.id
)
self.cleanup.append(user_1)
account_response = list_accounts(
self.apiclient,
id=self.account.id
)[0]
self.assertEqual(
hasattr(account_response, 'user'),
True,
"Users are included in account response")
account_users = account_response.user
self.assertEqual(
isinstance(account_users, list),
True,
"Check account for valid data"
)
self.assertNotEqual(
len(account_users),
0,
"Check number of User in Account")
[user] = [u for u in account_users if u.username == user_1.username]
self.assertEqual(
user.apikey,
None,
"Check that the user don't have an API key yet")
self.debug("Register API keys for user")
userkeys = User.registerUserKeys(self.apiclient, user_1.id)
users = list_accounts(
self.apiclient,
id=self.account.id
)[0].user
[user] = [u for u in users if u.id == user_1.id]
self.assertEqual(
user.apikey,
userkeys.apikey,
"Check User api key")
self.assertEqual(
user.secretkey,
userkeys.secretkey,
"Check User having secret key")
self.debug("Get test client with user keys")
cs_api = self.testClient.getUserApiClient(
UserName=self.account.name,
DomainName=self.account.domain)
self.debug("Renew API keys for user using current keys")
new_keys = User.registerUserKeys(cs_api, user_1.id)
self.assertNotEqual(
userkeys.apikey,
new_keys.apikey,
"Check API key is different")
self.assertNotEqual(
userkeys.secretkey,
new_keys.secretkey,
"Check secret key is different")
@attr(tags=[
"role",
"accounts",
"simulator",
"advanced",
"advancedns",
"basic",
"eip",
"sg"
])
def test_user_cannot_renew_other_keys(self):
cs_api = self.testClient.getUserApiClient(
UserName=self.account.name,
DomainName=self.account.domain)
self.debug("Try to change API key of an account in another domain")
users = list_accounts(
self.apiclient,
id=self.account_2.id
)[0].user
with self.assertRaises(CloudstackAPIException) as e:
User.registerUserKeys(cs_api, users[0].id)
class TestDomainForceRemove(cloudstackTestCase):
@classmethod