diff --git a/core/src/com/cloud/user/UserStatsLogVO.java b/core/src/com/cloud/user/UserStatsLogVO.java new file mode 100644 index 00000000000..2112c7fa1b8 --- /dev/null +++ b/core/src/com/cloud/user/UserStatsLogVO.java @@ -0,0 +1,134 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +package com.cloud.user; + +import java.util.Date; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; +import javax.persistence.Temporal; +import javax.persistence.TemporalType; + +@Entity +@Table(name="op_user_stats_log") +public class UserStatsLogVO { + @Id + @Column(name="user_stats_id") + private long userStatsId; + + @Column(name="net_bytes_received") + private long netBytesReceived; + + @Column(name="net_bytes_sent") + private long netBytesSent; + + @Column(name="current_bytes_received") + private long currentBytesReceived; + + @Column(name="current_bytes_sent") + private long currentBytesSent; + + @Column(name="agg_bytes_received") + private long aggBytesReceived; + + @Column(name="agg_bytes_sent") + private long aggBytesSent; + + @Column(name="updated") + @Temporal(value=TemporalType.TIMESTAMP) + private Date updatedTime; + + public UserStatsLogVO(){ + } + + public UserStatsLogVO(long userStatsId, long netBytesReceived, long netBytesSent, long currentBytesReceived, long currentBytesSent, + long aggBytesReceived, long aggBytesSent, Date updatedTime) { + this.userStatsId = userStatsId; + this.netBytesReceived = netBytesReceived; + this.netBytesSent = netBytesSent; + this.currentBytesReceived = currentBytesReceived; + this.currentBytesSent = currentBytesSent; + this.aggBytesReceived = aggBytesReceived; + this.aggBytesSent = aggBytesSent; + this.updatedTime = updatedTime; + } + + public Long getUserStatsId() { + return userStatsId; + } + + public long getCurrentBytesReceived() { + return currentBytesReceived; + } + + public void setCurrentBytesReceived(long currentBytesReceived) { + this.currentBytesReceived = currentBytesReceived; + } + + public long getCurrentBytesSent() { + return currentBytesSent; + } + + public void setCurrentBytesSent(long currentBytesSent) { + this.currentBytesSent = currentBytesSent; + } + + public long getNetBytesReceived() { + return netBytesReceived; + } + + public long getNetBytesSent() { + return netBytesSent; + } + + public void setNetBytesReceived(long netBytesReceived) { + this.netBytesReceived = netBytesReceived; + } + + public void setNetBytesSent(long netBytesSent) { + this.netBytesSent = netBytesSent; + } + + public long getAggBytesReceived() { + return aggBytesReceived; + } + + public void setAggBytesReceived(long aggBytesReceived) { + this.aggBytesReceived = aggBytesReceived; + } + + public long getAggBytesSent() { + return aggBytesSent; + } + + public void setAggBytesSent(long aggBytesSent) { + this.aggBytesSent = aggBytesSent; + } + + public Date getUpdatedTime() { + return updatedTime; + } + + public void setUpdatedTime(Date updatedTime) { + this.updatedTime = updatedTime; + } + +} diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java index 8b5ac4f949c..c14c385ee57 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java @@ -24,6 +24,7 @@ import java.util.List; import com.cloud.cluster.ManagementServerHost; import com.cloud.cluster.ManagementServerHost.State; import com.cloud.cluster.ManagementServerHostVO; +import com.cloud.utils.db.Filter; import com.cloud.utils.db.GenericDao; public interface ManagementServerHostDao extends GenericDao { @@ -44,4 +45,6 @@ public interface ManagementServerHostDao extends GenericDao listBy(ManagementServerHost.State...states); public List listOrphanMsids(); + + ManagementServerHostVO findOneInUpState(Filter filter); } diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java index abc7c0687f9..4add7166f88 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java @@ -36,6 +36,7 @@ import com.cloud.cluster.ManagementServerHost.State; import com.cloud.cluster.ManagementServerHostVO; import com.cloud.utils.DateUtil; import com.cloud.utils.db.DB; +import com.cloud.utils.db.Filter; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; @@ -258,4 +259,18 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase sc = StateSearch.create(); + + sc.setParameters("state", ManagementServerHost.State.Up); + + List mshosts = listBy(sc, filter); + if(mshosts != null && mshosts.size() > 0){ + return mshosts.get(0); + } + return null; + } + } diff --git a/server/src/com/cloud/configuration/DefaultComponentLibrary.java b/server/src/com/cloud/configuration/DefaultComponentLibrary.java index 223153eed9b..32f1bd5998f 100755 --- a/server/src/com/cloud/configuration/DefaultComponentLibrary.java +++ b/server/src/com/cloud/configuration/DefaultComponentLibrary.java @@ -173,6 +173,7 @@ import com.cloud.user.dao.SSHKeyPairDaoImpl; import com.cloud.user.dao.UserAccountDaoImpl; import com.cloud.user.dao.UserDaoImpl; import com.cloud.user.dao.UserStatisticsDaoImpl; +import com.cloud.user.dao.UserStatsLogDaoImpl; import com.cloud.utils.component.Adapter; import com.cloud.utils.component.ComponentLibrary; import com.cloud.utils.component.ComponentLibraryBase; @@ -226,6 +227,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com info.addParameter("cache.size", "5000"); info.addParameter("cache.time.to.live", "300"); addDao("UserStatisticsDao", UserStatisticsDaoImpl.class); + addDao("UserStatsLogDao", UserStatsLogDaoImpl.class); addDao("FirewallRulesDao", FirewallRulesDaoImpl.class); addDao("LoadBalancerDao", LoadBalancerDaoImpl.class); addDao("NetworkRuleConfigDao", NetworkRuleConfigDaoImpl.class); diff --git a/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java b/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java index e1ca0873862..f862798f435 100755 --- a/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java +++ b/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java @@ -78,7 +78,9 @@ import com.cloud.alert.AlertManager; import com.cloud.api.commands.UpgradeRouterCmd; import com.cloud.async.AsyncJobManager; import com.cloud.capacity.dao.CapacityDao; +import com.cloud.cluster.ManagementServerHostVO; import com.cloud.cluster.ManagementServerNode; +import com.cloud.cluster.dao.ManagementServerHostDao; import com.cloud.configuration.Config; import com.cloud.configuration.ConfigurationManager; import com.cloud.configuration.ZoneConfig; @@ -183,10 +185,12 @@ import com.cloud.user.AccountService; import com.cloud.user.User; import com.cloud.user.UserContext; import com.cloud.user.UserStatisticsVO; +import com.cloud.user.UserStatsLogVO; import com.cloud.user.UserVO; import com.cloud.user.dao.AccountDao; import com.cloud.user.dao.UserDao; import com.cloud.user.dao.UserStatisticsDao; +import com.cloud.user.dao.UserStatsLogDao; import com.cloud.uservm.UserVm; import com.cloud.utils.NumbersUtil; import com.cloud.utils.Pair; @@ -196,6 +200,8 @@ import com.cloud.utils.component.ComponentLocator; import com.cloud.utils.component.Inject; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; +import com.cloud.utils.db.Filter; +import com.cloud.utils.db.GlobalLock; import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.net.MacAddress; @@ -269,6 +275,8 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian @Inject CapacityDao _capacityDao = null; @Inject + UserStatsLogDao _userStatsLogDao = null; + @Inject AgentManager _agentMgr; @Inject StorageManager _storageMgr; @@ -332,7 +340,9 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian PhysicalNetworkServiceProviderDao _physicalProviderDao; @Inject VirtualRouterProviderDao _vrProviderDao; - + @Inject + ManagementServerHostDao _msHostDao; + int _routerRamSize; int _routerCpuMHz; int _retry = 2; @@ -350,7 +360,8 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian private int _usageAggregationRange = 1440; private String _usageTimeZone = "GMT"; private final long mgmtSrvrId = MacAddress.getMacAddress().toLong(); - + private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds + ScheduledExecutorService _executor; ScheduledExecutorService _checkExecutor; ScheduledExecutorService _networkStatsUpdateExecutor; @@ -689,39 +700,39 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian public boolean start() { if (_routerStatsInterval > 0){ _executor.scheduleAtFixedRate(new NetworkUsageTask(), _routerStatsInterval, _routerStatsInterval, TimeUnit.SECONDS); - - //Schedule Network stats update task - TimeZone usageTimezone = TimeZone.getTimeZone(_usageTimeZone); - Calendar cal = Calendar.getInstance(usageTimezone); - cal.setTime(new Date()); - long endDate = 0; - int HOURLY_TIME = 60; - final int DAILY_TIME = 60 * 24; - if (_usageAggregationRange == DAILY_TIME) { - cal.roll(Calendar.DAY_OF_YEAR, false); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - cal.roll(Calendar.DAY_OF_YEAR, true); - cal.add(Calendar.MILLISECOND, -1); - endDate = cal.getTime().getTime(); - } else if (_usageAggregationRange == HOURLY_TIME) { - cal.roll(Calendar.HOUR_OF_DAY, false); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - cal.roll(Calendar.HOUR_OF_DAY, true); - cal.add(Calendar.MILLISECOND, -1); - endDate = cal.getTime().getTime(); - } else { - endDate = cal.getTime().getTime(); - } - - _networkStatsUpdateExecutor.scheduleAtFixedRate(new NetworkStatsUpdateTask(), (endDate - System.currentTimeMillis()), (_usageAggregationRange * 60 * 1000), TimeUnit.MILLISECONDS); }else{ s_logger.debug("router.stats.interval - " + _routerStatsInterval+ " so not scheduling the router stats thread"); } + + //Schedule Network stats update task + TimeZone usageTimezone = TimeZone.getTimeZone(_usageTimeZone); + Calendar cal = Calendar.getInstance(usageTimezone); + cal.setTime(new Date()); + long endDate = 0; + int HOURLY_TIME = 60; + final int DAILY_TIME = 60 * 24; + if (_usageAggregationRange == DAILY_TIME) { + cal.roll(Calendar.DAY_OF_YEAR, false); + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + cal.roll(Calendar.DAY_OF_YEAR, true); + cal.add(Calendar.MILLISECOND, -1); + endDate = cal.getTime().getTime(); + } else if (_usageAggregationRange == HOURLY_TIME) { + cal.roll(Calendar.HOUR_OF_DAY, false); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + cal.roll(Calendar.HOUR_OF_DAY, true); + cal.add(Calendar.MILLISECOND, -1); + endDate = cal.getTime().getTime(); + } else { + endDate = cal.getTime().getTime(); + } + + _networkStatsUpdateExecutor.scheduleAtFixedRate(new NetworkStatsUpdateTask(), (endDate - System.currentTimeMillis()), (_usageAggregationRange * 60 * 1000), TimeUnit.MILLISECONDS); if (_routerCheckInterval > 0) { _checkExecutor.scheduleAtFixedRate(new CheckRouterTask(), _routerCheckInterval, _routerCheckInterval, TimeUnit.SECONDS); @@ -875,15 +886,49 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian @Override public void run() { - try{ - if(_statsDao.updateAggStats()){ - s_logger.debug("Succussfully updated aggregate network stats"); - } else { - s_logger.debug("Failed to update aggregate network stats"); + GlobalLock scanLock = GlobalLock.getInternLock("network.stats"); + try { + if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) { + //Check for ownership + //msHost in UP state with min id should run the job + ManagementServerHostVO msHost = _msHostDao.findOneInUpState(new Filter(ManagementServerHostVO.class, "id", true, 0L, 1L)); + if(msHost == null || (msHost.getMsid() != mgmtSrvrId)){ + s_logger.debug("Skipping aggregate network stats update"); + scanLock.unlock(); + return; + } + Transaction txn = Transaction.open(Transaction.CLOUD_DB); + try { + txn.start(); + //get all stats with delta > 0 + List updatedStats = _statsDao.listUpdatedStats(); + Date updatedTime = new Date(); + for(UserStatisticsVO stat : updatedStats){ + //update agg bytes + stat.setAggBytesReceived(stat.getCurrentBytesReceived() + stat.getNetBytesReceived()); + stat.setAggBytesSent(stat.getCurrentBytesSent() + stat.getNetBytesSent()); + _userStatsDao.update(stat.getId(), stat); + //insert into op_user_stats_log + UserStatsLogVO statsLog = new UserStatsLogVO(stat.getId(), stat.getNetBytesReceived(), stat.getNetBytesSent(), stat.getCurrentBytesReceived(), + stat.getCurrentBytesSent(), stat.getAggBytesReceived(), stat.getAggBytesSent(), updatedTime); + _userStatsLogDao.persist(statsLog); + } + s_logger.debug("Successfully updated aggregate network stats"); + txn.commit(); + } catch (Exception e){ + txn.rollback(); + s_logger.debug("Failed to update aggregate network stats", e); + } finally { + scanLock.unlock(); + txn.close(); + } } } catch (Exception e){ - s_logger.debug("Failed to update aggregate network stats", e); + s_logger.debug("Exception while trying to acquire network stats lock", e); + } finally { + scanLock.releaseRef(); } + } } diff --git a/server/src/com/cloud/user/dao/UserStatisticsDao.java b/server/src/com/cloud/user/dao/UserStatisticsDao.java index a400f736fd3..c01109c1350 100644 --- a/server/src/com/cloud/user/dao/UserStatisticsDao.java +++ b/server/src/com/cloud/user/dao/UserStatisticsDao.java @@ -33,5 +33,5 @@ public interface UserStatisticsDao extends GenericDao { List listActiveAndRecentlyDeleted(Date minRemovedDate, int startIndex, int limit); - boolean updateAggStats(); + List listUpdatedStats(); } diff --git a/server/src/com/cloud/user/dao/UserStatisticsDaoImpl.java b/server/src/com/cloud/user/dao/UserStatisticsDaoImpl.java index 298be81468e..83a4dfa199c 100644 --- a/server/src/com/cloud/user/dao/UserStatisticsDaoImpl.java +++ b/server/src/com/cloud/user/dao/UserStatisticsDaoImpl.java @@ -43,9 +43,11 @@ public class UserStatisticsDaoImpl extends GenericDaoBase= ?) " + "ORDER BY us.id"; - private static final String UPDATE_AGG_STATS = "UPDATE user_statistics set agg_bytes_received = net_bytes_received + current_bytes_received , agg_bytes_sent = net_bytes_sent + current_bytes_sent"; + private static final String UPDATED_STATS_SEARCH = "SELECT id, current_bytes_received, current_bytes_sent, net_bytes_received, net_bytes_sent, agg_bytes_received, agg_bytes_sent from user_statistics " + + "where (agg_bytes_received < net_bytes_received + current_bytes_received) OR (agg_bytes_sent < net_bytes_sent + current_bytes_sent)"; private final SearchBuilder AllFieldsSearch; - private final SearchBuilder AccountSearch; + private final SearchBuilder AccountSearch; + public UserStatisticsDaoImpl() { AccountSearch = createSearchBuilder(); @@ -113,18 +115,23 @@ public class UserStatisticsDaoImpl extends GenericDaoBase listUpdatedStats() { + List userStats = new ArrayList(); + + Transaction txn = Transaction.currentTxn(); try { - String sql = UPDATE_AGG_STATS; PreparedStatement pstmt = null; - pstmt = txn.prepareAutoCloseStatement(sql); - return pstmt.executeUpdate() > 0; + pstmt = txn.prepareAutoCloseStatement(UPDATED_STATS_SEARCH); + ResultSet rs = pstmt.executeQuery(); + while (rs.next()) { + userStats.add(toEntityBean(rs, false)); + } } catch (Exception ex) { - s_logger.error("error updating agg user stats", ex); + s_logger.error("error lisitng updated user stats", ex); } - return false; - } + return userStats; + } + } diff --git a/server/src/com/cloud/user/dao/UserStatsLogDao.java b/server/src/com/cloud/user/dao/UserStatsLogDao.java new file mode 100644 index 00000000000..f728e2c911c --- /dev/null +++ b/server/src/com/cloud/user/dao/UserStatsLogDao.java @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +package com.cloud.user.dao; + +import com.cloud.user.UserStatsLogVO; +import com.cloud.utils.db.GenericDao; + +public interface UserStatsLogDao extends GenericDao { +} diff --git a/server/src/com/cloud/user/dao/UserStatsLogDaoImpl.java b/server/src/com/cloud/user/dao/UserStatsLogDaoImpl.java new file mode 100644 index 00000000000..69b1bcb842c --- /dev/null +++ b/server/src/com/cloud/user/dao/UserStatsLogDaoImpl.java @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +package com.cloud.user.dao; + +import javax.ejb.Local; + +import com.cloud.user.UserStatsLogVO; +import com.cloud.utils.db.GenericDaoBase; + +@Local(value={UserStatsLogDao.class}) +public class UserStatsLogDaoImpl extends GenericDaoBase implements UserStatsLogDao { + public UserStatsLogDaoImpl(){ + } +} diff --git a/setup/db/create-schema.sql b/setup/db/create-schema.sql index f0713158e2b..8c2194fefa2 100755 --- a/setup/db/create-schema.sql +++ b/setup/db/create-schema.sql @@ -2013,4 +2013,17 @@ CREATE TABLE `cloud`.`virtual_router_providers` ( CONSTRAINT `uc_virtual_router_providers__uuid` UNIQUE (`uuid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; +CREATE TABLE `cloud`.`op_user_stats_log` ( + `user_stats_id` bigint unsigned NOT NULL, + `net_bytes_received` bigint unsigned NOT NULL default '0', + `net_bytes_sent` bigint unsigned NOT NULL default '0', + `current_bytes_received` bigint unsigned NOT NULL default '0', + `current_bytes_sent` bigint unsigned NOT NULL default '0', + `agg_bytes_received` bigint unsigned NOT NULL default '0', + `agg_bytes_sent` bigint unsigned NOT NULL default '0', + `updated` datetime COMMENT 'stats update timestamp', + UNIQUE KEY (`user_stats_id`, `updated`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + + SET foreign_key_checks = 1; diff --git a/setup/db/db/schema-2214to30.sql b/setup/db/db/schema-2214to30.sql index 5c8b22a68ac..6ea206197c5 100755 --- a/setup/db/db/schema-2214to30.sql +++ b/setup/db/db/schema-2214to30.sql @@ -485,3 +485,14 @@ ALTER TABLE `cloud`.`networks` ADD COLUMN `restart_required` int(1) unsigned NOT DELETE FROM `cloud`.`configuration` where name='cmd.wait'; UPDATE `cloud`.`configuration` set value='true' where name='firewall.rule.ui.enabled'; +CREATE TABLE `cloud`.`op_user_stats_log` ( + `user_stats_id` bigint unsigned NOT NULL, + `net_bytes_received` bigint unsigned NOT NULL default '0', + `net_bytes_sent` bigint unsigned NOT NULL default '0', + `current_bytes_received` bigint unsigned NOT NULL default '0', + `current_bytes_sent` bigint unsigned NOT NULL default '0', + `agg_bytes_received` bigint unsigned NOT NULL default '0', + `agg_bytes_sent` bigint unsigned NOT NULL default '0', + `updated` datetime COMMENT 'stats update timestamp', + UNIQUE KEY (`user_stats_id`, `updated`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8;