From 0cc3a1e3f83425224690a02b0bcaa503dbae711b Mon Sep 17 00:00:00 2001 From: Chiradeep Vittal Date: Tue, 23 Aug 2011 18:17:28 -0700 Subject: [PATCH] WIP: implement a queue where inserting a job for an existing vm does not create a new job --- .../security/LocalSecurityGroupWorkQueue.java | 147 ++++++++++++++-- .../security/SecurityGroupWorkQueue.java | 6 +- .../security/SecurityGroupQueueTest.java | 165 ++++++++++++++++++ .../com/cloud/utils/db/GlobalLockTest.java | 59 +++++++ 4 files changed, 358 insertions(+), 19 deletions(-) create mode 100644 server/test/com/cloud/network/security/SecurityGroupQueueTest.java create mode 100644 server/test/com/cloud/utils/db/GlobalLockTest.java diff --git a/server/src/com/cloud/network/security/LocalSecurityGroupWorkQueue.java b/server/src/com/cloud/network/security/LocalSecurityGroupWorkQueue.java index e3a2ebb14ef..42cd42f430a 100644 --- a/server/src/com/cloud/network/security/LocalSecurityGroupWorkQueue.java +++ b/server/src/com/cloud/network/security/LocalSecurityGroupWorkQueue.java @@ -18,10 +18,12 @@ package com.cloud.network.security; import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -35,40 +37,149 @@ import com.cloud.network.security.SecurityGroupWork.Step; public class LocalSecurityGroupWorkQueue implements SecurityGroupWorkQueue { protected static Logger s_logger = Logger.getLogger(LocalSecurityGroupWorkQueue.class); - protected LinkedBlockingQueue _queue = new LinkedBlockingQueue(); + protected TreeSet _currentWork = new TreeSet(); + private final ReentrantLock _lock = new ReentrantLock(); + private final Condition _notEmpty = _lock.newCondition(); + private final AtomicInteger _count = new AtomicInteger(0); + + public static class LocalSecurityGroupWork implements SecurityGroupWork, Comparable { + Long _logSequenceNumber; + Long _instanceId; + Step _step; + + public LocalSecurityGroupWork(Long instanceId, Long logSequence, Step step){ + this._instanceId = instanceId; + this._logSequenceNumber = logSequence; + this._step = step; + } + + @Override + public Long getInstanceId() { + return _instanceId; + } + + @Override + public Long getLogsequenceNumber() { + return _logSequenceNumber; + } + + @Override + public Step getStep() { + return _step; + } + + @Override + public void setStep(Step step) { + this._step = step; + } + + @Override + public void setLogsequenceNumber(Long logsequenceNumber) { + this._logSequenceNumber = logsequenceNumber; + + } + + @Override + public int compareTo(LocalSecurityGroupWork o) { + return this._instanceId.compareTo(o.getInstanceId()); + } + + } + @Override public void submitWorkForVm(long vmId, long sequenceNumber) { - - SecurityGroupWorkVO work = new SecurityGroupWorkVO(vmId, null, new Date(), SecurityGroupWork.Step.Scheduled, null); - boolean result = _queue.offer(work); - if (!result) { - s_logger.warn("Failed to add work item into queue for vm id " + vmId); + _lock.lock(); + try { + SecurityGroupWork work = new LocalSecurityGroupWork(vmId, sequenceNumber, Step.Scheduled); + boolean added = _currentWork.add(work); + if (added) + _count.incrementAndGet(); + } finally { + _lock.unlock(); } + signalNotEmpty(); } @Override - public void submitWorkForVms(Set vmIds) { - for (Long vmId: vmIds) { - SecurityGroupWorkVO work = new SecurityGroupWorkVO(vmId, null, new Date(), SecurityGroupWork.Step.Scheduled, null); - boolean result = _queue.offer(work); - if (!result) { - s_logger.warn("Failed to add work item into queue for vm id " + vmId); + public int submitWorkForVms(Set vmIds) { + _lock.lock(); + int newWork = _count.get(); + try { + for (Long vmId: vmIds) { + SecurityGroupWork work = new LocalSecurityGroupWork(vmId, null, SecurityGroupWork.Step.Scheduled); + boolean added = _currentWork.add(work); + if (added) + _count.incrementAndGet(); } + } finally { + newWork = _count.get() - newWork; + _lock.unlock(); } + signalNotEmpty(); + return newWork; } @Override public List getWork(int numberOfWorkItems) { - List work = new ArrayList(); - _queue.drainTo(work, numberOfWorkItems); - for (SecurityGroupWork w: work) { - w.setStep(Step.Processing); + List work = new ArrayList(numberOfWorkItems); + _lock.lock(); + int i = 0; + try { + while (_count.get() == 0) { + _notEmpty.await(); + } + int n = Math.min(numberOfWorkItems, _count.get()); + while (i < n ) { + SecurityGroupWork w = _currentWork.first(); + w.setStep(Step.Processing); + work.add(w); + _currentWork.remove(w); + ++i; + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + int c = _count.addAndGet(-i); + if (c > 0) + _notEmpty.signal(); + _lock.unlock(); } return work; + + } + + private void signalNotEmpty() { + _lock.lock(); + try { + _notEmpty.signal(); + } finally { + _lock.unlock(); + } } + + @Override + public int size() { + return _count.get(); + } + + + @Override + public void clear() { + _lock.lock(); + try { + _currentWork.clear(); + _count.set(0); + } finally { + _lock.unlock(); + } + + } + + } diff --git a/server/src/com/cloud/network/security/SecurityGroupWorkQueue.java b/server/src/com/cloud/network/security/SecurityGroupWorkQueue.java index 5667c92b461..0fb0773d558 100644 --- a/server/src/com/cloud/network/security/SecurityGroupWorkQueue.java +++ b/server/src/com/cloud/network/security/SecurityGroupWorkQueue.java @@ -30,7 +30,11 @@ public interface SecurityGroupWorkQueue { void submitWorkForVm(long vmId, long sequenceNumber); - void submitWorkForVms(Set vmIds); + int submitWorkForVms(Set vmIds); List getWork(int numberOfWorkItems); + + int size(); + + void clear(); } diff --git a/server/test/com/cloud/network/security/SecurityGroupQueueTest.java b/server/test/com/cloud/network/security/SecurityGroupQueueTest.java new file mode 100644 index 00000000000..8cf0da187f7 --- /dev/null +++ b/server/test/com/cloud/network/security/SecurityGroupQueueTest.java @@ -0,0 +1,165 @@ +package com.cloud.network.security; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import junit.framework.TestCase; + +public class SecurityGroupQueueTest extends TestCase { + public final static SecurityGroupWorkQueue queue = new LocalSecurityGroupWorkQueue(); + + + public static class Producer implements Runnable { + int _maxVmId = 0; + int _newWorkQueued=0; + Set vmIds = new HashSet(); + + public Producer(int maxVmId) { + this._maxVmId = maxVmId; + for (long i=1; i <= _maxVmId; i++) { + vmIds.add(i); + } + } + + public void run() { + _newWorkQueued = queue.submitWorkForVms(vmIds); + } + + public int getNewWork() { + return _newWorkQueued; + } + + public int getTotalWork() { + return _maxVmId; + } + } + + public static class Consumer implements Runnable { + private int _numJobsToDequeue = 0; + private int _numJobsDequeued = 0; + + public Consumer(int numJobsToDequeu) { + this._numJobsToDequeue = numJobsToDequeu; + } + + public void run() { + List result = queue.getWork(_numJobsToDequeue); + this._numJobsDequeued = result.size(); + } + + int getNumJobsToDequeue() { + return _numJobsToDequeue; + } + + int getNumJobsDequeued() { + return _numJobsDequeued; + } + } + + public void testNumJobsEqToNumVms1() { + queue.clear(); + final int numProducers = 50; + Thread [] pThreads = new Thread[numProducers]; + + Producer [] producers = new Producer[numProducers]; + int numProduced = 0; + + for (int i=0; i < numProducers; i++) { + producers[i] = new Producer(i+1); + pThreads[i] = new Thread(producers[i]); + numProduced += i+1; + pThreads[i].start(); + } + for (int i=0; i < numProducers ; i++) { + try { + pThreads[i].join(); + } catch (InterruptedException ie){ + ie.printStackTrace(); + } + } + System.out.println("Num Vms= " + numProducers + " Queue size = " + queue.size()); + assert(numProducers == queue.size()); + } + + public void testNumJobsEqToNumVms2() { + queue.clear(); + + final int numProducers = 50; + Thread [] pThreads = new Thread[numProducers]; + + Producer [] producers = new Producer[numProducers]; + int numProduced = 0; + int maxVmId = 10000; + for (int i=0; i < numProducers; i++) { + producers[i] = new Producer(maxVmId); + pThreads[i] = new Thread(producers[i]); + numProduced += i+1; + pThreads[i].start(); + } + for (int i=0; i < numProducers ; i++) { + try { + pThreads[i].join(); + } catch (InterruptedException ie){ + ie.printStackTrace(); + } + } + System.out.println("Num Vms= " + maxVmId + " Queue size = " + queue.size()); + assert(maxVmId == queue.size()); + } + + public void testDequeueOneJob() { + queue.clear(); + + final int numProducers = 2; + final int numConsumers = 5; + final int maxVmId = 200; + + Thread [] pThreads = new Thread[numProducers]; + Thread [] cThreads = new Thread[numConsumers]; + + + Consumer [] consumers = new Consumer[numConsumers]; + Producer [] producers = new Producer[numProducers]; + + int numProduced = 0; + for (int i=0; i < numConsumers; i++) { + consumers[i] = new Consumer(1); + cThreads[i] = new Thread(consumers[i]); + cThreads[i].start(); + } + for (int i=0; i < numProducers; i++) { + producers[i] = new Producer(maxVmId); + pThreads[i] = new Thread(producers[i]); + numProduced += maxVmId; + pThreads[i].start(); + } + for (int i=0; i < numConsumers ; i++) { + try { + cThreads[i].join(); + } catch (InterruptedException ie){ + ie.printStackTrace(); + } + } +// try { +// Thread.sleep(2000); +// } catch (InterruptedException e) { +// // TODO Auto-generated catch block +// e.printStackTrace(); +// } + int totalDequeued = 0; + for (int i=0; i < numConsumers; i++) { + //System.out.println("Consumer " + i + " ask to dequeue " + consumers[i].getNumJobsToDequeue() + ", dequeued " + consumers[i].getNumJobsDequeued()); + totalDequeued += consumers[i].getNumJobsDequeued(); + } + int totalQueued = 0; + for (int i=0; i < numProducers; i++) { + //System.out.println("Producer " + i + " ask to queue " + producers[i].getTotalWork() + ", queued " + producers[i].getNewWork()); + totalQueued += producers[i].getNewWork(); + } + System.out.println("Total jobs dequeued = " + totalDequeued + ", num queued=" + totalQueued + " queue current size=" + queue.size()); + assert(totalDequeued == numConsumers); + assert(totalQueued - totalDequeued == queue.size()); + } + +} diff --git a/server/test/com/cloud/utils/db/GlobalLockTest.java b/server/test/com/cloud/utils/db/GlobalLockTest.java new file mode 100644 index 00000000000..274b9015faf --- /dev/null +++ b/server/test/com/cloud/utils/db/GlobalLockTest.java @@ -0,0 +1,59 @@ +package com.cloud.utils.db; + +import org.apache.log4j.Logger; + +import junit.framework.Assert; + +import com.cloud.utils.Profiler; +import com.cloud.utils.testcase.Log4jEnabledTestCase; + +public class GlobalLockTest extends Log4jEnabledTestCase{ + public static final Logger s_logger = Logger.getLogger(GlobalLockTest.class); + private final static GlobalLock _workLock = GlobalLock.getInternLock("SecurityGroupWork"); + public static class Worker implements Runnable { + int id = 0; + int timeoutSeconds = 10; + int jobDuration = 2; + public Worker(int id, int timeout, int duration) { + this.id = id; + timeoutSeconds = timeout; + jobDuration = duration; + } + public void run() { + boolean locked = false; + try { + Profiler p = new Profiler(); + p.start(); + locked = _workLock.lock(timeoutSeconds); + p.stop(); + System.out.println("Thread " + id + " waited " + p.getDuration() + " ms, locked=" + locked); + if (locked) { + Thread.sleep(jobDuration*1000); + } + } catch (InterruptedException e) { + } finally { + if (locked) { + boolean unlocked = _workLock.unlock(); + System.out.println("Thread " + id + " unlocked=" + unlocked); + } + } + } + } + + public void testTimeout() { + Thread [] pool = new Thread[50]; + for (int i=0; i < pool.length; i++) { + pool[i] = new Thread(new Worker(i, 5, 3)); + } + for (int i=0; i < pool.length; i++) { + pool[i].start(); + } + for (int i=0; i < pool.length; i++) { + try { + pool[i].join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +}