[22.0] secondary storage resource limit for upload

This commit is contained in:
Abhisar Sinha 2026-03-09 10:19:56 +05:30 committed by Daan Hoogland
parent 5d5ee7b689
commit d722415105
4 changed files with 182 additions and 25 deletions

View File

@ -28,6 +28,7 @@ public class UploadStatusCommand extends Command {
}
private String entityUuid;
private EntityType entityType;
private Boolean abort;
protected UploadStatusCommand() {
}
@ -37,6 +38,11 @@ public class UploadStatusCommand extends Command {
this.entityType = entityType;
}
public UploadStatusCommand(String entityUuid, EntityType entityType, Boolean abort) {
this(entityUuid, entityType);
this.abort = abort;
}
public String getEntityUuid() {
return entityUuid;
}
@ -45,6 +51,10 @@ public class UploadStatusCommand extends Command {
return entityType;
}
public Boolean getAbort() {
return abort;
}
@Override
public boolean executeInSequence() {
return false;

View File

@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import com.cloud.agent.api.to.OVFInformationTO;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
@ -37,6 +36,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.TemplateService;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.reservation.dao.ReservationDao;
import org.apache.cloudstack.storage.command.UploadStatusAnswer;
import org.apache.cloudstack.storage.command.UploadStatusAnswer.UploadStatus;
import org.apache.cloudstack.storage.command.UploadStatusCommand;
@ -55,6 +55,7 @@ import com.cloud.agent.api.AgentControlCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.StartupCommand;
import com.cloud.agent.api.to.OVFInformationTO;
import com.cloud.alert.AlertManager;
import com.cloud.api.query.dao.TemplateJoinDao;
import com.cloud.api.query.vo.TemplateJoinVO;
@ -65,12 +66,16 @@ import com.cloud.exception.ConnectionException;
import com.cloud.host.Host;
import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
import com.cloud.resourcelimit.CheckedReservation;
import com.cloud.storage.Volume.Event;
import com.cloud.storage.dao.VMTemplateDao;
import com.cloud.storage.dao.VMTemplateZoneDao;
import com.cloud.storage.dao.VolumeDao;
import com.cloud.template.VirtualMachineTemplate;
import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.ResourceLimitService;
import com.cloud.user.dao.AccountDao;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.Transaction;
@ -117,6 +122,12 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
private TemplateJoinDao templateJoinDao;
@Inject
private DeployAsIsHelper deployAsIsHelper;
@Inject
private AccountDao accountDao;
@Inject
private AccountManager _accountMgr;
@Inject
private ReservationDao reservationDao;
private long _nodeId;
private ScheduledExecutorService _executor = null;
@ -205,6 +216,36 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
public UploadStatusCheck() {
}
private Answer sendUploadStatusCommandForVolume(EndPoint ep, UploadStatusCommand cmd, VolumeVO volume) {
Answer answer = null;
try {
answer = ep.sendMessage(cmd);
} catch (CloudRuntimeException e) {
logger.warn("Unable to get upload status for volume {}. Error details: {}", volume, e.getMessage());
answer = new UploadStatusAnswer(cmd, UploadStatus.UNKNOWN, e.getMessage());
}
if (answer == null || !(answer instanceof UploadStatusAnswer)) {
logger.warn("No or invalid answer corresponding to UploadStatusCommand for volume {}", volume);
return null;
}
return answer;
}
private Answer sendUploadStatusCommandForTemplate(EndPoint ep, UploadStatusCommand cmd, VMTemplateVO template) {
Answer answer = null;
try {
answer = ep.sendMessage(cmd);
} catch (CloudRuntimeException e) {
logger.warn("Unable to get upload status for template {}. Error details: {}", template, e.getMessage());
answer = new UploadStatusAnswer(cmd, UploadStatus.UNKNOWN, e.getMessage());
}
if (answer == null || !(answer instanceof UploadStatusAnswer)) {
logger.warn("No or invalid answer corresponding to UploadStatusCommand for template {}", template);
return null;
}
return answer;
}
@Override
protected void runInContext() {
// 1. Select all entries with download_state = Not_Downloaded or Download_In_Progress
@ -231,18 +272,17 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
UploadStatusCommand cmd = new UploadStatusCommand(volume.getUuid(), EntityType.Volume);
if (host != null && host.getManagementServerId() != null) {
if (_nodeId == host.getManagementServerId().longValue()) {
Answer answer = null;
try {
answer = ep.sendMessage(cmd);
} catch (CloudRuntimeException e) {
logger.warn("Unable to get upload status for volume {}. Error details: {}", volume, e.getMessage());
answer = new UploadStatusAnswer(cmd, UploadStatus.UNKNOWN, e.getMessage());
}
if (answer == null || !(answer instanceof UploadStatusAnswer)) {
logger.warn("No or invalid answer corresponding to UploadStatusCommand for volume {}", volume);
Answer answer = sendUploadStatusCommandForVolume(ep, cmd, volume);
if (answer == null) {
continue;
}
handleVolumeStatusResponse((UploadStatusAnswer)answer, volume, volumeDataStore);
if (!handleVolumeStatusResponse((UploadStatusAnswer)answer, volume, volumeDataStore)) {
cmd = new UploadStatusCommand(volume.getUuid(), EntityType.Volume, true);
answer = sendUploadStatusCommandForVolume(ep, cmd, volume);
if (answer == null) {
logger.warn("Unable to abort upload for volume {}", volume);
}
}
}
} else {
String error = "Volume " + volume.getUuid() + " failed to upload as SSVM is either destroyed or SSVM agent not in 'Up' state";
@ -275,18 +315,17 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
UploadStatusCommand cmd = new UploadStatusCommand(template.getUuid(), EntityType.Template);
if (host != null && host.getManagementServerId() != null) {
if (_nodeId == host.getManagementServerId().longValue()) {
Answer answer = null;
try {
answer = ep.sendMessage(cmd);
} catch (CloudRuntimeException e) {
logger.warn("Unable to get upload status for template {}. Error details: {}", template, e.getMessage());
answer = new UploadStatusAnswer(cmd, UploadStatus.UNKNOWN, e.getMessage());
}
if (answer == null || !(answer instanceof UploadStatusAnswer)) {
logger.warn("No or invalid answer corresponding to UploadStatusCommand for template {}", template);
Answer answer = sendUploadStatusCommandForTemplate(ep, cmd, template);
if (answer == null) {
continue;
}
handleTemplateStatusResponse((UploadStatusAnswer)answer, template, templateDataStore);
if (!handleTemplateStatusResponse((UploadStatusAnswer) answer, template, templateDataStore)) {
cmd = new UploadStatusCommand(template.getUuid(), EntityType.Template, true);
answer = sendUploadStatusCommandForTemplate(ep, cmd, template);
if (answer == null) {
logger.warn("Unable to abort upload for template {}", template);
}
}
}
} else {
String error = String.format(
@ -303,7 +342,41 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
}
}
private void handleVolumeStatusResponse(final UploadStatusAnswer answer, final VolumeVO volume, final VolumeDataStoreVO volumeDataStore) {
private Boolean checkAndUpdateSecondaryStorageResourceLimit(Long accountId, Long lastSize, Long currentSize) {
if (lastSize >= currentSize) {
return true;
}
Long usage = currentSize - lastSize;
try (CheckedReservation secStorageReservation = new CheckedReservation(_accountMgr.getAccount(accountId), Resource.ResourceType.secondary_storage, null, null, usage, reservationDao, _resourceLimitMgr)) {
_resourceLimitMgr.incrementResourceCount(accountId, Resource.ResourceType.secondary_storage, usage);
return true;
} catch (Exception e) {
_resourceLimitMgr.decrementResourceCount(accountId, Resource.ResourceType.secondary_storage, lastSize);
return false;
}
}
private Boolean checkAndUpdateVolumeResourceLimit(VolumeVO volume, VolumeDataStoreVO volumeDataStore, UploadStatusAnswer answer) {
boolean success = true;
Long currentSize = answer.getVirtualSize() != 0 ? answer.getVirtualSize() : answer.getPhysicalSize();
Long lastSize = volume.getSize() != null ? volume.getSize() : 0L;
if (!checkAndUpdateSecondaryStorageResourceLimit(volume.getAccountId(), volume.getSize(), currentSize)) {
volumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.DOWNLOAD_ERROR);
volumeDataStore.setState(State.Failed);
volumeDataStore.setErrorString("Storage Limit Reached");
Account owner = accountDao.findById(volume.getAccountId());
String msg = String.format("Upload of volume [%s] failed because its owner [%s] does not have enough secondary storage space available.", volume.getUuid(), owner.getUuid());
logger.error(msg);
success = false;
}
VolumeVO volumeUpdate = _volumeDao.findById(volume.getId());
volumeUpdate.setSize(currentSize);
_volumeDao.update(volumeUpdate.getId(), volumeUpdate);
return success;
}
private boolean handleVolumeStatusResponse(final UploadStatusAnswer answer, final VolumeVO volume, final VolumeDataStoreVO volumeDataStore) {
final boolean[] needAbort = new boolean[]{false};
final StateMachine2<Volume.State, Event, Volume> stateMachine = Volume.State.getStateMachine();
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
@ -315,6 +388,11 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
try {
switch (answer.getStatus()) {
case COMPLETED:
if (!checkAndUpdateVolumeResourceLimit(tmpVolume, tmpVolumeDataStore, answer)) {
stateMachine.transitTo(tmpVolume, Event.OperationFailed, null, _volumeDao);
sendAlert = true;
break;
}
tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.DOWNLOADED);
tmpVolumeDataStore.setState(State.Ready);
tmpVolumeDataStore.setInstallPath(answer.getInstallPath());
@ -326,7 +404,6 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
volumeUpdate.setSize(answer.getVirtualSize());
_volumeDao.update(tmpVolume.getId(), volumeUpdate);
stateMachine.transitTo(tmpVolume, Event.OperationSucceeded, null, _volumeDao);
_resourceLimitMgr.incrementResourceCount(volume.getAccountId(), Resource.ResourceType.secondary_storage, answer.getVirtualSize());
// publish usage events
UsageEventUtils.publishUsageEvent(EventTypes.EVENT_VOLUME_UPLOAD, tmpVolume.getAccountId(),
@ -339,6 +416,12 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
}
break;
case IN_PROGRESS:
if (!checkAndUpdateVolumeResourceLimit(tmpVolume, tmpVolumeDataStore, answer)) {
stateMachine.transitTo(tmpVolume, Event.OperationFailed, null, _volumeDao);
sendAlert = true;
needAbort[0] = true;
break;
}
if (tmpVolume.getState() == Volume.State.NotUploaded) {
tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.DOWNLOAD_IN_PROGRESS);
tmpVolumeDataStore.setDownloadPercent(answer.getDownloadPercent());
@ -387,10 +470,29 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
}
}
});
return !needAbort[0];
}
private void handleTemplateStatusResponse(final UploadStatusAnswer answer, final VMTemplateVO template, final TemplateDataStoreVO templateDataStore) {
private Boolean checkAndUpdateTemplateResourceLimit(VMTemplateVO template, TemplateDataStoreVO templateDataStore, UploadStatusAnswer answer) {
boolean success = true;
Long currentSize = answer.getVirtualSize() != 0 ? answer.getVirtualSize() : answer.getPhysicalSize();
Long lastSize = template.getSize() != null ? template.getSize() : 0L;
if (!checkAndUpdateSecondaryStorageResourceLimit(template.getAccountId(), lastSize, currentSize)) {
templateDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.DOWNLOAD_ERROR);
templateDataStore.setErrorString("Storage Limit Reached");
templateDataStore.setState(State.Failed);
Account owner = accountDao.findById(template.getAccountId());
String msg = String.format("Upload of template [%s] failed because its owner [%s] does not have enough secondary storage space available.", template.getUuid(), owner.getUuid());
logger.error(msg);
success = false;
}
templateDataStore.setSize(currentSize);
return success;
}
private boolean handleTemplateStatusResponse(final UploadStatusAnswer answer, final VMTemplateVO template, final TemplateDataStoreVO templateDataStore) {
final StateMachine2<VirtualMachineTemplate.State, VirtualMachineTemplate.Event, VirtualMachineTemplate> stateMachine = VirtualMachineTemplate.State.getStateMachine();
final boolean[] needAbort = new boolean[]{false};
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
@ -401,6 +503,11 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
try {
switch (answer.getStatus()) {
case COMPLETED:
if (!checkAndUpdateTemplateResourceLimit(tmpTemplate, tmpTemplateDataStore, answer)) {
stateMachine.transitTo(tmpTemplate, VirtualMachineTemplate.Event.OperationFailed, null, _templateDao);
sendAlert = true;
break;
}
tmpTemplateDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.DOWNLOADED);
tmpTemplateDataStore.setState(State.Ready);
tmpTemplateDataStore.setInstallPath(answer.getInstallPath());
@ -453,6 +560,12 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
}
break;
case IN_PROGRESS:
if (!checkAndUpdateTemplateResourceLimit(tmpTemplate, tmpTemplateDataStore, answer)) {
stateMachine.transitTo(tmpTemplate, VirtualMachineTemplate.Event.OperationFailed, null, _templateDao);
sendAlert = true;
needAbort[0] = true;
break;
}
if (tmpTemplate.getState() == VirtualMachineTemplate.State.NotUploaded) {
tmpTemplateDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.DOWNLOAD_IN_PROGRESS);
stateMachine.transitTo(tmpTemplate, VirtualMachineTemplate.Event.UploadRequested, null, _templateDao);
@ -502,6 +615,7 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
}
}
});
return !needAbort[0];
}
}

View File

@ -130,6 +130,7 @@ public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObj
if (decoder != null) {
decoder.cleanFiles();
}
storageResource.deregisterUploadChannel(uuid);
requestProcessed = false;
}
@ -182,6 +183,7 @@ public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObj
requestProcessed = true;
return;
}
storageResource.registerUploadChannel(uuid, ctx.channel());
//set the base directory to download the file
DiskFileUpload.baseDirectory = uploadEntity.getInstallPathPrefix();
this.processTimeout = uploadEntity.getProcessTimeout();

View File

@ -49,6 +49,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -258,7 +259,8 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
protected String _parent = "/mnt/SecStorage";
final private String _tmpltpp = "template.properties";
protected String createTemplateFromSnapshotXenScript;
private HashMap<String, UploadEntity> uploadEntityStateMap = new HashMap<>();
private final Map<String, UploadEntity> uploadEntityStateMap = new ConcurrentHashMap<>();
private final Map<String, Channel> uploadChannelMap = new ConcurrentHashMap<>();
private String _ssvmPSK = null;
private long processTimeout;
@ -2399,6 +2401,20 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
String entityUuid = cmd.getEntityUuid();
if (uploadEntityStateMap.containsKey(entityUuid)) {
UploadEntity uploadEntity = uploadEntityStateMap.get(entityUuid);
if (Boolean.TRUE.equals(cmd.getAbort())) {
updateStateMapWithError(entityUuid, "Upload Entity aborted");
String errorMsg = uploadEntity.getErrorMessage();
if (errorMsg == null) {
errorMsg = "Upload aborted by management server";
}
Channel channel = uploadChannelMap.remove(entityUuid);
if (channel != null && channel.isActive()) {
logger.info("Closing upload channel for entity {}", entityUuid);
channel.close();
}
uploadEntityStateMap.remove(entityUuid);
return new UploadStatusAnswer(cmd, UploadStatus.ERROR, errorMsg);
}
if (uploadEntity.getUploadState() == UploadEntity.Status.ERROR) {
uploadEntityStateMap.remove(entityUuid);
return new UploadStatusAnswer(cmd, UploadStatus.ERROR, uploadEntity.getErrorMessage());
@ -2417,6 +2433,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
UploadStatusAnswer answer = new UploadStatusAnswer(cmd, UploadStatus.IN_PROGRESS);
long downloadedSize = FileUtils.sizeOfDirectory(new File(uploadEntity.getInstallPathPrefix()));
int downloadPercent = (int)(100 * downloadedSize / uploadEntity.getContentLength());
answer.setPhysicalSize(downloadedSize);
answer.setDownloadPercent(Math.min(downloadPercent, 100));
return answer;
}
@ -3446,6 +3463,10 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
public String postUpload(String uuid, String filename, long processTimeout) {
UploadEntity uploadEntity = uploadEntityStateMap.get(uuid);
if (uploadEntity == null) {
logger.warn("Upload entity not found for uuid: {}. Upload may have been aborted.", uuid);
return "Upload entity not found. Upload may have been aborted.";
}
int installTimeoutPerGig = 180 * 60 * 1000;
String resourcePath = uploadEntity.getInstallPathPrefix();
@ -3596,6 +3617,16 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
return _ssvmPSK;
}
public void registerUploadChannel(String uuid, Channel channel) {
uploadChannelMap.put(uuid, channel);
}
public void deregisterUploadChannel(String uuid) {
if (uuid != null) {
uploadChannelMap.remove(uuid);
}
}
public void updateStateMapWithError(String uuid, String errorMessage) {
UploadEntity uploadEntity = null;
if (uploadEntityStateMap.get(uuid) != null) {