diff --git a/api/src/org/apache/cloudstack/context/CallContext.java b/api/src/org/apache/cloudstack/context/CallContext.java
index 7305d14ba9f..bf4818b37ca 100644
--- a/api/src/org/apache/cloudstack/context/CallContext.java
+++ b/api/src/org/apache/cloudstack/context/CallContext.java
@@ -114,7 +114,20 @@ public class CallContext {
}
public static CallContext current() {
- return s_currentContext.get();
+ CallContext context = s_currentContext.get();
+
+ // TODO other than async job and api dispatches, there are many system background running threads
+ // that do not setup CallContext at all, however, many places in code that are touched by these background tasks
+ // assume not-null CallContext. Following is a fix to address therefore caused NPE problems
+ //
+ // There are security implications with this. It assumes that all system background running threads are
+ // indeed have no problem in running under system context.
+ //
+ if (context == null) {
+ context = registerSystemCallContextOnceOnly();
+ }
+
+ return context;
}
/**
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
index fae9bf31209..1dcd6bd2682 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
@@ -18,29 +18,31 @@
*/
package org.apache.cloudstack.framework.messagebus;
+import org.apache.log4j.Logger;
+
public class MessageDetector implements MessageSubscriber {
+ private static final Logger s_logger = Logger.getLogger(MessageDetector.class);
private MessageBus _messageBus;
private String[] _subjects;
- private volatile boolean _signalled = false;
-
public MessageDetector() {
_messageBus = null;
_subjects = null;
}
- public boolean waitAny(long timeoutInMiliseconds) {
- synchronized (this) {
- if (_signalled)
- return true;
+ public void waitAny(long timeoutInMiliseconds) {
+ if (timeoutInMiliseconds < 100) {
+ s_logger.warn("waitAny is passed with a too short time-out interval. " + timeoutInMiliseconds + "ms");
+ timeoutInMiliseconds = 100;
+ }
+ synchronized (this) {
try {
wait(timeoutInMiliseconds);
} catch (InterruptedException e) {
}
}
- return _signalled;
}
public void open(MessageBus messageBus, String[] subjects) {
@@ -69,9 +71,20 @@ public class MessageDetector implements MessageSubscriber {
@Override
public void onPublishMessage(String senderAddress, String subject, Object args) {
- synchronized (this) {
- _signalled = true;
- notifyAll();
+ if (subjectMatched(subject)) {
+ synchronized (this) {
+ notifyAll();
+ }
}
}
+
+ private boolean subjectMatched(String subject) {
+ if (_subjects != null) {
+ for (String sub : _subjects) {
+ if (sub.equals(subject))
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
index c81d8aa994a..cfa552ca2b5 100644
--- a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
+++ b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
@@ -138,12 +138,7 @@ public class TestMessageBus extends TestCase {
try {
int count = 0;
while (count < 2) {
- if (detector.waitAny(1000)) {
- System.out.println("Detected signal on bus");
- count++;
- } else {
- System.out.println("Waiting timed out");
- }
+ detector.waitAny(1000);
}
} finally {
detector.close();
diff --git a/framework/jobs/pom.xml b/framework/jobs/pom.xml
index ecde92bd62a..1cec864c2a7 100644
--- a/framework/jobs/pom.xml
+++ b/framework/jobs/pom.xml
@@ -56,6 +56,21 @@
org.apache.cloudstack
cloud-framework-config
${project.version}
-
+
+
+ org.apache.cloudstack
+ cloud-framework-events
+ ${project.version}
+
+
+ org.apache.cloudstack
+ cloud-engine-schema
+ ${project.version}
+
+
+ commons-io
+ commons-io
+ test
+
diff --git a/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTest.java b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTest.java
new file mode 100644
index 00000000000..62a8d81ced2
--- /dev/null
+++ b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTest.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+/*
+ * This integration test requires real DB setup, it is not meant to run at per-build
+ * basis, it can only be opened in developer's run
+ *
+ *
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations = "classpath:/AsyncJobManagerTestContext.xml")
+public class AsyncJobManagerTest extends TestCase {
+ private static final Logger s_logger =
+ Logger.getLogger(AsyncJobManagerTest.class);
+
+ @Inject
+ AsyncJobManager _jobMgr;
+
+ @Inject
+ AsyncJobTestDashboard _testDashboard;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ try {
+ ComponentContext.initComponentsLifeCycle();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ s_logger.error(ex.getMessage());
+ }
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ public void testWaitBehave() {
+
+ final Object me = this;
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ s_logger.info("Sleeping...");
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ }
+
+ s_logger.info("wakeup");
+ synchronized (me) {
+ me.notifyAll();
+ }
+ }
+
+ }).start();
+
+ s_logger.info("First wait");
+ synchronized (me) {
+ try {
+ wait(5000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ s_logger.info("First wait done");
+
+ s_logger.info("Second wait");
+ synchronized (me) {
+ try {
+ wait(5000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ s_logger.info("Second wait done");
+ }
+
+ @Test
+ public void test() {
+ final int TOTAL_JOBS_PER_QUEUE = 5;
+ final int TOTAL_QUEUES = 100;
+
+ for (int i = 0; i < TOTAL_QUEUES; i++) {
+ for (int j = 0; j < TOTAL_JOBS_PER_QUEUE; j++) {
+ AsyncJobVO job = new AsyncJobVO();
+ job.setCmd("TestCmd");
+ job.setDispatcher("TestJobDispatcher");
+ job.setCmdInfo("TestCmd info");
+
+ _jobMgr.submitAsyncJob(job, "fakequeue", i);
+
+ s_logger.info("Job submitted. job " + job.getId() + ", queue: " + i);
+ }
+ }
+
+ while (true) {
+ if (_testDashboard.getCompletedJobCount() == TOTAL_JOBS_PER_QUEUE * TOTAL_QUEUES)
+ break;
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ s_logger.info("Test done with " + _testDashboard.getCompletedJobCount() + " job executed");
+ }
+}
+
+*/
diff --git a/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTestConfiguration.java b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTestConfiguration.java
new file mode 100644
index 00000000000..a70913cd983
--- /dev/null
+++ b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobManagerTestConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.jobs;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import org.apache.cloudstack.framework.config.ConfigDepot;
+import org.apache.cloudstack.framework.config.ScopedConfigStorage;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDaoImpl;
+import org.apache.cloudstack.framework.config.impl.ConfigDepotImpl;
+
+import com.cloud.storage.dao.StoragePoolDetailsDaoImpl;
+
+@Configuration
+public class AsyncJobManagerTestConfiguration {
+
+ @Bean
+ public ConfigDepot configDepot() {
+ return new ConfigDepotImpl();
+ }
+
+ @Bean
+ public ConfigurationDao configDao() {
+ return new ConfigurationDaoImpl();
+ }
+
+ @Bean
+ public ScopedConfigStorage scopedConfigStorage() {
+ return new StoragePoolDetailsDaoImpl();
+ }
+
+ @Bean
+ public AsyncJobTestDashboard testDashboard() {
+ return new AsyncJobTestDashboard();
+ }
+}
diff --git a/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDashboard.java b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDashboard.java
new file mode 100644
index 00000000000..728138d2b4f
--- /dev/null
+++ b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDashboard.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.jobs;
+
+public class AsyncJobTestDashboard {
+ int _completedJobCount = 0;
+ int _concurrencyCount = 0;
+
+ public AsyncJobTestDashboard() {
+ }
+
+ public synchronized int getCompletedJobCount() {
+ return _completedJobCount;
+ }
+
+ public synchronized void jobCompleted() {
+ _completedJobCount++;
+ }
+
+ public synchronized int getConcurrencyCount() {
+ return _concurrencyCount;
+ }
+
+ public synchronized void increaseConcurrency() {
+ _concurrencyCount++;
+ }
+
+ public synchronized void decreaseConcurrency() {
+ _concurrencyCount--;
+ }
+}
diff --git a/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDispatcher.java b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDispatcher.java
new file mode 100644
index 00000000000..34351a6c934
--- /dev/null
+++ b/framework/jobs/test/org/apache/cloudstack/framework/jobs/AsyncJobTestDispatcher.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+import java.util.Random;
+
+import javax.inject.Inject;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.jobs.JobInfo.Status;
+
+import com.cloud.utils.component.AdapterBase;
+
+public class AsyncJobTestDispatcher extends AdapterBase implements AsyncJobDispatcher {
+ private static final Logger s_logger =
+ Logger.getLogger(AsyncJobTestDispatcher.class);
+
+ @Inject
+ private AsyncJobManager _asyncJobMgr;
+
+ @Inject
+ private AsyncJobTestDashboard _testDashboard;
+
+ Random _random = new Random();
+
+ public AsyncJobTestDispatcher() {
+ }
+
+ @Override
+ public void runJob(final AsyncJob job) {
+ _testDashboard.increaseConcurrency();
+
+ s_logger.info("Execute job " + job.getId() + ", current concurrency " + _testDashboard.getConcurrencyCount());
+
+ int interval = 3000;
+
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ }
+
+ _asyncJobMgr.completeAsyncJob(job.getId(), Status.SUCCEEDED, 0, null);
+
+ _testDashboard.decreaseConcurrency();
+ _testDashboard.jobCompleted();
+ }
+}
diff --git a/framework/jobs/test/resources/AsyncJobManagerTestContext.xml b/framework/jobs/test/resources/AsyncJobManagerTestContext.xml
new file mode 100644
index 00000000000..fd5db304c25
--- /dev/null
+++ b/framework/jobs/test/resources/AsyncJobManagerTestContext.xml
@@ -0,0 +1,38 @@
+
+
+
+
+
+
+
+
+
+
diff --git a/framework/jobs/test/resources/commonContext.xml b/framework/jobs/test/resources/commonContext.xml
new file mode 100644
index 00000000000..6c3ca75b45d
--- /dev/null
+++ b/framework/jobs/test/resources/commonContext.xml
@@ -0,0 +1,37 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/framework/jobs/test/resources/db.properties b/framework/jobs/test/resources/db.properties
new file mode 100644
index 00000000000..e07d80c112d
--- /dev/null
+++ b/framework/jobs/test/resources/db.properties
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cluster.servlet.port=9090
+
+# CloudStack database settings
+db.cloud.username=cloud
+db.cloud.password=cloud
+db.root.password=
+db.cloud.host=localhost
+db.cloud.port=3306
+db.cloud.name=cloud
+
+# CloudStack database tuning parameters
+db.cloud.maxActive=250
+db.cloud.maxIdle=30
+db.cloud.maxWait=10000
+db.cloud.autoReconnect=true
+db.cloud.validationQuery=SELECT 1
+db.cloud.testOnBorrow=true
+db.cloud.testWhileIdle=true
+db.cloud.timeBetweenEvictionRunsMillis=40000
+db.cloud.minEvictableIdleTimeMillis=240000
+db.cloud.poolPreparedStatements=false
+db.cloud.url.params=prepStmtCacheSize=517&cachePrepStmts=true&prepStmtCacheSqlLimit=4096
+
+# usage database settings
+db.usage.username=cloud
+db.usage.password=cloud
+db.usage.host=localhost
+db.usage.port=3306
+db.usage.name=cloud_usage
+
+# usage database tuning parameters
+db.usage.maxActive=100
+db.usage.maxIdle=30
+db.usage.maxWait=10000
+db.usage.autoReconnect=true
+
+# awsapi database settings
+db.awsapi.name=cloudbridge
+
+# Simulator database settings
+db.simulator.username=cloud
+db.simulator.password=cloud
+db.simulator.host=localhost
+db.simulator.port=3306
+db.simulator.name=simulator
+db.simulator.maxActive=250
+db.simulator.maxIdle=30
+db.simulator.maxWait=10000
+db.simulator.autoReconnect=true
diff --git a/framework/jobs/test/resources/log4j.properties b/framework/jobs/test/resources/log4j.properties
new file mode 100644
index 00000000000..1119ecbba07
--- /dev/null
+++ b/framework/jobs/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
+log4j.appender.stdout.threshold=DEBUG
+log4j.rootLogger=INFO, rolling
+log4j.appender.rolling=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
+log4j.appender.rolling.layout.ConversionPattern=%d %-5p [%c{3}] (%t:%x) %m%n
+log4j.appender.rolling.file.threshold=DEBUG
+log4j.appender.rolling.File_testDashboard=./logs/testclient.log
+log4j.appender.rolling.DatePattern='.'yyy-MM-dd
+log4j.appender.rolling.file.append=false
+log4j.category.org.apache=DEBUG, rolling, stdout
+#log4j.category.com.cloud.utils.db.Transaction=ALL
+log4j.category.org.apache.cloudstack.network.contrail=ALL
+log4j.category.com.cloud.network=ALL
+