mirror of https://github.com/apache/cloudstack.git
Optimize the DB updates to use bulk UPDATE instead of row-level locks. (#13349)
Co-authored-by: Aaron Chung <aaron_chung@apple.com>
This commit is contained in:
parent
c2c855b9b1
commit
21e4475d96
|
|
@ -20,6 +20,7 @@ import java.util.Date;
|
|||
import java.util.List;
|
||||
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.cloud.alert.AlertVO;
|
||||
|
|
@ -28,7 +29,7 @@ import com.cloud.utils.db.GenericDaoBase;
|
|||
import com.cloud.utils.db.SearchBuilder;
|
||||
import com.cloud.utils.db.SearchCriteria;
|
||||
import com.cloud.utils.db.SearchCriteria.Op;
|
||||
import com.cloud.utils.db.TransactionLegacy;
|
||||
import com.cloud.utils.db.UpdateBuilder;
|
||||
|
||||
@Component
|
||||
public class AlertDaoImpl extends GenericDaoBase<AlertVO, Long> implements AlertDao {
|
||||
|
|
@ -107,25 +108,20 @@ public class AlertDaoImpl extends GenericDaoBase<AlertVO, Long> implements Alert
|
|||
}
|
||||
sc.setParameters("archived", false);
|
||||
|
||||
boolean result = true;
|
||||
;
|
||||
List<AlertVO> alerts = listBy(sc);
|
||||
if (ids != null && alerts.size() < ids.size()) {
|
||||
result = false;
|
||||
return result;
|
||||
return false;
|
||||
}
|
||||
if (alerts != null && !alerts.isEmpty()) {
|
||||
TransactionLegacy txn = TransactionLegacy.currentTxn();
|
||||
txn.start();
|
||||
for (AlertVO alert : alerts) {
|
||||
alert = lockRow(alert.getId(), true);
|
||||
alert.setArchived(true);
|
||||
update(alert.getId(), alert);
|
||||
txn.commit();
|
||||
}
|
||||
txn.close();
|
||||
|
||||
if (CollectionUtils.isEmpty(alerts)) {
|
||||
return true;
|
||||
}
|
||||
return result;
|
||||
|
||||
AlertVO alertForUpdate = createForUpdate();
|
||||
alertForUpdate.setArchived(true);
|
||||
UpdateBuilder ub = getUpdateBuilder(alertForUpdate);
|
||||
update(ub, sc, null);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -18,8 +18,10 @@ package com.cloud.event.dao;
|
|||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.cloud.event.Event.State;
|
||||
|
|
@ -29,12 +31,13 @@ import com.cloud.utils.db.GenericDaoBase;
|
|||
import com.cloud.utils.db.SearchBuilder;
|
||||
import com.cloud.utils.db.SearchCriteria;
|
||||
import com.cloud.utils.db.SearchCriteria.Op;
|
||||
import com.cloud.utils.db.TransactionLegacy;
|
||||
import com.cloud.utils.db.UpdateBuilder;
|
||||
|
||||
@Component
|
||||
public class EventDaoImpl extends GenericDaoBase<EventVO, Long> implements EventDao {
|
||||
protected final SearchBuilder<EventVO> CompletedEventSearch;
|
||||
protected final SearchBuilder<EventVO> ToArchiveOrDeleteEventSearch;
|
||||
protected final SearchBuilder<EventVO> ArchiveByIdsSearch;
|
||||
|
||||
public EventDaoImpl() {
|
||||
CompletedEventSearch = createSearchBuilder();
|
||||
|
|
@ -51,6 +54,10 @@ public class EventDaoImpl extends GenericDaoBase<EventVO, Long> implements Event
|
|||
ToArchiveOrDeleteEventSearch.and("createdDateL", ToArchiveOrDeleteEventSearch.entity().getCreateDate(), Op.LTEQ);
|
||||
ToArchiveOrDeleteEventSearch.and("archived", ToArchiveOrDeleteEventSearch.entity().getArchived(), Op.EQ);
|
||||
ToArchiveOrDeleteEventSearch.done();
|
||||
|
||||
ArchiveByIdsSearch = createSearchBuilder();
|
||||
ArchiveByIdsSearch.and("id", ArchiveByIdsSearch.entity().getId(), Op.IN);
|
||||
ArchiveByIdsSearch.done();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -100,16 +107,16 @@ public class EventDaoImpl extends GenericDaoBase<EventVO, Long> implements Event
|
|||
|
||||
@Override
|
||||
public void archiveEvents(List<EventVO> events) {
|
||||
if (events != null && !events.isEmpty()) {
|
||||
TransactionLegacy txn = TransactionLegacy.currentTxn();
|
||||
txn.start();
|
||||
for (EventVO event : events) {
|
||||
event = lockRow(event.getId(), true);
|
||||
event.setArchived(true);
|
||||
update(event.getId(), event);
|
||||
txn.commit();
|
||||
}
|
||||
txn.close();
|
||||
if (CollectionUtils.isEmpty(events)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<Long> ids = events.stream().map(EventVO::getId).collect(Collectors.toList());
|
||||
SearchCriteria<EventVO> sc = ArchiveByIdsSearch.create();
|
||||
sc.setParameters("id", ids.toArray(new Object[ids.size()]));
|
||||
EventVO eventForUpdate = createForUpdate();
|
||||
eventForUpdate.setArchived(true);
|
||||
UpdateBuilder ub = getUpdateBuilder(eventForUpdate);
|
||||
update(ub, sc, null);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -646,16 +646,22 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
|
|||
sc.setParameters("lastPinged", lastPingSecondsAfter);
|
||||
sc.setParameters("status", Status.Disconnected, Status.Down, Status.Alert);
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
List<HostVO> hosts = lockRows(sc, null, true); // exclusive lock
|
||||
for (HostVO host : hosts) {
|
||||
host.setManagementServerId(null);
|
||||
update(host.getId(), host);
|
||||
sb.append(host.getId());
|
||||
sb.append(" ");
|
||||
// SELECT before bulk UPDATE to preserve per-host-ID trace logging — the bulk UPDATE
|
||||
// cannot return which rows it matched since the WHERE column is being set to NULL
|
||||
if (logger.isTraceEnabled()) {
|
||||
List<HostVO> hosts = listBy(sc);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (HostVO host : hosts) {
|
||||
sb.append(host.getId());
|
||||
sb.append(" ");
|
||||
}
|
||||
logger.trace("Following hosts will be reset: {}", sb);
|
||||
}
|
||||
|
||||
logger.trace("Following hosts got reset: {}", sb);
|
||||
HostVO host = createForUpdate();
|
||||
host.setManagementServerId(null);
|
||||
UpdateBuilder ub = getUpdateBuilder(host);
|
||||
update(ub, sc, null);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -116,7 +116,7 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO
|
|||
//ensure that there is no job in Processing state for the same VM
|
||||
processing = true;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Security Group work take: found a job in Scheduled and Processing vmid=" + work.getInstanceId());
|
||||
logger.trace("Security Group work take: found a job in Scheduled and Processing vmid={}", work.getInstanceId());
|
||||
}
|
||||
}
|
||||
work.setServerId(serverId);
|
||||
|
|
@ -141,26 +141,16 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO
|
|||
}
|
||||
|
||||
@Override
|
||||
@DB
|
||||
public void updateStep(Long vmId, Long logSequenceNumber, Step step) {
|
||||
final TransactionLegacy txn = TransactionLegacy.currentTxn();
|
||||
txn.start();
|
||||
SearchCriteria<SecurityGroupWorkVO> sc = VmIdSeqNumSearch.create();
|
||||
sc.setParameters("vmId", vmId);
|
||||
sc.setParameters("seqno", logSequenceNumber);
|
||||
|
||||
final Filter filter = new Filter(SecurityGroupWorkVO.class, null, true, 0l, 1l);
|
||||
|
||||
final List<SecurityGroupWorkVO> vos = lockRows(sc, filter, true);
|
||||
if (vos.size() == 0) {
|
||||
txn.commit();
|
||||
return;
|
||||
}
|
||||
SecurityGroupWorkVO work = vos.get(0);
|
||||
work.setStep(step);
|
||||
update(work.getId(), work);
|
||||
|
||||
txn.commit();
|
||||
SecurityGroupWorkVO workForUpdate = createForUpdate();
|
||||
workForUpdate.setStep(step);
|
||||
// LIMIT 1 preserves the original single-row semantics: op_nwgrp_work has no
|
||||
// uniqueness on (instance_id, seq_no), so without it duplicate rows would all be updated.
|
||||
update(workForUpdate, sc, 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -172,21 +162,10 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO
|
|||
}
|
||||
|
||||
@Override
|
||||
@DB
|
||||
public void updateStep(Long workId, Step step) {
|
||||
final TransactionLegacy txn = TransactionLegacy.currentTxn();
|
||||
txn.start();
|
||||
|
||||
SecurityGroupWorkVO work = lockRow(workId, true);
|
||||
if (work == null) {
|
||||
txn.commit();
|
||||
return;
|
||||
}
|
||||
work.setStep(step);
|
||||
update(work.getId(), work);
|
||||
|
||||
txn.commit();
|
||||
|
||||
SecurityGroupWorkVO workForUpdate = createForUpdate();
|
||||
workForUpdate.setStep(step);
|
||||
update(workId, workForUpdate);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in New Issue