WIP: implement a queue where inserting a job for an existing vm does not create a new job

This commit is contained in:
Chiradeep Vittal 2011-08-23 18:17:28 -07:00
parent 6465ccff1b
commit 0cc3a1e3f8
4 changed files with 358 additions and 19 deletions

View File

@ -18,10 +18,12 @@
package com.cloud.network.security;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
@ -35,40 +37,149 @@ import com.cloud.network.security.SecurityGroupWork.Step;
public class LocalSecurityGroupWorkQueue implements SecurityGroupWorkQueue {
protected static Logger s_logger = Logger.getLogger(LocalSecurityGroupWorkQueue.class);
protected LinkedBlockingQueue<SecurityGroupWork> _queue = new LinkedBlockingQueue<SecurityGroupWork>();
protected TreeSet<SecurityGroupWork> _currentWork = new TreeSet<SecurityGroupWork>();
private final ReentrantLock _lock = new ReentrantLock();
private final Condition _notEmpty = _lock.newCondition();
private final AtomicInteger _count = new AtomicInteger(0);
public static class LocalSecurityGroupWork implements SecurityGroupWork, Comparable<LocalSecurityGroupWork> {
Long _logSequenceNumber;
Long _instanceId;
Step _step;
public LocalSecurityGroupWork(Long instanceId, Long logSequence, Step step){
this._instanceId = instanceId;
this._logSequenceNumber = logSequence;
this._step = step;
}
@Override
public Long getInstanceId() {
return _instanceId;
}
@Override
public Long getLogsequenceNumber() {
return _logSequenceNumber;
}
@Override
public Step getStep() {
return _step;
}
@Override
public void setStep(Step step) {
this._step = step;
}
@Override
public void setLogsequenceNumber(Long logsequenceNumber) {
this._logSequenceNumber = logsequenceNumber;
}
@Override
public int compareTo(LocalSecurityGroupWork o) {
return this._instanceId.compareTo(o.getInstanceId());
}
}
@Override
public void submitWorkForVm(long vmId, long sequenceNumber) {
SecurityGroupWorkVO work = new SecurityGroupWorkVO(vmId, null, new Date(), SecurityGroupWork.Step.Scheduled, null);
boolean result = _queue.offer(work);
if (!result) {
s_logger.warn("Failed to add work item into queue for vm id " + vmId);
_lock.lock();
try {
SecurityGroupWork work = new LocalSecurityGroupWork(vmId, sequenceNumber, Step.Scheduled);
boolean added = _currentWork.add(work);
if (added)
_count.incrementAndGet();
} finally {
_lock.unlock();
}
signalNotEmpty();
}
@Override
public void submitWorkForVms(Set<Long> vmIds) {
for (Long vmId: vmIds) {
SecurityGroupWorkVO work = new SecurityGroupWorkVO(vmId, null, new Date(), SecurityGroupWork.Step.Scheduled, null);
boolean result = _queue.offer(work);
if (!result) {
s_logger.warn("Failed to add work item into queue for vm id " + vmId);
public int submitWorkForVms(Set<Long> vmIds) {
_lock.lock();
int newWork = _count.get();
try {
for (Long vmId: vmIds) {
SecurityGroupWork work = new LocalSecurityGroupWork(vmId, null, SecurityGroupWork.Step.Scheduled);
boolean added = _currentWork.add(work);
if (added)
_count.incrementAndGet();
}
} finally {
newWork = _count.get() - newWork;
_lock.unlock();
}
signalNotEmpty();
return newWork;
}
@Override
public List<SecurityGroupWork> getWork(int numberOfWorkItems) {
List<SecurityGroupWork> work = new ArrayList<SecurityGroupWork>();
_queue.drainTo(work, numberOfWorkItems);
for (SecurityGroupWork w: work) {
w.setStep(Step.Processing);
List<SecurityGroupWork> work = new ArrayList<SecurityGroupWork>(numberOfWorkItems);
_lock.lock();
int i = 0;
try {
while (_count.get() == 0) {
_notEmpty.await();
}
int n = Math.min(numberOfWorkItems, _count.get());
while (i < n ) {
SecurityGroupWork w = _currentWork.first();
w.setStep(Step.Processing);
work.add(w);
_currentWork.remove(w);
++i;
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
int c = _count.addAndGet(-i);
if (c > 0)
_notEmpty.signal();
_lock.unlock();
}
return work;
}
private void signalNotEmpty() {
_lock.lock();
try {
_notEmpty.signal();
} finally {
_lock.unlock();
}
}
@Override
public int size() {
return _count.get();
}
@Override
public void clear() {
_lock.lock();
try {
_currentWork.clear();
_count.set(0);
} finally {
_lock.unlock();
}
}
}

View File

@ -30,7 +30,11 @@ public interface SecurityGroupWorkQueue {
void submitWorkForVm(long vmId, long sequenceNumber);
void submitWorkForVms(Set<Long> vmIds);
int submitWorkForVms(Set<Long> vmIds);
List<SecurityGroupWork> getWork(int numberOfWorkItems);
int size();
void clear();
}

View File

@ -0,0 +1,165 @@
package com.cloud.network.security;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import junit.framework.TestCase;
public class SecurityGroupQueueTest extends TestCase {
public final static SecurityGroupWorkQueue queue = new LocalSecurityGroupWorkQueue();
public static class Producer implements Runnable {
int _maxVmId = 0;
int _newWorkQueued=0;
Set<Long> vmIds = new HashSet<Long>();
public Producer(int maxVmId) {
this._maxVmId = maxVmId;
for (long i=1; i <= _maxVmId; i++) {
vmIds.add(i);
}
}
public void run() {
_newWorkQueued = queue.submitWorkForVms(vmIds);
}
public int getNewWork() {
return _newWorkQueued;
}
public int getTotalWork() {
return _maxVmId;
}
}
public static class Consumer implements Runnable {
private int _numJobsToDequeue = 0;
private int _numJobsDequeued = 0;
public Consumer(int numJobsToDequeu) {
this._numJobsToDequeue = numJobsToDequeu;
}
public void run() {
List<SecurityGroupWork> result = queue.getWork(_numJobsToDequeue);
this._numJobsDequeued = result.size();
}
int getNumJobsToDequeue() {
return _numJobsToDequeue;
}
int getNumJobsDequeued() {
return _numJobsDequeued;
}
}
public void testNumJobsEqToNumVms1() {
queue.clear();
final int numProducers = 50;
Thread [] pThreads = new Thread[numProducers];
Producer [] producers = new Producer[numProducers];
int numProduced = 0;
for (int i=0; i < numProducers; i++) {
producers[i] = new Producer(i+1);
pThreads[i] = new Thread(producers[i]);
numProduced += i+1;
pThreads[i].start();
}
for (int i=0; i < numProducers ; i++) {
try {
pThreads[i].join();
} catch (InterruptedException ie){
ie.printStackTrace();
}
}
System.out.println("Num Vms= " + numProducers + " Queue size = " + queue.size());
assert(numProducers == queue.size());
}
public void testNumJobsEqToNumVms2() {
queue.clear();
final int numProducers = 50;
Thread [] pThreads = new Thread[numProducers];
Producer [] producers = new Producer[numProducers];
int numProduced = 0;
int maxVmId = 10000;
for (int i=0; i < numProducers; i++) {
producers[i] = new Producer(maxVmId);
pThreads[i] = new Thread(producers[i]);
numProduced += i+1;
pThreads[i].start();
}
for (int i=0; i < numProducers ; i++) {
try {
pThreads[i].join();
} catch (InterruptedException ie){
ie.printStackTrace();
}
}
System.out.println("Num Vms= " + maxVmId + " Queue size = " + queue.size());
assert(maxVmId == queue.size());
}
public void testDequeueOneJob() {
queue.clear();
final int numProducers = 2;
final int numConsumers = 5;
final int maxVmId = 200;
Thread [] pThreads = new Thread[numProducers];
Thread [] cThreads = new Thread[numConsumers];
Consumer [] consumers = new Consumer[numConsumers];
Producer [] producers = new Producer[numProducers];
int numProduced = 0;
for (int i=0; i < numConsumers; i++) {
consumers[i] = new Consumer(1);
cThreads[i] = new Thread(consumers[i]);
cThreads[i].start();
}
for (int i=0; i < numProducers; i++) {
producers[i] = new Producer(maxVmId);
pThreads[i] = new Thread(producers[i]);
numProduced += maxVmId;
pThreads[i].start();
}
for (int i=0; i < numConsumers ; i++) {
try {
cThreads[i].join();
} catch (InterruptedException ie){
ie.printStackTrace();
}
}
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
int totalDequeued = 0;
for (int i=0; i < numConsumers; i++) {
//System.out.println("Consumer " + i + " ask to dequeue " + consumers[i].getNumJobsToDequeue() + ", dequeued " + consumers[i].getNumJobsDequeued());
totalDequeued += consumers[i].getNumJobsDequeued();
}
int totalQueued = 0;
for (int i=0; i < numProducers; i++) {
//System.out.println("Producer " + i + " ask to queue " + producers[i].getTotalWork() + ", queued " + producers[i].getNewWork());
totalQueued += producers[i].getNewWork();
}
System.out.println("Total jobs dequeued = " + totalDequeued + ", num queued=" + totalQueued + " queue current size=" + queue.size());
assert(totalDequeued == numConsumers);
assert(totalQueued - totalDequeued == queue.size());
}
}

View File

@ -0,0 +1,59 @@
package com.cloud.utils.db;
import org.apache.log4j.Logger;
import junit.framework.Assert;
import com.cloud.utils.Profiler;
import com.cloud.utils.testcase.Log4jEnabledTestCase;
public class GlobalLockTest extends Log4jEnabledTestCase{
public static final Logger s_logger = Logger.getLogger(GlobalLockTest.class);
private final static GlobalLock _workLock = GlobalLock.getInternLock("SecurityGroupWork");
public static class Worker implements Runnable {
int id = 0;
int timeoutSeconds = 10;
int jobDuration = 2;
public Worker(int id, int timeout, int duration) {
this.id = id;
timeoutSeconds = timeout;
jobDuration = duration;
}
public void run() {
boolean locked = false;
try {
Profiler p = new Profiler();
p.start();
locked = _workLock.lock(timeoutSeconds);
p.stop();
System.out.println("Thread " + id + " waited " + p.getDuration() + " ms, locked=" + locked);
if (locked) {
Thread.sleep(jobDuration*1000);
}
} catch (InterruptedException e) {
} finally {
if (locked) {
boolean unlocked = _workLock.unlock();
System.out.println("Thread " + id + " unlocked=" + unlocked);
}
}
}
}
public void testTimeout() {
Thread [] pool = new Thread[50];
for (int i=0; i < pool.length; i++) {
pool[i] = new Thread(new Worker(i, 5, 3));
}
for (int i=0; i < pool.length; i++) {
pool[i].start();
}
for (int i=0; i < pool.length; i++) {
try {
pool[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}