CLOUDSTACK-6743: Use edge-triggering in MessageDetector to handle bogus wakeup gracefully. Level triggering plus bogus wakeup can cause a tight loop to spin

This commit is contained in:
Kelven Yang 2014-05-21 16:14:14 -07:00
parent df6ce24f43
commit 09ec127470
12 changed files with 522 additions and 18 deletions

View File

@ -114,7 +114,20 @@ public class CallContext {
}
public static CallContext current() {
return s_currentContext.get();
CallContext context = s_currentContext.get();
// TODO other than async job and api dispatches, there are many system background running threads
// that do not setup CallContext at all, however, many places in code that are touched by these background tasks
// assume not-null CallContext. Following is a fix to address therefore caused NPE problems
//
// There are security implications with this. It assumes that all system background running threads are
// indeed have no problem in running under system context.
//
if (context == null) {
context = registerSystemCallContextOnceOnly();
}
return context;
}
/**

View File

@ -18,29 +18,31 @@
*/
package org.apache.cloudstack.framework.messagebus;
import org.apache.log4j.Logger;
public class MessageDetector implements MessageSubscriber {
private static final Logger s_logger = Logger.getLogger(MessageDetector.class);
private MessageBus _messageBus;
private String[] _subjects;
private volatile boolean _signalled = false;
public MessageDetector() {
_messageBus = null;
_subjects = null;
}
public boolean waitAny(long timeoutInMiliseconds) {
synchronized (this) {
if (_signalled)
return true;
public void waitAny(long timeoutInMiliseconds) {
if (timeoutInMiliseconds < 100) {
s_logger.warn("waitAny is passed with a too short time-out interval. " + timeoutInMiliseconds + "ms");
timeoutInMiliseconds = 100;
}
synchronized (this) {
try {
wait(timeoutInMiliseconds);
} catch (InterruptedException e) {
}
}
return _signalled;
}
public void open(MessageBus messageBus, String[] subjects) {
@ -69,9 +71,20 @@ public class MessageDetector implements MessageSubscriber {
@Override
public void onPublishMessage(String senderAddress, String subject, Object args) {
synchronized (this) {
_signalled = true;
notifyAll();
if (subjectMatched(subject)) {
synchronized (this) {
notifyAll();
}
}
}
private boolean subjectMatched(String subject) {
if (_subjects != null) {
for (String sub : _subjects) {
if (sub.equals(subject))
return true;
}
}
return false;
}
}

View File

@ -138,12 +138,7 @@ public class TestMessageBus extends TestCase {
try {
int count = 0;
while (count < 2) {
if (detector.waitAny(1000)) {
System.out.println("Detected signal on bus");
count++;
} else {
System.out.println("Waiting timed out");
}
detector.waitAny(1000);
}
} finally {
detector.close();

View File

@ -56,6 +56,21 @@
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-framework-config</artifactId>
<version>${project.version}</version>
</dependency>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-framework-events</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-engine-schema</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.jobs;
/*
* This integration test requires real DB setup, it is not meant to run at per-build
* basis, it can only be opened in developer's run
*
*
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:/AsyncJobManagerTestContext.xml")
public class AsyncJobManagerTest extends TestCase {
private static final Logger s_logger =
Logger.getLogger(AsyncJobManagerTest.class);
@Inject
AsyncJobManager _jobMgr;
@Inject
AsyncJobTestDashboard _testDashboard;
@Override
@Before
public void setUp() throws Exception {
try {
ComponentContext.initComponentsLifeCycle();
} catch (Exception ex) {
ex.printStackTrace();
s_logger.error(ex.getMessage());
}
}
@Override
@After
public void tearDown() throws Exception {
}
public void testWaitBehave() {
final Object me = this;
new Thread(new Runnable() {
@Override
public void run() {
s_logger.info("Sleeping...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
s_logger.info("wakeup");
synchronized (me) {
me.notifyAll();
}
}
}).start();
s_logger.info("First wait");
synchronized (me) {
try {
wait(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
s_logger.info("First wait done");
s_logger.info("Second wait");
synchronized (me) {
try {
wait(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
s_logger.info("Second wait done");
}
@Test
public void test() {
final int TOTAL_JOBS_PER_QUEUE = 5;
final int TOTAL_QUEUES = 100;
for (int i = 0; i < TOTAL_QUEUES; i++) {
for (int j = 0; j < TOTAL_JOBS_PER_QUEUE; j++) {
AsyncJobVO job = new AsyncJobVO();
job.setCmd("TestCmd");
job.setDispatcher("TestJobDispatcher");
job.setCmdInfo("TestCmd info");
_jobMgr.submitAsyncJob(job, "fakequeue", i);
s_logger.info("Job submitted. job " + job.getId() + ", queue: " + i);
}
}
while (true) {
if (_testDashboard.getCompletedJobCount() == TOTAL_JOBS_PER_QUEUE * TOTAL_QUEUES)
break;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
s_logger.info("Test done with " + _testDashboard.getCompletedJobCount() + " job executed");
}
}
*/

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.jobs;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ScopedConfigStorage;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.config.dao.ConfigurationDaoImpl;
import org.apache.cloudstack.framework.config.impl.ConfigDepotImpl;
import com.cloud.storage.dao.StoragePoolDetailsDaoImpl;
@Configuration
public class AsyncJobManagerTestConfiguration {
@Bean
public ConfigDepot configDepot() {
return new ConfigDepotImpl();
}
@Bean
public ConfigurationDao configDao() {
return new ConfigurationDaoImpl();
}
@Bean
public ScopedConfigStorage scopedConfigStorage() {
return new StoragePoolDetailsDaoImpl();
}
@Bean
public AsyncJobTestDashboard testDashboard() {
return new AsyncJobTestDashboard();
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.jobs;
public class AsyncJobTestDashboard {
int _completedJobCount = 0;
int _concurrencyCount = 0;
public AsyncJobTestDashboard() {
}
public synchronized int getCompletedJobCount() {
return _completedJobCount;
}
public synchronized void jobCompleted() {
_completedJobCount++;
}
public synchronized int getConcurrencyCount() {
return _concurrencyCount;
}
public synchronized void increaseConcurrency() {
_concurrencyCount++;
}
public synchronized void decreaseConcurrency() {
_concurrencyCount--;
}
}

View File

@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.jobs;
import java.util.Random;
import javax.inject.Inject;
import org.apache.log4j.Logger;
import org.apache.cloudstack.jobs.JobInfo.Status;
import com.cloud.utils.component.AdapterBase;
public class AsyncJobTestDispatcher extends AdapterBase implements AsyncJobDispatcher {
private static final Logger s_logger =
Logger.getLogger(AsyncJobTestDispatcher.class);
@Inject
private AsyncJobManager _asyncJobMgr;
@Inject
private AsyncJobTestDashboard _testDashboard;
Random _random = new Random();
public AsyncJobTestDispatcher() {
}
@Override
public void runJob(final AsyncJob job) {
_testDashboard.increaseConcurrency();
s_logger.info("Execute job " + job.getId() + ", current concurrency " + _testDashboard.getConcurrencyCount());
int interval = 3000;
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
}
_asyncJobMgr.completeAsyncJob(job.getId(), Status.SUCCEEDED, 0, null);
_testDashboard.decreaseConcurrency();
_testDashboard.jobCompleted();
}
}

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<import resource="commonContext.xml"/>
<import resource="classpath*:spring-framework-jobs-core-context.xml"/>
<bean id="AsyncJobManagerTestConfiguration"
class="org.apache.cloudstack.framework.jobs.AsyncJobManagerTestConfiguration" />
<bean id="TestJobDispatcher" class="org.apache.cloudstack.framework.jobs.AsyncJobTestDispatcher">
<property name="name" value="TestJobDispatcher" />
</bean>
</beans>

View File

@ -0,0 +1,37 @@
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
you under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<context:annotation-config />
<bean id="transactionContextBuilder" class="com.cloud.utils.db.TransactionContextBuilder" />
<bean id="instantiatePostProcessor" class="com.cloud.utils.component.ComponentInstantiationPostProcessor">
<property name="Interceptors">
<list>
<ref bean="transactionContextBuilder" />
</list>
</property>
</bean>
<bean id="eventBus" class = "org.apache.cloudstack.framework.messagebus.MessageBusBase" />
<bean id="componentContext" class="com.cloud.utils.component.ComponentContext"/>
</beans>

View File

@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
cluster.servlet.port=9090
# CloudStack database settings
db.cloud.username=cloud
db.cloud.password=cloud
db.root.password=
db.cloud.host=localhost
db.cloud.port=3306
db.cloud.name=cloud
# CloudStack database tuning parameters
db.cloud.maxActive=250
db.cloud.maxIdle=30
db.cloud.maxWait=10000
db.cloud.autoReconnect=true
db.cloud.validationQuery=SELECT 1
db.cloud.testOnBorrow=true
db.cloud.testWhileIdle=true
db.cloud.timeBetweenEvictionRunsMillis=40000
db.cloud.minEvictableIdleTimeMillis=240000
db.cloud.poolPreparedStatements=false
db.cloud.url.params=prepStmtCacheSize=517&cachePrepStmts=true&prepStmtCacheSqlLimit=4096
# usage database settings
db.usage.username=cloud
db.usage.password=cloud
db.usage.host=localhost
db.usage.port=3306
db.usage.name=cloud_usage
# usage database tuning parameters
db.usage.maxActive=100
db.usage.maxIdle=30
db.usage.maxWait=10000
db.usage.autoReconnect=true
# awsapi database settings
db.awsapi.name=cloudbridge
# Simulator database settings
db.simulator.username=cloud
db.simulator.password=cloud
db.simulator.host=localhost
db.simulator.port=3306
db.simulator.name=simulator
db.simulator.maxActive=250
db.simulator.maxIdle=30
db.simulator.maxWait=10000
db.simulator.autoReconnect=true

View File

@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.appender.stdout.threshold=DEBUG
log4j.rootLogger=INFO, rolling
log4j.appender.rolling=org.apache.log4j.DailyRollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.layout.ConversionPattern=%d %-5p [%c{3}] (%t:%x) %m%n
log4j.appender.rolling.file.threshold=DEBUG
log4j.appender.rolling.File_testDashboard=./logs/testclient.log
log4j.appender.rolling.DatePattern='.'yyy-MM-dd
log4j.appender.rolling.file.append=false
log4j.category.org.apache=DEBUG, rolling, stdout
#log4j.category.com.cloud.utils.db.Transaction=ALL
log4j.category.org.apache.cloudstack.network.contrail=ALL
log4j.category.com.cloud.network=ALL