mirror of https://github.com/apache/cloudstack.git
CLOUDSTACK-5358: Bring back concurrency control in sync-queue management
This commit is contained in:
parent
cd8501e263
commit
8db0d83d1a
File diff suppressed because it is too large
Load Diff
|
|
@ -23,9 +23,14 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
|
|||
import com.cloud.utils.db.GenericDao;
|
||||
|
||||
public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {
|
||||
public SyncQueueItemVO getNextQueueItem(long queueId);
|
||||
public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
|
||||
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
|
||||
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
|
||||
public Long getQueueItemIdByContentIdAndType(long contentId, String contentType);
|
||||
public SyncQueueItemVO getNextQueueItem(long queueId);
|
||||
public int getActiveQueueItemCount(long queueId);
|
||||
|
||||
public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
|
||||
|
||||
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
|
||||
|
||||
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
|
||||
|
||||
public Long getQueueItemIdByContentIdAndType(long contentId, String contentType);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import com.cloud.utils.db.GenericDaoBase;
|
|||
import com.cloud.utils.db.GenericSearchBuilder;
|
||||
import com.cloud.utils.db.SearchBuilder;
|
||||
import com.cloud.utils.db.SearchCriteria;
|
||||
import com.cloud.utils.db.SearchCriteria.Func;
|
||||
import com.cloud.utils.db.SearchCriteria.Op;
|
||||
import com.cloud.utils.db.TransactionLegacy;
|
||||
|
||||
|
|
@ -43,7 +44,8 @@ import com.cloud.utils.db.TransactionLegacy;
|
|||
public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
|
||||
private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
|
||||
final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
|
||||
|
||||
final GenericSearchBuilder<SyncQueueItemVO, Integer> queueActiveItemSearch;
|
||||
|
||||
public SyncQueueItemDaoImpl() {
|
||||
super();
|
||||
queueIdSearch = createSearchBuilder(Long.class);
|
||||
|
|
@ -51,37 +53,52 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
|
|||
queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ);
|
||||
queueIdSearch.selectFields(queueIdSearch.entity().getId());
|
||||
queueIdSearch.done();
|
||||
|
||||
queueActiveItemSearch = createSearchBuilder(Integer.class);
|
||||
queueActiveItemSearch.and("queueId", queueActiveItemSearch.entity().getQueueId(), Op.EQ);
|
||||
queueActiveItemSearch.and("processNumber", queueActiveItemSearch.entity().getLastProcessNumber(), Op.NNULL);
|
||||
queueActiveItemSearch.select(null, Func.COUNT, queueActiveItemSearch.entity().getId());
|
||||
queueActiveItemSearch.done();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SyncQueueItemVO getNextQueueItem(long queueId) {
|
||||
|
||||
SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
|
||||
@Override
|
||||
public SyncQueueItemVO getNextQueueItem(long queueId) {
|
||||
|
||||
SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
|
||||
sb.and("queueId", sb.entity().getQueueId(), SearchCriteria.Op.EQ);
|
||||
sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL);
|
||||
sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL);
|
||||
sb.done();
|
||||
|
||||
SearchCriteria<SyncQueueItemVO> sc = sb.create();
|
||||
sc.setParameters("queueId", queueId);
|
||||
|
||||
Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
|
||||
|
||||
SearchCriteria<SyncQueueItemVO> sc = sb.create();
|
||||
sc.setParameters("queueId", queueId);
|
||||
|
||||
Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
|
||||
List<SyncQueueItemVO> l = listBy(sc, filter);
|
||||
if(l != null && l.size() > 0)
|
||||
return l.get(0);
|
||||
|
||||
return null;
|
||||
}
|
||||
return l.get(0);
|
||||
|
||||
@Override
|
||||
public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
|
||||
List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
|
||||
|
||||
String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
|
||||
" FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getActiveQueueItemCount(long queueId) {
|
||||
SearchCriteria<Integer> sc = queueActiveItemSearch.create();
|
||||
sc.setParameters("queueId", queueId);
|
||||
|
||||
List<Integer> count = customSearch(sc, null);
|
||||
return count.get(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
|
||||
List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
|
||||
|
||||
String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
|
||||
" FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
|
||||
" WHERE i.queue_proc_number IS NULL " +
|
||||
" GROUP BY q.id " +
|
||||
" ORDER BY i.id " +
|
||||
" LIMIT 0, ?";
|
||||
" GROUP BY q.id " +
|
||||
" ORDER BY i.id " +
|
||||
" LIMIT 0, ?";
|
||||
|
||||
TransactionLegacy txn = TransactionLegacy.currentTxn();
|
||||
PreparedStatement pstmt = null;
|
||||
|
|
@ -90,54 +107,54 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
|
|||
pstmt.setInt(1, maxItems);
|
||||
ResultSet rs = pstmt.executeQuery();
|
||||
while(rs.next()) {
|
||||
SyncQueueItemVO item = new SyncQueueItemVO();
|
||||
item.setId(rs.getLong(1));
|
||||
item.setQueueId(rs.getLong(2));
|
||||
item.setContentType(rs.getString(3));
|
||||
item.setContentId(rs.getLong(4));
|
||||
item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5)));
|
||||
l.add(item);
|
||||
SyncQueueItemVO item = new SyncQueueItemVO();
|
||||
item.setId(rs.getLong(1));
|
||||
item.setQueueId(rs.getLong(2));
|
||||
item.setContentType(rs.getString(3));
|
||||
item.setContentId(rs.getLong(4));
|
||||
item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5)));
|
||||
l.add(item);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
s_logger.error("Unexpected sql excetpion, ", e);
|
||||
s_logger.error("Unexpected sql excetpion, ", e);
|
||||
} catch (Throwable e) {
|
||||
s_logger.error("Unexpected excetpion, ", e);
|
||||
s_logger.error("Unexpected excetpion, ", e);
|
||||
}
|
||||
return l;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
|
||||
SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
|
||||
return l;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
|
||||
SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
|
||||
sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(),
|
||||
SearchCriteria.Op.EQ);
|
||||
SearchCriteria.Op.EQ);
|
||||
sb.done();
|
||||
|
||||
SearchCriteria<SyncQueueItemVO> sc = sb.create();
|
||||
sc.setParameters("lastProcessMsid", msid);
|
||||
|
||||
Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null);
|
||||
|
||||
if(exclusive)
|
||||
return lockRows(sc, filter, true);
|
||||
|
||||
SearchCriteria<SyncQueueItemVO> sc = sb.create();
|
||||
sc.setParameters("lastProcessMsid", msid);
|
||||
|
||||
Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null);
|
||||
|
||||
if (exclusive)
|
||||
return lockRows(sc, filter, true);
|
||||
return listBy(sc, filter);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
|
||||
Date cutTime = DateUtil.currentGMTTime();
|
||||
|
||||
|
||||
SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder();
|
||||
sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL);
|
||||
sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL);
|
||||
sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
|
||||
sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT);
|
||||
|
||||
|
||||
sbItem.done();
|
||||
|
||||
|
||||
SearchCriteria<SyncQueueItemVO> sc = sbItem.create();
|
||||
sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs));
|
||||
|
||||
|
||||
if(exclusive)
|
||||
return lockRows(sc, null, true);
|
||||
return listBy(sc, null);
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import java.util.Date;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
|
@ -88,12 +87,12 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
|
|||
|
||||
private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
|
||||
|
||||
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
|
||||
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds
|
||||
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
|
||||
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds
|
||||
|
||||
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
|
||||
private static final int HEARTBEAT_INTERVAL = 2000;
|
||||
private static final int GC_INTERVAL = 10000; // 10 seconds
|
||||
private static final int GC_INTERVAL = 10000; // 10 seconds
|
||||
|
||||
@Inject
|
||||
private SyncQueueItemDao _queueItemDao;
|
||||
|
|
@ -362,38 +361,38 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
|
|||
// I removed the temporary solution already. I think my changes should fix the deadlock.
|
||||
|
||||
/*
|
||||
------------------------
|
||||
LATEST DETECTED DEADLOCK
|
||||
------------------------
|
||||
130625 20:03:10
|
||||
*** (1) TRANSACTION:
|
||||
TRANSACTION 0 98087127, ACTIVE 0 sec, process no 1489, OS thread id 139837829175040 fetching rows, thread declared inside InnoDB 494
|
||||
mysql tables in use 2, locked 1
|
||||
LOCK WAIT 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1
|
||||
MySQL thread id 28408, query id 368571321 localhost 127.0.0.1 cloud preparing
|
||||
UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 9)
|
||||
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
|
||||
RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087127 lock_mode X locks rec but not gap waiting
|
||||
Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
|
||||
0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
|
||||
------------------------
|
||||
LATEST DETECTED DEADLOCK
|
||||
------------------------
|
||||
130625 20:03:10
|
||||
*** (1) TRANSACTION:
|
||||
TRANSACTION 0 98087127, ACTIVE 0 sec, process no 1489, OS thread id 139837829175040 fetching rows, thread declared inside InnoDB 494
|
||||
mysql tables in use 2, locked 1
|
||||
LOCK WAIT 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1
|
||||
MySQL thread id 28408, query id 368571321 localhost 127.0.0.1 cloud preparing
|
||||
UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 9)
|
||||
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
|
||||
RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087127 lock_mode X locks rec but not gap waiting
|
||||
Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
|
||||
0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
|
||||
|
||||
*** (2) TRANSACTION:
|
||||
TRANSACTION 0 98087128, ACTIVE 0 sec, process no 1489, OS thread id 139837671909120 fetching rows, thread declared inside InnoDB 492
|
||||
mysql tables in use 2, locked 1
|
||||
3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1
|
||||
MySQL thread id 28406, query id 368571323 localhost 127.0.0.1 cloud preparing
|
||||
UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 8)
|
||||
*** (2) HOLDS THE LOCK(S):
|
||||
RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap
|
||||
Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
|
||||
0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
|
||||
*** (2) TRANSACTION:
|
||||
TRANSACTION 0 98087128, ACTIVE 0 sec, process no 1489, OS thread id 139837671909120 fetching rows, thread declared inside InnoDB 492
|
||||
mysql tables in use 2, locked 1
|
||||
3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1
|
||||
MySQL thread id 28406, query id 368571323 localhost 127.0.0.1 cloud preparing
|
||||
UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 8)
|
||||
*** (2) HOLDS THE LOCK(S):
|
||||
RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap
|
||||
Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
|
||||
0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
|
||||
|
||||
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
|
||||
RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap waiting
|
||||
Record lock, heap no 10 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
|
||||
0: len 8; hex 0000000000000009; asc ;; 1: len 6; hex 000005d8b0d7; asc ;; 2: len 7; hex 00000009280110; asc ( ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
|
||||
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
|
||||
RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap waiting
|
||||
Record lock, heap no 10 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
|
||||
0: len 8; hex 0000000000000009; asc ;; 1: len 6; hex 000005d8b0d7; asc ;; 2: len 7; hex 00000009280110; asc ( ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
|
||||
|
||||
*** WE ROLL BACK TRANSACTION (2)
|
||||
*** WE ROLL BACK TRANSACTION (2)
|
||||
*/
|
||||
|
||||
_joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
|
||||
|
|
@ -406,23 +405,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
|
|||
}
|
||||
|
||||
SyncQueueVO queue = null;
|
||||
|
||||
// to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks
|
||||
// we retry five times until we throw an exception
|
||||
Random random = new Random();
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
|
||||
if (queue != null) {
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(1000 + random.nextInt(5000));
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
|
||||
queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
|
||||
if (queue == null)
|
||||
throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
import javax.inject.Inject;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
|
||||
import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
|
||||
|
||||
|
|
@ -146,18 +147,18 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
|
|||
processNumber = new Long(1);
|
||||
else
|
||||
processNumber = processNumber + 1;
|
||||
|
||||
|
||||
Date dt = DateUtil.currentGMTTime();
|
||||
queueVO.setLastProcessNumber(processNumber);
|
||||
queueVO.setLastUpdated(dt);
|
||||
queueVO.setQueueSize(queueVO.getQueueSize() + 1);
|
||||
_syncQueueDao.update(queueVO.getId(), queueVO);
|
||||
|
||||
|
||||
itemVO.setLastProcessMsid(msid);
|
||||
itemVO.setLastProcessNumber(processNumber);
|
||||
itemVO.setLastProcessTime(dt);
|
||||
_syncQueueItemDao.update(item.getId(), itemVO);
|
||||
|
||||
|
||||
resultList.add(item);
|
||||
}
|
||||
}
|
||||
|
|
@ -183,9 +184,9 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
|
|||
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
|
||||
if(itemVO != null) {
|
||||
SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
|
||||
|
||||
|
||||
_syncQueueItemDao.expunge(itemVO.getId());
|
||||
|
||||
|
||||
// if item is active, reset queue information
|
||||
if (itemVO.getLastProcessMsid() != null) {
|
||||
queueVO.setLastUpdated(DateUtil.currentGMTTime());
|
||||
|
|
@ -239,18 +240,15 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
|
|||
}
|
||||
|
||||
private boolean queueReadyToProcess(SyncQueueVO queueVO) {
|
||||
return true;
|
||||
|
||||
//
|
||||
// TODO
|
||||
//
|
||||
// Need to disable concurrency disable at queue level due to the need to support
|
||||
// job wake-up dispatching task
|
||||
//
|
||||
// Concurrency control is better done at higher level and leave the job scheduling/serializing simpler
|
||||
//
|
||||
|
||||
// return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
|
||||
int nActiveItems = _syncQueueItemDao.getActiveQueueItemCount(queueVO.getId());
|
||||
if (nActiveItems < queueVO.getQueueSizeLimit())
|
||||
return true;
|
||||
|
||||
if (s_logger.isDebugEnabled())
|
||||
s_logger.debug("Queue (queue id, sync type, sync id) - (" + queueVO.getId()
|
||||
+ "," + queueVO.getSyncObjType() + ", " + queueVO.getSyncObjId()
|
||||
+ ") is reaching concurrency limit " + queueVO.getQueueSizeLimit());
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in New Issue