Merge branch 'master' of ssh://git.cloud.com/var/lib/git/cloudstack-oss

This commit is contained in:
David Nalley 2011-06-06 17:53:22 -04:00
commit 5ca38bc43a
38 changed files with 915 additions and 629 deletions

View File

@ -22,13 +22,15 @@ import com.cloud.host.Status.Event;
public class TransferAgentCommand extends Command {
protected long agentId;
protected long futureOwner;
protected long currentOwner;
Event event;
protected TransferAgentCommand() {
}
public TransferAgentCommand(long agentId, long futureOwner, Event event) {
public TransferAgentCommand(long agentId, long currentOwner, long futureOwner, Event event) {
this.agentId = agentId;
this.currentOwner = currentOwner;
this.futureOwner = futureOwner;
this.event = event;
}
@ -45,6 +47,10 @@ public class TransferAgentCommand extends Command {
return event;
}
public long getCurrentOwner() {
return currentOwner;
}
@Override
public boolean executeInSequence() {
return false;

View File

@ -145,24 +145,27 @@ public class DeployVMCmd extends BaseAsyncCreateCmd {
}
public List<Long> getSecurityGroupIdList() {
if (securityGroupIdList != null && securityGroupIdList != null) {
if (securityGroupNameList != null && securityGroupIdList != null) {
throw new InvalidParameterValueException("securitygroupids parameter is mutually exclusive with securitygroupnames parameter");
} else if (securityGroupNameList == null && securityGroupIdList == null) {
throw new InvalidParameterValueException("securitygroupids or securitygroupnames must be specified");
}
//transform group names to ids here
if (securityGroupNameList != null) {
securityGroupIdList = new ArrayList<Long>();
List<Long> securityGroupIds = new ArrayList<Long>();
for (String groupName : securityGroupNameList) {
Long groupId = _responseGenerator.getSecurityGroupId(groupName, getEntityOwnerId());
if (groupId == null) {
throw new InvalidParameterValueException("Unable to find group by name " + groupName + " for account " + getEntityOwnerId());
} else {
securityGroupIdList.add(groupId);
securityGroupIds.add(groupId);
}
}
return securityGroupIds;
} else {
return securityGroupIdList;
}
return securityGroupIdList;
}
public Long getServiceOfferingId() {
@ -320,15 +323,15 @@ public class DeployVMCmd extends BaseAsyncCreateCmd {
if (getNetworkIds() != null) {
throw new InvalidParameterValueException("Can't specify network Ids in Basic zone");
} else {
vm = _userVmService.createBasicSecurityGroupVirtualMachine(zone, serviceOffering, template, securityGroupIdList, owner, name,
vm = _userVmService.createBasicSecurityGroupVirtualMachine(zone, serviceOffering, template, getSecurityGroupIdList(), owner, name,
displayName, diskOfferingId, size, group, getHypervisor(), userData, sshKeyPairName);
}
} else {
if (zone.isSecurityGroupEnabled()) {
vm = _userVmService.createAdvancedSecurityGroupVirtualMachine(zone, serviceOffering, template, getNetworkIds(), securityGroupIdList,
vm = _userVmService.createAdvancedSecurityGroupVirtualMachine(zone, serviceOffering, template, getNetworkIds(), getSecurityGroupIdList(),
owner, name, displayName, diskOfferingId, size, group, getHypervisor(), userData, sshKeyPairName);
} else {
if (securityGroupIdList != null && !securityGroupIdList.isEmpty()) {
if (getSecurityGroupIdList() != null && !getSecurityGroupIdList().isEmpty()) {
throw new InvalidParameterValueException("Can't create vm with security groups; security group feature is not enabled per zone");
}
vm = _userVmService.createAdvancedVirtualMachine(zone, serviceOffering, template, getNetworkIds(), owner, name, displayName,

View File

@ -196,4 +196,7 @@ public class EventTypes {
public static final String EVENT_ENABLE_STATIC_NAT = "STATICNAT.ENABLE";
public static final String EVENT_DISABLE_STATIC_NAT = "STATICNAT.DISABLE";
public static final String EVENT_ZONE_VLAN_ASSIGN = "ZONE.VLAN.ASSIGN";
public static final String EVENT_ZONE_VLAN_RELEASE = "ZONE.VLAN.RELEASE";
}

View File

@ -183,7 +183,7 @@ public enum Status {
s_fsm.addTransition(Status.Alert, Event.Ping, Status.Up);
s_fsm.addTransition(Status.Alert, Event.Remove, Status.Removed);
s_fsm.addTransition(Status.Alert, Event.ManagementServerDown, Status.Alert);
s_fsm.addTransition(Status.Rebalancing, Event.RebalanceFailed, Status.Alert);
s_fsm.addTransition(Status.Rebalancing, Event.RebalanceFailed, Status.Disconnected);
s_fsm.addTransition(Status.Rebalancing, Event.RebalanceCompleted, Status.Connecting);
}

View File

@ -324,7 +324,7 @@ public class Request {
assert false : "More gson errors on " + buff.toString();
return "";
}
if (content.length() <= 4) {
if (content.length() <= (1 + _cmds.length * 3)) {
return null;
}
} else {
@ -368,7 +368,7 @@ public class Request {
final ByteBuffer buff = ByteBuffer.wrap(bytes);
final byte ver = buff.get();
final Version version = Version.get(ver);
if (version.ordinal() != Version.v1.ordinal()) {
if (version.ordinal() != Version.v1.ordinal() && version.ordinal() != Version.v3.ordinal()) {
throw new UnsupportedVersionException("This version is no longer supported: " + version.toString(), UnsupportedVersionException.IncompatibleVersion);
}
final byte reserved = buff.get(); // tossed away for now.
@ -379,7 +379,12 @@ public class Request {
final int size = buff.getInt();
final long mgmtId = buff.getLong();
final long agentId = buff.getLong();
final long via = buff.getLong();
long via;
if (version.ordinal() == Version.v1.ordinal()) {
via = buff.getLong();
} else {
via = agentId;
}
byte[] command = null;
int offset = 0;
@ -426,11 +431,11 @@ public class Request {
}
public static long getAgentId(final byte[] bytes) {
return NumbersUtil.bytesToLong(bytes, 28);
return NumbersUtil.bytesToLong(bytes, 24);
}
public static long getViaAgentId(final byte[] bytes) {
return NumbersUtil.bytesToLong(bytes, 24);
return NumbersUtil.bytesToLong(bytes, 32);
}
public static boolean fromServer(final byte[] bytes) {

View File

@ -53,7 +53,7 @@ public class RequestTest extends TestCase {
cmd2.addPortConfig("abc", "24", true, "eth0");
cmd2.addPortConfig("127.0.0.1", "44", false, "eth1");
Request sreq = new Request(2, 3, new Command[] { cmd1, cmd2, cmd3 }, true, true);
sreq.setSequence(1);
sreq.setSequence(892403717);
Logger logger = Logger.getLogger(GsonHelper.class);
Level level = logger.getLevel();
@ -75,16 +75,17 @@ public class RequestTest extends TestCase {
assert (!log.contains("password"));
logger.setLevel(Level.INFO);
sreq.log("Info", true, Level.INFO);
assert (log.contains(UpdateHostPasswordCommand.class.getSimpleName()));
assert (log.contains(SecStorageFirewallCfgCommand.class.getSimpleName()));
assert (!log.contains(GetHostStatsCommand.class.getSimpleName()));
assert (!log.contains("username"));
assert (!log.contains("password"));
log = sreq.log("Info", true, Level.INFO);
assert (log == null);
logger.setLevel(level);
byte[] bytes = sreq.getBytes();
assert Request.getSequence(bytes) == 892403717;
assert Request.getManagementServerId(bytes) == 3;
assert Request.getAgentId(bytes) == 2;
assert Request.getViaAgentId(bytes) == 2;
Request creq = null;
try {
creq = Request.parse(bytes);

View File

@ -329,9 +329,8 @@ public abstract class AgentAttache {
public void send(Request req, final Listener listener) throws AgentUnavailableException {
checkAvailability(req.getCommands());
long seq = getNextSequence();
req.setSequence(seq);
long seq = req.getSequence();
if (listener != null) {
registerListener(seq, listener);
} else if (s_logger.isDebugEnabled()) {
@ -376,9 +375,8 @@ public abstract class AgentAttache {
public Answer[] send(Request req, int wait) throws AgentUnavailableException, OperationTimedoutException {
SynchronousListener sl = new SynchronousListener(null);
long seq = getNextSequence();
req.setSequence(seq);
long seq = req.getSequence();
send(req, sl);
try {

View File

@ -804,6 +804,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true);
req.setSequence(agent.getNextSequence());
Answer[] answers = agent.send(req, timeout);
notifyAnswersToMonitors(hostId, req.getSequence(), answers);
commands.setAnswers(answers);
@ -818,6 +819,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
try {
Request req = new Request(hostId, _nodeId, new CheckHealthCommand(), true);
req.setSequence(agent.getNextSequence());
Answer[] answers = agent.send(req, 50 * 1000);
if (answers != null && answers[0] != null) {
Status status = answers[0].getResult() ? Status.Up : Status.Down;
@ -863,6 +865,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
return -1;
}
Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true);
req.setSequence(agent.getNextSequence());
agent.send(req, listener);
return req.getSequence();
}
@ -1133,12 +1136,12 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
public void startDirectlyConnectedHosts() {
List<HostVO> hosts = _hostDao.findDirectlyConnectedHosts();
for (HostVO host : hosts) {
loadDirectlyConnectedHost(host);
loadDirectlyConnectedHost(host, false);
}
}
@SuppressWarnings("rawtypes")
protected void loadDirectlyConnectedHost(HostVO host) {
protected boolean loadDirectlyConnectedHost(HostVO host, boolean executeNow) {
String resourceName = host.getResource();
ServerResource resource = null;
try {
@ -1147,25 +1150,25 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
resource = (ServerResource) constructor.newInstance();
} catch (ClassNotFoundException e) {
s_logger.warn("Unable to find class " + host.getResource(), e);
return;
return false;
} catch (InstantiationException e) {
s_logger.warn("Unablet to instantiate class " + host.getResource(), e);
return;
return false;
} catch (IllegalAccessException e) {
s_logger.warn("Illegal access " + host.getResource(), e);
return;
return false;
} catch (SecurityException e) {
s_logger.warn("Security error on " + host.getResource(), e);
return;
return false;
} catch (NoSuchMethodException e) {
s_logger.warn("NoSuchMethodException error on " + host.getResource(), e);
return;
return false;
} catch (IllegalArgumentException e) {
s_logger.warn("IllegalArgumentException error on " + host.getResource(), e);
return;
return false;
} catch (InvocationTargetException e) {
s_logger.warn("InvocationTargetException error on " + host.getResource(), e);
return;
return false;
}
_hostDao.loadDetails(host);
@ -1201,14 +1204,25 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
} catch (ConfigurationException e) {
e.printStackTrace();
s_logger.warn("Unable to configure resource due to ", e);
return;
return false;
}
if (!resource.start()) {
s_logger.warn("Unable to start the resource");
return;
return false;
}
if (executeNow) {
AgentAttache attache = simulateStart(host.getId(), resource, host.getDetails(), false, null, null);
if (attache == null) {
return false;
} else {
return true;
}
} else {
_executor.execute(new SimulateStartTask(host.getId(), resource, host.getDetails(), null));
return true;
}
_executor.execute(new SimulateStartTask(host.getId(), resource, host.getDetails(), null));
}
@Override

View File

@ -10,6 +10,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import javax.net.ssl.SSLEngine;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
@ -126,6 +128,11 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout
}
continue;
}
SSLEngine sslEngine = s_clusteredAgentMgr.getSSLEngine(peerName);
if (sslEngine == null) {
throw new AgentUnavailableException("Unable to get SSLEngine of peer " + peerName, _id);
}
try {
if (s_logger.isDebugEnabled()) {
@ -135,7 +142,7 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout
SynchronousListener synchronous = (SynchronousListener)listener;
synchronous.setPeer(peerName);
}
Link.write(ch, req.toBytes());
Link.write(ch, req.toBytes(), sslEngine);
error = false;
return;
} catch (IOException e) {

View File

@ -15,6 +15,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -27,6 +28,8 @@ import java.util.concurrent.TimeUnit;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.log4j.Logger;
@ -83,12 +86,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
public long _loadSize = 100;
protected Set<Long> _agentToTransferIds = new HashSet<Long>();
private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
@Inject
protected ClusterManager _clusterMgr = null;
protected HashMap<String, SocketChannel> _peers;
protected HashMap<String, SSLEngine> _sslEngines;
private final Timer _timer = new Timer("ClusteredAgentManager Timer");
@Inject
@ -106,6 +111,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
_peers = new HashMap<String, SocketChannel>(7);
_sslEngines = new HashMap<String, SSLEngine>(7);
_nodeId = _clusterMgr.getManagementNodeId();
ConfigurationDao configDao = ComponentLocator.getCurrentLocator().getDao(ConfigurationDao.class);
@ -190,7 +196,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (s_logger.isDebugEnabled()) {
s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ")");
}
loadDirectlyConnectedHost(host);
loadDirectlyConnectedHost(host, false);
} catch (Throwable e) {
s_logger.debug(" can not load directly connected host " + host.getId() + "(" + host.getName() + ") due to " + e.toString());
}
@ -406,6 +412,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public boolean routeToPeer(String peer, byte[] bytes) {
int i = 0;
SocketChannel ch = null;
SSLEngine sslEngine = null;
while (i++ < 5) {
ch = connectToPeer(peer, ch);
if (ch == null) {
@ -415,11 +422,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
return false;
}
sslEngine = getSSLEngine(peer);
if (sslEngine == null) {
logD(bytes, "Unable to get SSLEngine of peer: " + peer);
return false;
}
try {
if (s_logger.isDebugEnabled()) {
logD(bytes, "Routing to peer");
}
Link.write(ch, new ByteBuffer[] { ByteBuffer.wrap(bytes) });
Link.write(ch, new ByteBuffer[] { ByteBuffer.wrap(bytes) }, sslEngine);
return true;
} catch (IOException e) {
try {
@ -434,6 +446,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public String findPeer(long hostId) {
return _clusterMgr.getPeerName(hostId);
}
public SSLEngine getSSLEngine(String peerName) {
return _sslEngines.get(peerName);
}
public void cancel(String peerName, long hostId, long sequence, String reason) {
CancelCommand cancel = new CancelCommand(sequence, reason);
@ -453,12 +469,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
_peers.remove(peerName);
_sslEngines.remove(peerName);
}
}
public SocketChannel connectToPeer(String peerName, SocketChannel prevCh) {
synchronized (_peers) {
SocketChannel ch = _peers.get(peerName);
SSLEngine sslEngine = null;
if (prevCh != null) {
try {
prevCh.close();
@ -483,10 +501,21 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
ch.configureBlocking(true); // make sure we are working at blocking mode
ch.socket().setKeepAlive(true);
ch.socket().setSoTimeout(60 * 1000);
try {
SSLContext sslContext = Link.initSSLContext(true);
sslEngine = sslContext.createSSLEngine(ip, _port);
sslEngine.setUseClientMode(true);
Link.doHandshake(ch, sslEngine, true);
s_logger.info("SSL: Handshake done");
} catch (Exception e) {
throw new IOException("SSL: Fail to init SSL! " + e);
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Connection to peer opened: " + peerName + ", ip: " + ip);
}
_peers.put(peerName, ch);
_sslEngines.put(peerName, sslEngine);
} catch (IOException e) {
s_logger.warn("Unable to connect to peer management server: " + peerName + ", ip: " + ip + " due to " + e.getMessage(), e);
return null;
@ -542,7 +571,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
_timer.cancel();
//cancel all transfer tasks
s_transferExecutor.shutdownNow();
cleanupTransferMap();
return super.stop();
}
@ -569,7 +602,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
final byte[] data = task.getData();
Version ver = Request.getVersion(data);
if (ver.ordinal() < Version.v3.ordinal()) {
if (ver.ordinal() != Version.v1.ordinal() && ver.ordinal() != Version.v3.ordinal()) {
s_logger.warn("Wrong version for clustered agent request");
super.doTask(task);
return;
}
@ -685,12 +719,12 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
@Override
public boolean executeRebalanceRequest(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException {
public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException {
if (event == Event.RequestAgentRebalance) {
return setToWaitForRebalance(agentId);
} else if (event == Event.StartAgentRebalance) {
return rebalanceHost(agentId);
}
return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId);
} else if (event == Event.StartAgentRebalance) {
return rebalanceHost(agentId, currentOwnerId, futureOwnerId);
}
return true;
}
@ -737,9 +771,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
long hostId = host.getId();
s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId);
boolean result = true;
if (_hostTransferDao.findById(hostId) != null) {
s_logger.warn("Somebody else is already rebalancing host id: " + hostId);
continue;
}
HostTransferMapVO transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId);
try {
Answer[] answer = sendRebalanceCommand(hostId, node.getMsid(), Event.RequestAgentRebalance);
Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance);
if (answer == null) {
s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid());
result = false;
@ -748,8 +788,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid(), ex);
result = false;
} finally {
HostTransferMapVO updatedTransfer = _hostTransferDao.findById(transfer.getId());
if (!result && updatedTransfer.getState() == HostTransferState.TransferRequested) {
HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId);
if (!result && transferState != null && transferState.getState() == HostTransferState.TransferRequested) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode");
}
@ -765,23 +805,22 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
private Answer[] sendRebalanceCommand(long agentId, long peer, Event event) {
TransferAgentCommand transfer = new TransferAgentCommand(agentId, peer, event);
private Answer[] sendRebalanceCommand(long peer, long agentId, long currentOwnerId, long futureOwnerId, Event event) {
TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event);
Commands commands = new Commands(OnError.Stop);
commands.addCommand(transfer);
Command[] cmds = commands.toCommands();
String peerName = Long.toString(peer);
try {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer);
}
String peerName = Long.toString(peer);
Answer[] answers = _clusterMgr.execute(peerName, agentId, cmds, true);
return answers;
} catch (Exception e) {
s_logger.warn("Caught exception while talking to " + peer, e);
s_logger.warn("Caught exception while talking to " + currentOwnerId, e);
return null;
}
}
@ -791,45 +830,52 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
public void run() {
try {
// TODO - change to trace level later on
if (s_logger.isDebugEnabled()) {
s_logger.debug("Clustered agent transfer scan check, management server id:" + _nodeId);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Clustered agent transfer scan check, management server id:" + _nodeId);
}
if (_agentToTransferIds.size() > 0) {
s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
for (Long hostId : _agentToTransferIds) {
AgentAttache attache = findAttache(hostId);
if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
boolean result = false;
_agentToTransferIds.remove(hostId);
try {
_hostTransferDao.startAgentTransfer(hostId);
result = rebalanceHost(hostId);
} finally {
if (result) {
finishRebalance(hostId, Event.RebalanceCompleted);
} else {
finishRebalance(hostId, Event.RebalanceFailed);
}
}
} else {
// if we timed out waiting for the host to reconnect, remove host from rebalance list and delete from op_host_transfer DB
// no need to do anything with the real attache
Date cutTime = DateUtil.currentGMTTime();
if (_hostTransferDao.isNotActive(hostId, new Date(cutTime.getTime() - rebalanceTimeOut))) {
s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, failing rebalance for this host");
_agentToTransferIds.remove(hostId);
_hostTransferDao.completeAgentTransfer(hostId);
} else {
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
}
// if the thread:
// 1) timed out waiting for the host to reconnect
// 2) recipient management server is not active any more
// remove the host from re-balance list and delete from op_host_transfer DB
// no need to do anything with the real attache as we haven't modified it yet
Date cutTime = DateUtil.currentGMTTime();
if (_hostTransferDao.isNotActive(hostId, new Date(cutTime.getTime() - rebalanceTimeOut))) {
s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host");
failStartRebalance(hostId);
return;
}
HostTransferMapVO transferMap = _hostTransferDao.findByIdAndCurrentOwnerId(hostId, _nodeId);
if (transferMap == null) {
s_logger.debug("Can't transfer host id=" + hostId + "; record for the host no longer exists in op_host_transfer table");
failStartRebalance(hostId);
return;
}
ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
if (ms != null && ms.getState() != ManagementServerHost.State.Up) {
s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host");
failStartRebalance(hostId);
return;
}
if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
rebalanceHost(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner());
} else {
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
}
}
} else {
// TODO - change to trace level later on
if (s_logger.isDebugEnabled()) {
s_logger.debug("Found no agents to be transfered by the management server " + _nodeId);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Found no agents to be transfered by the management server " + _nodeId);
}
}
@ -841,7 +887,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
private boolean setToWaitForRebalance(final long hostId) {
private boolean setToWaitForRebalance(final long hostId, long currentOwnerId, long futureOwnerId) {
s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer");
synchronized (_agentToTransferIds) {
return _agentToTransferIds.add(hostId);
@ -849,75 +895,63 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
private boolean rebalanceHost(final long hostId) {
HostTransferMapVO map = _hostTransferDao.findById(hostId);
HostVO host = _hostDao.findById(hostId);
protected boolean rebalanceHost(final long hostId, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException{
boolean result = true;
if (map.getInitialOwner() == _nodeId) {
ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)findAttache(hostId);
if (attache != null && !attache.getTransferMode()) {
attache.setTransferMode(true);
s_logger.debug("Putting agent id=" + hostId + " to transfer mode");
_agents.put(hostId, attache);
if (host != null && host.getRemoved() == null) {
host.setManagementServerId(null);
s_logger.debug("Updating host id=" + hostId + " with the status " + Status.Rebalancing);
_hostDao.updateStatus(host, Event.StartAgentRebalance, _nodeId);
}
try {
Answer[] answer = sendRebalanceCommand(hostId, map.getFutureOwner(), Event.StartAgentRebalance);
if (answer == null) {
s_logger.warn("Host " + hostId + " failed to connect to the management server " + map.getFutureOwner() + " as a part of rebalance process");
result = false;
}
} catch (Exception ex) {
s_logger.warn("Host " + hostId + " failed to connect to the management server " + map.getFutureOwner() + " as a part of rebalance process", ex);
if (currentOwnerId == _nodeId) {
if (!startRebalance(hostId)) {
s_logger.debug("Failed to start agent rebalancing");
failStartRebalance(hostId);
return false;
}
try {
Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance);
if (answer == null || !answer[0].getResult()) {
s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process");
result = false;
}
if (result) {
s_logger.debug("Got host id=" + hostId + " from management server " + map.getFutureOwner());
}
} else {
s_logger.warn("Unable to find agent " + hostId + " on management server " + _nodeId);
} catch (Exception ex) {
s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process", ex);
result = false;
}
} else if (map.getFutureOwner() == _nodeId) {
if (result) {
s_logger.debug("Got host id=" + hostId + " from management server " + futureOwnerId);
finishRebalance(hostId, futureOwnerId, Event.RebalanceCompleted);
} else {
finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed);
}
} else if (futureOwnerId == _nodeId) {
HostVO host = _hostDao.findById(hostId);
try {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") as a part of rebalance process");
}
//TODO - 1) no need to do vmfullSync/storageSetup on the agent side 2) Make sure that if connection fails, host goes from Rebalance state to Alert
loadDirectlyConnectedHost(host);
result = loadDirectlyConnectedHost(host, true);
} catch (Exception ex) {
s_logger.warn("Unable to load directly connected host " + host.getId() + " as a part of rebalance due to exception: ", ex);
result = false;
}
}
return result;
}
private boolean finishRebalance(final long hostId, Event event) {
HostTransferMapVO map = _hostTransferDao.findById(hostId);
if (map.getInitialOwner() != _nodeId) {
s_logger.warn("Why finish rebalance called not by initial host owner???");
return false;
}
protected void finishRebalance(final long hostId, long futureOwnerId, Event event) throws AgentUnavailableException{
boolean success = (event == Event.RebalanceCompleted) ? true : false;
if (s_logger.isDebugEnabled()) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Finishing rebalancing for the agent " + hostId + " with result " + success);
}
AgentAttache attache = findAttache(hostId);
if (attache == null) {
s_logger.debug("Unable to find attache for the host id=" + hostId + ", assuming that the agent disconnected already");
return true;
_hostTransferDao.completeAgentTransfer(hostId);
return;
} else if (success) {
s_logger.debug("Management server " + _nodeId + " is completing agent " + hostId + " rebalance");
//1) Get all the requests before removing transfer attache
@ -929,22 +963,87 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
getAttache(hostId);
//3) forward all the requests to the management server which owns the host now
if (!requests.isEmpty()) {
s_logger.debug("Forwarding requests held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + map.getFutureOwner());
for (Request request : requests) {
routeToPeer(Long.toString(map.getFutureOwner()), request.getBytes());
s_logger.debug("Forwarding requests held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId);
for (Iterator<Request> iter = requests.iterator(); iter.hasNext();) {
Request req = iter.next();
boolean routeResult = routeToPeer(Long.toString(futureOwnerId), req.getBytes());
if (!routeResult) {
logD(req.getBytes(), "Failed to route request to peer");
}
}
}
} catch (AgentUnavailableException ex) {
s_logger.warn("Not creating forward attache as agent is not available", ex);
//TODO - - have to handle the case when requests can't be forwarded due to lack of forward attache
s_logger.warn("Failed to finish host " + hostId + " rebalance: couldn't create forward attache as agent is not available", ex);
failRebalance(hostId);
}
} else {
((ClusteredDirectAgentAttache) attache).setTransferMode(false);
//TODO - have to handle the case when agent fails to rebalance 1) Either connect it back 2) Or disconnect it
failRebalance(hostId);
}
_hostTransferDao.completeAgentTransfer(hostId);
}
protected void failRebalance(final long hostId) throws AgentUnavailableException{
reconnect(hostId);
_hostTransferDao.completeAgentTransfer(hostId);
}
@DB
protected boolean startRebalance(final long hostId) {
HostVO host = _hostDao.findById(hostId);
if (host == null || host.getRemoved() != null) {
s_logger.warn("Unable to find host record, fail start rebalancing process");
return false;
}
synchronized (_agents) {
ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
_agentToTransferIds.remove(hostId);
s_logger.debug("Putting agent id=" + hostId + " to transfer mode");
attache.setTransferMode(true);
_agents.put(hostId, attache);
} else {
if (attache == null) {
s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing");
} else {
s_logger.warn("Attache for the agent " + hostId + " has request queue size= " + attache.getQueueSize() + " and listener queue size " + attache.getNonRecurringListenersSize() + ", can't start host rebalancing");
}
return false;
}
}
Transaction txn = Transaction.currentTxn();
txn.start();
s_logger.debug("Updating host id=" + hostId + " with the status " + Status.Rebalancing);
host.setManagementServerId(null);
_hostDao.updateStatus(host, Event.StartAgentRebalance, _nodeId);
_hostTransferDao.startAgentTransfer(hostId);
txn.commit();
return true;
}
protected void failStartRebalance(final long hostId) {
_agentToTransferIds.remove(hostId);
_hostTransferDao.completeAgentTransfer(hostId);
}
protected void cleanupTransferMap() {
List<HostTransferMapVO> hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(_nodeId);
for (HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) {
_hostTransferDao.remove(hostJoingingCluster.getId());
}
List<HostTransferMapVO> hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(_nodeId);
for (HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) {
_hostTransferDao.remove(hostLeavingCluster.getId());
}
}
}

View File

@ -44,7 +44,7 @@ public class DirectAgentAttache extends AgentAttache {
private final static Logger s_logger = Logger.getLogger(DirectAgentAttache.class);
ServerResource _resource;
static ScheduledExecutorService s_executor = new ScheduledThreadPoolExecutor(100, new NamedThreadFactory("DirectAgent"));
static ScheduledExecutorService s_executor = new ScheduledThreadPoolExecutor(500, new NamedThreadFactory("DirectAgent"));
List<ScheduledFuture<?>> _futures = new ArrayList<ScheduledFuture<?>>();
AgentManagerImpl _mgr;
long _seq = 0;

View File

@ -63,7 +63,7 @@ public interface ClusterManager extends Manager {
*/
public void broadcast(long agentId, Command[] cmds);
boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException;
boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException;
boolean isAgentRebalanceEnabled();
}

View File

@ -928,9 +928,7 @@ public class ClusterManagerImpl implements ClusterManager {
}
}
} catch (final InterruptedException e) {
} finally {
s_logger.debug("Agent rebalancing is completed, management server " + _mshostId + " is ready");
}
}
} catch (Throwable e) {
s_logger.error("Unexpected exception : ", e);
@ -1169,8 +1167,8 @@ public class ClusterManagerImpl implements ClusterManager {
}
@Override
public boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException {
return _rebalanceService.executeRebalanceRequest(agentId, event);
public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
}
@Override

View File

@ -213,7 +213,7 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
}
boolean result = false;
try {
result = manager.rebalanceAgent(cmd.getAgentId(), cmd.getEvent());
result = manager.rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result is " + result);
}

View File

@ -9,6 +9,6 @@ public interface ClusteredAgentRebalanceService {
void startRebalanceAgents();
boolean executeRebalanceRequest(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException;
boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException;
}

View File

@ -167,7 +167,7 @@ public class DummyClusterManagerImpl implements ClusterManager {
}
@Override
public boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException {
public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
return false;
}

View File

@ -174,5 +174,10 @@ public class ManagementServerHostVO implements ManagementServerHost{
public void setAlertCount(int count) {
alertCount = count;
}
}
@Override
public String toString() {
return new StringBuilder("ManagementServer[").append("-").append(id).append("-").append(msid).append("-").append(state).append("]").toString();
}
}

View File

@ -27,7 +27,7 @@ import com.cloud.utils.db.GenericDao;
public interface HostTransferMapDao extends GenericDao<HostTransferMapVO, Long> {
List<HostTransferMapVO> listHostsLeavingCluster(long clusterId);
List<HostTransferMapVO> listHostsLeavingCluster(long currentOwnerId);
List<HostTransferMapVO> listHostsJoiningCluster(long futureOwnerId);
@ -40,4 +40,8 @@ public interface HostTransferMapDao extends GenericDao<HostTransferMapVO, Long>
boolean isNotActive(long hostId, Date cutTime);
boolean startAgentTransfer(long hostId);
HostTransferMapVO findByIdAndFutureOwnerId(long id, long futureOwnerId);
HostTransferMapVO findByIdAndCurrentOwnerId(long id, long currentOwnerId);
}

View File

@ -63,10 +63,9 @@ public class HostTransferMapDaoImpl extends GenericDaoBase<HostTransferMapVO, Lo
}
@Override
public List<HostTransferMapVO> listHostsLeavingCluster(long clusterId) {
public List<HostTransferMapVO> listHostsLeavingCluster(long currentOwnerId) {
SearchCriteria<HostTransferMapVO> sc = IntermediateStateSearch.create();
sc.setParameters("initialOwner", clusterId);
sc.setParameters("state", HostTransferState.TransferRequested, HostTransferState.TransferStarted);
sc.setParameters("initialOwner", currentOwnerId);
return listBy(sc);
}
@ -75,12 +74,10 @@ public class HostTransferMapDaoImpl extends GenericDaoBase<HostTransferMapVO, Lo
public List<HostTransferMapVO> listHostsJoiningCluster(long futureOwnerId) {
SearchCriteria<HostTransferMapVO> sc = IntermediateStateSearch.create();
sc.setParameters("futureOwner", futureOwnerId);
sc.setParameters("state", HostTransferState.TransferRequested);
return listBy(sc);
}
@Override
public HostTransferMapVO startAgentTransfering(long hostId, long initialOwner, long futureOwner) {
HostTransferMapVO transfer = new HostTransferMapVO(hostId, initialOwner, futureOwner);
@ -122,4 +119,24 @@ public class HostTransferMapDaoImpl extends GenericDaoBase<HostTransferMapVO, Lo
return update(hostId, transfer);
}
@Override
public HostTransferMapVO findByIdAndFutureOwnerId(long id, long futureOwnerId) {
SearchCriteria<HostTransferMapVO> sc = AllFieldsSearch.create();
sc.setParameters("futureOwner", futureOwnerId);
sc.setParameters("id", id);
return findOneBy(sc);
}
@Override
public HostTransferMapVO findByIdAndCurrentOwnerId(long id, long currentOwnerId) {
SearchCriteria<HostTransferMapVO> sc = AllFieldsSearch.create();
sc.setParameters("initialOwner", currentOwnerId);
sc.setParameters("id", id);
return findOneBy(sc);
}
}

View File

@ -32,6 +32,9 @@ import com.cloud.dc.dao.DataCenterDao;
import com.cloud.dc.dao.VlanDao;
import com.cloud.deploy.DeployDestination;
import com.cloud.deploy.DeploymentPlan;
import com.cloud.event.EventTypes;
import com.cloud.event.EventUtils;
import com.cloud.event.EventVO;
import com.cloud.exception.InsufficientAddressCapacityException;
import com.cloud.exception.InsufficientVirtualNetworkCapcityException;
import com.cloud.exception.InvalidParameterValueException;
@ -48,6 +51,7 @@ import com.cloud.network.Networks.TrafficType;
import com.cloud.network.dao.NetworkDao;
import com.cloud.offering.NetworkOffering;
import com.cloud.user.Account;
import com.cloud.user.UserContext;
import com.cloud.utils.component.AdapterBase;
import com.cloud.utils.component.Inject;
import com.cloud.utils.db.DB;
@ -155,6 +159,7 @@ public class GuestNetworkGuru extends AdapterBase implements NetworkGuru {
throw new InsufficientVirtualNetworkCapcityException("Unable to allocate vnet as a part of network " + network + " implement ", DataCenter.class, dcId);
}
implemented.setBroadcastUri(BroadcastDomainType.Vlan.toUri(vnet));
EventUtils.saveEvent(UserContext.current().getCallerUserId(), network.getAccountId(), EventVO.LEVEL_INFO, EventTypes.EVENT_ZONE_VLAN_ASSIGN, "Assignbed Zone Vlan: "+vnet+ " Network Id: "+network.getId(), 0);
} else {
implemented.setBroadcastUri(network.getBroadcastUri());
}
@ -166,7 +171,6 @@ public class GuestNetworkGuru extends AdapterBase implements NetworkGuru {
if (network.getCidr() != null) {
implemented.setCidr(network.getCidr());
}
return implemented;
}
@ -261,6 +265,8 @@ public class GuestNetworkGuru extends AdapterBase implements NetworkGuru {
s_logger.debug("Releasing vnet for the network id=" + profile.getId());
if (profile.getBroadcastUri() != null) {
_dcDao.releaseVnet(profile.getBroadcastUri().getHost(), profile.getDataCenterId(), profile.getAccountId(), profile.getReservationId());
EventUtils.saveEvent(UserContext.current().getCallerUserId(), profile.getAccountId(), EventVO.LEVEL_INFO, EventTypes.EVENT_ZONE_VLAN_RELEASE, "Released Zone Vlan: "
+profile.getBroadcastUri().getHost()+" for Network: "+profile.getId(), 0);
profile.setBroadcastUri(null);
}
}

View File

@ -69,7 +69,7 @@ public class SecurityGroupListener implements Listener {
@Override
public boolean isRecurring() {
return false;
return true;
}

View File

@ -4261,10 +4261,12 @@ public class ManagementServerImpl implements ManagementServer {
}
VMTemplateVO template = ApiDBUtils.findTemplateById(volume.getTemplateId());
boolean isExtractable = template != null && template.isExtractable() && template.getTemplateType() != Storage.TemplateType.SYSTEM;
if (!isExtractable && account != null && account.getType() != Account.ACCOUNT_TYPE_ADMIN) { // Global admins are allowed
// to extract
throw new PermissionDeniedException("The volume:" + volumeId + " is not allowed to be extracted");
if (volume.getVolumeType() != Volume.Type.DATADISK){ //Datadisk dont have any template dependence.
boolean isExtractable = template != null && template.isExtractable() && template.getTemplateType() != Storage.TemplateType.SYSTEM;
if (!isExtractable && account != null && account.getType() != Account.ACCOUNT_TYPE_ADMIN) { // Global admins are allowed
// to extract
throw new PermissionDeniedException("The volume:" + volumeId + " is not allowed to be extracted");
}
}
Upload.Mode extractMode;
@ -4274,19 +4276,7 @@ public class ManagementServerImpl implements ManagementServer {
extractMode = mode.equals(Upload.Mode.FTP_UPLOAD.toString()) ? Upload.Mode.FTP_UPLOAD : Upload.Mode.HTTP_DOWNLOAD;
}
if (account != null) {
if (!isAdmin(account.getType())) {
if (volume.getAccountId() != account.getId()) {
throw new PermissionDeniedException("Unable to find volume with ID: " + volumeId + " for account: " + account.getAccountName());
}
} else {
Account userAccount = _accountDao.findById(volume.getAccountId());
if ((userAccount == null) || !_domainDao.isChildDomain(account.getDomainId(), userAccount.getDomainId())) {
throw new PermissionDeniedException("Unable to extract volume:" + volumeId + " - permission denied.");
}
}
}
_accountMgr.checkAccess(account, volume);
// If mode is upload perform extra checks on url and also see if there is an ongoing upload on the same.
if (extractMode == Upload.Mode.FTP_UPLOAD) {
URI uri = new URI(url);

View File

@ -128,10 +128,18 @@ public class StatsCollector {
hostAndVmStatsInterval = NumbersUtil.parseLong(configs.get("vm.stats.interval"), 60000L);
storageStatsInterval = NumbersUtil.parseLong(configs.get("storage.stats.interval"), 60000L);
volumeStatsInterval = NumbersUtil.parseLong(configs.get("volume.stats.interval"), -1L);
_executor.scheduleWithFixedDelay(new HostCollector(), 15000L, hostStatsInterval, TimeUnit.MILLISECONDS);
_executor.scheduleWithFixedDelay(new VmStatsCollector(), 15000L, hostAndVmStatsInterval, TimeUnit.MILLISECONDS);
_executor.scheduleWithFixedDelay(new StorageCollector(), 15000L, storageStatsInterval, TimeUnit.MILLISECONDS);
if (hostStatsInterval > 0) {
_executor.scheduleWithFixedDelay(new HostCollector(), 15000L, hostStatsInterval, TimeUnit.MILLISECONDS);
}
if (hostAndVmStatsInterval > 0) {
_executor.scheduleWithFixedDelay(new VmStatsCollector(), 15000L, hostAndVmStatsInterval, TimeUnit.MILLISECONDS);
}
if (storageStatsInterval > 0) {
_executor.scheduleWithFixedDelay(new StorageCollector(), 15000L, storageStatsInterval, TimeUnit.MILLISECONDS);
}
// -1 means we don't even start this thread to pick up any data.
if (volumeStatsInterval > 0) {
@ -199,7 +207,7 @@ public class StatsCollector {
vmIds.add(vm.getId());
}
try
try
{
HashMap<Long, VmStatsEntry> vmStatsById = _userVmMgr.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds);
@ -250,7 +258,7 @@ public class StatsCollector {
class StorageCollector implements Runnable {
@Override
public void run() {
try {
try {
List<HostVO> hosts = _hostDao.listSecondaryStorageHosts();
ConcurrentHashMap<Long, StorageStats> storageStats = new ConcurrentHashMap<Long, StorageStats>();
for (HostVO host : hosts) {
@ -260,13 +268,13 @@ public class StatsCollector {
if (answer != null && answer.getResult()) {
storageStats.put(hostId, (StorageStats)answer);
//Seems like we have dynamically updated the sec. storage as prev. size and the current do not match
if (_storageStats.get(hostId)!=null &&
_storageStats.get(hostId).getCapacityBytes() != ((StorageStats)answer).getCapacityBytes()){
if (_storageStats.get(hostId)!=null &&
_storageStats.get(hostId).getCapacityBytes() != ((StorageStats)answer).getCapacityBytes()){
host.setTotalSize(((StorageStats)answer).getCapacityBytes());
_hostDao.update(hostId, host);
}
}
}
}
}
_storageStats = storageStats;
ConcurrentHashMap<Long, StorageStats> storagePoolStats = new ConcurrentHashMap<Long, StorageStats>();
@ -279,11 +287,11 @@ public class StatsCollector {
if (answer != null && answer.getResult()) {
storagePoolStats.put(pool.getId(), (StorageStats)answer);
// Seems like we have dynamically updated the pool size since the prev. size and the current do not match
// Seems like we have dynamically updated the pool size since the prev. size and the current do not match
if (_storagePoolStats.get(poolId)!= null &&
_storagePoolStats.get(poolId).getCapacityBytes() != ((StorageStats)answer).getCapacityBytes()){
pool.setCapacityBytes(((StorageStats)answer).getCapacityBytes());
_storagePoolDao.update(pool.getId(), pool);
_storagePoolStats.get(poolId).getCapacityBytes() != ((StorageStats)answer).getCapacityBytes()){
pool.setCapacityBytes(((StorageStats)answer).getCapacityBytes());
_storagePoolDao.update(pool.getId(), pool);
}
}
} catch (StorageUnavailableException e) {
@ -291,7 +299,7 @@ public class StatsCollector {
} catch (Exception e) {
s_logger.warn("Unable to get stats for " + pool);
}
}
}
_storagePoolStats = storagePoolStats;
} catch (Throwable t) {
s_logger.error("Error trying to retrieve storage stats", t);

View File

@ -80,6 +80,7 @@ public class RegisterCompleteServlet extends HttpServlet implements ServletConte
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
String registrationToken = req.getParameter("token");
String expires = req.getParameter("expires");
int statusCode = HttpServletResponse.SC_OK;
String responseMessage = null;
@ -90,29 +91,34 @@ public class RegisterCompleteServlet extends HttpServlet implements ServletConte
s_logger.info("Attempting to register user account with token = "+registrationToken);
User resourceAdminUser = _accountSvc.getActiveUserByRegistrationToken(registrationToken);
if (resourceAdminUser != null) {
if(!resourceAdminUser.isRegistered()){
_accountSvc.markUserRegistered(resourceAdminUser.getId());
if(resourceAdminUser.isRegistered()) {
statusCode = 503;
responseMessage = "{ \"registration_info\" : { \"errorcode\" : \"503\", \"errortext\" : \"Expired token = " + registrationToken + "\" } }";
} else {
if(expires != null && expires.toLowerCase().equals("true")){
_accountSvc.markUserRegistered(resourceAdminUser.getId());
}
Account resourceAdminAccount = _accountSvc.getActiveAccount(resourceAdminUser.getAccountId());
Account rsUserAccount = _accountSvc.getActiveAccount(resourceAdminAccount.getAccountName()+"-user", resourceAdminAccount.getDomainId());
List<UserVO> users = _userDao.listByAccount(rsUserAccount.getId());
User rsUser = users.get(0);
Configuration config = _configDao.findByName("endpointe.url");
StringBuffer sb = new StringBuffer();
sb.append("{ \"registration_info\" : { \"endpoint_url\" : \""+encodeParam(config.getValue())+"\", ");
sb.append("\"domain_id\" : \""+resourceAdminAccount.getDomainId()+"\", ");
sb.append("\"admin_account\" : \""+encodeParam(resourceAdminUser.getUsername())+"\", ");
sb.append("\"admin_account_api_key\" : \""+resourceAdminUser.getApiKey()+"\", ");
sb.append("\"admin_account_secret_key\" : \""+resourceAdminUser.getSecretKey()+"\", ");
sb.append("\"user_account\" : \""+encodeParam(rsUser.getUsername())+"\", ");
sb.append("\"user_account_api_key\" : \""+rsUser.getApiKey()+"\", ");
sb.append("\"user_account_secret_key\" : \""+rsUser.getSecretKey()+"\" ");
sb.append("} }");
responseMessage = sb.toString();
}
Account resourceAdminAccount = _accountSvc.getActiveAccount(resourceAdminUser.getAccountId());
Account rsUserAccount = _accountSvc.getActiveAccount(resourceAdminAccount.getAccountName()+"-user", resourceAdminAccount.getDomainId());
List<UserVO> users = _userDao.listByAccount(rsUserAccount.getId());
User rsUser = users.get(0);
Configuration config = _configDao.findByName("endpointe.url");
StringBuffer sb = new StringBuffer();
sb.append("{ \"registration_info\" : { \"endpoint_url\" : \""+encodeParam(config.getValue())+"\", ");
sb.append("\"domain_id\" : \""+resourceAdminAccount.getDomainId()+"\", ");
sb.append("\"admin_account\" : \""+encodeParam(resourceAdminUser.getUsername())+"\", ");
sb.append("\"admin_account_api_key\" : \""+resourceAdminUser.getApiKey()+"\", ");
sb.append("\"admin_account_secret_key\" : \""+resourceAdminUser.getSecretKey()+"\", ");
sb.append("\"user_account\" : \""+encodeParam(rsUser.getUsername())+"\", ");
sb.append("\"user_account_api_key\" : \""+rsUser.getApiKey()+"\", ");
sb.append("\"user_account_secret_key\" : \""+rsUser.getSecretKey()+"\" ");
sb.append("} }");
responseMessage = sb.toString();
} else {
statusCode = 503;
responseMessage = "{ \"registration_info\" : { \"errorcode\" : \"503\", \"errortext\" : \"Invalid token = " + registrationToken + "\" } }";

View File

@ -43,6 +43,7 @@ import com.cloud.upgrade.dao.Upgrade221to222;
import com.cloud.upgrade.dao.Upgrade222to224;
import com.cloud.upgrade.dao.Upgrade224to225;
import com.cloud.upgrade.dao.Upgrade225to226;
import com.cloud.upgrade.dao.Upgrade226to227;
import com.cloud.upgrade.dao.UpgradeSnapshot217to224;
import com.cloud.upgrade.dao.UpgradeSnapshot223to224;
import com.cloud.upgrade.dao.VersionDao;
@ -65,14 +66,15 @@ public class DatabaseUpgradeChecker implements SystemIntegrityChecker {
public DatabaseUpgradeChecker() {
_dao = ComponentLocator.inject(VersionDaoImpl.class);
_upgradeMap.put("2.1.7", new DbUpgrade[] { new Upgrade217to218(), new Upgrade218to22(), new Upgrade221to222(), new UpgradeSnapshot217to224(), new Upgrade222to224(), new Upgrade224to225(), new Upgrade225to226() });
_upgradeMap.put("2.1.8", new DbUpgrade[] { new Upgrade218to22(), new Upgrade221to222(), new UpgradeSnapshot217to224(), new Upgrade222to224(), new Upgrade218to224DomainVlans(), new Upgrade224to225(), new Upgrade225to226() });
_upgradeMap.put("2.1.9", new DbUpgrade[] { new Upgrade218to22(), new Upgrade221to222(), new UpgradeSnapshot217to224(), new Upgrade222to224(), new Upgrade218to224DomainVlans(), new Upgrade224to225(), new Upgrade225to226() });
_upgradeMap.put("2.2.1", new DbUpgrade[] { new Upgrade221to222(), new UpgradeSnapshot223to224(), new Upgrade222to224(), new Upgrade224to225()});
_upgradeMap.put("2.2.2", new DbUpgrade[] { new Upgrade222to224(), new UpgradeSnapshot223to224(), new Upgrade224to225(), new Upgrade225to226() });
_upgradeMap.put("2.2.3", new DbUpgrade[] { new Upgrade222to224(), new UpgradeSnapshot223to224(), new Upgrade224to225(), new Upgrade225to226() });
_upgradeMap.put("2.2.4", new DbUpgrade[] { new Upgrade224to225(), new Upgrade225to226() });
_upgradeMap.put("2.2.5", new DbUpgrade[] { new Upgrade225to226()});
_upgradeMap.put("2.1.7", new DbUpgrade[] { new Upgrade217to218(), new Upgrade218to22(), new Upgrade221to222(), new UpgradeSnapshot217to224(), new Upgrade222to224(), new Upgrade224to225(), new Upgrade225to226(), new Upgrade226to227() });
_upgradeMap.put("2.1.8", new DbUpgrade[] { new Upgrade218to22(), new Upgrade221to222(), new UpgradeSnapshot217to224(), new Upgrade222to224(), new Upgrade218to224DomainVlans(), new Upgrade224to225(), new Upgrade225to226(), new Upgrade226to227() });
_upgradeMap.put("2.1.9", new DbUpgrade[] { new Upgrade218to22(), new Upgrade221to222(), new UpgradeSnapshot217to224(), new Upgrade222to224(), new Upgrade218to224DomainVlans(), new Upgrade224to225(), new Upgrade225to226(), new Upgrade226to227() });
_upgradeMap.put("2.2.1", new DbUpgrade[] { new Upgrade221to222(), new UpgradeSnapshot223to224(), new Upgrade222to224(), new Upgrade224to225(), new Upgrade225to226(), new Upgrade226to227()});
_upgradeMap.put("2.2.2", new DbUpgrade[] { new Upgrade222to224(), new UpgradeSnapshot223to224(), new Upgrade224to225(), new Upgrade225to226(), new Upgrade226to227() });
_upgradeMap.put("2.2.3", new DbUpgrade[] { new Upgrade222to224(), new UpgradeSnapshot223to224(), new Upgrade224to225(), new Upgrade225to226(), new Upgrade226to227() });
_upgradeMap.put("2.2.4", new DbUpgrade[] { new Upgrade224to225(), new Upgrade225to226(), new Upgrade226to227() });
_upgradeMap.put("2.2.5", new DbUpgrade[] { new Upgrade225to226(), new Upgrade226to227() });
_upgradeMap.put("2.2.6", new DbUpgrade[] { new Upgrade226to227()});
}
protected void runScript(Connection conn, File file) {

View File

@ -0,0 +1,59 @@
package com.cloud.upgrade.dao;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import org.apache.log4j.Logger;
import com.cloud.utils.exception.CloudRuntimeException;
public class DbUpgradeUtils {
final static Logger s_logger = Logger.getLogger(DbUpgradeUtils.class);
public static void dropKeysIfExist(Connection conn, String tableName, List<String> keys, boolean isForeignKey) {
for (String key : keys) {
try {
PreparedStatement pstmt = null;
if (isForeignKey) {
pstmt = conn.prepareStatement("ALTER TABLE " + tableName + " DROP FOREIGN KEY " + key);
} else {
pstmt = conn.prepareStatement("ALTER TABLE " + tableName + " DROP KEY " + key);
}
pstmt.executeUpdate();
s_logger.debug("Key " + key + " is dropped successfully from the table " + tableName);
pstmt.close();
} catch (SQLException e) {
// do nothing here
continue;
}
}
}
public static void dropTableColumnsIfExist(Connection conn, String tableName, List<String> columns) {
PreparedStatement pstmt = null;
try {
for (String column : columns) {
try {
pstmt = conn.prepareStatement("SELECT " + column + " FROM " + tableName);
pstmt.executeQuery();
} catch (SQLException e) {
// if there is an exception, it means that field doesn't exist, so do nothing here
s_logger.trace("Field " + column + " doesn't exist in " + tableName);
continue;
}
pstmt = conn.prepareStatement("ALTER TABLE " + tableName + " DROP COLUMN " + column);
pstmt.executeUpdate();
s_logger.debug("Column " + column + " is dropped successfully from the table " + tableName);
pstmt.close();
}
} catch (SQLException e) {
s_logger.warn("Unable to drop columns using query " + pstmt + " due to exception", e);
throw new CloudRuntimeException("Unable to drop columns due to ", e);
}
}
}

View File

@ -209,32 +209,7 @@ public class Upgrade224to225 implements DbUpgrade {
s_logger.debug("Dropping columns that don't exist in 2.2.5 version of the DB...");
for (String tableName : tablesToModify.keySet()) {
dropTableColumnsIfExist(conn, tableName, tablesToModify.get(tableName));
}
}
private void dropTableColumnsIfExist(Connection conn, String tableName, List<String> columns) {
PreparedStatement pstmt = null;
try {
for (String column : columns) {
try {
pstmt = conn.prepareStatement("SELECT " + column + " FROM " + tableName);
pstmt.executeQuery();
} catch (SQLException e) {
// if there is an exception, it means that field doesn't exist, so do nothing here
s_logger.trace("Field " + column + " doesn't exist in " + tableName);
continue;
}
pstmt = conn.prepareStatement("ALTER TABLE " + tableName + " DROP COLUMN " + column);
pstmt.executeUpdate();
s_logger.debug("Column " + column + " is dropped successfully from the table " + tableName);
pstmt.close();
}
} catch (SQLException e) {
s_logger.warn("Unable to drop columns using query " + pstmt + " due to exception", e);
throw new CloudRuntimeException("Unable to drop columns due to ", e);
DbUpgradeUtils.dropTableColumnsIfExist(conn, tableName, tablesToModify.get(tableName));
}
}
@ -303,31 +278,12 @@ public class Upgrade224to225 implements DbUpgrade {
// drop all foreign keys first
s_logger.debug("Dropping keys that don't exist in 2.2.5 version of the DB...");
for (String tableName : foreignKeys.keySet()) {
dropKeysIfExist(conn, tableName, foreignKeys.get(tableName), true);
DbUpgradeUtils.dropKeysIfExist(conn, tableName, foreignKeys.get(tableName), true);
}
// drop indexes now
for (String tableName : indexes.keySet()) {
dropKeysIfExist(conn, tableName, indexes.get(tableName), false);
}
}
private void dropKeysIfExist(Connection conn, String tableName, List<String> keys, boolean isForeignKey) {
for (String key : keys) {
try {
PreparedStatement pstmt = null;
if (isForeignKey) {
pstmt = conn.prepareStatement("ALTER TABLE " + tableName + " DROP FOREIGN KEY " + key);
} else {
pstmt = conn.prepareStatement("ALTER TABLE " + tableName + " DROP KEY " + key);
}
pstmt.executeUpdate();
s_logger.debug("Key " + key + " is dropped successfully from the table " + tableName);
pstmt.close();
} catch (SQLException e) {
// do nothing here
continue;
}
DbUpgradeUtils.dropKeysIfExist(conn, tableName, indexes.get(tableName), false);
}
}

View File

@ -19,32 +19,42 @@ package com.cloud.upgrade.dao;
import java.io.File;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import com.cloud.dc.DataCenterVO;
import com.cloud.dc.dao.DataCenterDao;
import com.cloud.host.HostVO;
import com.cloud.host.dao.HostDao;
import com.cloud.storage.DiskOfferingVO;
import com.cloud.storage.dao.DiskOfferingDao;
import com.cloud.storage.dao.SnapshotDao;
import com.cloud.utils.component.Inject;
import org.apache.log4j.Logger;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.script.Script;
public class Upgrade225to226 implements DbUpgrade {
@Inject
protected SnapshotDao _snapshotDao;
@Inject
protected HostDao _hostDao;
@Inject
protected DataCenterDao _dcDao;
@Inject
protected DiskOfferingDao _diskOfferingDao;
final static Logger s_logger = Logger.getLogger(Upgrade225to226.class);
@Override
public File[] getPrepareScripts() {
String file = Script.findScript("", "db/schema-225to226.sql");
if (file == null) {
throw new CloudRuntimeException("Unable to find the upgrade script, schema-225to226.sql");
}
return new File[] { new File(file) };
}
@Override
public void performDataMigration(Connection conn) {
dropKeysIfExist(conn);
dropTableColumnsIfExist(conn);
}
@Override
public File[] getCleanupScripts() {
return null;
}
@Override
public String[] getUpgradableVersionRange() {
return new String[] {"2.2.5"};
return new String[] { "2.2.5", "2.2.5" };
}
@Override
@ -54,38 +64,46 @@ public class Upgrade225to226 implements DbUpgrade {
@Override
public boolean supportsRollingUpgrade() {
return true;
return false;
}
@Override
public File[] getPrepareScripts() {
String script = Script.findScript("", "db/schema-225to226.sql");
if (script == null) {
throw new CloudRuntimeException("Unable to find db/schema-224to225.sql");
}
return new File[] { new File(script) };
}
private void dropTableColumnsIfExist(Connection conn) {
HashMap<String, List<String>> tablesToModify = new HashMap<String, List<String>>();
@Override
public void performDataMigration(Connection conn) {
List<DataCenterVO> dcs = _dcDao.listAll();
for ( DataCenterVO dc : dcs ) {
HostVO host = _hostDao.findSecondaryStorageHost(dc.getId());
_snapshotDao.updateSnapshotSecHost(dc.getId(), host.getId());
}
List<DiskOfferingVO> offerings = _diskOfferingDao.listAll();
for ( DiskOfferingVO offering : offerings ) {
if( offering.getDiskSize() <= 2 * 1024 * 1024) { // the unit is MB
offering.setDiskSize(offering.getDiskSize() * 1024 * 1024);
_diskOfferingDao.update(offering.getId(), offering);
}
// domain router table
List<String> columns = new ArrayList<String>();
columns.add("account_id");
columns.add("domain_id");
tablesToModify.put("domain_router", columns);
s_logger.debug("Dropping columns that don't exist in 2.2.6 version of the DB...");
for (String tableName : tablesToModify.keySet()) {
DbUpgradeUtils.dropTableColumnsIfExist(conn, tableName, tablesToModify.get(tableName));
}
}
@Override
public File[] getCleanupScripts() {
return null;
private void dropKeysIfExist(Connection conn) {
HashMap<String, List<String>> foreignKeys = new HashMap<String, List<String>>();
HashMap<String, List<String>> indexes = new HashMap<String, List<String>>();
// domain router table
List<String> keys = new ArrayList<String>();
keys.add("fk_domain_router__account_id");
foreignKeys.put("domain_router", keys);
keys = new ArrayList<String>();
keys.add("i_domain_router__account_id");
indexes.put("domain_router", keys);
// drop all foreign keys first
s_logger.debug("Dropping keys that don't exist in 2.2.6 version of the DB...");
for (String tableName : foreignKeys.keySet()) {
DbUpgradeUtils.dropKeysIfExist(conn, tableName, foreignKeys.get(tableName), true);
}
// drop indexes now
for (String tableName : indexes.keySet()) {
DbUpgradeUtils.dropKeysIfExist(conn, tableName, indexes.get(tableName), false);
}
}
}

View File

@ -0,0 +1,91 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.upgrade.dao;
import java.io.File;
import java.sql.Connection;
import java.util.List;
import com.cloud.dc.DataCenterVO;
import com.cloud.dc.dao.DataCenterDao;
import com.cloud.host.HostVO;
import com.cloud.host.dao.HostDao;
import com.cloud.storage.DiskOfferingVO;
import com.cloud.storage.dao.DiskOfferingDao;
import com.cloud.storage.dao.SnapshotDao;
import com.cloud.utils.component.Inject;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.script.Script;
public class Upgrade226to227 implements DbUpgrade {
@Inject
protected SnapshotDao _snapshotDao;
@Inject
protected HostDao _hostDao;
@Inject
protected DataCenterDao _dcDao;
@Inject
protected DiskOfferingDao _diskOfferingDao;
@Override
public String[] getUpgradableVersionRange() {
return new String[] {"2.2.5"};
}
@Override
public String getUpgradedVersion() {
return "2.2.6";
}
@Override
public boolean supportsRollingUpgrade() {
return true;
}
@Override
public File[] getPrepareScripts() {
String script = Script.findScript("", "db/schema-226to227.sql");
if (script == null) {
throw new CloudRuntimeException("Unable to find db/schema-226to227.sql");
}
return new File[] { new File(script) };
}
@Override
public void performDataMigration(Connection conn) {
List<DataCenterVO> dcs = _dcDao.listAll();
for ( DataCenterVO dc : dcs ) {
HostVO host = _hostDao.findSecondaryStorageHost(dc.getId());
_snapshotDao.updateSnapshotSecHost(dc.getId(), host.getId());
}
List<DiskOfferingVO> offerings = _diskOfferingDao.listAll();
for ( DiskOfferingVO offering : offerings ) {
if( offering.getDiskSize() <= 2 * 1024 * 1024) { // the unit is MB
offering.setDiskSize(offering.getDiskSize() * 1024 * 1024);
_diskOfferingDao.update(offering.getId(), offering);
}
}
}
@Override
public File[] getCleanupScripts() {
return null;
}
}

View File

@ -1800,7 +1800,6 @@ public class AccountManagerImpl implements AccountManager, AccountService, Manag
public void markUserRegistered(long userId) {
UserVO userForUpdate = _userDao.createForUpdate();
userForUpdate.setRegistered(true);
userForUpdate.setRegistrationToken(null);
_userDao.update(Long.valueOf(userId), userForUpdate);
}
}

View File

@ -1198,7 +1198,7 @@ CREATE TABLE `cloud`.`storage_pool` (
`cluster_id` bigint unsigned COMMENT 'foreign key to cluster',
`available_bytes` bigint unsigned,
`capacity_bytes` bigint unsigned,
`host_address` char(40) NOT NULL COMMENT 'FQDN or IP of storage server',
`host_address` varchar(255) NOT NULL COMMENT 'FQDN or IP of storage server',
`path` varchar(255) NOT NULL COMMENT 'Filesystem path that is shared',
`created` datetime COMMENT 'date the pool created',
`removed` datetime COMMENT 'date removed if not null',
@ -1530,7 +1530,7 @@ CREATE TABLE `cloud`.`op_host_transfer` (
`state` varchar(32) NOT NULL COMMENT 'the transfer state of the host',
`created` datetime NOT NULL COMMENT 'date created',
PRIMARY KEY (`id`),
CONSTRAINT `fk_op_host_transfer__id` FOREIGN KEY `fk_op_host_transfer__id` (`id`) REFERENCES `host` (`id`),
CONSTRAINT `fk_op_host_transfer__id` FOREIGN KEY `fk_op_host_transfer__id` (`id`) REFERENCES `host` (`id`) ON DELETE CASCADE,
CONSTRAINT `fk_op_host_transfer__initial_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__initial_mgmt_server_id`(`initial_mgmt_server_id`) REFERENCES `mshost`(`msid`),
CONSTRAINT `fk_op_host_transfer__future_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__future_mgmt_server_id`(`future_mgmt_server_id`) REFERENCES `mshost`(`msid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

View File

@ -26,9 +26,6 @@ ALTER TABLE `cloud`.`user_vm` DROP COLUMN `service_offering_id`;
ALTER TABLE `cloud`.`user_vm` DROP COLUMN `account_id`;
ALTER TABLE `cloud`.`user_vm` DROP COLUMN `domain_id`;
ALTER TABLE `cloud`.`domain_router` DROP FOREIGN KEY `fk_domain_router__account_id`;
ALTER TABLE `cloud`.`domain_router` DROP INDEX `i_domain_router__account_id`;
#ALTER TABLE `cloud`.`secondary_storage_vm` DROP COLUMN `guid`;
#ALTER TABLE `cloud`.`vlan` ADD CONSTRAINT `fk_vlan__network_id` FOREIGN KEY `fk_vlan__network_id`(`network_id`) REFERENCES `networks`(`id`);

View File

@ -953,7 +953,7 @@ INSERT INTO `cloud`.`vm_template` (id, unique_name, name, public, created, type,
INSERT INTO `cloud`.`vm_template` (id, unique_name, name, public, created, type, hvm, bits, account_id, url, checksum, enable_password, display_text, format, guest_os_id, featured, cross_zones, hypervisor_type, extractable)
VALUES (7, 'centos53-x64', 'CentOS 5.3(64-bit) no GUI (vSphere)', 1, now(), 'BUILTIN', 0, 64, 1, 'http://download.cloud.com/releases/2.2.0/CentOS5.3-x86_64.ova', 'f6f881b7f2292948d8494db837fe0f47', 0, 'CentOS 5.3(64-bit) no GUI (vSphere)', 'OVA', 12, 1, 1, 'VMware', 1);
UPDATE vm_instance SET guest_os_id=15 where vm_template_id=1;
UPDATE vm_instance SET vm_template_id=(SELECT id FROM vm_template WHERE unique_name='routing-xenserver-2.2.4' AND removed IS NULL) where vm_template_id=1;
UPDATE vm_instance SET vm_template_id=(SELECT id FROM vm_template WHERE name='systemvm-xenserver-2.2.4' AND removed IS NULL) where vm_template_id=1;
ALTER TABLE `cloud`.`instance_group` ADD CONSTRAINT `fk_instance_group__account_id` FOREIGN KEY `fk_instance_group__account_id` (`account_id`) REFERENCES `account` (`id`);

View File

@ -1,114 +1,7 @@
--;
-- Schema upgrade from 2.2.5 to 2.2.6;
--;
ALTER TABLE `cloud`.`mshost` ADD COLUMN `runid` bigint NOT NULL DEFAULT 0 COMMENT 'run id, combined with msid to form a cluster session';
ALTER TABLE `cloud`.`mshost` ADD COLUMN `state` varchar(10) NOT NULL default 'Down';
ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `limit_cpu_use` tinyint(1) NOT NULL DEFAULT 0 ;
ALTER TABLE `cloud`.`service_offering` ADD COLUMN `limit_cpu_use` tinyint(1) NOT NULL DEFAULT 0 ;
ALTER TABLE `cloud`.`storage_pool` MODIFY `host_address` varchar(255) NOT NULL;
DROP TABLE IF EXISTS `cloud`.`certificate`;
CREATE TABLE `cloud`.`keystore` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`name` varchar(64) NOT NULL COMMENT 'unique name for the certifiation',
`certificate` text NOT NULL COMMENT 'the actual certificate being stored in the db',
`key` text NOT NULL COMMENT 'private key associated wih the certificate',
`domain_suffix` varchar(256) NOT NULL COMMENT 'DNS domain suffix associated with the certificate',
PRIMARY KEY (`id`),
UNIQUE(name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `cloud`.`cmd_exec_log` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`host_id` bigint unsigned NOT NULL COMMENT 'host id of the system VM agent that command is sent to',
`instance_id` bigint unsigned NOT NULL COMMENT 'instance id of the system VM that command is executed on',
`command_name` varchar(255) NOT NULL COMMENT 'command name',
`weight` integer NOT NULL DEFAULT 1 COMMENT 'command weight in consideration of the load factor added to host that is executing the command',
`created` datetime NOT NULL COMMENT 'date created',
PRIMARY KEY (`id`),
INDEX `i_cmd_exec_log__host_id`(`host_id`),
INDEX `i_cmd_exec_log__instance_id`(`instance_id`),
CONSTRAINT `fk_cmd_exec_log_ref__inst_id` FOREIGN KEY (`instance_id`) REFERENCES `vm_instance`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `cloud`.`network_tags` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`network_id` bigint unsigned NOT NULL COMMENT 'id of the network',
`tag` varchar(255) NOT NULL COMMENT 'tag',
PRIMARY KEY (`id`),
CONSTRAINT `fk_network_tags__network_id` FOREIGN KEY (`network_id`) REFERENCES `networks`(`id`),
UNIQUE KEY(`network_id`, `tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS `cloud`.`firewall_rules_cidrs` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`firewall_rule_id` bigint(20) unsigned NOT NULL COMMENT 'firewall rule id',
`source_cidr` varchar(18) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `fk_firewall_cidrs_firewall_rules` (`firewall_rule_id`),
CONSTRAINT `fk_firewall_cidrs_firewall_rules` FOREIGN KEY (`firewall_rule_id`) REFERENCES `firewall_rules` (`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
ALTER TABLE `cloud`.`secondary_storage_vm` ADD COLUMN `role` varchar(64) NOT NULL DEFAULT 'templateProcessor';
INSERT INTO `cloud`.`configuration` (category, instance, component, name, value, description) VALUES ('Network', 'DEFAULT', 'management-server', 'vm.network.throttling.rate', 200, 'Default data transfer rate in megabits per second allowed in user vm\'s default network.');
DELETE FROM `cloud`.`configuration` where name='guest.ip.network';
DELETE FROM `cloud`.`configuration` where name='guest.netmask';
ALTER TABLE `cloud`.`host_pod_ref` ADD COLUMN `removed` datetime COMMENT 'date removed if not null';
ALTER TABLE `cloud`.`host_pod_ref` MODIFY `name` varchar(255);
ALTER TABLE `cloud`.`security_group` DROP COLUMN `account_name`;
ALTER TABLE `cloud`.`security_ingress_rule` DROP COLUMN `allowed_security_group`;
ALTER TABLE `cloud`.`security_ingress_rule` DROP COLUMN `allowed_sec_grp_acct`;
ALTER TABLE `cloud`.`data_center` ADD COLUMN `zone_token` varchar(255);
ALTER TABLE `cloud`.`data_center` ADD INDEX `i_data_center__zone_token`(`zone_token`);
ALTER TABLE `cloud`.`vm_template` ADD COLUMN `source_template_id` bigint unsigned COMMENT 'Id of the original template, if this template is created from snapshot';
ALTER TABLE `cloud`.`op_dc_link_local_ip_address_alloc` ADD INDEX `i_op_dc_link_local_ip_address_alloc__pod_id`(`pod_id`);
ALTER TABLE `cloud`.`op_dc_link_local_ip_address_alloc` ADD INDEX `i_op_dc_link_local_ip_address_alloc__data_center_id`(`data_center_id`);
ALTER TABLE `cloud`.`op_dc_link_local_ip_address_alloc` ADD INDEX `i_op_dc_link_local_ip_address_alloc__nic_id_reservation_id`(`nic_id`,`reservation_id`);
INSERT INTO `cloud`.`guest_os` (id, category_id, display_name) VALUES (139, 7, 'Other PV (32-bit)');
INSERT INTO `cloud`.`guest_os` (id, category_id, display_name) VALUES (140, 7, 'Other PV (64-bit)');
INSERT INTO `cloud`.`guest_os_hypervisor` (hypervisor_type, guest_os_name, guest_os_id) VALUES ('XenServer', 'Other PV (32-bit)', 139);
INSERT INTO `cloud`.`guest_os_hypervisor` (hypervisor_type, guest_os_name, guest_os_id) VALUES ('XenServer', 'Other PV (64-bit)', 140);
ALTER TABLE `cloud`.`network_offerings` ADD COLUMN `shared_source_nat_service` int(1) unsigned NOT NULL DEFAULT 0 COMMENT 'true if the network offering provides the shared source nat service';
CREATE TABLE `cloud`.`op_host_transfer` (
`id` bigint unsigned UNIQUE NOT NULL COMMENT 'Id of the host',
`initial_mgmt_server_id` bigint unsigned COMMENT 'management server the host is transfered from',
`future_mgmt_server_id` bigint unsigned COMMENT 'management server the host is transfered to',
`state` varchar(32) NOT NULL COMMENT 'the transfer state of the host',
`created` datetime NOT NULL COMMENT 'date created',
PRIMARY KEY (`id`),
CONSTRAINT `fk_op_host_transfer__id` FOREIGN KEY `fk_op_host_transfer__id` (`id`) REFERENCES `host` (`id`),
CONSTRAINT `fk_op_host_transfer__initial_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__initial_mgmt_server_id`(`initial_mgmt_server_id`) REFERENCES `mshost`(`msid`),
CONSTRAINT `fk_op_host_transfer__future_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__future_mgmt_server_id`(`future_mgmt_server_id`) REFERENCES `mshost`(`msid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
ALTER TABLE `cloud`.`snapshots` ADD COLUMN `swift_id` bigint unsigned;
ALTER TABLE `cloud`.`snapshots` ADD COLUMN `swift_name` varchar(255);
ALTER TABLE `cloud`.`snapshots` ADD COLUMN `sechost_id` bigint unsigned;
CREATE TABLE `cloud`.`swift` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`hostname` varchar(255),
`account` varchar(255) COMMENT ' account in swift',
`username` varchar(255) COMMENT ' username in swift',
`token` varchar(255) COMMENT 'token for this user',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `vm_type` varchar(32) NOT NULL;
UPDATE vm_instance set vm_type=type;
--;
-- Schema upgrade from 2.2.5 to 2.2.6;
--;
ALTER TABLE `cloud`.`storage_pool` MODIFY `host_address` varchar(255) NOT NULL;

View File

@ -0,0 +1,114 @@
--;
-- Schema upgrade from 2.2.5 to 2.2.6;
--;
ALTER TABLE `cloud`.`mshost` ADD COLUMN `runid` bigint NOT NULL DEFAULT 0 COMMENT 'run id, combined with msid to form a cluster session';
ALTER TABLE `cloud`.`mshost` ADD COLUMN `state` varchar(10) NOT NULL default 'Down';
ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `limit_cpu_use` tinyint(1) NOT NULL DEFAULT 0 ;
ALTER TABLE `cloud`.`service_offering` ADD COLUMN `limit_cpu_use` tinyint(1) NOT NULL DEFAULT 0 ;
ALTER TABLE `cloud`.`storage_pool` MODIFY `host_address` varchar(255) NOT NULL;
DROP TABLE IF EXISTS `cloud`.`certificate`;
CREATE TABLE `cloud`.`keystore` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`name` varchar(64) NOT NULL COMMENT 'unique name for the certifiation',
`certificate` text NOT NULL COMMENT 'the actual certificate being stored in the db',
`key` text NOT NULL COMMENT 'private key associated wih the certificate',
`domain_suffix` varchar(256) NOT NULL COMMENT 'DNS domain suffix associated with the certificate',
PRIMARY KEY (`id`),
UNIQUE(name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `cloud`.`cmd_exec_log` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`host_id` bigint unsigned NOT NULL COMMENT 'host id of the system VM agent that command is sent to',
`instance_id` bigint unsigned NOT NULL COMMENT 'instance id of the system VM that command is executed on',
`command_name` varchar(255) NOT NULL COMMENT 'command name',
`weight` integer NOT NULL DEFAULT 1 COMMENT 'command weight in consideration of the load factor added to host that is executing the command',
`created` datetime NOT NULL COMMENT 'date created',
PRIMARY KEY (`id`),
INDEX `i_cmd_exec_log__host_id`(`host_id`),
INDEX `i_cmd_exec_log__instance_id`(`instance_id`),
CONSTRAINT `fk_cmd_exec_log_ref__inst_id` FOREIGN KEY (`instance_id`) REFERENCES `vm_instance`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `cloud`.`network_tags` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`network_id` bigint unsigned NOT NULL COMMENT 'id of the network',
`tag` varchar(255) NOT NULL COMMENT 'tag',
PRIMARY KEY (`id`),
CONSTRAINT `fk_network_tags__network_id` FOREIGN KEY (`network_id`) REFERENCES `networks`(`id`),
UNIQUE KEY(`network_id`, `tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS `cloud`.`firewall_rules_cidrs` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`firewall_rule_id` bigint(20) unsigned NOT NULL COMMENT 'firewall rule id',
`source_cidr` varchar(18) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `fk_firewall_cidrs_firewall_rules` (`firewall_rule_id`),
CONSTRAINT `fk_firewall_cidrs_firewall_rules` FOREIGN KEY (`firewall_rule_id`) REFERENCES `firewall_rules` (`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
ALTER TABLE `cloud`.`secondary_storage_vm` ADD COLUMN `role` varchar(64) NOT NULL DEFAULT 'templateProcessor';
INSERT INTO `cloud`.`configuration` (category, instance, component, name, value, description) VALUES ('Network', 'DEFAULT', 'management-server', 'vm.network.throttling.rate', 200, 'Default data transfer rate in megabits per second allowed in user vm\'s default network.');
DELETE FROM `cloud`.`configuration` where name='guest.ip.network';
DELETE FROM `cloud`.`configuration` where name='guest.netmask';
ALTER TABLE `cloud`.`host_pod_ref` ADD COLUMN `removed` datetime COMMENT 'date removed if not null';
ALTER TABLE `cloud`.`host_pod_ref` MODIFY `name` varchar(255);
ALTER TABLE `cloud`.`security_group` DROP COLUMN `account_name`;
ALTER TABLE `cloud`.`security_ingress_rule` DROP COLUMN `allowed_security_group`;
ALTER TABLE `cloud`.`security_ingress_rule` DROP COLUMN `allowed_sec_grp_acct`;
ALTER TABLE `cloud`.`data_center` ADD COLUMN `zone_token` varchar(255);
ALTER TABLE `cloud`.`data_center` ADD INDEX `i_data_center__zone_token`(`zone_token`);
ALTER TABLE `cloud`.`vm_template` ADD COLUMN `source_template_id` bigint unsigned COMMENT 'Id of the original template, if this template is created from snapshot';
ALTER TABLE `cloud`.`op_dc_link_local_ip_address_alloc` ADD INDEX `i_op_dc_link_local_ip_address_alloc__pod_id`(`pod_id`);
ALTER TABLE `cloud`.`op_dc_link_local_ip_address_alloc` ADD INDEX `i_op_dc_link_local_ip_address_alloc__data_center_id`(`data_center_id`);
ALTER TABLE `cloud`.`op_dc_link_local_ip_address_alloc` ADD INDEX `i_op_dc_link_local_ip_address_alloc__nic_id_reservation_id`(`nic_id`,`reservation_id`);
INSERT INTO `cloud`.`guest_os` (id, category_id, display_name) VALUES (139, 7, 'Other PV (32-bit)');
INSERT INTO `cloud`.`guest_os` (id, category_id, display_name) VALUES (140, 7, 'Other PV (64-bit)');
INSERT INTO `cloud`.`guest_os_hypervisor` (hypervisor_type, guest_os_name, guest_os_id) VALUES ('XenServer', 'Other PV (32-bit)', 139);
INSERT INTO `cloud`.`guest_os_hypervisor` (hypervisor_type, guest_os_name, guest_os_id) VALUES ('XenServer', 'Other PV (64-bit)', 140);
ALTER TABLE `cloud`.`network_offerings` ADD COLUMN `shared_source_nat_service` int(1) unsigned NOT NULL DEFAULT 0 COMMENT 'true if the network offering provides the shared source nat service';
CREATE TABLE `cloud`.`op_host_transfer` (
`id` bigint unsigned UNIQUE NOT NULL COMMENT 'Id of the host',
`initial_mgmt_server_id` bigint unsigned COMMENT 'management server the host is transfered from',
`future_mgmt_server_id` bigint unsigned COMMENT 'management server the host is transfered to',
`state` varchar(32) NOT NULL COMMENT 'the transfer state of the host',
`created` datetime NOT NULL COMMENT 'date created',
PRIMARY KEY (`id`),
CONSTRAINT `fk_op_host_transfer__id` FOREIGN KEY `fk_op_host_transfer__id` (`id`) REFERENCES `host` (`id`) ON DELETE CASCADE,
CONSTRAINT `fk_op_host_transfer__initial_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__initial_mgmt_server_id`(`initial_mgmt_server_id`) REFERENCES `mshost`(`msid`),
CONSTRAINT `fk_op_host_transfer__future_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__future_mgmt_server_id`(`future_mgmt_server_id`) REFERENCES `mshost`(`msid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
ALTER TABLE `cloud`.`snapshots` ADD COLUMN `swift_id` bigint unsigned;
ALTER TABLE `cloud`.`snapshots` ADD COLUMN `swift_name` varchar(255);
ALTER TABLE `cloud`.`snapshots` ADD COLUMN `sechost_id` bigint unsigned;
CREATE TABLE `cloud`.`swift` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`hostname` varchar(255),
`account` varchar(255) COMMENT ' account in swift',
`username` varchar(255) COMMENT ' username in swift',
`token` varchar(255) COMMENT 'token for this user',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `vm_type` varchar(32) NOT NULL;
UPDATE vm_instance set vm_type=type;

View File

@ -18,6 +18,8 @@
package com.cloud.utils.nio;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -26,11 +28,16 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.security.KeyStore;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import org.apache.log4j.Logger;
@ -82,13 +89,14 @@ public class Link {
}
/**
* No user, so comment it out.
*
* Static methods for reading from a channel in case
* you need to add a client that doesn't require nio.
* @param ch channel to read from.
* @param bytebuffer to use.
* @return bytes read
* @throws IOException if not read to completion.
*/
public static byte[] read(SocketChannel ch, ByteBuffer buff) throws IOException {
synchronized(buff) {
buff.clear();
@ -121,7 +129,44 @@ public class Link {
return output.toByteArray();
}
}
*/
private static void doWrite(SocketChannel ch, ByteBuffer[] buffers, SSLEngine sslEngine) throws IOException {
ByteBuffer pkgBuf;
SSLSession sslSession = sslEngine.getSession();
SSLEngineResult engResult;
ByteBuffer headBuf = ByteBuffer.allocate(4);
pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
engResult = sslEngine.wrap(buffers, pkgBuf);
if (engResult.getHandshakeStatus() != HandshakeStatus.FINISHED &&
engResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING &&
engResult.getStatus() != SSLEngineResult.Status.OK) {
throw new IOException("SSL: SSLEngine return bad result! " + engResult);
}
int dataRemaining = pkgBuf.position();
int headRemaining = 4;
pkgBuf.flip();
headBuf.putInt(dataRemaining);
headBuf.flip();
while (headRemaining > 0) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Writing Header " + headRemaining);
}
long count = ch.write(headBuf);
headRemaining -= count;
}
while (dataRemaining > 0) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Writing Data " + dataRemaining);
}
long count = ch.write(pkgBuf);
dataRemaining -= count;
}
}
/**
* write method to write to a socket. This method writes to completion so
@ -132,26 +177,10 @@ public class Link {
* @param buffers buffers to write.
* @throws IOException if unable to write to completion.
*/
public static void write(SocketChannel ch, ByteBuffer[] buffers) throws IOException {
public static void write(SocketChannel ch, ByteBuffer[] buffers, SSLEngine sslEngine) throws IOException {
synchronized(ch) {
int length = 0;
ByteBuffer[] buff = new ByteBuffer[buffers.length + 1];
for (int i = 0; i < buffers.length; i++) {
length += buffers[i].remaining();
buff[i + 1] = buffers[i];
}
buff[0] = ByteBuffer.allocate(4);
buff[0].putInt(length);
buff[0].flip();
long count = 0;
while (count < length + 4) {
long written = ch.write(buff);
if (written < 0) {
throw new IOException("Unable to write after " + count);
}
count += written;
}
}
doWrite(ch, buffers, sslEngine);
}
}
public byte[] read(SocketChannel ch) throws IOException {
@ -285,42 +314,10 @@ public class Link {
return true;
}
ByteBuffer pkgBuf;
SSLSession sslSession = _sslEngine.getSession();
SSLEngineResult engResult;
ByteBuffer headBuf = ByteBuffer.allocate(4);
ByteBuffer[] raw_data = new ByteBuffer[data.length - 1];
System.arraycopy(data, 1, raw_data, 0, data.length - 1);
pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
engResult = _sslEngine.wrap(raw_data, pkgBuf);
if (engResult.getHandshakeStatus() != HandshakeStatus.FINISHED &&
engResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING &&
engResult.getStatus() != SSLEngineResult.Status.OK) {
throw new IOException("SSL: SSLEngine return bad result! " + engResult);
}
int dataRemaining = pkgBuf.position();
int headRemaining = 4;
pkgBuf.flip();
headBuf.putInt(dataRemaining);
headBuf.flip();
while (headRemaining > 0) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Writing Header " + headRemaining);
}
long count = ch.write(headBuf);
headRemaining -= count;
}
while (dataRemaining > 0) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Writing Data " + dataRemaining);
}
long count = ch.write(pkgBuf);
dataRemaining -= count;
}
doWrite(ch, raw_data, _sslEngine);
}
return false;
}
@ -343,4 +340,132 @@ public class Link {
}
_connection.scheduleTask(task);
}
public static SSLContext initSSLContext(boolean isClient) throws Exception {
SSLContext sslContext = null;
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
KeyStore ks = KeyStore.getInstance("JKS");
TrustManager[] tms;
if (!isClient) {
char[] passphrase = "vmops.com".toCharArray();
String keystorePath = "/etc/cloud/management/cloud.keystore";
if (new File(keystorePath).exists()) {
ks.load(new FileInputStream(keystorePath), passphrase);
} else {
s_logger.warn("SSL: Fail to find the generated keystore. Loading fail-safe one to continue.");
ks.load(NioConnection.class.getResourceAsStream("/cloud.keystore"), passphrase);
}
kmf.init(ks, passphrase);
tmf.init(ks);
tms = tmf.getTrustManagers();
} else {
ks.load(null, null);
kmf.init(ks, null);
tms = new TrustManager[1];
tms[0] = new TrustAllManager();
}
sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf.getKeyManagers(), tms, null);
s_logger.info("SSL: SSLcontext has been initialized");
return sslContext;
}
public static void doHandshake(SocketChannel ch, SSLEngine sslEngine,
boolean isClient) throws IOException {
s_logger.info("SSL: begin Handshake, isClient: " + isClient);
SSLEngineResult engResult;
SSLSession sslSession = sslEngine.getSession();
HandshakeStatus hsStatus;
ByteBuffer in_pkgBuf =
ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
ByteBuffer in_appBuf =
ByteBuffer.allocate(sslSession.getApplicationBufferSize() + 40);
ByteBuffer out_pkgBuf =
ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
ByteBuffer out_appBuf =
ByteBuffer.allocate(sslSession.getApplicationBufferSize() + 40);
int count;
if (isClient) {
hsStatus = SSLEngineResult.HandshakeStatus.NEED_WRAP;
} else {
hsStatus = SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
}
while (hsStatus != SSLEngineResult.HandshakeStatus.FINISHED) {
if (s_logger.isTraceEnabled()) {
s_logger.info("SSL: Handshake status " + hsStatus);
}
engResult = null;
if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_WRAP) {
out_pkgBuf.clear();
out_appBuf.clear();
out_appBuf.put("Hello".getBytes());
engResult = sslEngine.wrap(out_appBuf, out_pkgBuf);
out_pkgBuf.flip();
int remain = out_pkgBuf.limit();
while (remain != 0) {
remain -= ch.write(out_pkgBuf);
if (remain < 0) {
throw new IOException("Too much bytes sent?");
}
}
} else if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
in_appBuf.clear();
// One packet may contained multiply operation
if (in_pkgBuf.position() == 0 || !in_pkgBuf.hasRemaining()) {
in_pkgBuf.clear();
count = ch.read(in_pkgBuf);
if (count == -1) {
throw new IOException("Connection closed with -1 on reading size.");
}
in_pkgBuf.flip();
}
engResult = sslEngine.unwrap(in_pkgBuf, in_appBuf);
ByteBuffer tmp_pkgBuf =
ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
while (engResult.getStatus() == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
// We need more packets to complete this operation
if (s_logger.isTraceEnabled()) {
s_logger.info("SSL: Buffer overflowed, getting more packets");
}
tmp_pkgBuf.clear();
count = ch.read(tmp_pkgBuf);
tmp_pkgBuf.flip();
in_pkgBuf.mark();
in_pkgBuf.position(in_pkgBuf.limit());
in_pkgBuf.limit(in_pkgBuf.limit() + tmp_pkgBuf.limit());
in_pkgBuf.put(tmp_pkgBuf);
in_pkgBuf.reset();
in_appBuf.clear();
engResult = sslEngine.unwrap(in_pkgBuf, in_appBuf);
}
} else if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_TASK) {
Runnable run;
while ((run = sslEngine.getDelegatedTask()) != null) {
if (s_logger.isTraceEnabled()) {
s_logger.info("SSL: Running delegated task!");
}
run.run();
}
} else if (hsStatus == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
throw new IOException("NOT a handshaking!");
}
if (engResult != null && engResult.getStatus() != SSLEngineResult.Status.OK) {
throw new IOException("Fail to handshake! " + engResult.getStatus());
}
if (engResult != null)
hsStatus = engResult.getHandshakeStatus();
else
hsStatus = sslEngine.getHandshakeStatus();
}
}
}

View File

@ -71,11 +71,11 @@ public class NioClient extends NioConnection {
// Begin SSL handshake in BLOCKING mode
sch.configureBlocking(true);
SSLContext sslContext = initSSLContext(true);
SSLContext sslContext = Link.initSSLContext(true);
sslEngine = sslContext.createSSLEngine(_host, _port);
sslEngine.setUseClientMode(true);
doHandshake(sch, sslEngine, true);
Link.doHandshake(sch, sslEngine, true);
s_logger.info("SSL: Handshake done");
} catch (Exception e) {
throw new IOException("SSL: Fail to init SSL! " + e);

View File

@ -17,20 +17,16 @@
*/
package com.cloud.utils.nio;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -40,19 +36,12 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import org.apache.log4j.Logger;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.nio.TrustAllManager;
/**
* NioConnection abstracts the NIO socket operations. The Java implementation
@ -184,133 +173,6 @@ public abstract class NioConnection implements Runnable {
abstract void registerLink(InetSocketAddress saddr, Link link);
abstract void unregisterLink(InetSocketAddress saddr);
protected SSLContext initSSLContext(boolean isClient) throws Exception {
SSLContext sslContext = null;
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
KeyStore ks = KeyStore.getInstance("JKS");
TrustManager[] tms;
if (!isClient) {
char[] passphrase = "vmops.com".toCharArray();
String keystorePath = "/etc/cloud/management/cloud.keystore";
if (new File(keystorePath).exists()) {
ks.load(new FileInputStream(keystorePath), passphrase);
} else {
s_logger.warn("SSL: Fail to find the generated keystore. Loading fail-safe one to continue.");
ks.load(NioConnection.class.getResourceAsStream("/cloud.keystore"), passphrase);
}
kmf.init(ks, passphrase);
tmf.init(ks);
tms = tmf.getTrustManagers();
} else {
ks.load(null, null);
kmf.init(ks, null);
tms = new TrustManager[1];
tms[0] = new TrustAllManager();
}
sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf.getKeyManagers(), tms, null);
s_logger.info("SSL: SSLcontext has been initialized");
return sslContext;
}
protected void doHandshake(SocketChannel ch, SSLEngine sslEngine,
boolean isClient) throws IOException {
s_logger.info("SSL: begin Handshake, isClient: " + isClient);
SSLEngineResult engResult;
SSLSession sslSession = sslEngine.getSession();
HandshakeStatus hsStatus;
ByteBuffer in_pkgBuf =
ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
ByteBuffer in_appBuf =
ByteBuffer.allocate(sslSession.getApplicationBufferSize() + 40);
ByteBuffer out_pkgBuf =
ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
ByteBuffer out_appBuf =
ByteBuffer.allocate(sslSession.getApplicationBufferSize() + 40);
int count;
if (isClient) {
hsStatus = SSLEngineResult.HandshakeStatus.NEED_WRAP;
} else {
hsStatus = SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
}
while (hsStatus != SSLEngineResult.HandshakeStatus.FINISHED) {
if (s_logger.isTraceEnabled()) {
s_logger.info("SSL: Handshake status " + hsStatus);
}
engResult = null;
if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_WRAP) {
out_pkgBuf.clear();
out_appBuf.clear();
out_appBuf.put("Hello".getBytes());
engResult = sslEngine.wrap(out_appBuf, out_pkgBuf);
out_pkgBuf.flip();
int remain = out_pkgBuf.limit();
while (remain != 0) {
remain -= ch.write(out_pkgBuf);
if (remain < 0) {
throw new IOException("Too much bytes sent?");
}
}
} else if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
in_appBuf.clear();
// One packet may contained multiply operation
if (in_pkgBuf.position() == 0 || !in_pkgBuf.hasRemaining()) {
in_pkgBuf.clear();
count = ch.read(in_pkgBuf);
if (count == -1) {
throw new IOException("Connection closed with -1 on reading size.");
}
in_pkgBuf.flip();
}
engResult = sslEngine.unwrap(in_pkgBuf, in_appBuf);
ByteBuffer tmp_pkgBuf =
ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
while (engResult.getStatus() == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
// We need more packets to complete this operation
if (s_logger.isTraceEnabled()) {
s_logger.info("SSL: Buffer overflowed, getting more packets");
}
tmp_pkgBuf.clear();
count = ch.read(tmp_pkgBuf);
tmp_pkgBuf.flip();
in_pkgBuf.mark();
in_pkgBuf.position(in_pkgBuf.limit());
in_pkgBuf.limit(in_pkgBuf.limit() + tmp_pkgBuf.limit());
in_pkgBuf.put(tmp_pkgBuf);
in_pkgBuf.reset();
in_appBuf.clear();
engResult = sslEngine.unwrap(in_pkgBuf, in_appBuf);
}
} else if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_TASK) {
Runnable run;
while ((run = sslEngine.getDelegatedTask()) != null) {
if (s_logger.isTraceEnabled()) {
s_logger.info("SSL: Running delegated task!");
}
run.run();
}
} else if (hsStatus == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
throw new IOException("NOT a handshaking!");
}
if (engResult != null && engResult.getStatus() != SSLEngineResult.Status.OK) {
throw new IOException("Fail to handshake! " + engResult.getStatus());
}
if (engResult != null)
hsStatus = engResult.getHandshakeStatus();
else
hsStatus = sslEngine.getHandshakeStatus();
}
}
protected void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
@ -327,12 +189,12 @@ public abstract class NioConnection implements Runnable {
SSLEngine sslEngine = null;
try {
SSLContext sslContext = initSSLContext(false);
SSLContext sslContext = Link.initSSLContext(false);
sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false);
sslEngine.setNeedClientAuth(false);
doHandshake(socketChannel, sslEngine, false);
Link.doHandshake(socketChannel, sslEngine, false);
s_logger.info("SSL: Handshake done");
} catch (Exception e) {
throw new IOException("SSL: Fail to init SSL! " + e);