Added progress to upload Image Transfers

This commit is contained in:
Abhisar Sinha 2026-01-29 00:27:20 +05:30 committed by Abhishek Kumar
parent 7b45d2e118
commit 2350661ee3
12 changed files with 400 additions and 11 deletions

View File

@ -62,6 +62,10 @@ public class ImageTransferResponse extends BaseResponse {
@Param(description = "the image transfer direction: upload / download")
private String direction;
@SerializedName("progress")
@Param(description = "progress in percentage for the upload image transfer")
private Integer progress;
@SerializedName(ApiConstants.CREATED)
@Param(description = "the date created")
private Date created;
@ -98,6 +102,10 @@ public class ImageTransferResponse extends BaseResponse {
this.direction = direction;
}
public void setProgress(Integer progress) {
this.progress = progress;
}
public void setCreated(Date created) {
this.created = created;
}

View File

@ -29,13 +29,20 @@ import org.apache.cloudstack.api.command.admin.backup.StartBackupCmd;
import org.apache.cloudstack.api.response.BackupResponse;
import org.apache.cloudstack.api.response.CheckpointResponse;
import org.apache.cloudstack.api.response.ImageTransferResponse;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import com.cloud.utils.component.PluggableService;
/**
* Service for managing oVirt-style incremental backups using libvirt checkpoints
*/
public interface IncrementalBackupService extends PluggableService {
public interface IncrementalBackupService extends Configurable, PluggableService {
ConfigKey<Long> ImageTransferPollingInterval = new ConfigKey<>("Advanced", Long.class,
"image.transfer.polling.interval",
"10",
"The image transfer progress polling interval in seconds.", true, ConfigKey.Scope.Global);
/**
* Start a backup session for a VM

View File

@ -0,0 +1,47 @@
//Licensed to the Apache Software Foundation (ASF) under one
//or more contributor license agreements. See the NOTICE file
//distributed with this work for additional information
//regarding copyright ownership. The ASF licenses this file
//to you under the Apache License, Version 2.0 (the
//"License"); you may not use this file except in compliance
//the License. You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing,
//software distributed under the License is distributed on an
//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
//KIND, either express or implied. See the License for the
//specific language governing permissions and limitations
//under the License.
package org.apache.cloudstack.backup;
import java.util.Map;
import com.cloud.agent.api.Answer;
public class GetImageTransferProgressAnswer extends Answer {
private Map<String, Integer> progressMap; // transferId -> progress percentage (0-100)
public GetImageTransferProgressAnswer() {
}
public GetImageTransferProgressAnswer(GetImageTransferProgressCommand cmd, boolean success, String details) {
super(cmd, success, details);
}
public GetImageTransferProgressAnswer(GetImageTransferProgressCommand cmd, boolean success, String details,
Map<String, Integer> progressMap) {
super(cmd, success, details);
this.progressMap = progressMap;
}
public Map<String, Integer> getProgressMap() {
return progressMap;
}
public void setProgressMap(Map<String, Integer> progressMap) {
this.progressMap = progressMap;
}
}

View File

@ -0,0 +1,67 @@
//Licensed to the Apache Software Foundation (ASF) under one
//or more contributor license agreements. See the NOTICE file
//distributed with this work for additional information
//regarding copyright ownership. The ASF licenses this file
//to you under the Apache License, Version 2.0 (the
//"License"); you may not use this file except in compliance
//the License. You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing,
//software distributed under the License is distributed on an
//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
//KIND, either express or implied. See the License for the
//specific language governing permissions and limitations
//under the License.
package org.apache.cloudstack.backup;
import java.util.List;
import java.util.Map;
import com.cloud.agent.api.Command;
public class GetImageTransferProgressCommand extends Command {
private List<String> transferIds;
private Map<String, String> volumePaths; // transferId -> volume path
private Map<String, Long> volumeSizes; // transferId -> volume size
public GetImageTransferProgressCommand() {
}
public GetImageTransferProgressCommand(List<String> transferIds, Map<String, String> volumePaths, Map<String, Long> volumeSizes) {
this.transferIds = transferIds;
this.volumePaths = volumePaths;
this.volumeSizes = volumeSizes;
}
public List<String> getTransferIds() {
return transferIds;
}
public void setTransferIds(List<String> transferIds) {
this.transferIds = transferIds;
}
public Map<String, String> getVolumePaths() {
return volumePaths;
}
public void setVolumePaths(Map<String, String> volumePaths) {
this.volumePaths = volumePaths;
}
public Map<String, Long> getVolumeSizes() {
return volumeSizes;
}
public void setVolumeSizes(Map<String, Long> volumeSizes) {
this.volumeSizes = volumeSizes;
}
@Override
public boolean executeInSequence() {
return false;
}
}

View File

@ -68,6 +68,9 @@ public class ImageTransferVO implements ImageTransfer {
@Column(name = "signed_ticket_id")
private String signedTicketId;
@Column(name = "progress")
private Integer progress;
@Column(name = "account_id")
Long accountId;
@ -189,6 +192,15 @@ public class ImageTransferVO implements ImageTransfer {
this.signedTicketId = signedTicketId;
}
public Integer getProgress() {
return progress;
}
public void setProgress(Integer progress) {
this.progress = progress;
this.updated = new Date();
}
@Override
public Class<?> getEntityType() {
return ImageTransfer.class;

View File

@ -19,6 +19,7 @@ package org.apache.cloudstack.backup.dao;
import java.util.List;
import org.apache.cloudstack.backup.ImageTransfer;
import org.apache.cloudstack.backup.ImageTransferVO;
import com.cloud.utils.db.GenericDao;
@ -27,6 +28,6 @@ public interface ImageTransferDao extends GenericDao<ImageTransferVO, Long> {
List<ImageTransferVO> listByBackupId(Long backupId);
ImageTransferVO findByUuid(String uuid);
ImageTransferVO findByNbdPort(int port);
ImageTransferVO findByVolume(Long volumeId);
List<ImageTransferVO> listByPhaseAndDirection(ImageTransfer.Phase phase, ImageTransfer.Direction direction);
}

View File

@ -21,6 +21,7 @@ import java.util.List;
import javax.annotation.PostConstruct;
import org.apache.cloudstack.backup.ImageTransfer;
import org.apache.cloudstack.backup.ImageTransferVO;
import org.springframework.stereotype.Component;
@ -35,6 +36,7 @@ public class ImageTransferDaoImpl extends GenericDaoBase<ImageTransferVO, Long>
private SearchBuilder<ImageTransferVO> uuidSearch;
private SearchBuilder<ImageTransferVO> nbdPortSearch;
private SearchBuilder<ImageTransferVO> volumeSearch;
private SearchBuilder<ImageTransferVO> phaseDirectionSearch;
public ImageTransferDaoImpl() {
}
@ -56,6 +58,11 @@ public class ImageTransferDaoImpl extends GenericDaoBase<ImageTransferVO, Long>
volumeSearch = createSearchBuilder();
volumeSearch.and("volumeId", volumeSearch.entity().getDiskId(), SearchCriteria.Op.EQ);
volumeSearch.done();
phaseDirectionSearch = createSearchBuilder();
phaseDirectionSearch.and("phase", phaseDirectionSearch.entity().getPhase(), SearchCriteria.Op.EQ);
phaseDirectionSearch.and("direction", phaseDirectionSearch.entity().getDirection(), SearchCriteria.Op.EQ);
phaseDirectionSearch.done();
}
@Override
@ -85,4 +92,12 @@ public class ImageTransferDaoImpl extends GenericDaoBase<ImageTransferVO, Long>
sc.setParameters("volumeId", volumeId);
return findOneBy(sc);
}
@Override
public List<ImageTransferVO> listByPhaseAndDirection(ImageTransfer.Phase phase, ImageTransfer.Direction direction) {
SearchCriteria<ImageTransferVO> sc = phaseDirectionSearch.create();
sc.setParameters("phase", phase);
sc.setParameters("direction", direction);
return listBy(sc);
}
}

View File

@ -92,3 +92,4 @@ CALL `cloud`.`IDEMPOTENT_ADD_UNIQUE_KEY`('cloud.counter', 'uc_counter__provider_
UPDATE `cloud`.`configuration` SET `scope` = 2 WHERE `name` = 'use.https.to.upload';
-- Delete the configuration for 'use.https.to.upload' from StoragePool
DELETE FROM `cloud`.`storage_pool_details` WHERE `name` = 'use.https.to.upload';

View File

@ -143,6 +143,7 @@ CREATE TABLE IF NOT EXISTS `cloud`.`image_transfer`(
`transfer_url` varchar(255) COMMENT 'ImageIO transfer URL',
`phase` varchar(20) NOT NULL COMMENT 'Transfer phase: initializing, transferring, finished, failed',
`direction` varchar(20) NOT NULL COMMENT 'Direction: upload, download',
`progress` int COMMENT 'Transfer progress percentage (0-100)',
`signed_ticket_id` varchar(255) COMMENT 'Signed ticket ID from ImageIO',
`created` datetime NOT NULL COMMENT 'date created',
`updated` datetime COMMENT 'date updated if not null',

View File

@ -0,0 +1,102 @@
//Licensed to the Apache Software Foundation (ASF) under one
//or more contributor license agreements. See the NOTICE file
//distributed with this work for additional information
//regarding copyright ownership. The ASF licenses this file
//to you under the Apache License, Version 2.0 (the
//"License"); you may not use this file except in compliance
//the License. You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing,
//software distributed under the License is distributed on an
//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
//KIND, either express or implied. See the License for the
//specific language governing permissions and limitations
//under the License.
package com.cloud.hypervisor.kvm.resource.wrapper;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.cloudstack.backup.GetImageTransferProgressAnswer;
import org.apache.cloudstack.backup.GetImageTransferProgressCommand;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.cloud.agent.api.Answer;
import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
import com.cloud.hypervisor.kvm.storage.KVMPhysicalDisk;
import com.cloud.resource.CommandWrapper;
import com.cloud.resource.ResourceWrapper;
@ResourceWrapper(handles = GetImageTransferProgressCommand.class)
public class LibvirtGetImageTransferProgressCommandWrapper extends CommandWrapper<GetImageTransferProgressCommand, Answer, LibvirtComputingResource> {
protected Logger logger = LogManager.getLogger(getClass());
@Override
public Answer execute(GetImageTransferProgressCommand cmd, LibvirtComputingResource resource) {
try {
List<String> transferIds = cmd.getTransferIds();
Map<String, String> volumePaths = cmd.getVolumePaths();
Map<String, Long> volumeSizes = cmd.getVolumeSizes();
Map<String, Integer> progressMap = new HashMap<>();
if (transferIds == null || transferIds.isEmpty()) {
return new GetImageTransferProgressAnswer(cmd, true, "No transfers to check", progressMap);
}
for (String transferId : transferIds) {
String volumePath = volumePaths.get(transferId);
Long volumeSize = volumeSizes.get(transferId);
if (volumePath == null || volumeSize == null || volumeSize == 0) {
logger.warn("Missing volume path or size for transferId: " + transferId);
progressMap.put(transferId, 0);
continue;
}
try {
File file = new File(volumePath);
if (!file.exists()) {
logger.warn("Volume file does not exist: " + volumePath);
progressMap.put(transferId, 0);
continue;
}
long currentSize = file.length();
if (volumePath.endsWith(".qcow2") || volumePath.endsWith(".qcow")) {
try {
long virtualSize = KVMPhysicalDisk.getVirtualSizeFromFile(volumePath);
currentSize = virtualSize;
} catch (Exception e) {
logger.warn("Failed to get virtual size for qcow2 file: " + volumePath + ", using physical size", e);
}
}
int progress = 0;
if (volumeSize > 0) {
progress = (int) Math.min(100, Math.max(0, (currentSize * 100) / volumeSize));
}
progressMap.put(transferId, progress);
logger.debug("Transfer {} progress: {}% (current: {}, total: {})", transferId, progress, currentSize, volumeSize);
} catch (Exception e) {
logger.error("Error getting progress for transferId: " + transferId + ", path: " + volumePath, e);
progressMap.put(transferId, 0);
}
}
return new GetImageTransferProgressAnswer(cmd, true, "Progress retrieved successfully", progressMap);
} catch (Exception e) {
logger.error("Error executing GetImageTransferProgressCommand", e);
return new GetImageTransferProgressAnswer(cmd, false, "Error getting transfer progress: " + e.getMessage());
}
}
}

View File

@ -23,6 +23,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.stream.Collectors;
@ -41,6 +43,8 @@ import org.apache.cloudstack.api.response.ImageTransferResponse;
import org.apache.cloudstack.backup.dao.BackupDao;
import org.apache.cloudstack.backup.dao.BackupOfferingDao;
import org.apache.cloudstack.backup.dao.ImageTransferDao;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
@ -96,6 +100,8 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
@Inject
EndPointSelector _epSelector;
private Timer imageTransferTimer;
private static final int NBD_PORT_RANGE_START = 10809;
private static final int NBD_PORT_RANGE_END = 10909;
private static final boolean DATAPLANE_PROXY_MODE = true;
@ -204,7 +210,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
} catch (AgentUnavailableException | OperationTimedoutException e) {
backupDao.remove(backup.getId());
throw new CloudRuntimeException("Failed to communicate with agent: " + e.getMessage(), e);
throw new CloudRuntimeException("Failed to communicate with agent", e);
}
}
@ -269,7 +275,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
return true;
} catch (AgentUnavailableException | OperationTimedoutException e) {
throw new CloudRuntimeException("Failed to communicate with agent: " + e.getMessage(), e);
throw new CloudRuntimeException("Failed to communicate with agent", e);
}
}
@ -324,7 +330,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
return imageTransfer;
} catch (AgentUnavailableException | OperationTimedoutException e) {
throw new CloudRuntimeException("Failed to communicate with agent: " + e.getMessage(), e);
throw new CloudRuntimeException("Failed to communicate with agent", e);
}
}
@ -363,7 +369,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
try {
nbdServerAnswer = (StartNBDServerAnswer) agentManager.send(host.getId(), nbdServerCmd);
} catch (AgentUnavailableException | OperationTimedoutException e) {
throw new CloudRuntimeException("Failed to communicate with agent: " + e.getMessage(), e);
throw new CloudRuntimeException("Failed to communicate with agent", e);
}
if (!nbdServerAnswer.getResult()) {
throw new CloudRuntimeException("Failed to start the NBD server");
@ -392,7 +398,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
volume.getId(),
host.getId(),
nbdPort,
ImageTransferVO.Phase.initializing,
ImageTransferVO.Phase.transferring,
ImageTransfer.Direction.upload,
volume.getAccountId(),
volume.getDomainId(),
@ -463,7 +469,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
}
} catch (AgentUnavailableException | OperationTimedoutException e) {
throw new CloudRuntimeException("Failed to communicate with agent: " + e.getMessage(), e);
throw new CloudRuntimeException("Failed to communicate with agent", e);
}
}
@ -477,7 +483,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
try {
answer = agentManager.send(imageTransfer.getHostId(), stopNbdServerCommand);
} catch (AgentUnavailableException | OperationTimedoutException e) {
throw new CloudRuntimeException("Failed to communicate with agent: " + e.getMessage(), e);
throw new CloudRuntimeException("Failed to communicate with agent", e);
}
if (!answer.getResult()) {
throw new CloudRuntimeException("Failed to stop the nbd server");
@ -602,9 +608,131 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
Volume volume = volumeDao.findById(volumeId);
response.setDiskId(volume.getUuid());
response.setTransferUrl(imageTransferVO.getTransferUrl());
response.setPhase(ImageTransferVO.Phase.initializing.toString());
response.setPhase(imageTransferVO.getPhase().toString());
response.setProgress(imageTransferVO.getProgress());
response.setDirection(imageTransferVO.getDirection().toString());
response.setCreated(imageTransferVO.getCreated());
return response;
}
@Override
public boolean start() {
final TimerTask imageTransferPollTask = new ManagedContextTimerTask() {
@Override
protected void runInContext() {
try {
pollImageTransferProgress();
} catch (final Throwable t) {
logger.warn("Catch throwable in image transfer poll task ", t);
}
}
};
imageTransferTimer = new Timer("ImageTransferPollTask");
long pollingInterval = ImageTransferPollingInterval.value() * 1000L;
imageTransferTimer.schedule(imageTransferPollTask, pollingInterval, pollingInterval);
return true;
}
@Override
public boolean stop() {
if (imageTransferTimer != null) {
imageTransferTimer.cancel();
imageTransferTimer = null;
}
return true;
}
private void pollImageTransferProgress() {
try {
List<ImageTransferVO> transferringTransfers = imageTransferDao.listByPhaseAndDirection(
ImageTransfer.Phase.transferring, ImageTransfer.Direction.upload);
if (transferringTransfers == null || transferringTransfers.isEmpty()) {
return;
}
Map<Long, List<ImageTransferVO>> transfersByHost = transferringTransfers.stream()
.collect(Collectors.groupingBy(ImageTransferVO::getHostId));
for (Map.Entry<Long, List<ImageTransferVO>> entry : transfersByHost.entrySet()) {
Long hostId = entry.getKey();
List<ImageTransferVO> hostTransfers = entry.getValue();
try {
List<String> transferIds = new ArrayList<>();
Map<String, String> volumePaths = new HashMap<>();
Map<String, Long> volumeSizes = new HashMap<>();
for (ImageTransferVO transfer : hostTransfers) {
VolumeVO volume = volumeDao.findById(transfer.getDiskId());
if (volume == null) {
logger.warn("Volume not found for image transfer: " + transfer.getUuid());
continue;
}
String transferId = transfer.getUuid();
transferIds.add(transferId);
String volumePath = volume.getPath();
if (volumePath == null) {
logger.warn("Volume path is null for image transfer: " + transfer.getUuid());
continue;
}
StoragePoolVO storagePool = primaryDataStoreDao.findById(volume.getPoolId());
volumePath = String.format("/mnt/%s/%s", storagePool.getUuid(), volumePath);
volumePaths.put(transferId, volumePath);
volumeSizes.put(transferId, volume.getSize());
}
if (transferIds.isEmpty()) {
continue;
}
GetImageTransferProgressCommand cmd = new GetImageTransferProgressCommand(transferIds, volumePaths, volumeSizes);
GetImageTransferProgressAnswer answer = (GetImageTransferProgressAnswer) agentManager.send(hostId, cmd);
if (answer != null && answer.getResult() && answer.getProgressMap() != null) {
for (ImageTransferVO transfer : hostTransfers) {
String transferId = transfer.getUuid();
Integer progress = answer.getProgressMap().get(transferId);
if (progress != null) {
transfer.setProgress(progress);
if (progress == 100) {
transfer.setPhase(ImageTransfer.Phase.finished);
logger.debug("Updated phase for image transfer {} to finished", transferId);
}
imageTransferDao.update(transfer.getId(), transfer);
logger.debug("Updated progress for image transfer {}: {}%", transferId, progress);
}
}
} else {
logger.warn("Failed to get progress for transfers on host {}: {}", hostId,
answer != null ? answer.getDetails() : "null answer");
}
} catch (AgentUnavailableException | OperationTimedoutException e) {
logger.warn("Failed to communicate with host {} for image transfer progress", hostId);
} catch (Exception e) {
logger.error("Error polling image transfer progress for host " + hostId, e);
}
}
} catch (Exception e) {
logger.error("Error in pollImageTransferProgress", e);
}
}
@Override
public String getConfigComponentName() {
return IncrementalBackupService.class.getSimpleName();
}
@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey[]{
ImageTransferPollingInterval
};
}
}

View File

@ -3826,7 +3826,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
// Open firewall port for image server
if (_inSystemVM) {
String rule = String.format("-p tcp -m state --state NEW -m tcp --dport %d -j ACCEPT", imageServerPort);
IpTablesHelper.addConditionally(IpTablesHelper.INPUT_CHAIN, true, rule,
IpTablesHelper.addConditionally(IpTablesHelper.INPUT_CHAIN, false, rule,
String.format("Error in opening up image server port %d", imageServerPort));
}