diff --git a/usage/.classpath b/usage/.classpath
new file mode 100644
index 00000000000..cfb92788e37
--- /dev/null
+++ b/usage/.classpath
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/usage/.project b/usage/.project
new file mode 100644
index 00000000000..9870136a749
--- /dev/null
+++ b/usage/.project
@@ -0,0 +1,17 @@
+
+
+ usage
+
+
+
+
+
+ org.eclipse.jdt.core.javabuilder
+
+
+
+
+
+ org.eclipse.jdt.core.javanature
+
+
diff --git a/usage/conf/log4j-cloud_usage.xml.in b/usage/conf/log4j-cloud_usage.xml.in
new file mode 100644
index 00000000000..02f5cf588a0
--- /dev/null
+++ b/usage/conf/log4j-cloud_usage.xml.in
@@ -0,0 +1,68 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/usage/conf/usage-components.xml.in b/usage/conf/usage-components.xml.in
new file mode 100644
index 00000000000..bda902fe88f
--- /dev/null
+++ b/usage/conf/usage-components.xml.in
@@ -0,0 +1,57 @@
+
+
+
+
+
+
+
+ 50
+ -1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ DAILY
+
+
+
+
+
diff --git a/usage/distro/centos/SYSCONFDIR/rc.d/init.d/cloud-usage.in b/usage/distro/centos/SYSCONFDIR/rc.d/init.d/cloud-usage.in
new file mode 100755
index 00000000000..168a8b64b06
--- /dev/null
+++ b/usage/distro/centos/SYSCONFDIR/rc.d/init.d/cloud-usage.in
@@ -0,0 +1,82 @@
+#!/bin/bash
+
+# chkconfig: 35 99 10
+# description: CloudStack Usage Monitor
+
+# WARNING: if this script is changed, then all other initscripts MUST BE changed to match it as well
+
+. /etc/rc.d/init.d/functions
+
+whatami=cloud-usage
+
+# set environment variables
+
+SHORTNAME="$whatami"
+PIDFILE=@PIDDIR@/"$whatami".pid
+LOCKFILE=@LOCKDIR@/"$SHORTNAME"
+LOGFILE=@USAGELOG@
+PROGNAME="CloudStack Usage Monitor"
+USER=@MSUSER@
+
+unset OPTIONS
+[ -r @SYSCONFDIR@/sysconfig/"$SHORTNAME" ] && source @SYSCONFDIR@/sysconfig/"$SHORTNAME"
+DAEMONIZE=@BINDIR@/@PACKAGE@-daemonize
+PROG=@LIBEXECDIR@/usage-runner
+
+start() {
+ echo -n $"Starting $PROGNAME: "
+ if hostname --fqdn >/dev/null 2>&1 ; then
+ daemon --check=$SHORTNAME --pidfile=${PIDFILE} "$DAEMONIZE" \
+ -n "$SHORTNAME" -p "$PIDFILE" -l "$LOGFILE" -u "$USER" "$PROG" $OPTIONS
+ RETVAL=$?
+ echo
+ else
+ failure
+ echo
+ echo The host name does not resolve properly to an IP address. Cannot start "$PROGNAME". > /dev/stderr
+ RETVAL=9
+ fi
+ [ $RETVAL = 0 ] && touch ${LOCKFILE}
+ return $RETVAL
+}
+
+stop() {
+ echo -n $"Stopping $PROGNAME: "
+ killproc -p ${PIDFILE} $SHORTNAME # -d 10 $SHORTNAME
+ RETVAL=$?
+ echo
+ [ $RETVAL = 0 ] && rm -f ${LOCKFILE} ${PIDFILE}
+}
+
+
+# See how we were called.
+case "$1" in
+ start)
+ start
+ ;;
+ stop)
+ stop
+ ;;
+ status)
+ status -p ${PIDFILE} $SHORTNAME
+ RETVAL=$?
+ ;;
+ restart)
+ stop
+ sleep 3
+ start
+ ;;
+ condrestart)
+ if status -p ${PIDFILE} $SHORTNAME >&/dev/null; then
+ stop
+ sleep 3
+ start
+ fi
+ ;;
+ *)
+ echo $"Usage: $whatami {start|stop|restart|condrestart|status|help}"
+ RETVAL=3
+esac
+
+exit $RETVAL
+
diff --git a/usage/distro/fedora/SYSCONFDIR/rc.d/init.d/cloud-usage.in b/usage/distro/fedora/SYSCONFDIR/rc.d/init.d/cloud-usage.in
new file mode 100755
index 00000000000..168a8b64b06
--- /dev/null
+++ b/usage/distro/fedora/SYSCONFDIR/rc.d/init.d/cloud-usage.in
@@ -0,0 +1,82 @@
+#!/bin/bash
+
+# chkconfig: 35 99 10
+# description: CloudStack Usage Monitor
+
+# WARNING: if this script is changed, then all other initscripts MUST BE changed to match it as well
+
+. /etc/rc.d/init.d/functions
+
+whatami=cloud-usage
+
+# set environment variables
+
+SHORTNAME="$whatami"
+PIDFILE=@PIDDIR@/"$whatami".pid
+LOCKFILE=@LOCKDIR@/"$SHORTNAME"
+LOGFILE=@USAGELOG@
+PROGNAME="CloudStack Usage Monitor"
+USER=@MSUSER@
+
+unset OPTIONS
+[ -r @SYSCONFDIR@/sysconfig/"$SHORTNAME" ] && source @SYSCONFDIR@/sysconfig/"$SHORTNAME"
+DAEMONIZE=@BINDIR@/@PACKAGE@-daemonize
+PROG=@LIBEXECDIR@/usage-runner
+
+start() {
+ echo -n $"Starting $PROGNAME: "
+ if hostname --fqdn >/dev/null 2>&1 ; then
+ daemon --check=$SHORTNAME --pidfile=${PIDFILE} "$DAEMONIZE" \
+ -n "$SHORTNAME" -p "$PIDFILE" -l "$LOGFILE" -u "$USER" "$PROG" $OPTIONS
+ RETVAL=$?
+ echo
+ else
+ failure
+ echo
+ echo The host name does not resolve properly to an IP address. Cannot start "$PROGNAME". > /dev/stderr
+ RETVAL=9
+ fi
+ [ $RETVAL = 0 ] && touch ${LOCKFILE}
+ return $RETVAL
+}
+
+stop() {
+ echo -n $"Stopping $PROGNAME: "
+ killproc -p ${PIDFILE} $SHORTNAME # -d 10 $SHORTNAME
+ RETVAL=$?
+ echo
+ [ $RETVAL = 0 ] && rm -f ${LOCKFILE} ${PIDFILE}
+}
+
+
+# See how we were called.
+case "$1" in
+ start)
+ start
+ ;;
+ stop)
+ stop
+ ;;
+ status)
+ status -p ${PIDFILE} $SHORTNAME
+ RETVAL=$?
+ ;;
+ restart)
+ stop
+ sleep 3
+ start
+ ;;
+ condrestart)
+ if status -p ${PIDFILE} $SHORTNAME >&/dev/null; then
+ stop
+ sleep 3
+ start
+ fi
+ ;;
+ *)
+ echo $"Usage: $whatami {start|stop|restart|condrestart|status|help}"
+ RETVAL=3
+esac
+
+exit $RETVAL
+
diff --git a/usage/distro/rhel/SYSCONFDIR/rc.d/init.d/cloud-usage.in b/usage/distro/rhel/SYSCONFDIR/rc.d/init.d/cloud-usage.in
new file mode 100644
index 00000000000..168a8b64b06
--- /dev/null
+++ b/usage/distro/rhel/SYSCONFDIR/rc.d/init.d/cloud-usage.in
@@ -0,0 +1,82 @@
+#!/bin/bash
+
+# chkconfig: 35 99 10
+# description: CloudStack Usage Monitor
+
+# WARNING: if this script is changed, then all other initscripts MUST BE changed to match it as well
+
+. /etc/rc.d/init.d/functions
+
+whatami=cloud-usage
+
+# set environment variables
+
+SHORTNAME="$whatami"
+PIDFILE=@PIDDIR@/"$whatami".pid
+LOCKFILE=@LOCKDIR@/"$SHORTNAME"
+LOGFILE=@USAGELOG@
+PROGNAME="CloudStack Usage Monitor"
+USER=@MSUSER@
+
+unset OPTIONS
+[ -r @SYSCONFDIR@/sysconfig/"$SHORTNAME" ] && source @SYSCONFDIR@/sysconfig/"$SHORTNAME"
+DAEMONIZE=@BINDIR@/@PACKAGE@-daemonize
+PROG=@LIBEXECDIR@/usage-runner
+
+start() {
+ echo -n $"Starting $PROGNAME: "
+ if hostname --fqdn >/dev/null 2>&1 ; then
+ daemon --check=$SHORTNAME --pidfile=${PIDFILE} "$DAEMONIZE" \
+ -n "$SHORTNAME" -p "$PIDFILE" -l "$LOGFILE" -u "$USER" "$PROG" $OPTIONS
+ RETVAL=$?
+ echo
+ else
+ failure
+ echo
+ echo The host name does not resolve properly to an IP address. Cannot start "$PROGNAME". > /dev/stderr
+ RETVAL=9
+ fi
+ [ $RETVAL = 0 ] && touch ${LOCKFILE}
+ return $RETVAL
+}
+
+stop() {
+ echo -n $"Stopping $PROGNAME: "
+ killproc -p ${PIDFILE} $SHORTNAME # -d 10 $SHORTNAME
+ RETVAL=$?
+ echo
+ [ $RETVAL = 0 ] && rm -f ${LOCKFILE} ${PIDFILE}
+}
+
+
+# See how we were called.
+case "$1" in
+ start)
+ start
+ ;;
+ stop)
+ stop
+ ;;
+ status)
+ status -p ${PIDFILE} $SHORTNAME
+ RETVAL=$?
+ ;;
+ restart)
+ stop
+ sleep 3
+ start
+ ;;
+ condrestart)
+ if status -p ${PIDFILE} $SHORTNAME >&/dev/null; then
+ stop
+ sleep 3
+ start
+ fi
+ ;;
+ *)
+ echo $"Usage: $whatami {start|stop|restart|condrestart|status|help}"
+ RETVAL=3
+esac
+
+exit $RETVAL
+
diff --git a/usage/distro/ubuntu/SYSCONFDIR/init.d/cloud-usage.in b/usage/distro/ubuntu/SYSCONFDIR/init.d/cloud-usage.in
new file mode 100755
index 00000000000..e59fc2acecb
--- /dev/null
+++ b/usage/distro/ubuntu/SYSCONFDIR/init.d/cloud-usage.in
@@ -0,0 +1,96 @@
+#!/bin/bash
+
+# chkconfig: 35 99 10
+# description: CloudStack Usage Monitor
+
+# WARNING: if this script is changed, then all other initscripts MUST BE changed to match it as well
+
+. /lib/lsb/init-functions
+. /etc/default/rcS
+
+whatami=cloud-usage
+
+# set environment variables
+
+SHORTNAME="$whatami"
+PIDFILE=@PIDDIR@/"$whatami".pid
+LOCKFILE=@LOCKDIR@/"$SHORTNAME"
+LOGFILE=@USAGELOG@
+PROGNAME="CloudStack Usage Monitor"
+USER=@MSUSER@
+
+unset OPTIONS
+[ -r @SYSCONFDIR@/default/"$SHORTNAME" ] && source @SYSCONFDIR@/default/"$SHORTNAME"
+DAEMONIZE=@BINDIR@/@PACKAGE@-daemonize
+PROG=@LIBEXECDIR@/usage-runner
+
+start() {
+ log_daemon_msg $"Starting $PROGNAME" "$SHORTNAME"
+ if [ -s "$PIDFILE" ] && kill -0 $(cat "$PIDFILE") >/dev/null 2>&1; then
+ log_progress_msg "apparently already running"
+ log_end_msg 0
+ exit 0
+ fi
+ if hostname --fqdn >/dev/null 2>&1 ; then
+ true
+ else
+ log_failure_msg "The host name does not resolve properly to an IP address. Cannot start $PROGNAME"
+ log_end_msg 1
+ exit 1
+ fi
+
+ if start-stop-daemon --start --quiet \
+ --pidfile "$PIDFILE" \
+ --exec "$DAEMONIZE" -- -n "$SHORTNAME" -p "$PIDFILE" -l "$LOGFILE" -u "$USER" "$PROG" $OPTIONS
+ RETVAL=$?
+ then
+ rc=0
+ sleep 1
+ if ! kill -0 $(cat "$PIDFILE") >/dev/null 2>&1; then
+ log_failure_msg "$PROG failed to start"
+ rc=1
+ fi
+ else
+ rc=1
+ fi
+
+ if [ $rc -eq 0 ]; then
+ log_end_msg 0
+ else
+ log_end_msg 1
+ rm -f "$PIDFILE"
+ fi
+}
+
+stop() {
+ echo -n $"Stopping $PROGNAME" "$SHORTNAME"
+ start-stop-daemon --stop --quiet --oknodo --pidfile "$PIDFILE"
+ log_end_msg $?
+ rm -f "$PIDFILE"
+}
+
+
+# See how we were called.
+case "$1" in
+ start)
+ start
+ ;;
+ stop)
+ stop
+ ;;
+ status)
+ status_of_proc -p "$PIDFILE" "$PROG" "$SHORTNAME"
+ RETVAL=$?
+ ;;
+ restart)
+ stop
+ sleep 3
+ start
+ ;;
+ *)
+ echo $"Usage: $whatami {start|stop|restart|status|help}"
+ RETVAL=3
+esac
+
+exit $RETVAL
+
diff --git a/usage/libexec/usage-runner.in b/usage/libexec/usage-runner.in
new file mode 100755
index 00000000000..3eb948383b7
--- /dev/null
+++ b/usage/libexec/usage-runner.in
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+SYSTEMJARS="@SYSTEMJARS@"
+SCP=$(build-classpath $SYSTEMJARS) ; if [ $? != 0 ] ; then SCP="@SYSTEMCLASSPATH@" ; fi
+DCP="@DEPSCLASSPATH@"
+ACP="@USAGECLASSPATH@"
+export CLASSPATH=$SCP:$DCP:$ACP:@USAGESYSCONFDIR@
+for jarfile in "@PREMIUMJAVADIR@"/* ; do
+ if [ ! -e "$jarfile" ] ; then continue ; fi
+ CLASSPATH=$jarfile:$CLASSPATH
+done
+for plugin in "@PLUGINJAVADIR@"/* ; do
+ if [ ! -e "$plugin" ] ; then continue ; fi
+ CLASSPATH=$plugin:$CLASSPATH
+done
+export CLASSPATH
+
+set -e
+echo Current directory is "$PWD"
+echo CLASSPATH to run the usage server: "$CLASSPATH"
+exec java -cp "$CLASSPATH" -Dpid=$$ -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=@USAGELOGDIR@ "$@" com.cloud.usage.UsageServer
diff --git a/usage/scripts/usageserver.sh b/usage/scripts/usageserver.sh
new file mode 100755
index 00000000000..ddc9fee5d37
--- /dev/null
+++ b/usage/scripts/usageserver.sh
@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+
+
+
+ #
+ # Copyright (C) 2011 Cloud.com, Inc. All rights reserved.
+ #
+
+
+# run the usage server
+
+PATHSEP=':'
+if [[ $OSTYPE == "cygwin" ]]; then
+ PATHSEP=';'
+ export CATALINA_HOME=`cygpath -m $CATALINA_HOME`
+fi
+
+CP=./$PATHSEP$CATALINA_HOME/webapps/client/WEB-INF/lib/vmops-server.jar
+CP=${CP}$PATHSEP$CATALINA_HOME/webapps/client/WEB-INF/lib/vmops-server-extras.jar
+CP=${CP}$PATHSEP$CATALINA_HOME/webapps/client/WEB-INF/lib/vmops-utils.jar
+CP=${CP}$PATHSEP$CATALINA_HOME/webapps/client/WEB-INF/lib/vmops-core.jar
+CP=${CP}$PATHSEP$CATALINA_HOME/webapps/client/WEB-INF/lib/vmops-usage.jar
+CP=${CP}$PATHSEP$CATALINA_HOME/conf
+
+for file in $CATALINA_HOME/lib/*.jar; do
+ CP=${CP}$PATHSEP$file
+done
+
+#echo CP is $CP
+DEBUG_OPTS=
+#DEBUG_OPTS=-Xrunjdwp:transport=dt_socket,address=$1,server=y,suspend=n
+
+java -cp $CP $DEBUG_OPTS -Dcatalina.home=${CATALINA_HOME} -Dpid=$$ com.vmops.usage.UsageServer $*
diff --git a/usage/src/com/cloud/usage/UsageAlertManagerImpl.java b/usage/src/com/cloud/usage/UsageAlertManagerImpl.java
new file mode 100644
index 00000000000..0fe8dc83e83
--- /dev/null
+++ b/usage/src/com/cloud/usage/UsageAlertManagerImpl.java
@@ -0,0 +1,260 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+*
+ *
+ */
+
+package com.cloud.usage;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Date;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.ejb.Local;
+import javax.mail.Authenticator;
+import javax.mail.MessagingException;
+import javax.mail.PasswordAuthentication;
+import javax.mail.Session;
+import javax.mail.URLName;
+import javax.mail.Message.RecipientType;
+import javax.mail.internet.InternetAddress;
+import javax.naming.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.alert.AlertManager;
+import com.cloud.alert.AlertVO;
+import com.cloud.alert.dao.AlertDao;
+import com.cloud.configuration.dao.ConfigurationDao;
+import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.component.ComponentLocator;
+import com.sun.mail.smtp.SMTPMessage;
+import com.sun.mail.smtp.SMTPSSLTransport;
+import com.sun.mail.smtp.SMTPTransport;
+
+@Local(value={AlertManager.class})
+public class UsageAlertManagerImpl implements AlertManager {
+ private static final Logger s_logger = Logger.getLogger(UsageAlertManagerImpl.class.getName());
+
+ private String _name = null;
+ private EmailAlert _emailAlert;
+ private AlertDao _alertDao;
+
+ @Override
+ public boolean configure(String name, Map params) throws ConfigurationException {
+ _name = name;
+
+ ComponentLocator locator = ComponentLocator.getCurrentLocator();
+ ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
+ if (configDao == null) {
+ s_logger.error("Unable to get the configuration dao.");
+ return false;
+ }
+
+ Map configs = configDao.getConfiguration("management-server", params);
+
+ // set up the email system for alerts
+ String emailAddressList = configs.get("alert.email.addresses");
+ String[] emailAddresses = null;
+ if (emailAddressList != null) {
+ emailAddresses = emailAddressList.split(",");
+ }
+
+ String smtpHost = configs.get("alert.smtp.host");
+ int smtpPort = NumbersUtil.parseInt(configs.get("alert.smtp.port"), 25);
+ String useAuthStr = configs.get("alert.smtp.useAuth");
+ boolean useAuth = ((useAuthStr == null) ? false : Boolean.parseBoolean(useAuthStr));
+ String smtpUsername = configs.get("alert.smtp.username");
+ String smtpPassword = configs.get("alert.smtp.password");
+ String emailSender = configs.get("alert.email.sender");
+ String smtpDebugStr = configs.get("alert.smtp.debug");
+ boolean smtpDebug = false;
+ if (smtpDebugStr != null) {
+ smtpDebug = Boolean.parseBoolean(smtpDebugStr);
+ }
+
+ _emailAlert = new EmailAlert(emailAddresses, smtpHost, smtpPort, useAuth, smtpUsername, smtpPassword, emailSender, smtpDebug);
+
+ _alertDao = locator.getDao(AlertDao.class);
+ if (_alertDao == null) {
+ s_logger.error("Unable to get the alert dao.");
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public String getName() {
+ return _name;
+ }
+
+ @Override
+ public boolean start() {
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ return true;
+ }
+
+ @Override
+ public void clearAlert(short alertType, long dataCenterId, long podId) {
+ try {
+ if (_emailAlert != null) {
+ _emailAlert.clearAlert(alertType, dataCenterId, podId);
+ }
+ } catch (Exception ex) {
+ s_logger.error("Problem clearing email alert", ex);
+ }
+ }
+
+ @Override
+ public void sendAlert(short alertType, long dataCenterId, Long podId, String subject, String body) {
+ // TODO: queue up these messages and send them as one set of issues once a certain number of issues is reached? If that's the case,
+ // shouldn't we have a type/severity as part of the API so that severe errors get sent right away?
+ try {
+ if (_emailAlert != null) {
+ _emailAlert.sendAlert(alertType, dataCenterId, podId, subject, body);
+ }
+ } catch (Exception ex) {
+ s_logger.error("Problem sending email alert", ex);
+ }
+ }
+
+
+ class EmailAlert {
+ private Session _smtpSession;
+ private InternetAddress[] _recipientList;
+ private final String _smtpHost;
+ private int _smtpPort = -1;
+ private boolean _smtpUseAuth = false;
+ private final String _smtpUsername;
+ private final String _smtpPassword;
+ private final String _emailSender;
+
+ public EmailAlert(String[] recipientList, String smtpHost, int smtpPort, boolean smtpUseAuth, final String smtpUsername, final String smtpPassword, String emailSender, boolean smtpDebug) {
+ if (recipientList != null) {
+ _recipientList = new InternetAddress[recipientList.length];
+ for (int i = 0; i < recipientList.length; i++) {
+ try {
+ _recipientList[i] = new InternetAddress(recipientList[i], recipientList[i]);
+ } catch (Exception ex) {
+ s_logger.error("Exception creating address for: " + recipientList[i], ex);
+ }
+ }
+ }
+
+ _smtpHost = smtpHost;
+ _smtpPort = smtpPort;
+ _smtpUseAuth = smtpUseAuth;
+ _smtpUsername = smtpUsername;
+ _smtpPassword = smtpPassword;
+ _emailSender = emailSender;
+
+ if (_smtpHost != null) {
+ Properties smtpProps = new Properties();
+ smtpProps.put("mail.smtp.host", smtpHost);
+ smtpProps.put("mail.smtp.port", smtpPort);
+ smtpProps.put("mail.smtp.auth", ""+smtpUseAuth);
+ if (smtpUsername != null) {
+ smtpProps.put("mail.smtp.user", smtpUsername);
+ }
+
+ smtpProps.put("mail.smtps.host", smtpHost);
+ smtpProps.put("mail.smtps.port", smtpPort);
+ smtpProps.put("mail.smtps.auth", ""+smtpUseAuth);
+ if (smtpUsername != null) {
+ smtpProps.put("mail.smtps.user", smtpUsername);
+ }
+
+ if ((smtpUsername != null) && (smtpPassword != null)) {
+ _smtpSession = Session.getInstance(smtpProps, new Authenticator() {
+ @Override
+ protected PasswordAuthentication getPasswordAuthentication() {
+ return new PasswordAuthentication(smtpUsername, smtpPassword);
+ }
+ });
+ } else {
+ _smtpSession = Session.getInstance(smtpProps);
+ }
+ _smtpSession.setDebug(smtpDebug);
+ } else {
+ _smtpSession = null;
+ }
+ }
+
+ // TODO: make sure this handles SSL transport (useAuth is true) and regular
+ public void sendAlert(short alertType, long dataCenterId, Long podId, String subject, String content) throws MessagingException, UnsupportedEncodingException {
+ AlertVO alert = null;
+
+ if ((alertType != AlertManager.ALERT_TYPE_HOST) &&
+ (alertType != AlertManager.ALERT_TYPE_USERVM) &&
+ (alertType != AlertManager.ALERT_TYPE_DOMAIN_ROUTER) &&
+ (alertType != AlertManager.ALERT_TYPE_CONSOLE_PROXY) &&
+ (alertType != AlertManager.ALERT_TYPE_STORAGE_MISC) &&
+ (alertType != AlertManager.ALERT_TYPE_MANAGMENT_NODE)) {
+ alert = _alertDao.getLastAlert(alertType, dataCenterId, podId);
+ }
+
+ if (alert == null) {
+ // set up a new alert
+ AlertVO newAlert = new AlertVO();
+ newAlert.setType(alertType);
+ newAlert.setSubject(subject);
+ newAlert.setPodId(podId);
+ newAlert.setDataCenterId(dataCenterId);
+ newAlert.setSentCount(1); // initialize sent count to 1 since we are now sending an alert
+ newAlert.setLastSent(new Date());
+ _alertDao.persist(newAlert);
+ } else {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Have already sent: " + alert.getSentCount() + " emails for alert type '" + alertType + "' -- skipping send email");
+ }
+ return;
+ }
+
+ if (_smtpSession != null) {
+ SMTPMessage msg = new SMTPMessage(_smtpSession);
+ msg.setSender(new InternetAddress(_emailSender, _emailSender));
+ msg.setFrom(new InternetAddress(_emailSender, _emailSender));
+ for (InternetAddress address : _recipientList) {
+ msg.addRecipient(RecipientType.TO, address);
+ }
+ msg.setSubject(subject);
+ msg.setSentDate(new Date());
+ msg.setContent(content, "text/plain");
+ msg.saveChanges();
+
+ SMTPTransport smtpTrans = null;
+ if (_smtpUseAuth) {
+ smtpTrans = new SMTPSSLTransport(_smtpSession, new URLName("smtp", _smtpHost, _smtpPort, null, _smtpUsername, _smtpPassword));
+ } else {
+ smtpTrans = new SMTPTransport(_smtpSession, new URLName("smtp", _smtpHost, _smtpPort, null, _smtpUsername, _smtpPassword));
+ }
+ smtpTrans.connect();
+ smtpTrans.sendMessage(msg, msg.getAllRecipients());
+ smtpTrans.close();
+ }
+ }
+
+ public void clearAlert(short alertType, long dataCenterId, Long podId) {
+ if (alertType != -1) {
+ AlertVO alert = _alertDao.getLastAlert(alertType, dataCenterId, podId);
+ if (alert != null) {
+ AlertVO updatedAlert = _alertDao.createForUpdate();
+ updatedAlert.setResolved(new Date());
+ _alertDao.update(alert.getId(), updatedAlert);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void recalculateCapacity() {
+ // TODO Auto-generated method stub
+
+ }
+}
diff --git a/usage/src/com/cloud/usage/UsageManager.java b/usage/src/com/cloud/usage/UsageManager.java
new file mode 100644
index 00000000000..42b583975ef
--- /dev/null
+++ b/usage/src/com/cloud/usage/UsageManager.java
@@ -0,0 +1,15 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+*
+ *
+ */
+
+package com.cloud.usage;
+
+import com.cloud.usage.UsageJobVO;
+import com.cloud.utils.component.Manager;
+
+public interface UsageManager extends Manager {
+ public void scheduleParse();
+ public void parse(UsageJobVO job, long startDateMillis, long endDateMillis);
+}
diff --git a/usage/src/com/cloud/usage/UsageManagerImpl.java b/usage/src/com/cloud/usage/UsageManagerImpl.java
new file mode 100644
index 00000000000..dc50596cd87
--- /dev/null
+++ b/usage/src/com/cloud/usage/UsageManagerImpl.java
@@ -0,0 +1,1409 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+*
+ *
+ */
+
+package com.cloud.usage;
+
+import java.net.InetAddress;
+import java.sql.SQLException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.ejb.Local;
+import javax.naming.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.alert.AlertManager;
+import com.cloud.configuration.dao.ConfigurationDao;
+import com.cloud.event.EventTypes;
+import com.cloud.event.UsageEventVO;
+import com.cloud.event.dao.UsageEventDao;
+import com.cloud.usage.dao.UsageDao;
+import com.cloud.usage.dao.UsageIPAddressDao;
+import com.cloud.usage.dao.UsageJobDao;
+import com.cloud.usage.dao.UsageLoadBalancerPolicyDao;
+import com.cloud.usage.dao.UsageNetworkDao;
+import com.cloud.usage.dao.UsageNetworkOfferingDao;
+import com.cloud.usage.dao.UsagePortForwardingRuleDao;
+import com.cloud.usage.dao.UsageStorageDao;
+import com.cloud.usage.dao.UsageVMInstanceDao;
+import com.cloud.usage.dao.UsageVolumeDao;
+import com.cloud.usage.parser.IPAddressUsageParser;
+import com.cloud.usage.parser.LoadBalancerUsageParser;
+import com.cloud.usage.parser.NetworkOfferingUsageParser;
+import com.cloud.usage.parser.NetworkUsageParser;
+import com.cloud.usage.parser.PortForwardingUsageParser;
+import com.cloud.usage.parser.StorageUsageParser;
+import com.cloud.usage.parser.VMInstanceUsageParser;
+import com.cloud.usage.parser.VolumeUsageParser;
+import com.cloud.user.Account;
+import com.cloud.user.AccountVO;
+import com.cloud.user.UserStatisticsVO;
+import com.cloud.user.dao.AccountDao;
+import com.cloud.user.dao.UserStatisticsDao;
+import com.cloud.utils.component.ComponentLocator;
+import com.cloud.utils.component.Inject;
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+@Local(value={UsageManager.class})
+public class UsageManagerImpl implements UsageManager, Runnable {
+ public static final Logger s_logger = Logger.getLogger(UsageManagerImpl.class.getName());
+
+ protected static final String DAILY = "DAILY";
+ protected static final String WEEKLY = "WEEKLY";
+ protected static final String MONTHLY = "MONTHLY";
+
+ private static final int HOURLY_TIME = 60;
+ private static final int DAILY_TIME = 60 * 24;
+ private static final int THREE_DAYS_IN_MINUTES = 60 * 24 * 3;
+ private static final int USAGE_AGGREGATION_RANGE_MIN = 10;
+
+ private final ComponentLocator _locator = ComponentLocator.getLocator(UsageServer.Name, "usage-components.xml", "log4j-cloud_usage");
+ private final AccountDao m_accountDao = _locator.getDao(AccountDao.class);
+ private final UserStatisticsDao m_userStatsDao = _locator.getDao(UserStatisticsDao.class);
+ private final UsageDao m_usageDao = _locator.getDao(UsageDao.class);
+ private final UsageVMInstanceDao m_usageInstanceDao = _locator.getDao(UsageVMInstanceDao.class);
+ private final UsageIPAddressDao m_usageIPAddressDao = _locator.getDao(UsageIPAddressDao.class);
+ private final UsageNetworkDao m_usageNetworkDao = _locator.getDao(UsageNetworkDao.class);
+ private final UsageVolumeDao m_usageVolumeDao = _locator.getDao(UsageVolumeDao.class);
+ private final UsageStorageDao m_usageStorageDao = _locator.getDao(UsageStorageDao.class);
+ private final UsageLoadBalancerPolicyDao m_usageLoadBalancerPolicyDao = _locator.getDao(UsageLoadBalancerPolicyDao.class);
+ private final UsagePortForwardingRuleDao m_usagePortForwardingRuleDao = _locator.getDao(UsagePortForwardingRuleDao.class);
+ private final UsageNetworkOfferingDao m_usageNetworkOfferingDao = _locator.getDao(UsageNetworkOfferingDao.class);
+ private final UsageJobDao m_usageJobDao = _locator.getDao(UsageJobDao.class);
+ @Inject protected AlertManager _alertMgr;
+ @Inject protected UsageEventDao _usageEventDao;
+
+ private String m_version = null;
+ private String m_name = null;
+ private final Calendar m_jobExecTime = Calendar.getInstance();
+ private int m_aggregationDuration = 0;
+ private int m_sanityCheckInterval = 0;
+ String m_hostname = null;
+ int m_pid = 0;
+ TimeZone m_usageTimezone = null;
+ private final GlobalLock m_heartbeatLock = GlobalLock.getInternLock("usage.job.heartbeat.check");
+
+ private final ScheduledExecutorService m_executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Job"));
+ private final ScheduledExecutorService m_heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-HB"));
+ private final ScheduledExecutorService m_sanityExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Sanity"));
+ private Future m_scheduledFuture = null;
+ private Future m_heartbeat = null;
+ private Future m_sanity = null;
+
+ protected UsageManagerImpl() {
+ }
+
+ private void mergeConfigs(Map dbParams, Map xmlParams) {
+ for (Map.Entry param : xmlParams.entrySet()) {
+ dbParams.put(param.getKey(), (String)param.getValue());
+ }
+ }
+
+ public boolean configure(String name, Map params) throws ConfigurationException {
+ final String run = "usage.vmops.pid";
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Checking to see if " + run + " exists.");
+ }
+
+ final Class> c = UsageServer.class;
+ m_version = c.getPackage().getImplementationVersion();
+ if (m_version == null) {
+ throw new CloudRuntimeException("Unable to find the implementation version of this usage server");
+ }
+
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("Implementation Version is " + m_version);
+ }
+
+ m_name = name;
+
+ ComponentLocator locator = ComponentLocator.getCurrentLocator();
+ ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
+ if (configDao == null) {
+ s_logger.error("Unable to get the configuration dao.");
+ return false;
+ }
+
+ Map configs = configDao.getConfiguration(params);
+
+ if (params != null) {
+ mergeConfigs(configs, params);
+ }
+
+ String execTime = configs.get("usage.stats.job.exec.time");
+ String aggregationRange = configs.get("usage.stats.job.aggregation.range");
+ String execTimeZone = configs.get("usage.execution.timezone");
+ String sanityCheckInterval = configs.get("usage.sanity.check.interval");
+ if(sanityCheckInterval != null){
+ m_sanityCheckInterval = Integer.parseInt(sanityCheckInterval);
+ }
+
+ m_usageTimezone = TimeZone.getTimeZone("GMT");
+
+ try {
+ if ((execTime == null) || (aggregationRange == null)) {
+ s_logger.error("missing configuration values for usage job, usage.stats.job.exec.time = " + execTime + ", usage.stats.job.aggregation.range = " + aggregationRange);
+ throw new ConfigurationException("Missing configuration values for usage job, usage.stats.job.exec.time = " + execTime + ", usage.stats.job.aggregation.range = " + aggregationRange);
+ }
+ String[] execTimeSegments = execTime.split(":");
+ if (execTimeSegments.length != 2) {
+ s_logger.error("Unable to parse usage.stats.job.exec.time");
+ throw new ConfigurationException("Unable to parse usage.stats.job.exec.time '" + execTime + "'");
+ }
+ int hourOfDay = Integer.parseInt(execTimeSegments[0]);
+ int minutes = Integer.parseInt(execTimeSegments[1]);
+ m_jobExecTime.setTime(new Date());
+
+ m_jobExecTime.set(Calendar.HOUR_OF_DAY, hourOfDay);
+ m_jobExecTime.set(Calendar.MINUTE, minutes);
+ m_jobExecTime.set(Calendar.SECOND, 0);
+ m_jobExecTime.set(Calendar.MILLISECOND, 0);
+ if(execTimeZone != null){
+ m_jobExecTime.setTimeZone(TimeZone.getTimeZone(execTimeZone));
+ }
+
+ // if the hour to execute the job has already passed, roll the day forward to the next day
+ Date execDate = m_jobExecTime.getTime();
+ if (execDate.before(new Date())) {
+ m_jobExecTime.roll(Calendar.DAY_OF_YEAR, true);
+ }
+
+ s_logger.debug("Execution Time: "+execDate.toString());
+ Date currentDate = new Date(System.currentTimeMillis());
+ s_logger.debug("Current Time: "+currentDate.toString());
+
+ m_aggregationDuration = Integer.parseInt(aggregationRange);
+ if (m_aggregationDuration < USAGE_AGGREGATION_RANGE_MIN) {
+ s_logger.warn("Usage stats job aggregation range is to small, using the minimum value of " + USAGE_AGGREGATION_RANGE_MIN);
+ m_aggregationDuration = USAGE_AGGREGATION_RANGE_MIN;
+ }
+ m_hostname = InetAddress.getLocalHost().getHostName() + "/" + InetAddress.getLocalHost().getHostAddress();
+ } catch (NumberFormatException ex) {
+ throw new ConfigurationException("Unable to parse usage.stats.job.exec.time '" + execTime + "' or usage.stats.job.aggregation.range '" + aggregationRange + "', please check configuration values");
+ } catch (Exception e) {
+ s_logger.error("Unhandled exception configuring UsageManger", e);
+ throw new ConfigurationException("Unhandled exception configuring UsageManager " + e.toString());
+ }
+ m_pid = Integer.parseInt(System.getProperty("pid"));
+ return true;
+ }
+
+ public String getName() {
+ return m_name;
+ }
+
+ public boolean start() {
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("Starting Usage Manager");
+ }
+
+ // use the configured exec time and aggregation duration for scheduling the job
+ m_scheduledFuture = m_executor.scheduleAtFixedRate(this, m_jobExecTime.getTimeInMillis() - System.currentTimeMillis(), m_aggregationDuration * 60 * 1000, TimeUnit.MILLISECONDS);
+
+ m_heartbeat = m_heartbeatExecutor.scheduleAtFixedRate(new Heartbeat(), /* start in 15 seconds...*/15*1000, /* check database every minute*/60*1000, TimeUnit.MILLISECONDS);
+
+ if(m_sanityCheckInterval > 0){
+ m_sanity = m_sanityExecutor.scheduleAtFixedRate(new SanityCheck(), 1, m_sanityCheckInterval, TimeUnit.DAYS);
+ }
+
+ Transaction usageTxn = Transaction.open(Transaction.USAGE_DB);
+ try {
+ if(m_heartbeatLock.lock(3)) { // 3 second timeout
+ try {
+ UsageJobVO job = m_usageJobDao.getLastJob();
+ if (job == null) {
+ m_usageJobDao.createNewJob(m_hostname, m_pid, UsageJobVO.JOB_TYPE_RECURRING);
+ }
+ } finally {
+ m_heartbeatLock.unlock();
+ }
+ } else {
+ if(s_logger.isTraceEnabled())
+ s_logger.trace("Heartbeat lock is in use by others, returning true as someone else will take over the job if required");
+ }
+ } finally {
+ usageTxn.close();
+ }
+
+ return true;
+ }
+
+ public boolean stop() {
+ m_heartbeat.cancel(true);
+ m_scheduledFuture.cancel(true);
+ m_sanity.cancel(true);
+ return true;
+ }
+
+ public void run() {
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("starting usage job...");
+ }
+
+ // how about we update the job exec time when the job starts???
+ long execTime = m_jobExecTime.getTimeInMillis();
+ long now = System.currentTimeMillis() + 2000; // 2 second buffer since jobs can run a little early (though usually just by milliseconds)
+
+ if (execTime < now) {
+ // if exec time is in the past, calculate the next time the job will execute...if this is a one-off job that is a result
+ // of scheduleParse() then don't update the next exec time...
+ m_jobExecTime.add(Calendar.MINUTE, m_aggregationDuration);
+ }
+
+ UsageJobVO job = m_usageJobDao.isOwner(m_hostname, m_pid);
+ if (job != null) {
+ // FIXME: we really need to do a better job of not missing any events...so we should some how
+ // keep track of the last time usage was run, then go from there...
+ // For executing the job, we treat hourly and daily as special time ranges, using the previous full hour or the previous
+ // full day. Otherwise we just subtract off the aggregation range from the current time and use that as start date with
+ // current time as end date.
+ Calendar cal = Calendar.getInstance(m_usageTimezone);
+ cal.setTime(new Date());
+ long startDate = 0;
+ long endDate = 0;
+ if (m_aggregationDuration == DAILY_TIME) {
+ cal.roll(Calendar.DAY_OF_YEAR, false);
+ cal.set(Calendar.HOUR_OF_DAY, 0);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ startDate = cal.getTime().getTime();
+
+ cal.roll(Calendar.DAY_OF_YEAR, true);
+ cal.add(Calendar.MILLISECOND, -1);
+ endDate = cal.getTime().getTime();
+ } else if (m_aggregationDuration == HOURLY_TIME) {
+ cal.roll(Calendar.HOUR_OF_DAY, false);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ startDate = cal.getTime().getTime();
+
+ cal.roll(Calendar.HOUR_OF_DAY, true);
+ cal.add(Calendar.MILLISECOND, -1);
+ endDate = cal.getTime().getTime();
+ } else {
+ endDate = cal.getTime().getTime(); // current time
+ cal.add(Calendar.MINUTE, -1*m_aggregationDuration);
+ startDate = cal.getTime().getTime();
+ }
+
+ parse(job, startDate, endDate);
+ } else {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Not owner of usage job, skipping...");
+ }
+ }
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("usage job complete");
+ }
+ }
+
+ public void scheduleParse() {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Scheduling Usage job...");
+ }
+ m_executor.schedule(this, 0, TimeUnit.MILLISECONDS);
+ }
+
+ public void parse(UsageJobVO job, long startDateMillis, long endDateMillis) {
+ // TODO: Shouldn't we also allow parsing by the type of usage?
+
+ boolean success = false;
+ long timeStart = System.currentTimeMillis();
+ long deleteOldStatsTimeMillis = 0L;
+ try {
+ if ((endDateMillis == 0) || (endDateMillis > timeStart)) {
+ endDateMillis = timeStart;
+ }
+
+ long lastSuccess = m_usageJobDao.getLastJobSuccessDateMillis();
+ if (lastSuccess != 0) {
+ startDateMillis = lastSuccess+1; // 1 millisecond after
+ }
+
+ if (startDateMillis >= endDateMillis) {
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("not parsing usage records since start time mills (" + startDateMillis + ") is on or after end time millis (" + endDateMillis + ")");
+ }
+
+ Transaction jobUpdateTxn = Transaction.open(Transaction.USAGE_DB);
+ try {
+ jobUpdateTxn.start();
+ // everything seemed to work...set endDate as the last success date
+ m_usageJobDao.updateJobSuccess(job.getId(), startDateMillis, endDateMillis, System.currentTimeMillis() - timeStart, success);
+
+ // create a new job if this is a recurring job
+ if (job.getJobType() == UsageJobVO.JOB_TYPE_RECURRING) {
+ m_usageJobDao.createNewJob(m_hostname, m_pid, UsageJobVO.JOB_TYPE_RECURRING);
+ }
+ jobUpdateTxn.commit();
+ } finally {
+ jobUpdateTxn.close();
+ }
+
+ return;
+ }
+ deleteOldStatsTimeMillis = startDateMillis;
+
+ Date startDate = new Date(startDateMillis);
+ Date endDate = new Date(endDateMillis);
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("Parsing usage records between " + startDate + " and " + endDate);
+ }
+
+ List accounts = null;
+ List userStats = null;
+ Map networkStats = null;
+ Transaction userTxn = Transaction.open(Transaction.CLOUD_DB);
+ try {
+ Long limit = Long.valueOf(500);
+ Long offset = Long.valueOf(0);
+ Long lastAccountId = m_usageDao.getLastAccountId();
+ if (lastAccountId == null) {
+ lastAccountId = Long.valueOf(0);
+ }
+
+ do {
+ Filter filter = new Filter(AccountVO.class, "id", true, offset, limit);
+
+ accounts = m_accountDao.findActiveAccounts(lastAccountId, filter);
+
+ if ((accounts != null) && !accounts.isEmpty()) {
+ // now update the accounts in the cloud_usage db
+ m_usageDao.updateAccounts(accounts);
+ }
+ offset = new Long(offset.longValue() + limit.longValue());
+ } while ((accounts != null) && !accounts.isEmpty());
+
+ // reset offset
+ offset = Long.valueOf(0);
+
+ do {
+ Filter filter = new Filter(AccountVO.class, "id", true, offset, limit);
+
+ accounts = m_accountDao.findRecentlyDeletedAccounts(lastAccountId, startDate, filter);
+
+ if ((accounts != null) && !accounts.isEmpty()) {
+ // now update the accounts in the cloud_usage db
+ m_usageDao.updateAccounts(accounts);
+ }
+ offset = new Long(offset.longValue() + limit.longValue());
+ } while ((accounts != null) && !accounts.isEmpty());
+
+ // reset offset
+ offset = Long.valueOf(0);
+
+ do {
+ Filter filter = new Filter(AccountVO.class, "id", true, offset, limit);
+
+ accounts = m_accountDao.findNewAccounts(lastAccountId, filter);
+
+ if ((accounts != null) && !accounts.isEmpty()) {
+ // now copy the accounts to cloud_usage db
+ m_usageDao.saveAccounts(accounts);
+ }
+ offset = new Long(offset.longValue() + limit.longValue());
+ } while ((accounts != null) && !accounts.isEmpty());
+
+ // reset offset
+ offset = Long.valueOf(0);
+
+ // get all the user stats to create usage records for the network usage
+ Long lastUserStatsId = m_usageDao.getLastUserStatsId();
+ if (lastUserStatsId == null) {
+ lastUserStatsId = Long.valueOf(0);
+ }
+
+ SearchCriteria sc2 = m_userStatsDao.createSearchCriteria();
+ sc2.addAnd("id", SearchCriteria.Op.LTEQ, lastUserStatsId);
+ do {
+ Filter filter = new Filter(UserStatisticsVO.class, "id", true, offset, limit);
+
+ userStats = m_userStatsDao.search(sc2, filter);
+
+ if ((userStats != null) && !userStats.isEmpty()) {
+ // now copy the accounts to cloud_usage db
+ m_usageDao.updateUserStats(userStats);
+ }
+ offset = new Long(offset.longValue() + limit.longValue());
+ } while ((userStats != null) && !userStats.isEmpty());
+
+ // reset offset
+ offset = Long.valueOf(0);
+
+ sc2 = m_userStatsDao.createSearchCriteria();
+ sc2.addAnd("id", SearchCriteria.Op.GT, lastUserStatsId);
+ do {
+ Filter filter = new Filter(UserStatisticsVO.class, "id", true, offset, limit);
+
+ userStats = m_userStatsDao.search(sc2, filter);
+
+ if ((userStats != null) && !userStats.isEmpty()) {
+ // now copy the accounts to cloud_usage db
+ m_usageDao.saveUserStats(userStats);
+ }
+ offset = new Long(offset.longValue() + limit.longValue());
+ } while ((userStats != null) && !userStats.isEmpty());
+ } finally {
+ userTxn.close();
+ }
+
+ // TODO: Fetch a maximum number of events and process them before moving on to the next range of events
+
+ // - get a list of the latest events
+ // - insert the latest events into the usage.events table
+ List events = _usageEventDao.getRecentEvents(new Date(endDateMillis));
+
+
+ Transaction usageTxn = Transaction.open(Transaction.USAGE_DB);
+ try {
+ usageTxn.start();
+
+ // make sure start date is before all of our un-processed events (the events are ordered oldest
+ // to newest, so just test against the first event)
+ if ((events != null) && (events.size() > 0)) {
+ Date oldestEventDate = events.get(0).getCreateDate();
+ if (oldestEventDate.getTime() < startDateMillis) {
+ startDateMillis = oldestEventDate.getTime();
+ startDate = new Date(startDateMillis);
+ }
+
+ // - loop over the list of events and create entries in the helper tables
+ // - create the usage records using the parse methods below
+ for (UsageEventVO event : events) {
+ event.setProcessed(true);
+ _usageEventDao.update(event.getId(), event);
+ createHelperRecord(event);
+ }
+ }
+
+ // TODO: Fetch a maximum number of user stats and process them before moving on to the next range of user stats
+
+ // get user stats in order to compute network usage
+ networkStats = m_usageNetworkDao.getRecentNetworkStats();
+
+ Calendar recentlyDeletedCal = Calendar.getInstance(m_usageTimezone);
+ recentlyDeletedCal.setTimeInMillis(startDateMillis);
+ recentlyDeletedCal.add(Calendar.MINUTE, -1*THREE_DAYS_IN_MINUTES);
+ Date recentlyDeletedDate = recentlyDeletedCal.getTime();
+
+ // Keep track of user stats for an account, across all of its public IPs
+ Map aggregatedStats = new HashMap();
+ int startIndex = 0;
+ do {
+ userStats = m_userStatsDao.listActiveAndRecentlyDeleted(recentlyDeletedDate, startIndex, 500);
+
+ if (userStats != null) {
+ for (UserStatisticsVO userStat : userStats) {
+ if(userStat.getDeviceId() != null){
+ String hostKey = userStat.getDataCenterId() + "-" + userStat.getAccountId()+"-Host-" + userStat.getDeviceId();
+ UserStatisticsVO hostAggregatedStat = aggregatedStats.get(hostKey);
+ if (hostAggregatedStat == null) {
+ hostAggregatedStat = new UserStatisticsVO(userStat.getAccountId(), userStat.getDataCenterId(), userStat.getPublicIpAddress(),
+ userStat.getDeviceId(), userStat.getDeviceType(), userStat.getNetworkId());
+ }
+
+ hostAggregatedStat.setNetBytesSent(hostAggregatedStat.getNetBytesSent() + userStat.getNetBytesSent());
+ hostAggregatedStat.setNetBytesReceived(hostAggregatedStat.getNetBytesReceived() + userStat.getNetBytesReceived());
+ hostAggregatedStat.setCurrentBytesSent(hostAggregatedStat.getCurrentBytesSent() + userStat.getCurrentBytesSent());
+ hostAggregatedStat.setCurrentBytesReceived(hostAggregatedStat.getCurrentBytesReceived() + userStat.getCurrentBytesReceived());
+ aggregatedStats.put(hostKey, hostAggregatedStat);
+ }
+ }
+ }
+ startIndex += 500;
+ } while ((userStats != null) && !userStats.isEmpty());
+
+ // loop over the user stats, create delta entries in the usage_network helper table
+ int numAcctsProcessed = 0;
+ for (String key : aggregatedStats.keySet()) {
+ UsageNetworkVO currentNetworkStats = null;
+ if (networkStats != null) {
+ currentNetworkStats = networkStats.get(key);
+ }
+
+ createNetworkHelperEntry(aggregatedStats.get(key), currentNetworkStats, endDateMillis);
+ numAcctsProcessed++;
+ }
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("created network stats helper entries for " + numAcctsProcessed + " accts");
+ }
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("deleting old network stats helper entries older than " + deleteOldStatsTimeMillis);
+ }
+ m_usageNetworkDao.deleteOldStats(deleteOldStatsTimeMillis);
+
+ // commit the helper records, then start a new transaction
+ usageTxn.commit();
+ usageTxn.start();
+
+ boolean parsed = false;
+ numAcctsProcessed = 0;
+
+ Date currentStartDate = startDate;
+ Date currentEndDate = endDate;
+ Date tempDate = endDate;
+
+ Calendar aggregateCal = Calendar.getInstance(m_usageTimezone);
+
+ while ((tempDate.after(startDate)) && ((tempDate.getTime() - startDate.getTime()) > 60000)){
+ currentEndDate = tempDate;
+ aggregateCal.setTime(tempDate);
+ aggregateCal.add(Calendar.MINUTE, -m_aggregationDuration);
+ tempDate = aggregateCal.getTime();
+ }
+
+ while (!currentEndDate.after(endDate) || (currentEndDate.getTime() -endDate.getTime() < 60000)){
+ Long offset = Long.valueOf(0);
+ Long limit = Long.valueOf(500);
+
+ do {
+ Filter filter = new Filter(AccountVO.class, "id", true, offset, limit);
+ accounts = m_accountDao.listAll(filter);
+ if ((accounts != null) && !accounts.isEmpty()) {
+ for (AccountVO account : accounts) {
+ parsed = parseHelperTables(account, currentStartDate, currentEndDate);
+ numAcctsProcessed++;
+ }
+ }
+ offset = new Long(offset.longValue() + limit.longValue());
+ } while ((accounts != null) && !accounts.isEmpty());
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("processed VM/Network Usage for " + numAcctsProcessed + " ACTIVE accts");
+ }
+ numAcctsProcessed = 0;
+
+ // reset offset
+ offset = Long.valueOf(0);
+
+ do {
+ Filter filter = new Filter(AccountVO.class, "id", true, offset, limit);
+
+ accounts = m_accountDao.findRecentlyDeletedAccounts(null, recentlyDeletedDate, filter);
+
+ if ((accounts != null) && !accounts.isEmpty()) {
+ for (AccountVO account : accounts) {
+ parsed = parseHelperTables(account, currentStartDate, currentEndDate);
+ List publicTemplates = m_usageDao.listPublicTemplatesByAccount(account.getId());
+ for(Long templateId : publicTemplates){
+ //mark public templates owned by deleted accounts as deleted
+ List storageVOs = m_usageStorageDao.listById(account.getId(), templateId, StorageTypes.TEMPLATE);
+ if (storageVOs.size() > 1) {
+ s_logger.warn("More that one usage entry for storage: " + templateId + " assigned to account: " + account.getId() + "; marking them all as deleted...");
+ }
+ for (UsageStorageVO storageVO : storageVOs) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("deleting template: " + storageVO.getId() + " from account: " + storageVO.getAccountId());
+ }
+ storageVO.setDeleted(account.getRemoved());
+ m_usageStorageDao.update(storageVO);
+ }
+ }
+ numAcctsProcessed++;
+ }
+ }
+ offset = new Long(offset.longValue() + limit.longValue());
+ } while ((accounts != null) && !accounts.isEmpty());
+
+ currentStartDate = new Date(currentEndDate.getTime() + 1);
+ aggregateCal.setTime(currentEndDate);
+ aggregateCal.add(Calendar.MINUTE, m_aggregationDuration);
+ currentEndDate = aggregateCal.getTime();
+ }
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("processed Usage for " + numAcctsProcessed + " RECENTLY DELETED accts");
+ }
+
+ // FIXME: we don't break the above loop if something fails to parse, so it gets reset every account,
+ // do we want to break out of processing accounts and rollback if there are errors?
+ if (!parsed) {
+ usageTxn.rollback();
+ } else {
+ success = true;
+ }
+ } catch (Exception ex) {
+ s_logger.error("Exception in usage manager", ex);
+ usageTxn.rollback();
+ } finally {
+ // everything seemed to work...set endDate as the last success date
+ m_usageJobDao.updateJobSuccess(job.getId(), startDateMillis, endDateMillis, System.currentTimeMillis() - timeStart, success);
+
+ // create a new job if this is a recurring job
+ if (job.getJobType() == UsageJobVO.JOB_TYPE_RECURRING) {
+ m_usageJobDao.createNewJob(m_hostname, m_pid, UsageJobVO.JOB_TYPE_RECURRING);
+ }
+ usageTxn.commit();
+ usageTxn.close();
+
+ // switch back to CLOUD_DB
+ Transaction swap = Transaction.open(Transaction.CLOUD_DB);
+ if(!success){
+ _alertMgr.sendAlert(AlertManager.ALERT_TYPE_USAGE_SERVER_RESULT, 0, new Long(0), "Usage job failed. Job id: "+job.getId(), "Usage job failed. Job id: "+job.getId());
+ } else {
+ _alertMgr.clearAlert(AlertManager.ALERT_TYPE_USAGE_SERVER_RESULT, 0, 0);
+ }
+ swap.close();
+
+ }
+ } catch (Exception e) {
+ s_logger.error("Usage Manager error", e);
+ }
+ }
+
+ private boolean parseHelperTables(AccountVO account, Date currentStartDate, Date currentEndDate){
+ boolean parsed = false;
+
+ parsed = VMInstanceUsageParser.parse(account, currentStartDate, currentEndDate);
+ if (s_logger.isDebugEnabled()) {
+ if (!parsed) {
+ s_logger.debug("vm usage instances successfully parsed? " + parsed + " (for account: " + account.getAccountName() + ", id: " + account.getId() + ")");
+ }
+ }
+
+ parsed = NetworkUsageParser.parse(account, currentStartDate, currentEndDate);
+ if (s_logger.isDebugEnabled()) {
+ if (!parsed) {
+ s_logger.debug("network usage successfully parsed? " + parsed + " (for account: " + account.getAccountName() + ", id: " + account.getId() + ")");
+ }
+ }
+
+ parsed = VolumeUsageParser.parse(account, currentStartDate, currentEndDate);
+ if (s_logger.isDebugEnabled()) {
+ if (!parsed) {
+ s_logger.debug("volume usage successfully parsed? " + parsed + " (for account: " + account.getAccountName() + ", id: " + account.getId() + ")");
+ }
+ }
+
+ parsed = StorageUsageParser.parse(account, currentStartDate, currentEndDate);
+ if (s_logger.isDebugEnabled()) {
+ if (!parsed) {
+ s_logger.debug("storage usage successfully parsed? " + parsed + " (for account: " + account.getAccountName() + ", id: " + account.getId() + ")");
+ }
+ }
+
+ parsed = LoadBalancerUsageParser.parse(account, currentStartDate, currentEndDate);
+ if (s_logger.isDebugEnabled()) {
+ if (!parsed) {
+ s_logger.debug("load balancer usage successfully parsed? " + parsed + " (for account: " + account.getAccountName() + ", id: " + account.getId() + ")");
+ }
+ }
+
+ parsed = PortForwardingUsageParser.parse(account, currentStartDate, currentEndDate);
+ if (s_logger.isDebugEnabled()) {
+ if (!parsed) {
+ s_logger.debug("port forwarding usage successfully parsed? " + parsed + " (for account: " + account.getAccountName() + ", id: " + account.getId() + ")");
+ }
+ }
+
+ parsed = NetworkOfferingUsageParser.parse(account, currentStartDate, currentEndDate);
+ if (s_logger.isDebugEnabled()) {
+ if (!parsed) {
+ s_logger.debug("network offering usage successfully parsed? " + parsed + " (for account: " + account.getAccountName() + ", id: " + account.getId() + ")");
+ }
+ }
+
+ parsed = IPAddressUsageParser.parse(account, currentStartDate, currentEndDate);
+ if (s_logger.isDebugEnabled()) {
+ if (!parsed) {
+ s_logger.debug("IPAddress usage successfully parsed? " + parsed + " (for account: " + account.getAccountName() + ", id: " + account.getId() + ")");
+ }
+ }
+
+ return parsed;
+ }
+
+ private void createHelperRecord(UsageEventVO event) {
+ String eventType = event.getType();
+ if (isVMEvent(eventType)) {
+ createVMHelperEvent(event);
+ } else if (isIPEvent(eventType)) {
+ createIPHelperEvent(event);
+ } else if (isVolumeEvent(eventType)) {
+ createVolumeHelperEvent(event);
+ } else if (isTemplateEvent(eventType)) {
+ createTemplateHelperEvent(event);
+ } else if (isISOEvent(eventType)) {
+ createISOHelperEvent(event);
+ } else if (isSnapshotEvent(eventType)) {
+ createSnapshotHelperEvent(event);
+ } else if (isLoadBalancerEvent(eventType)) {
+ createLoadBalancerHelperEvent(event);
+ } else if (isPortForwardingEvent(eventType)) {
+ createPortForwardingHelperEvent(event);
+ } else if (isNetworkOfferingEvent(eventType)) {
+ createNetworkOfferingEvent(event);
+ }
+ }
+
+ private boolean isVMEvent(String eventType) {
+ if (eventType == null) return false;
+ return eventType.startsWith("VM.");
+ }
+
+ private boolean isIPEvent(String eventType) {
+ if (eventType == null) return false;
+ return eventType.startsWith("NET.IP");
+ }
+
+ private boolean isVolumeEvent(String eventType) {
+ if (eventType == null) return false;
+ return (eventType.equals(EventTypes.EVENT_VOLUME_CREATE) ||
+ eventType.equals(EventTypes.EVENT_VOLUME_DELETE));
+ }
+
+ private boolean isTemplateEvent(String eventType) {
+ if (eventType == null) return false;
+ return (eventType.equals(EventTypes.EVENT_TEMPLATE_CREATE) ||
+ eventType.equals(EventTypes.EVENT_TEMPLATE_COPY) ||
+ eventType.equals(EventTypes.EVENT_TEMPLATE_DELETE));
+ }
+
+ private boolean isISOEvent(String eventType) {
+ if (eventType == null) return false;
+ return (eventType.equals(EventTypes.EVENT_ISO_CREATE) ||
+ eventType.equals(EventTypes.EVENT_ISO_COPY) ||
+ eventType.equals(EventTypes.EVENT_ISO_DELETE));
+ }
+
+ private boolean isSnapshotEvent(String eventType) {
+ if (eventType == null) return false;
+ return (eventType.equals(EventTypes.EVENT_SNAPSHOT_CREATE) ||
+ eventType.equals(EventTypes.EVENT_SNAPSHOT_DELETE));
+ }
+
+ private boolean isLoadBalancerEvent(String eventType) {
+ if (eventType == null) return false;
+ return eventType.startsWith("LB.");
+ }
+
+ private boolean isPortForwardingEvent(String eventType) {
+ if (eventType == null) return false;
+ return eventType.startsWith("NET.RULE");
+ }
+
+ private boolean isNetworkOfferingEvent(String eventType) {
+ if (eventType == null) return false;
+ return (eventType.equals(EventTypes.EVENT_NETWORK_OFFERING_CREATE) ||
+ eventType.equals(EventTypes.EVENT_NETWORK_OFFERING_DELETE) ||
+ eventType.equals(EventTypes.EVENT_NETWORK_OFFERING_ASSIGN) ||
+ eventType.equals(EventTypes.EVENT_NETWORK_OFFERING_REMOVE));
+ }
+
+ private void createVMHelperEvent(UsageEventVO event) {
+
+ // One record for handling VM.START and VM.STOP
+ // One record for handling VM.CREATE and VM.DESTROY
+ // VM events have the parameter "id="
+ long vmId = event.getResourceId();
+ Long soId = event.getOfferingId();; // service offering id
+ long zoneId = event.getZoneId();
+ String vmName = event.getResourceName();
+
+ if (EventTypes.EVENT_VM_START.equals(event.getType())) {
+ // create a new usage_vm_instance row for this VM
+ try {
+
+ SearchCriteria sc = m_usageInstanceDao.createSearchCriteria();
+ sc.addAnd("vmInstanceId", SearchCriteria.Op.EQ, Long.valueOf(vmId));
+ sc.addAnd("endDate", SearchCriteria.Op.NULL);
+ sc.addAnd("usageType", SearchCriteria.Op.EQ, UsageTypes.RUNNING_VM);
+ List usageInstances = m_usageInstanceDao.search(sc, null);
+ if (usageInstances != null) {
+ if (usageInstances.size() > 0) {
+ s_logger.error("found entries for a vm running with id: " + vmId + ", which are not stopped. Ending them all...");
+ for (UsageVMInstanceVO usageInstance : usageInstances) {
+ usageInstance.setEndDate(event.getCreateDate());
+ m_usageInstanceDao.update(usageInstance);
+ }
+ }
+ }
+
+ sc = m_usageInstanceDao.createSearchCriteria();
+ sc.addAnd("vmInstanceId", SearchCriteria.Op.EQ, Long.valueOf(vmId));
+ sc.addAnd("endDate", SearchCriteria.Op.NULL);
+ sc.addAnd("usageType", SearchCriteria.Op.EQ, UsageTypes.ALLOCATED_VM);
+ usageInstances = m_usageInstanceDao.search(sc, null);
+ if (usageInstances == null || (usageInstances.size() == 0)) {
+ s_logger.error("Cannot find allocated vm entry for a vm running with id: " + vmId);
+ }
+
+ Long templateId = event.getTemplateId();
+ String hypervisorType = event.getResourceType();
+
+ // add this VM to the usage helper table
+ UsageVMInstanceVO usageInstanceNew = new UsageVMInstanceVO(UsageTypes.RUNNING_VM, zoneId, event.getAccountId(), vmId, vmName,
+ soId, templateId, hypervisorType, event.getCreateDate(), null);
+ m_usageInstanceDao.persist(usageInstanceNew);
+ } catch (Exception ex) {
+ s_logger.error("Error saving usage instance for vm: " + vmId, ex);
+ }
+ } else if (EventTypes.EVENT_VM_STOP.equals(event.getType())) {
+ // find the latest usage_vm_instance row, update the stop date (should be null) to the event date
+ // FIXME: search criteria needs to have some kind of type information so we distinguish between START/STOP and CREATE/DESTROY
+ SearchCriteria sc = m_usageInstanceDao.createSearchCriteria();
+ sc.addAnd("vmInstanceId", SearchCriteria.Op.EQ, Long.valueOf(vmId));
+ sc.addAnd("endDate", SearchCriteria.Op.NULL);
+ sc.addAnd("usageType", SearchCriteria.Op.EQ, UsageTypes.RUNNING_VM);
+ List usageInstances = m_usageInstanceDao.search(sc, null);
+ if (usageInstances != null) {
+ if (usageInstances.size() > 1) {
+ s_logger.warn("found multiple entries for a vm running with id: " + vmId + ", ending them all...");
+ }
+ for (UsageVMInstanceVO usageInstance : usageInstances) {
+ usageInstance.setEndDate(event.getCreateDate());
+ // TODO: UsageVMInstanceVO should have an ID field and we should do updates through that field since we are really
+ // updating one row at a time here
+ m_usageInstanceDao.update(usageInstance);
+ }
+ }
+ } else if (EventTypes.EVENT_VM_CREATE.equals(event.getType())) {
+ try {
+ Long templateId = event.getTemplateId();
+ String hypervisorType = event.getResourceType();
+ // add this VM to the usage helper table
+ UsageVMInstanceVO usageInstanceNew = new UsageVMInstanceVO(UsageTypes.ALLOCATED_VM, zoneId, event.getAccountId(), vmId, vmName,
+ soId, templateId, hypervisorType, event.getCreateDate(), null);
+ m_usageInstanceDao.persist(usageInstanceNew);
+ } catch (Exception ex) {
+ s_logger.error("Error saving usage instance for vm: " + vmId, ex);
+ }
+ } else if (EventTypes.EVENT_VM_DESTROY.equals(event.getType())) {
+ SearchCriteria sc = m_usageInstanceDao.createSearchCriteria();
+ sc.addAnd("vmInstanceId", SearchCriteria.Op.EQ, Long.valueOf(vmId));
+ sc.addAnd("endDate", SearchCriteria.Op.NULL);
+ sc.addAnd("usageType", SearchCriteria.Op.EQ, UsageTypes.ALLOCATED_VM);
+ List usageInstances = m_usageInstanceDao.search(sc, null);
+ if (usageInstances != null) {
+ if (usageInstances.size() > 1) {
+ s_logger.warn("found multiple entries for a vm allocated with id: " + vmId + ", detroying them all...");
+ }
+ for (UsageVMInstanceVO usageInstance : usageInstances) {
+ usageInstance.setEndDate(event.getCreateDate());
+ m_usageInstanceDao.update(usageInstance);
+ }
+ }
+ } else if (EventTypes.EVENT_VM_UPGRADE.equals(event.getType())) {
+ SearchCriteria sc = m_usageInstanceDao.createSearchCriteria();
+ sc.addAnd("vmInstanceId", SearchCriteria.Op.EQ, Long.valueOf(vmId));
+ sc.addAnd("endDate", SearchCriteria.Op.NULL);
+ sc.addAnd("usageType", SearchCriteria.Op.EQ, UsageTypes.ALLOCATED_VM);
+ List usageInstances = m_usageInstanceDao.search(sc, null);
+ if (usageInstances != null) {
+ if (usageInstances.size() > 1) {
+ s_logger.warn("found multiple entries for a vm allocated with id: " + vmId + ", updating end_date for all of them...");
+ }
+ for (UsageVMInstanceVO usageInstance : usageInstances) {
+ usageInstance.setEndDate(event.getCreateDate());
+ m_usageInstanceDao.update(usageInstance);
+ }
+ }
+
+ Long templateId = event.getTemplateId();
+ String hypervisorType = event.getResourceType();
+ // add this VM to the usage helper table
+ UsageVMInstanceVO usageInstanceNew = new UsageVMInstanceVO(UsageTypes.ALLOCATED_VM, zoneId, event.getAccountId(), vmId, vmName,
+ soId, templateId, hypervisorType, event.getCreateDate(), null);
+ m_usageInstanceDao.persist(usageInstanceNew);
+ }
+ }
+
+ private void createNetworkHelperEntry(UserStatisticsVO userStat, UsageNetworkVO usageNetworkStats, long timestamp) {
+ long currentAccountedBytesSent = 0L;
+ long currentAccountedBytesReceived = 0L;
+ if (usageNetworkStats != null) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("getting current accounted bytes for... accountId: " + usageNetworkStats.getAccountId() + " in zone: " + userStat.getDataCenterId() + "; cbr: " + usageNetworkStats.getCurrentBytesReceived() +
+ "; cbs: " + usageNetworkStats.getCurrentBytesSent() + "; nbr: " + usageNetworkStats.getNetBytesReceived() + "; nbs: " + usageNetworkStats.getNetBytesSent());
+ }
+ currentAccountedBytesSent = (usageNetworkStats.getCurrentBytesSent() + usageNetworkStats.getNetBytesSent());
+ currentAccountedBytesReceived = (usageNetworkStats.getCurrentBytesReceived() + usageNetworkStats.getNetBytesReceived());
+ }
+ long bytesSent = (userStat.getCurrentBytesSent() + userStat.getNetBytesSent()) - currentAccountedBytesSent;
+ long bytesReceived = (userStat.getCurrentBytesReceived() + userStat.getNetBytesReceived()) - currentAccountedBytesReceived;
+
+ if (bytesSent < 0) {
+ s_logger.warn("Calculated negative value for bytes sent: " + bytesSent + ", user stats say: " + (userStat.getCurrentBytesSent() + userStat.getNetBytesSent()) + ", previous network usage was: " + currentAccountedBytesSent);
+ bytesSent = 0;
+ }
+ if (bytesReceived < 0) {
+ s_logger.warn("Calculated negative value for bytes received: " + bytesReceived + ", user stats say: " + (userStat.getCurrentBytesReceived() + userStat.getNetBytesReceived()) + ", previous network usage was: " + currentAccountedBytesReceived);
+ bytesReceived = 0;
+ }
+
+ long hostId = 0;
+
+ if(userStat.getDeviceId() != null){
+ hostId = userStat.getDeviceId();
+ }
+
+ UsageNetworkVO usageNetworkVO = new UsageNetworkVO(userStat.getAccountId(), userStat.getDataCenterId(), hostId, userStat.getDeviceType(), userStat.getNetworkId(), bytesSent, bytesReceived,
+ userStat.getNetBytesReceived(), userStat.getNetBytesSent(),
+ userStat.getCurrentBytesReceived(), userStat.getCurrentBytesSent(), timestamp);
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("creating networkHelperEntry... accountId: " + userStat.getAccountId() + " in zone: " + userStat.getDataCenterId() + "; cbr: " + userStat.getCurrentBytesReceived() + "; cbs: " + userStat.getCurrentBytesSent() +
+ "; nbr: " + userStat.getNetBytesReceived() + "; nbs: " + userStat.getNetBytesSent() + "; curABS: " + currentAccountedBytesSent + "; curABR: " + currentAccountedBytesReceived + "; ubs: " + bytesSent + "; ubr: " + bytesReceived);
+ }
+ m_usageNetworkDao.persist(usageNetworkVO);
+ }
+
+ private void createIPHelperEvent(UsageEventVO event) {
+
+ String ipAddress = event.getResourceName();
+
+ if (EventTypes.EVENT_NET_IP_ASSIGN.equals(event.getType())) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("assigning ip address: " + ipAddress + " to account: " + event.getAccountId());
+ }
+ Account acct = m_accountDao.findByIdIncludingRemoved(event.getAccountId());
+ long zoneId = event.getZoneId();
+ long id = event.getResourceId();
+ long sourceNat = event.getSize();
+ boolean isSourceNat = (sourceNat == 1) ? true : false ;
+ UsageIPAddressVO ipAddressVO = new UsageIPAddressVO(id, event.getAccountId(), acct.getDomainId(), zoneId, ipAddress, isSourceNat, event.getCreateDate(), null);
+ m_usageIPAddressDao.persist(ipAddressVO);
+ } else if (EventTypes.EVENT_NET_IP_RELEASE.equals(event.getType())) {
+ SearchCriteria sc = m_usageIPAddressDao.createSearchCriteria();
+ sc.addAnd("accountId", SearchCriteria.Op.EQ, event.getAccountId());
+ sc.addAnd("address", SearchCriteria.Op.EQ, ipAddress);
+ sc.addAnd("released", SearchCriteria.Op.NULL);
+ List ipAddressVOs = m_usageIPAddressDao.search(sc, null);
+ if (ipAddressVOs.size() > 1) {
+ s_logger.warn("More that one usage entry for ip address: " + ipAddress + " assigned to account: " + event.getAccountId() + "; marking them all as released...");
+ }
+ for (UsageIPAddressVO ipAddressVO : ipAddressVOs) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("releasing ip address: " + ipAddressVO.getAddress() + " from account: " + ipAddressVO.getAccountId());
+ }
+ ipAddressVO.setReleased(event.getCreateDate()); // there really shouldn't be more than one
+ m_usageIPAddressDao.update(ipAddressVO);
+ }
+ }
+ }
+
+ private void createVolumeHelperEvent(UsageEventVO event) {
+
+ Long doId = -1L;
+ long zoneId = -1L;
+ Long templateId = -1L;
+ long size = -1L;
+
+ long volId = event.getResourceId();
+ if (EventTypes.EVENT_VOLUME_CREATE.equals(event.getType())) {
+ doId = event.getOfferingId();
+ zoneId = event.getZoneId();
+ templateId = event.getTemplateId();
+ size = event.getSize();
+ }
+
+ if (EventTypes.EVENT_VOLUME_CREATE.equals(event.getType())) {
+ SearchCriteria sc = m_usageVolumeDao.createSearchCriteria();
+ sc.addAnd("accountId", SearchCriteria.Op.EQ, event.getAccountId());
+ sc.addAnd("id", SearchCriteria.Op.EQ, volId);
+ sc.addAnd("deleted", SearchCriteria.Op.NULL);
+ List volumesVOs = m_usageVolumeDao.search(sc, null);
+ if (volumesVOs.size() > 0) {
+ //This is a safeguard to avoid double counting of volumes.
+ s_logger.error("Found duplicate usage entry for volume: " + volId + " assigned to account: " + event.getAccountId() + "; marking as deleted...");
+ }
+ for (UsageVolumeVO volumesVO : volumesVOs) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("deleting volume: " + volumesVO.getId() + " from account: " + volumesVO.getAccountId());
+ }
+ volumesVO.setDeleted(event.getCreateDate());
+ m_usageVolumeDao.update(volumesVO);
+ }
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("create volume with id : " + volId + " for account: " + event.getAccountId());
+ }
+ Account acct = m_accountDao.findByIdIncludingRemoved(event.getAccountId());
+ UsageVolumeVO volumeVO = new UsageVolumeVO(volId, zoneId, event.getAccountId(), acct.getDomainId(), doId, templateId, size, event.getCreateDate(), null);
+ m_usageVolumeDao.persist(volumeVO);
+ } else if (EventTypes.EVENT_VOLUME_DELETE.equals(event.getType())) {
+ SearchCriteria sc = m_usageVolumeDao.createSearchCriteria();
+ sc.addAnd("accountId", SearchCriteria.Op.EQ, event.getAccountId());
+ sc.addAnd("id", SearchCriteria.Op.EQ, volId);
+ sc.addAnd("deleted", SearchCriteria.Op.NULL);
+ List volumesVOs = m_usageVolumeDao.search(sc, null);
+ if (volumesVOs.size() > 1) {
+ s_logger.warn("More that one usage entry for volume: " + volId + " assigned to account: " + event.getAccountId() + "; marking them all as deleted...");
+ }
+ for (UsageVolumeVO volumesVO : volumesVOs) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("deleting volume: " + volumesVO.getId() + " from account: " + volumesVO.getAccountId());
+ }
+ volumesVO.setDeleted(event.getCreateDate()); // there really shouldn't be more than one
+ m_usageVolumeDao.update(volumesVO);
+ }
+ }
+ }
+
+ private void createTemplateHelperEvent(UsageEventVO event) {
+
+ long templateId = -1L;
+ long zoneId = -1L;
+ long templateSize = -1L;
+
+ templateId = event.getResourceId();
+ zoneId = event.getZoneId();
+ if (EventTypes.EVENT_TEMPLATE_CREATE.equals(event.getType()) || EventTypes.EVENT_TEMPLATE_COPY.equals(event.getType())) {
+ templateSize = event.getSize();
+ if(templateSize < 1){
+ s_logger.error("Incorrect size for template with Id "+templateId);
+ return;
+ }
+ if(zoneId == -1L){
+ s_logger.error("Incorrect zoneId for template with Id "+templateId);
+ return;
+ }
+ }
+
+ if (EventTypes.EVENT_TEMPLATE_CREATE.equals(event.getType()) || EventTypes.EVENT_TEMPLATE_COPY.equals(event.getType())) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("create template with id : " + templateId + " for account: " + event.getAccountId());
+ }
+ Account acct = m_accountDao.findByIdIncludingRemoved(event.getAccountId());
+ UsageStorageVO storageVO = new UsageStorageVO(templateId, zoneId, event.getAccountId(), acct.getDomainId(), StorageTypes.TEMPLATE, event.getTemplateId(),
+ templateSize, event.getCreateDate(), null);
+ m_usageStorageDao.persist(storageVO);
+ } else if (EventTypes.EVENT_TEMPLATE_DELETE.equals(event.getType())) {
+ List storageVOs;
+ if(zoneId != -1L){
+ storageVOs = m_usageStorageDao.listByIdAndZone(event.getAccountId(), templateId, StorageTypes.TEMPLATE, zoneId);
+ } else {
+ storageVOs = m_usageStorageDao.listById(event.getAccountId(), templateId, StorageTypes.TEMPLATE);
+ }
+ if (storageVOs.size() > 1) {
+ s_logger.warn("More that one usage entry for storage: " + templateId + " assigned to account: " + event.getAccountId() + "; marking them all as deleted...");
+ }
+ for (UsageStorageVO storageVO : storageVOs) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("deleting template: " + storageVO.getId() + " from account: " + storageVO.getAccountId());
+ }
+ storageVO.setDeleted(event.getCreateDate()); // there really shouldn't be more than one
+ m_usageStorageDao.update(storageVO);
+ }
+ }
+ }
+
+ private void createISOHelperEvent(UsageEventVO event) {
+ long isoSize = -1L;
+
+ long isoId = event.getResourceId();
+ long zoneId = event.getZoneId();
+ if (EventTypes.EVENT_ISO_CREATE.equals(event.getType()) || EventTypes.EVENT_ISO_COPY.equals(event.getType())) {
+ isoSize = event.getSize();
+ }
+
+ if (EventTypes.EVENT_ISO_CREATE.equals(event.getType()) || EventTypes.EVENT_ISO_COPY.equals(event.getType())) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("create iso with id : " + isoId + " for account: " + event.getAccountId());
+ }
+ Account acct = m_accountDao.findByIdIncludingRemoved(event.getAccountId());
+ UsageStorageVO storageVO = new UsageStorageVO( isoId, zoneId, event.getAccountId(), acct.getDomainId(), StorageTypes.ISO, null,
+ isoSize, event.getCreateDate(), null);
+ m_usageStorageDao.persist(storageVO);
+ } else if (EventTypes.EVENT_ISO_DELETE.equals(event.getType())) {
+ List storageVOs;
+ if(zoneId != -1L){
+ storageVOs = m_usageStorageDao.listByIdAndZone(event.getAccountId(), isoId, StorageTypes.ISO, zoneId);
+ } else {
+ storageVOs = m_usageStorageDao.listById(event.getAccountId(), isoId, StorageTypes.ISO);
+ }
+
+ if (storageVOs.size() > 1) {
+ s_logger.warn("More that one usage entry for storage: " + isoId + " assigned to account: " + event.getAccountId() + "; marking them all as deleted...");
+ }
+ for (UsageStorageVO storageVO : storageVOs) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("deleting iso: " + storageVO.getId() + " from account: " + storageVO.getAccountId());
+ }
+ storageVO.setDeleted(event.getCreateDate()); // there really shouldn't be more than one
+ m_usageStorageDao.update(storageVO);
+ }
+ }
+ }
+
+ private void createSnapshotHelperEvent(UsageEventVO event) {
+ long snapSize = -1L;
+ long zoneId = -1L;
+
+ long snapId = event.getResourceId();
+ if (EventTypes.EVENT_SNAPSHOT_CREATE.equals(event.getType())) {
+ snapSize = event.getSize();
+ zoneId = event.getZoneId();
+ }
+
+ if (EventTypes.EVENT_SNAPSHOT_CREATE.equals(event.getType())) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("create snapshot with id : " + snapId + " for account: " + event.getAccountId());
+ }
+ Account acct = m_accountDao.findByIdIncludingRemoved(event.getAccountId());
+ UsageStorageVO storageVO = new UsageStorageVO( snapId, zoneId, event.getAccountId(), acct.getDomainId(), StorageTypes.SNAPSHOT, null,
+ snapSize, event.getCreateDate(), null);
+ m_usageStorageDao.persist(storageVO);
+ } else if (EventTypes.EVENT_SNAPSHOT_DELETE.equals(event.getType())) {
+ List storageVOs = m_usageStorageDao.listById(event.getAccountId(), snapId, StorageTypes.SNAPSHOT);
+ if (storageVOs.size() > 1) {
+ s_logger.warn("More that one usage entry for storage: " + snapId + " assigned to account: " + event.getAccountId() + "; marking them all as deleted...");
+ }
+ for (UsageStorageVO storageVO : storageVOs) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("deleting snapshot: " + storageVO.getId() + " from account: " + storageVO.getAccountId());
+ }
+ storageVO.setDeleted(event.getCreateDate()); // there really shouldn't be more than one
+ m_usageStorageDao.update(storageVO);
+ }
+ }
+ }
+
+ private void createLoadBalancerHelperEvent(UsageEventVO event) {
+
+ long zoneId = -1L;
+
+ long id = event.getResourceId();
+
+ if (EventTypes.EVENT_LOAD_BALANCER_CREATE.equals(event.getType())) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating load balancer : " + id + " for account: " + event.getAccountId());
+ }
+ zoneId = event.getZoneId();
+ Account acct = m_accountDao.findByIdIncludingRemoved(event.getAccountId());
+ UsageLoadBalancerPolicyVO lbVO = new UsageLoadBalancerPolicyVO(id, zoneId, event.getAccountId(), acct.getDomainId(),
+ event.getCreateDate(), null);
+ m_usageLoadBalancerPolicyDao.persist(lbVO);
+ } else if (EventTypes.EVENT_LOAD_BALANCER_DELETE.equals(event.getType())) {
+ SearchCriteria sc = m_usageLoadBalancerPolicyDao.createSearchCriteria();
+ sc.addAnd("accountId", SearchCriteria.Op.EQ, event.getAccountId());
+ sc.addAnd("id", SearchCriteria.Op.EQ, id);
+ sc.addAnd("deleted", SearchCriteria.Op.NULL);
+ List lbVOs = m_usageLoadBalancerPolicyDao.search(sc, null);
+ if (lbVOs.size() > 1) {
+ s_logger.warn("More that one usage entry for load balancer policy: " + id + " assigned to account: " + event.getAccountId() + "; marking them all as deleted...");
+ }
+ for (UsageLoadBalancerPolicyVO lbVO : lbVOs) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("deleting load balancer policy: " + lbVO.getId() + " from account: " + lbVO.getAccountId());
+ }
+ lbVO.setDeleted(event.getCreateDate()); // there really shouldn't be more than one
+ m_usageLoadBalancerPolicyDao.update(lbVO);
+ }
+ }
+ }
+
+ private void createPortForwardingHelperEvent(UsageEventVO event) {
+
+ long zoneId = -1L;
+
+ long id = event.getResourceId();
+
+ if (EventTypes.EVENT_NET_RULE_ADD.equals(event.getType())) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating port forwarding rule : " + id + " for account: " + event.getAccountId());
+ }
+ zoneId = event.getZoneId();
+ Account acct = m_accountDao.findByIdIncludingRemoved(event.getAccountId());
+ UsagePortForwardingRuleVO pfVO = new UsagePortForwardingRuleVO(id, zoneId, event.getAccountId(), acct.getDomainId(),
+ event.getCreateDate(), null);
+ m_usagePortForwardingRuleDao.persist(pfVO);
+ } else if (EventTypes.EVENT_NET_RULE_DELETE.equals(event.getType())) {
+ SearchCriteria sc = m_usagePortForwardingRuleDao.createSearchCriteria();
+ sc.addAnd("accountId", SearchCriteria.Op.EQ, event.getAccountId());
+ sc.addAnd("id", SearchCriteria.Op.EQ, id);
+ sc.addAnd("deleted", SearchCriteria.Op.NULL);
+ List pfVOs = m_usagePortForwardingRuleDao.search(sc, null);
+ if (pfVOs.size() > 1) {
+ s_logger.warn("More that one usage entry for port forwarding rule: " + id + " assigned to account: " + event.getAccountId() + "; marking them all as deleted...");
+ }
+ for (UsagePortForwardingRuleVO pfVO : pfVOs) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("deleting port forwarding rule: " + pfVO.getId() + " from account: " + pfVO.getAccountId());
+ }
+ pfVO.setDeleted(event.getCreateDate()); // there really shouldn't be more than one
+ m_usagePortForwardingRuleDao.update(pfVO);
+ }
+ }
+ }
+
+ private void createNetworkOfferingEvent(UsageEventVO event) {
+
+ long zoneId = -1L;
+
+ long vmId = event.getResourceId();
+ long networkOfferingId = event.getOfferingId();
+
+ if (EventTypes.EVENT_NETWORK_OFFERING_CREATE.equals(event.getType()) || EventTypes.EVENT_NETWORK_OFFERING_ASSIGN.equals(event.getType())) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating networking offering: "+ networkOfferingId +" for Vm: " + vmId + " for account: " + event.getAccountId());
+ }
+ zoneId = event.getZoneId();
+ Account acct = m_accountDao.findByIdIncludingRemoved(event.getAccountId());
+ boolean isDefault = (event.getSize() == 1) ? true : false ;
+ UsageNetworkOfferingVO networkOffering = new UsageNetworkOfferingVO(zoneId, event.getAccountId(), acct.getDomainId(), vmId, networkOfferingId, isDefault, event.getCreateDate(), null);
+ m_usageNetworkOfferingDao.persist(networkOffering);
+ } else if (EventTypes.EVENT_NETWORK_OFFERING_DELETE.equals(event.getType()) || EventTypes.EVENT_NETWORK_OFFERING_REMOVE.equals(event.getType())) {
+ SearchCriteria sc = m_usageNetworkOfferingDao.createSearchCriteria();
+ sc.addAnd("accountId", SearchCriteria.Op.EQ, event.getAccountId());
+ sc.addAnd("vmInstanceId", SearchCriteria.Op.EQ, vmId);
+ sc.addAnd("networkOfferingId", SearchCriteria.Op.EQ, networkOfferingId);
+ sc.addAnd("deleted", SearchCriteria.Op.NULL);
+ List noVOs = m_usageNetworkOfferingDao.search(sc, null);
+ if (noVOs.size() > 1) {
+ s_logger.warn("More that one usage entry for networking offering: "+ networkOfferingId +" for Vm: " + vmId+" assigned to account: " + event.getAccountId() + "; marking them all as deleted...");
+ }
+ for (UsageNetworkOfferingVO noVO : noVOs) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("deleting network offering: " + noVO.getNetworkOfferingId() + " from Vm: " + noVO.getVmInstanceId());
+ }
+ noVO.setDeleted(event.getCreateDate()); // there really shouldn't be more than one
+ m_usageNetworkOfferingDao.update(noVO);
+ }
+ }
+ }
+
+
+ private class Heartbeat implements Runnable {
+ public void run() {
+ Transaction usageTxn = Transaction.open(Transaction.USAGE_DB);
+ try {
+ if(!m_heartbeatLock.lock(3)) { // 3 second timeout
+ if(s_logger.isTraceEnabled())
+ s_logger.trace("Heartbeat lock is in use by others, returning true as someone else will take over the job if required");
+ return;
+ }
+
+ try {
+ // check for one-off jobs
+ UsageJobVO nextJob = m_usageJobDao.getNextImmediateJob();
+ if (nextJob != null) {
+ if (m_hostname.equals(nextJob.getHost()) && (m_pid == nextJob.getPid().intValue())) {
+ updateJob(nextJob.getId(), null, null, null, UsageJobVO.JOB_SCHEDULED);
+ scheduleParse();
+ }
+ }
+
+ Long jobId = m_usageJobDao.checkHeartbeat(m_hostname, m_pid, m_aggregationDuration);
+ if (jobId != null) {
+ // if I'm taking over the job...see how long it's been since the last job, and if it's more than the
+ // aggregation range...do a one off job to catch up. However, only do this if we are more than half
+ // the aggregation range away from executing the next job
+ long now = System.currentTimeMillis();
+ long timeToJob = m_jobExecTime.getTimeInMillis() - now;
+ long timeSinceJob = 0;
+ long aggregationDurationMillis = m_aggregationDuration * 60 * 1000;
+ long lastSuccess = m_usageJobDao.getLastJobSuccessDateMillis();
+ if (lastSuccess > 0) {
+ timeSinceJob = now - lastSuccess;
+ }
+
+ if ((timeSinceJob > 0) && (timeSinceJob > aggregationDurationMillis)) {
+ if (timeToJob > (aggregationDurationMillis/2)) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("it's been " + timeSinceJob + " ms since last usage job and " + timeToJob + " ms until next job, scheduling an immediate job to catch up (aggregation duration is " + m_aggregationDuration + " minutes)");
+ }
+ scheduleParse();
+ }
+ }
+
+ boolean changeOwner = updateJob(jobId, m_hostname, Integer.valueOf(m_pid), new Date(), UsageJobVO.JOB_NOT_SCHEDULED);
+ if (changeOwner) {
+ deleteOneOffJobs(m_hostname, m_pid);
+ }
+ }
+ } finally {
+ m_heartbeatLock.unlock();
+ }
+ } catch (Exception ex) {
+ s_logger.error("error in heartbeat", ex);
+ } finally {
+ usageTxn.close();
+ }
+ }
+
+ @DB
+ protected boolean updateJob(Long jobId, String hostname, Integer pid, Date heartbeat, int scheduled) {
+ boolean changeOwner = false;
+ Transaction txn = Transaction.currentTxn();
+ try {
+ txn.start();
+
+ // take over the job, setting our hostname/pid/heartbeat time
+ UsageJobVO job = m_usageJobDao.lockRow(jobId, Boolean.TRUE);
+ if (!job.getHost().equals(hostname) || !job.getPid().equals(pid)) {
+ changeOwner = true;
+ }
+
+ UsageJobVO jobForUpdate = m_usageJobDao.createForUpdate();
+ if (hostname != null) {
+ jobForUpdate.setHost(hostname);
+ }
+ if (pid != null) {
+ jobForUpdate.setPid(pid);
+ }
+ if (heartbeat != null) {
+ jobForUpdate.setHeartbeat(heartbeat);
+ }
+ jobForUpdate.setScheduled(scheduled);
+ m_usageJobDao.update(job.getId(), jobForUpdate);
+
+ txn.commit();
+ } catch (Exception dbEx) {
+ txn.rollback();
+ s_logger.error("error updating usage job", dbEx);
+ }
+ return changeOwner;
+ }
+
+ @DB
+ protected void deleteOneOffJobs(String hostname, int pid) {
+ SearchCriteria sc = m_usageJobDao.createSearchCriteria();
+ SearchCriteria ssc = m_usageJobDao.createSearchCriteria();
+ ssc.addOr("host", SearchCriteria.Op.NEQ, hostname);
+ ssc.addOr("pid", SearchCriteria.Op.NEQ, pid);
+ sc.addAnd("host", SearchCriteria.Op.SC, ssc);
+ sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
+ sc.addAnd("jobType", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_TYPE_SINGLE));
+ sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(0));
+ m_usageJobDao.expunge(sc);
+ }
+ }
+
+ private class SanityCheck implements Runnable {
+ public void run() {
+ UsageSanityChecker usc = new UsageSanityChecker();
+ try {
+ String errors = usc.runSanityCheck();
+ if(errors.length() > 0){
+ _alertMgr.sendAlert(AlertManager.ALERT_TYPE_USAGE_SANITY_RESULT, 0, new Long(0), "Usage Sanity Check failed", errors);
+ } else {
+ _alertMgr.clearAlert(AlertManager.ALERT_TYPE_USAGE_SANITY_RESULT, 0, 0);
+ }
+ } catch (SQLException e) {
+ s_logger.error("Error in sanity check", e);
+ }
+ }
+ }
+}
diff --git a/usage/src/com/cloud/usage/UsageSanityChecker.java b/usage/src/com/cloud/usage/UsageSanityChecker.java
new file mode 100644
index 00000000000..ef47de6465c
--- /dev/null
+++ b/usage/src/com/cloud/usage/UsageSanityChecker.java
@@ -0,0 +1,217 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+*
+ *
+ */
+
+package com.cloud.usage;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import com.cloud.utils.db.Transaction;
+
+public class UsageSanityChecker {
+
+ private StringBuffer errors;
+ private String lastCheckId = "";
+ private final String lastCheckFile = "/usr/local/libexec/sanity-check-last-id";
+
+ private boolean checkMaxUsage(Connection conn) throws SQLException{
+
+ PreparedStatement pstmt = conn.prepareStatement("SELECT value FROM `cloud`.`configuration` where name = 'usage.stats.job.aggregation.range'");
+ ResultSet rs = pstmt.executeQuery();
+
+ int aggregationRange = 1440;
+ if(rs.next()){
+ aggregationRange = rs.getInt(1);
+ } else {
+ System.out.println("Failed to retrieve aggregation range. Using default : "+aggregationRange);
+ }
+
+ int aggregationHours = aggregationRange / 60;
+
+ /*
+ * Check for usage records with raw_usage > aggregationHours
+ */
+ pstmt = conn.prepareStatement("SELECT count(*) FROM `cloud_usage`.`cloud_usage` cu where usage_type not in (4,5) and raw_usage > "+aggregationHours+lastCheckId);
+ rs = pstmt.executeQuery();
+ if(rs.next() && (rs.getInt(1) > 0)){
+ errors.append("Error: Found "+rs.getInt(1)+" usage records with raw_usage > "+aggregationHours);
+ errors.append("\n");
+ return false;
+ }
+ return true;
+ }
+
+ private boolean checkVmUsage(Connection conn) throws SQLException{
+ boolean success = true;
+ /*
+ * Check for Vm usage records which are created after the vm is destroyed
+ */
+ PreparedStatement pstmt = conn.prepareStatement("select count(*) from cloud_usage.cloud_usage cu inner join cloud.vm_instance vm where vm.type = 'User' " +
+ "and cu.usage_type in (1 , 2) and cu.usage_id = vm.id and cu.start_date > vm.removed"+lastCheckId);
+ ResultSet rs = pstmt.executeQuery();
+ if(rs.next() && (rs.getInt(1) > 0)){
+ errors.append("Error: Found "+rs.getInt(1)+" Vm usage records which are created after Vm is destroyed");
+ errors.append("\n");
+ success = false;
+ }
+
+ /*
+ * Check for Vms which have multiple running vm records in helper table
+ */
+ pstmt = conn.prepareStatement("select sum(cnt) from (select count(*) as cnt from cloud_usage.usage_vm_instance where usage_type =1 " +
+ "and end_date is null group by vm_instance_id having count(vm_instance_id) > 1) c ;");
+ rs = pstmt.executeQuery();
+ if(rs.next() && (rs.getInt(1) > 0)){
+ errors.append("Error: Found "+rs.getInt(1)+" duplicate running Vm entries in vm usage helper table");
+ errors.append("\n");
+ success = false;
+ }
+
+ /*
+ * Check for Vms which have multiple allocated vm records in helper table
+ */
+ pstmt = conn.prepareStatement("select sum(cnt) from (select count(*) as cnt from cloud_usage.usage_vm_instance where usage_type =2 " +
+ "and end_date is null group by vm_instance_id having count(vm_instance_id) > 1) c ;");
+ rs = pstmt.executeQuery();
+ if(rs.next() && (rs.getInt(1) > 0)){
+ errors.append("Error: Found "+rs.getInt(1)+" duplicate allocated Vm entries in vm usage helper table");
+ errors.append("\n");
+ success = false;
+ }
+
+ /*
+ * Check for Vms which have running vm entry without allocated vm entry in helper table
+ */
+ pstmt = conn.prepareStatement("select count(vm_instance_id) from cloud_usage.usage_vm_instance o where o.end_date is null and o.usage_type=1 and not exists " +
+ "(select 1 from cloud_usage.usage_vm_instance i where i.vm_instance_id=o.vm_instance_id and usage_type=2 and i.end_date is null)");
+ rs = pstmt.executeQuery();
+ if(rs.next() && (rs.getInt(1) > 0)){
+ errors.append("Error: Found "+rs.getInt(1)+" running Vm entries without corresponding allocated entries in vm usage helper table");
+ errors.append("\n");
+ success = false;
+ }
+ return success;
+ }
+
+ private boolean checkVolumeUsage(Connection conn) throws SQLException{
+ boolean success = true;
+ /*
+ * Check for Volume usage records which are created after the volume is removed
+ */
+ PreparedStatement pstmt = conn.prepareStatement("select count(*) from cloud_usage.cloud_usage cu inner join cloud.volumes v " +
+ "where cu.usage_type = 6 and cu.usage_id = v.id and cu.start_date > v.removed"+lastCheckId);
+ ResultSet rs = pstmt.executeQuery();
+ if(rs.next() && (rs.getInt(1) > 0)){
+ errors.append("Error: Found "+rs.getInt(1)+" volume usage records which are created after volume is removed");
+ errors.append("\n");
+ success = false;
+ }
+
+ /*
+ * Check for duplicate records in volume usage helper table
+ */
+ pstmt = conn.prepareStatement("select sum(cnt) from (select count(*) as cnt from cloud_usage.usage_volume " +
+ "where deleted is null group by id having count(id) > 1) c;");
+ rs = pstmt.executeQuery();
+ if(rs.next() && (rs.getInt(1) > 0)){
+ errors.append("Error: Found "+rs.getInt(1)+" duplicate records is volume usage helper table");
+ errors.append("\n");
+ success = false;
+ }
+ return success;
+ }
+
+ private boolean checkTemplateISOUsage(Connection conn) throws SQLException{
+ /*
+ * Check for Template/ISO usage records which are created after it is removed
+ */
+ PreparedStatement pstmt = conn.prepareStatement("select count(*) from cloud_usage.cloud_usage cu inner join cloud.template_zone_ref tzr " +
+ "where cu.usage_id = tzr.template_id and cu.zone_id = tzr.zone_id and cu.usage_type in (7,8) and cu.start_date > tzr.removed"+lastCheckId);
+ ResultSet rs = pstmt.executeQuery();
+ if(rs.next() && (rs.getInt(1) > 0)){
+ errors.append("Error: Found "+rs.getInt(1)+" template/ISO usage records which are created after it is removed");
+ errors.append("\n");
+ return false;
+ }
+ return true;
+ }
+
+ private boolean checkSnapshotUsage(Connection conn) throws SQLException{
+ /*
+ * Check for snapshot usage records which are created after snapshot is removed
+ */
+ PreparedStatement pstmt = conn.prepareStatement("select count(*) from cloud_usage.cloud_usage cu inner join cloud.snapshots s " +
+ "where cu.usage_id = s.id and cu.usage_type = 9 and cu.start_date > s.removed"+lastCheckId);
+ ResultSet rs = pstmt.executeQuery();
+ if(rs.next() && (rs.getInt(1) > 0)){
+ errors.append("Error: Found "+rs.getInt(1)+" snapshot usage records which are created after snapshot is removed");
+ errors.append("\n");
+ return false;
+ }
+ return true;
+ }
+
+ public String runSanityCheck() throws SQLException{
+ try {
+ BufferedReader reader = new BufferedReader( new FileReader (lastCheckFile));
+ String last_id = null;
+ if( (reader != null) && ( last_id = reader.readLine() ) != null ) {
+ int lastId = Integer.parseInt(last_id);
+ if(lastId > 0){
+ lastCheckId = " and cu.id > "+last_id;
+ }
+ }
+ reader.close();
+ } catch (Exception e) {
+ // Error while reading last check id
+ }
+
+ Connection conn = Transaction.getStandaloneConnection();
+ int maxId = 0;
+ PreparedStatement pstmt = conn.prepareStatement("select max(id) from cloud_usage.cloud_usage");
+ ResultSet rs = pstmt.executeQuery();
+ if(rs.next() && (rs.getInt(1) > 0)){
+ maxId = rs.getInt(1);
+ lastCheckId += " and cu.id <= "+maxId;
+ }
+ errors = new StringBuffer();
+ checkMaxUsage(conn);
+ checkVmUsage(conn);
+ checkVolumeUsage(conn);
+ checkTemplateISOUsage(conn);
+ checkSnapshotUsage(conn);
+ FileWriter fstream;
+ try {
+ fstream = new FileWriter(lastCheckFile);
+ BufferedWriter out = new BufferedWriter(fstream);
+ out.write(""+maxId);
+ out.close();
+ } catch (IOException e) {
+ // Error while writing last check id
+ }
+ return errors.toString();
+ }
+
+ public static void main(String args[]){
+ UsageSanityChecker usc = new UsageSanityChecker();
+ String sanityErrors;
+ try {
+ sanityErrors = usc.runSanityCheck();
+ if(sanityErrors.length() > 0){
+ System.out.println(sanityErrors.toString());
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/usage/src/com/cloud/usage/UsageServer.java b/usage/src/com/cloud/usage/UsageServer.java
new file mode 100644
index 00000000000..83876d89959
--- /dev/null
+++ b/usage/src/com/cloud/usage/UsageServer.java
@@ -0,0 +1,30 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+*
+ *
+ */
+
+package com.cloud.usage;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.utils.component.ComponentLocator;
+
+public class UsageServer {
+ private static final Logger s_logger = Logger.getLogger(UsageServer.class.getName());
+ public static final String Name = "usage-server";
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ // TODO: do we need to communicate with mgmt server?
+ final ComponentLocator _locator = ComponentLocator.getLocator(UsageServer.Name, "usage-components.xml", "log4j-cloud_usage");
+ UsageManager mgr = _locator.getManager(UsageManager.class);
+ if (mgr != null) {
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("UsageServer ready...");
+ }
+ }
+ }
+}
diff --git a/usage/src/com/cloud/usage/parser/IPAddressUsageParser.java b/usage/src/com/cloud/usage/parser/IPAddressUsageParser.java
new file mode 100644
index 00000000000..892117a7b97
--- /dev/null
+++ b/usage/src/com/cloud/usage/parser/IPAddressUsageParser.java
@@ -0,0 +1,165 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+*
+ *
+ */
+
+package com.cloud.usage.parser;
+
+import java.text.DecimalFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.usage.UsageIPAddressVO;
+import com.cloud.usage.UsageServer;
+import com.cloud.usage.UsageTypes;
+import com.cloud.usage.UsageVO;
+import com.cloud.usage.dao.UsageDao;
+import com.cloud.usage.dao.UsageIPAddressDao;
+import com.cloud.user.AccountVO;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentLocator;
+
+public class IPAddressUsageParser {
+ public static final Logger s_logger = Logger.getLogger(IPAddressUsageParser.class.getName());
+
+ private static ComponentLocator _locator = ComponentLocator.getLocator(UsageServer.Name, "usage-components.xml", "log4j-cloud_usage");
+ private static UsageDao m_usageDao = _locator.getDao(UsageDao.class);
+ private static UsageIPAddressDao m_usageIPAddressDao = _locator.getDao(UsageIPAddressDao.class);
+
+
+ public static boolean parse(AccountVO account, Date startDate, Date endDate) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Parsing IP Address usage for account: " + account.getId());
+ }
+ if ((endDate == null) || endDate.after(new Date())) {
+ endDate = new Date();
+ }
+
+ // - query usage_ip_address table with the following criteria:
+ // - look for an entry for accountId with start date in the given range
+ // - look for an entry for accountId with end date in the given range
+ // - look for an entry for accountId with end date null (currently running vm or owned IP)
+ // - look for an entry for accountId with start date before given range *and* end date after given range
+ List usageIPAddress = m_usageIPAddressDao.getUsageRecords(account.getId(), account.getDomainId(), startDate, endDate);
+
+ if(usageIPAddress.isEmpty()){
+ s_logger.debug("No IP Address usage for this period");
+ return true;
+ }
+
+ // This map has both the running time *and* the usage amount.
+ Map> usageMap = new HashMap>();
+
+ Map IPMap = new HashMap();
+
+ // loop through all the usage IPs, create a usage record for each
+ for (UsageIPAddressVO usageIp : usageIPAddress) {
+ long IpId = usageIp.getId();
+
+ String key = ""+IpId;
+
+ // store the info in the IP map
+ IPMap.put(key, new IpInfo(usageIp.getZoneId(), IpId, usageIp.getAddress(), usageIp.isSourceNat()));
+
+ Date IpAssignDate = usageIp.getAssigned();
+ Date IpReleaseDeleteDate = usageIp.getReleased();
+
+ if ((IpReleaseDeleteDate == null) || IpReleaseDeleteDate.after(endDate)) {
+ IpReleaseDeleteDate = endDate;
+ }
+
+ // clip the start date to the beginning of our aggregation range if the vm has been running for a while
+ if (IpAssignDate.before(startDate)) {
+ IpAssignDate = startDate;
+ }
+
+ long currentDuration = (IpReleaseDeleteDate.getTime() - IpAssignDate.getTime()) + 1; // make sure this is an inclusive check for milliseconds (i.e. use n - m + 1 to find total number of millis to charge)
+
+ updateIpUsageData(usageMap, key, usageIp.getId(), currentDuration);
+ }
+
+ for (String ipIdKey : usageMap.keySet()) {
+ Pair ipTimeInfo = usageMap.get(ipIdKey);
+ long useTime = ipTimeInfo.second().longValue();
+
+ // Only create a usage record if we have a runningTime of bigger than zero.
+ if (useTime > 0L) {
+ IpInfo info = IPMap.get(ipIdKey);
+ createUsageRecord(info.getZoneId(), useTime, startDate, endDate, account, info.getIpId(), info.getIPAddress(), info.isSourceNat());
+ }
+ }
+
+ return true;
+ }
+
+ private static void updateIpUsageData(Map> usageDataMap, String key, long ipId, long duration) {
+ Pair ipUsageInfo = usageDataMap.get(key);
+ if (ipUsageInfo == null) {
+ ipUsageInfo = new Pair(new Long(ipId), new Long(duration));
+ } else {
+ Long runningTime = ipUsageInfo.second();
+ runningTime = new Long(runningTime.longValue() + duration);
+ ipUsageInfo = new Pair(ipUsageInfo.first(), runningTime);
+ }
+ usageDataMap.put(key, ipUsageInfo);
+ }
+
+ private static void createUsageRecord(long zoneId, long runningTime, Date startDate, Date endDate, AccountVO account, long IpId, String IPAddress, boolean isSourceNat) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Total usage time " + runningTime + "ms");
+ }
+
+ float usage = runningTime / 1000f / 60f / 60f;
+
+ DecimalFormat dFormat = new DecimalFormat("#.######");
+ String usageDisplay = dFormat.format(usage);
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating IP usage record with id: " + IpId + ", usage: " + usageDisplay + ", startDate: " + startDate + ", endDate: " + endDate + ", for account: " + account.getId());
+ }
+
+ String usageDesc = "IPAddress: "+IPAddress;
+
+ // Create the usage record
+
+ UsageVO usageRecord = new UsageVO(zoneId, account.getAccountId(), account.getDomainId(), usageDesc, usageDisplay + " Hrs",
+ UsageTypes.IP_ADDRESS, new Double(usage), null, null, null, null, IpId, startDate, endDate, (isSourceNat?"SourceNat":""));
+ m_usageDao.persist(usageRecord);
+ }
+
+ private static class IpInfo {
+ private long zoneId;
+ private long IpId;
+ private String IPAddress;
+ private boolean isSourceNat;
+
+ public IpInfo(long zoneId,long IpId, String IPAddress, boolean isSourceNat) {
+ this.zoneId = zoneId;
+ this.IpId = IpId;
+ this.IPAddress = IPAddress;
+ this.isSourceNat = isSourceNat;
+ }
+
+ public long getZoneId() {
+ return zoneId;
+ }
+
+ public long getIpId() {
+ return IpId;
+ }
+
+ public String getIPAddress() {
+ return IPAddress;
+ }
+
+ public boolean isSourceNat() {
+ return isSourceNat;
+ }
+ }
+
+}
diff --git a/usage/src/com/cloud/usage/parser/LoadBalancerUsageParser.java b/usage/src/com/cloud/usage/parser/LoadBalancerUsageParser.java
new file mode 100644
index 00000000000..504c999f046
--- /dev/null
+++ b/usage/src/com/cloud/usage/parser/LoadBalancerUsageParser.java
@@ -0,0 +1,148 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+ *
+ */
+
+package com.cloud.usage.parser;
+
+import java.text.DecimalFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.usage.UsageLoadBalancerPolicyVO;
+import com.cloud.usage.UsageServer;
+import com.cloud.usage.UsageTypes;
+import com.cloud.usage.UsageVO;
+import com.cloud.usage.dao.UsageDao;
+import com.cloud.usage.dao.UsageLoadBalancerPolicyDao;
+import com.cloud.user.AccountVO;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentLocator;
+
+public class LoadBalancerUsageParser {
+ public static final Logger s_logger = Logger.getLogger(LoadBalancerUsageParser.class.getName());
+
+ private static ComponentLocator _locator = ComponentLocator.getLocator(UsageServer.Name, "usage-components.xml", "log4j-cloud_usage");
+ private static UsageDao m_usageDao = _locator.getDao(UsageDao.class);
+ private static UsageLoadBalancerPolicyDao m_usageLoadBalancerPolicyDao = _locator.getDao(UsageLoadBalancerPolicyDao.class);
+
+ public static boolean parse(AccountVO account, Date startDate, Date endDate) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Parsing all LoadBalancerPolicy usage events for account: " + account.getId());
+ }
+ if ((endDate == null) || endDate.after(new Date())) {
+ endDate = new Date();
+ }
+
+ // - query usage_volume table with the following criteria:
+ // - look for an entry for accountId with start date in the given range
+ // - look for an entry for accountId with end date in the given range
+ // - look for an entry for accountId with end date null (currently running vm or owned IP)
+ // - look for an entry for accountId with start date before given range *and* end date after given range
+ List usageLBs = m_usageLoadBalancerPolicyDao.getUsageRecords(account.getId(), account.getDomainId(), startDate, endDate, false, 0);
+
+ if(usageLBs.isEmpty()){
+ s_logger.debug("No load balancer usage events for this period");
+ return true;
+ }
+
+ // This map has both the running time *and* the usage amount.
+ Map> usageMap = new HashMap>();
+ Map lbMap = new HashMap();
+
+ // loop through all the load balancer policies, create a usage record for each
+ for (UsageLoadBalancerPolicyVO usageLB : usageLBs) {
+ long lbId = usageLB.getId();
+ String key = ""+lbId;
+
+ lbMap.put(key, new LBInfo(lbId, usageLB.getZoneId()));
+
+ Date lbCreateDate = usageLB.getCreated();
+ Date lbDeleteDate = usageLB.getDeleted();
+
+ if ((lbDeleteDate == null) || lbDeleteDate.after(endDate)) {
+ lbDeleteDate = endDate;
+ }
+
+ // clip the start date to the beginning of our aggregation range if the vm has been running for a while
+ if (lbCreateDate.before(startDate)) {
+ lbCreateDate = startDate;
+ }
+
+ long currentDuration = (lbDeleteDate.getTime() - lbCreateDate.getTime()) + 1; // make sure this is an inclusive check for milliseconds (i.e. use n - m + 1 to find total number of millis to charge)
+
+
+ updateLBUsageData(usageMap, key, usageLB.getId(), currentDuration);
+ }
+
+ for (String lbIdKey : usageMap.keySet()) {
+ Pair sgtimeInfo = usageMap.get(lbIdKey);
+ long useTime = sgtimeInfo.second().longValue();
+
+ // Only create a usage record if we have a runningTime of bigger than zero.
+ if (useTime > 0L) {
+ LBInfo info = lbMap.get(lbIdKey);
+ createUsageRecord(UsageTypes.LOAD_BALANCER_POLICY, useTime, startDate, endDate, account, info.getId(), info.getZoneId() );
+ }
+ }
+
+ return true;
+ }
+
+ private static void updateLBUsageData(Map> usageDataMap, String key, long lbId, long duration) {
+ Pair lbUsageInfo = usageDataMap.get(key);
+ if (lbUsageInfo == null) {
+ lbUsageInfo = new Pair(new Long(lbId), new Long(duration));
+ } else {
+ Long runningTime = lbUsageInfo.second();
+ runningTime = new Long(runningTime.longValue() + duration);
+ lbUsageInfo = new Pair(lbUsageInfo.first(), runningTime);
+ }
+ usageDataMap.put(key, lbUsageInfo);
+ }
+
+ private static void createUsageRecord(int type, long runningTime, Date startDate, Date endDate, AccountVO account, long lbId, long zoneId) {
+ // Our smallest increment is hourly for now
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Total running time " + runningTime + "ms");
+ }
+
+ float usage = runningTime / 1000f / 60f / 60f;
+
+ DecimalFormat dFormat = new DecimalFormat("#.######");
+ String usageDisplay = dFormat.format(usage);
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating Volume usage record for load balancer: " + lbId + ", usage: " + usageDisplay + ", startDate: " + startDate + ", endDate: " + endDate + ", for account: " + account.getId());
+ }
+
+ // Create the usage record
+ String usageDesc = "Load Balancing Policy: "+lbId+" usage time";
+
+ //ToDo: get zone id
+ UsageVO usageRecord = new UsageVO(zoneId, account.getId(), account.getDomainId(), usageDesc, usageDisplay + " Hrs", type,
+ new Double(usage), null, null, null, null, lbId, null, startDate, endDate);
+ m_usageDao.persist(usageRecord);
+ }
+
+ private static class LBInfo {
+ private long id;
+ private long zoneId;
+
+ public LBInfo(long id, long zoneId) {
+ this.id = id;
+ this.zoneId = zoneId;
+ }
+ public long getZoneId() {
+ return zoneId;
+ }
+ public long getId() {
+ return id;
+ }
+ }
+
+}
diff --git a/usage/src/com/cloud/usage/parser/NetworkOfferingUsageParser.java b/usage/src/com/cloud/usage/parser/NetworkOfferingUsageParser.java
new file mode 100644
index 00000000000..0128c0b0501
--- /dev/null
+++ b/usage/src/com/cloud/usage/parser/NetworkOfferingUsageParser.java
@@ -0,0 +1,160 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+ *
+ */
+
+package com.cloud.usage.parser;
+
+import java.text.DecimalFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.usage.UsageNetworkOfferingVO;
+import com.cloud.usage.UsageServer;
+import com.cloud.usage.UsageTypes;
+import com.cloud.usage.UsageVO;
+import com.cloud.usage.dao.UsageDao;
+import com.cloud.usage.dao.UsageNetworkOfferingDao;
+import com.cloud.user.AccountVO;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentLocator;
+
+public class NetworkOfferingUsageParser {
+ public static final Logger s_logger = Logger.getLogger(NetworkOfferingUsageParser.class.getName());
+
+ private static ComponentLocator _locator = ComponentLocator.getLocator(UsageServer.Name, "usage-components.xml", "log4j-cloud_usage");
+ private static UsageDao m_usageDao = _locator.getDao(UsageDao.class);
+ private static UsageNetworkOfferingDao m_usageNetworkOfferingDao = _locator.getDao(UsageNetworkOfferingDao.class);
+
+ public static boolean parse(AccountVO account, Date startDate, Date endDate) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Parsing all NetworkOffering usage events for account: " + account.getId());
+ }
+ if ((endDate == null) || endDate.after(new Date())) {
+ endDate = new Date();
+ }
+
+ // - query usage_volume table with the following criteria:
+ // - look for an entry for accountId with start date in the given range
+ // - look for an entry for accountId with end date in the given range
+ // - look for an entry for accountId with end date null (currently running vm or owned IP)
+ // - look for an entry for accountId with start date before given range *and* end date after given range
+ List usageNOs = m_usageNetworkOfferingDao.getUsageRecords(account.getId(), account.getDomainId(), startDate, endDate, false, 0);
+
+ if(usageNOs.isEmpty()){
+ s_logger.debug("No NetworkOffering usage events for this period");
+ return true;
+ }
+
+ // This map has both the running time *and* the usage amount.
+ Map> usageMap = new HashMap>();
+ Map noMap = new HashMap();
+
+ // loop through all the network offerings, create a usage record for each
+ for (UsageNetworkOfferingVO usageNO : usageNOs) {
+ long vmId = usageNO.getVmInstanceId();
+ long noId = usageNO.getNetworkOfferingId();
+ String key = ""+vmId+"NO"+noId;
+
+ noMap.put(key, new NOInfo(vmId, usageNO.getZoneId(), noId, usageNO.isDefault()));
+
+ Date noCreateDate = usageNO.getCreated();
+ Date noDeleteDate = usageNO.getDeleted();
+
+ if ((noDeleteDate == null) || noDeleteDate.after(endDate)) {
+ noDeleteDate = endDate;
+ }
+
+ // clip the start date to the beginning of our aggregation range if the vm has been running for a while
+ if (noCreateDate.before(startDate)) {
+ noCreateDate = startDate;
+ }
+
+ long currentDuration = (noDeleteDate.getTime() - noCreateDate.getTime()) + 1; // make sure this is an inclusive check for milliseconds (i.e. use n - m + 1 to find total number of millis to charge)
+
+
+ updateNOUsageData(usageMap, key, usageNO.getVmInstanceId(), currentDuration);
+ }
+
+ for (String noIdKey : usageMap.keySet()) {
+ Pair notimeInfo = usageMap.get(noIdKey);
+ long useTime = notimeInfo.second().longValue();
+
+ // Only create a usage record if we have a runningTime of bigger than zero.
+ if (useTime > 0L) {
+ NOInfo info = noMap.get(noIdKey);
+ createUsageRecord(UsageTypes.NETWORK_OFFERING, useTime, startDate, endDate, account, info.getVmId(), info.getNOId(), info.getZoneId(), info.isDefault());
+ }
+ }
+
+ return true;
+ }
+
+ private static void updateNOUsageData(Map> usageDataMap, String key, long vmId, long duration) {
+ Pair noUsageInfo = usageDataMap.get(key);
+ if (noUsageInfo == null) {
+ noUsageInfo = new Pair(new Long(vmId), new Long(duration));
+ } else {
+ Long runningTime = noUsageInfo.second();
+ runningTime = new Long(runningTime.longValue() + duration);
+ noUsageInfo = new Pair(noUsageInfo.first(), runningTime);
+ }
+ usageDataMap.put(key, noUsageInfo);
+ }
+
+ private static void createUsageRecord(int type, long runningTime, Date startDate, Date endDate, AccountVO account, long vmId, long noId, long zoneId, boolean isDefault) {
+ // Our smallest increment is hourly for now
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Total running time " + runningTime + "ms");
+ }
+
+ float usage = runningTime / 1000f / 60f / 60f;
+
+ DecimalFormat dFormat = new DecimalFormat("#.######");
+ String usageDisplay = dFormat.format(usage);
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating network offering:" + noId + " usage record for Vm : " + vmId + ", usage: " + usageDisplay + ", startDate: " + startDate + ", endDate: " + endDate + ", for account: " + account.getId());
+ }
+
+ // Create the usage record
+ String usageDesc = "Network offering:" + noId + " for Vm : " + vmId + " usage time";
+
+ long defaultNic = (isDefault) ? 1 : 0;
+ UsageVO usageRecord = new UsageVO(zoneId, account.getId(), account.getDomainId(), usageDesc, usageDisplay + " Hrs", type,
+ new Double(usage), vmId, null, noId, null, defaultNic, null, startDate, endDate);
+ m_usageDao.persist(usageRecord);
+ }
+
+ private static class NOInfo {
+ private long vmId;
+ private long zoneId;
+ private long noId;
+ private boolean isDefault;
+
+ public NOInfo(long vmId, long zoneId, long noId, boolean isDefault) {
+ this.vmId = vmId;
+ this.zoneId = zoneId;
+ this.noId = noId;
+ this.isDefault = isDefault;
+ }
+ public long getZoneId() {
+ return zoneId;
+ }
+ public long getVmId() {
+ return vmId;
+ }
+ public long getNOId() {
+ return noId;
+ }
+
+ public boolean isDefault(){
+ return isDefault;
+ }
+ }
+
+}
diff --git a/usage/src/com/cloud/usage/parser/NetworkUsageParser.java b/usage/src/com/cloud/usage/parser/NetworkUsageParser.java
new file mode 100644
index 00000000000..24b7e0145c2
--- /dev/null
+++ b/usage/src/com/cloud/usage/parser/NetworkUsageParser.java
@@ -0,0 +1,154 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+ *
+ */
+
+package com.cloud.usage.parser;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.usage.UsageNetworkVO;
+import com.cloud.usage.UsageServer;
+import com.cloud.usage.UsageTypes;
+import com.cloud.usage.UsageVO;
+import com.cloud.usage.dao.UsageDao;
+import com.cloud.usage.dao.UsageNetworkDao;
+import com.cloud.user.AccountVO;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentLocator;
+import com.cloud.utils.db.SearchCriteria;
+
+public class NetworkUsageParser {
+public static final Logger s_logger = Logger.getLogger(NetworkUsageParser.class.getName());
+
+ private static ComponentLocator _locator = ComponentLocator.getLocator(UsageServer.Name, "usage-components.xml", "log4j-cloud_usage");
+ private static UsageDao m_usageDao = _locator.getDao(UsageDao.class);
+ private static UsageNetworkDao m_usageNetworkDao = _locator.getDao(UsageNetworkDao.class);
+
+ public static boolean parse(AccountVO account, Date startDate, Date endDate) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Parsing all Network usage events for account: " + account.getId());
+ }
+
+ if ((endDate == null) || endDate.after(new Date())) {
+ endDate = new Date();
+ }
+
+ // - query usage_network table for all entries for userId with
+ // event_date in the given range
+ SearchCriteria sc = m_usageNetworkDao.createSearchCriteria();
+ sc.addAnd("accountId", SearchCriteria.Op.EQ, account.getId());
+ sc.addAnd("eventTimeMillis", SearchCriteria.Op.BETWEEN, startDate.getTime(), endDate.getTime());
+ List usageNetworkVOs = m_usageNetworkDao.search(sc, null);
+
+ Map networkUsageByZone = new HashMap();
+
+ // Calculate the total bytes since last parsing
+ for (UsageNetworkVO usageNetwork : usageNetworkVOs) {
+ long zoneId = usageNetwork.getZoneId();
+ String key = ""+zoneId;
+ if(usageNetwork.getHostId() != 0){
+ key += "-Host"+usageNetwork.getHostId();
+ }
+ NetworkInfo networkInfo = networkUsageByZone.get(key);
+
+ long bytesSent = usageNetwork.getBytesSent();
+ long bytesReceived = usageNetwork.getBytesReceived();
+ if (networkInfo != null) {
+ bytesSent += networkInfo.getBytesSent();
+ bytesReceived += networkInfo.getBytesRcvd();
+ }
+
+ networkUsageByZone.put(key, new NetworkInfo(zoneId, usageNetwork.getHostId(), usageNetwork.getHostType(), usageNetwork.getNetworkId(), bytesSent, bytesReceived));
+ }
+
+ for (String key : networkUsageByZone.keySet()) {
+ NetworkInfo networkInfo = networkUsageByZone.get(key);
+ long totalBytesSent = networkInfo.getBytesSent();
+ long totalBytesReceived = networkInfo.getBytesRcvd();
+
+ if ((totalBytesSent > 0L) || (totalBytesReceived > 0L)) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating usage record, total bytes sent:" + totalBytesSent + ", total bytes received: " + totalBytesReceived + " for account: "
+ + account.getId() + " in availability zone " + networkInfo.getZoneId() + ", start: " + startDate + ", end: " + endDate);
+ }
+
+ Long hostId = null;
+
+ // Create the usage record for bytes sent
+ String usageDesc = "network bytes sent";
+ if(networkInfo.getHostId() != 0){
+ hostId = networkInfo.getHostId();
+ usageDesc += " for Host: "+networkInfo.getHostId();
+ }
+ UsageVO usageRecord = new UsageVO(networkInfo.getZoneId(), account.getId(), account.getDomainId(), usageDesc, totalBytesSent + " bytes sent",
+ UsageTypes.NETWORK_BYTES_SENT, new Double(totalBytesSent), hostId, networkInfo.getHostType(), networkInfo.getNetworkId(), startDate, endDate);
+ m_usageDao.persist(usageRecord);
+
+ // Create the usage record for bytes received
+ usageDesc = "network bytes received";
+ if(networkInfo.getHostId() != 0){
+ usageDesc += " for Host: "+networkInfo.getHostId();
+ }
+ usageRecord = new UsageVO(networkInfo.getZoneId(), account.getId(), account.getDomainId(), usageDesc, totalBytesReceived + " bytes received",
+ UsageTypes.NETWORK_BYTES_RECEIVED, new Double(totalBytesReceived), hostId, networkInfo.getHostType(), networkInfo.getNetworkId(), startDate, endDate);
+ m_usageDao.persist(usageRecord);
+ } else {
+ // Don't charge anything if there were zero bytes processed
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("No usage record (0 bytes used) generated for account: " + account.getId());
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private static class NetworkInfo {
+ private long zoneId;
+ private long hostId;
+ private String hostType;
+ private Long networkId;
+ private long bytesSent;
+ private long bytesRcvd;
+
+ public NetworkInfo(long zoneId, long hostId, String hostType, Long networkId, long bytesSent, long bytesRcvd) {
+ this.zoneId = zoneId;
+ this.hostId = hostId;
+ this.hostType = hostType;
+ this.networkId = networkId;
+ this.bytesSent = bytesSent;
+ this.bytesRcvd = bytesRcvd;
+ }
+
+ public long getZoneId() {
+ return zoneId;
+ }
+
+ public long getHostId() {
+ return hostId;
+ }
+
+ public Long getNetworkId() {
+ return networkId;
+ }
+
+ public long getBytesSent() {
+ return bytesSent;
+ }
+
+ public long getBytesRcvd() {
+ return bytesRcvd;
+ }
+
+ public String getHostType(){
+ return hostType;
+ }
+
+ }
+}
diff --git a/usage/src/com/cloud/usage/parser/PortForwardingUsageParser.java b/usage/src/com/cloud/usage/parser/PortForwardingUsageParser.java
new file mode 100644
index 00000000000..c098786e474
--- /dev/null
+++ b/usage/src/com/cloud/usage/parser/PortForwardingUsageParser.java
@@ -0,0 +1,148 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+ *
+ */
+
+package com.cloud.usage.parser;
+
+import java.text.DecimalFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.usage.UsagePortForwardingRuleVO;
+import com.cloud.usage.UsageServer;
+import com.cloud.usage.UsageTypes;
+import com.cloud.usage.UsageVO;
+import com.cloud.usage.dao.UsageDao;
+import com.cloud.usage.dao.UsagePortForwardingRuleDao;
+import com.cloud.user.AccountVO;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentLocator;
+
+public class PortForwardingUsageParser {
+ public static final Logger s_logger = Logger.getLogger(PortForwardingUsageParser.class.getName());
+
+ private static ComponentLocator _locator = ComponentLocator.getLocator(UsageServer.Name, "usage-components.xml", "log4j-cloud_usage");
+ private static UsageDao m_usageDao = _locator.getDao(UsageDao.class);
+ private static UsagePortForwardingRuleDao m_usagePFRuleDao = _locator.getDao(UsagePortForwardingRuleDao.class);
+
+ public static boolean parse(AccountVO account, Date startDate, Date endDate) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Parsing all PortForwardingRule usage events for account: " + account.getId());
+ }
+ if ((endDate == null) || endDate.after(new Date())) {
+ endDate = new Date();
+ }
+
+ // - query usage_volume table with the following criteria:
+ // - look for an entry for accountId with start date in the given range
+ // - look for an entry for accountId with end date in the given range
+ // - look for an entry for accountId with end date null (currently running vm or owned IP)
+ // - look for an entry for accountId with start date before given range *and* end date after given range
+ List usagePFs = m_usagePFRuleDao.getUsageRecords(account.getId(), account.getDomainId(), startDate, endDate, false, 0);
+
+ if(usagePFs.isEmpty()){
+ s_logger.debug("No port forwarding usage events for this period");
+ return true;
+ }
+
+ // This map has both the running time *and* the usage amount.
+ Map> usageMap = new HashMap>();
+ Map pfMap = new HashMap();
+
+ // loop through all the port forwarding rule, create a usage record for each
+ for (UsagePortForwardingRuleVO usagePF : usagePFs) {
+ long pfId = usagePF.getId();
+ String key = ""+pfId;
+
+ pfMap.put(key, new PFInfo(pfId, usagePF.getZoneId()));
+
+ Date pfCreateDate = usagePF.getCreated();
+ Date pfDeleteDate = usagePF.getDeleted();
+
+ if ((pfDeleteDate == null) || pfDeleteDate.after(endDate)) {
+ pfDeleteDate = endDate;
+ }
+
+ // clip the start date to the beginning of our aggregation range if the vm has been running for a while
+ if (pfCreateDate.before(startDate)) {
+ pfCreateDate = startDate;
+ }
+
+ long currentDuration = (pfDeleteDate.getTime() - pfCreateDate.getTime()) + 1; // make sure this is an inclusive check for milliseconds (i.e. use n - m + 1 to find total number of millis to charge)
+
+
+ updatePFUsageData(usageMap, key, usagePF.getId(), currentDuration);
+ }
+
+ for (String pfIdKey : usageMap.keySet()) {
+ Pair sgtimeInfo = usageMap.get(pfIdKey);
+ long useTime = sgtimeInfo.second().longValue();
+
+ // Only create a usage record if we have a runningTime of bigger than zero.
+ if (useTime > 0L) {
+ PFInfo info = pfMap.get(pfIdKey);
+ createUsageRecord(UsageTypes.PORT_FORWARDING_RULE, useTime, startDate, endDate, account, info.getId(), info.getZoneId() );
+ }
+ }
+
+ return true;
+ }
+
+ private static void updatePFUsageData(Map> usageDataMap, String key, long pfId, long duration) {
+ Pair pfUsageInfo = usageDataMap.get(key);
+ if (pfUsageInfo == null) {
+ pfUsageInfo = new Pair(new Long(pfId), new Long(duration));
+ } else {
+ Long runningTime = pfUsageInfo.second();
+ runningTime = new Long(runningTime.longValue() + duration);
+ pfUsageInfo = new Pair(pfUsageInfo.first(), runningTime);
+ }
+ usageDataMap.put(key, pfUsageInfo);
+ }
+
+ private static void createUsageRecord(int type, long runningTime, Date startDate, Date endDate, AccountVO account, long pfId, long zoneId) {
+ // Our smallest increment is hourly for now
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Total running time " + runningTime + "ms");
+ }
+
+ float usage = runningTime / 1000f / 60f / 60f;
+
+ DecimalFormat dFormat = new DecimalFormat("#.######");
+ String usageDisplay = dFormat.format(usage);
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating usage record for port forwarding rule: " + pfId + ", usage: " + usageDisplay + ", startDate: " + startDate + ", endDate: " + endDate + ", for account: " + account.getId());
+ }
+
+ // Create the usage record
+ String usageDesc = "Port Forwarding Rule: "+pfId+" usage time";
+
+ //ToDo: get zone id
+ UsageVO usageRecord = new UsageVO(zoneId, account.getId(), account.getDomainId(), usageDesc, usageDisplay + " Hrs", type,
+ new Double(usage), null, null, null, null, pfId, null, startDate, endDate);
+ m_usageDao.persist(usageRecord);
+ }
+
+ private static class PFInfo {
+ private long id;
+ private long zoneId;
+
+ public PFInfo(long id, long zoneId) {
+ this.id = id;
+ this.zoneId = zoneId;
+ }
+ public long getZoneId() {
+ return zoneId;
+ }
+ public long getId() {
+ return id;
+ }
+ }
+
+}
diff --git a/usage/src/com/cloud/usage/parser/StorageUsageParser.java b/usage/src/com/cloud/usage/parser/StorageUsageParser.java
new file mode 100644
index 00000000000..0d4c7f5e99f
--- /dev/null
+++ b/usage/src/com/cloud/usage/parser/StorageUsageParser.java
@@ -0,0 +1,194 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+ *
+ */
+
+package com.cloud.usage.parser;
+
+import java.text.DecimalFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.usage.StorageTypes;
+import com.cloud.usage.UsageServer;
+import com.cloud.usage.UsageStorageVO;
+import com.cloud.usage.UsageTypes;
+import com.cloud.usage.UsageVO;
+import com.cloud.usage.dao.UsageDao;
+import com.cloud.usage.dao.UsageStorageDao;
+import com.cloud.user.AccountVO;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentLocator;
+
+public class StorageUsageParser {
+ public static final Logger s_logger = Logger.getLogger(StorageUsageParser.class.getName());
+
+ private static ComponentLocator _locator = ComponentLocator.getLocator(UsageServer.Name, "usage-components.xml", "log4j-cloud_usage");
+ private static UsageDao m_usageDao = _locator.getDao(UsageDao.class);
+ private static UsageStorageDao m_usageStorageDao = _locator.getDao(UsageStorageDao.class);
+
+ public static boolean parse(AccountVO account, Date startDate, Date endDate) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Parsing all Storage usage events for account: " + account.getId());
+ }
+ if ((endDate == null) || endDate.after(new Date())) {
+ endDate = new Date();
+ }
+
+ // - query usage_volume table with the following criteria:
+ // - look for an entry for accountId with start date in the given range
+ // - look for an entry for accountId with end date in the given range
+ // - look for an entry for accountId with end date null (currently running vm or owned IP)
+ // - look for an entry for accountId with start date before given range *and* end date after given range
+ List usageUsageStorages = m_usageStorageDao.getUsageRecords(account.getId(), account.getDomainId(), startDate, endDate, false, 0);
+
+ if(usageUsageStorages.isEmpty()){
+ s_logger.debug("No Storage usage events for this period");
+ return true;
+ }
+
+ // This map has both the running time *and* the usage amount.
+ Map> usageMap = new HashMap>();
+
+ Map storageMap = new HashMap();
+
+ // loop through all the usage volumes, create a usage record for each
+ for (UsageStorageVO usageStorage : usageUsageStorages) {
+ long storageId = usageStorage.getId();
+ int storage_type = usageStorage.getStorageType();
+ long size = usageStorage.getSize();
+ long zoneId = usageStorage.getZoneId();
+ Long sourceId = usageStorage.getSourceId();
+
+ String key = ""+storageId+"Z"+zoneId+"T"+storage_type;
+
+ // store the info in the storage map
+ storageMap.put(key, new StorageInfo(zoneId, storageId, storage_type, sourceId, size));
+
+ Date storageCreateDate = usageStorage.getCreated();
+ Date storageDeleteDate = usageStorage.getDeleted();
+
+ if ((storageDeleteDate == null) || storageDeleteDate.after(endDate)) {
+ storageDeleteDate = endDate;
+ }
+
+ // clip the start date to the beginning of our aggregation range if the vm has been running for a while
+ if (storageCreateDate.before(startDate)) {
+ storageCreateDate = startDate;
+ }
+
+ long currentDuration = (storageDeleteDate.getTime() - storageCreateDate.getTime()) + 1; // make sure this is an inclusive check for milliseconds (i.e. use n - m + 1 to find total number of millis to charge)
+
+ updateStorageUsageData(usageMap, key, usageStorage.getId(), currentDuration);
+ }
+
+ for (String storageIdKey : usageMap.keySet()) {
+ Pair storagetimeInfo = usageMap.get(storageIdKey);
+ long useTime = storagetimeInfo.second().longValue();
+
+ // Only create a usage record if we have a runningTime of bigger than zero.
+ if (useTime > 0L) {
+ StorageInfo info = storageMap.get(storageIdKey);
+ createUsageRecord(info.getZoneId(), info.getStorageType(), useTime, startDate, endDate, account, info.getStorageId(), info.getSourceId(), info.getSize());
+ }
+ }
+
+ return true;
+ }
+
+ private static void updateStorageUsageData(Map> usageDataMap, String key, long storageId, long duration) {
+ Pair volUsageInfo = usageDataMap.get(key);
+ if (volUsageInfo == null) {
+ volUsageInfo = new Pair(new Long(storageId), new Long(duration));
+ } else {
+ Long runningTime = volUsageInfo.second();
+ runningTime = new Long(runningTime.longValue() + duration);
+ volUsageInfo = new Pair(volUsageInfo.first(), runningTime);
+ }
+ usageDataMap.put(key, volUsageInfo);
+ }
+
+ private static void createUsageRecord(long zoneId, int type, long runningTime, Date startDate, Date endDate, AccountVO account, long storageId, Long sourceId, long size) {
+ // Our smallest increment is hourly for now
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Total running time " + runningTime + "ms");
+ }
+
+ float usage = runningTime / 1000f / 60f / 60f;
+
+ DecimalFormat dFormat = new DecimalFormat("#.######");
+ String usageDisplay = dFormat.format(usage);
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating Storage usage record for type: "+ type + " with id: " + storageId + ", usage: " + usageDisplay + ", startDate: " + startDate + ", endDate: " + endDate + ", for account: " + account.getId());
+ }
+
+ String usageDesc = "";
+ Long tmplSourceId = null;
+
+ int usage_type = 0;
+ switch(type){
+ case StorageTypes.TEMPLATE:
+ usage_type = UsageTypes.TEMPLATE;
+ usageDesc += "Template ";
+ tmplSourceId = sourceId;
+ break;
+ case StorageTypes.ISO:
+ usage_type = UsageTypes.ISO;
+ usageDesc += "ISO ";
+ break;
+ case StorageTypes.SNAPSHOT:
+ usage_type = UsageTypes.SNAPSHOT;
+ usageDesc += "Snapshot ";
+ break;
+ }
+ // Create the usage record
+ usageDesc += "Id:"+storageId+" Size:"+size;
+
+ //ToDo: get zone id
+ UsageVO usageRecord = new UsageVO(zoneId, account.getId(), account.getDomainId(), usageDesc, usageDisplay + " Hrs", usage_type,
+ new Double(usage), null, null, null, tmplSourceId, storageId, size, startDate, endDate);
+ m_usageDao.persist(usageRecord);
+ }
+
+ private static class StorageInfo {
+ private long zoneId;
+ private long storageId;
+ private int storageType;
+ private Long sourceId;
+ private long size;
+
+ public StorageInfo(long zoneId, long storageId, int storageType, Long sourceId, long size) {
+ this.zoneId = zoneId;
+ this.storageId = storageId;
+ this.storageType = storageType;
+ this.sourceId = sourceId;
+ this.size = size;
+ }
+
+ public long getZoneId() {
+ return zoneId;
+ }
+
+ public long getStorageId() {
+ return storageId;
+ }
+
+ public int getStorageType() {
+ return storageType;
+ }
+
+ public long getSourceId() {
+ return sourceId;
+ }
+
+
+ public long getSize() {
+ return size;
+ }
+ }
+}
diff --git a/usage/src/com/cloud/usage/parser/UsageParser.java b/usage/src/com/cloud/usage/parser/UsageParser.java
new file mode 100644
index 00000000000..2802390f63f
--- /dev/null
+++ b/usage/src/com/cloud/usage/parser/UsageParser.java
@@ -0,0 +1,24 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+ *
+ */
+
+package com.cloud.usage.parser;
+
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+
+public abstract class UsageParser implements Runnable {
+ public static final Logger s_logger = Logger.getLogger(UsageParser.class.getName());
+
+ public void run() {
+ try {
+ parse(null);
+ } catch (Exception e) {
+ s_logger.warn("Error while parsing usage events", e);
+ }
+ }
+
+ public abstract void parse(Date endDate);
+}
diff --git a/usage/src/com/cloud/usage/parser/VMInstanceUsageParser.java b/usage/src/com/cloud/usage/parser/VMInstanceUsageParser.java
new file mode 100644
index 00000000000..2f79f287bad
--- /dev/null
+++ b/usage/src/com/cloud/usage/parser/VMInstanceUsageParser.java
@@ -0,0 +1,189 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+ *
+ */
+
+package com.cloud.usage.parser;
+
+import java.text.DecimalFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.usage.UsageServer;
+import com.cloud.usage.UsageTypes;
+import com.cloud.usage.UsageVMInstanceVO;
+import com.cloud.usage.UsageVO;
+import com.cloud.usage.dao.UsageDao;
+import com.cloud.usage.dao.UsageVMInstanceDao;
+import com.cloud.user.AccountVO;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentLocator;
+
+public class VMInstanceUsageParser {
+ public static final Logger s_logger = Logger.getLogger(VMInstanceUsageParser.class.getName());
+
+ private static ComponentLocator _locator = ComponentLocator.getLocator(UsageServer.Name, "usage-components.xml", "log4j-cloud_usage");
+ private static UsageDao m_usageDao = _locator.getDao(UsageDao.class);
+ private static UsageVMInstanceDao m_usageInstanceDao = _locator.getDao(UsageVMInstanceDao.class);
+
+ public static boolean parse(AccountVO account, Date startDate, Date endDate) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Parsing all VMInstance usage events for account: " + account.getId());
+ }
+ if ((endDate == null) || endDate.after(new Date())) {
+ endDate = new Date();
+ }
+
+ // - query usage_vm_instance table with the following criteria:
+ // - look for an entry for accountId with start date in the given range
+ // - look for an entry for accountId with end date in the given range
+ // - look for an entry for accountId with end date null (currently running vm or owned IP)
+ // - look for an entry for accountId with start date before given range *and* end date after given range
+ List usageInstances = m_usageInstanceDao.getUsageRecords(account.getId(), startDate, endDate);
+//ToDo: Add domainID for getting usage records
+
+ // This map has both the running time *and* the usage amount.
+ Map> usageVMUptimeMap = new HashMap>();
+ Map> allocatedVMMap = new HashMap>();
+
+ Map vmServiceOfferingMap = new HashMap();
+
+ // loop through all the usage instances, create a usage record for each
+ for (UsageVMInstanceVO usageInstance : usageInstances) {
+ long vmId = usageInstance.getVmInstanceId();
+ long soId = usageInstance.getSerivceOfferingId();
+ long zoneId = usageInstance.getZoneId();
+ long tId = usageInstance.getTemplateId();
+ int usageType = usageInstance.getUsageType();
+ String key = vmId + "-" + soId + "-" + usageType;
+
+ // store the info in the service offering map
+ vmServiceOfferingMap.put(key, new VMInfo(vmId, zoneId, soId, tId, usageInstance.getHypervisorType()));
+
+ Date vmStartDate = usageInstance.getStartDate();
+ Date vmEndDate = usageInstance.getEndDate();
+
+ if ((vmEndDate == null) || vmEndDate.after(endDate)) {
+ vmEndDate = endDate;
+ }
+
+ // clip the start date to the beginning of our aggregation range if the vm has been running for a while
+ if (vmStartDate.before(startDate)) {
+ vmStartDate = startDate;
+ }
+
+ long currentDuration = (vmEndDate.getTime() - vmStartDate.getTime()) + 1; // make sure this is an inclusive check for milliseconds (i.e. use n - m + 1 to find total number of millis to charge)
+
+ switch (usageType) {
+ case UsageTypes.ALLOCATED_VM:
+ updateVmUsageData(allocatedVMMap, key, usageInstance.getVmName(), currentDuration);
+ break;
+ case UsageTypes.RUNNING_VM:
+ updateVmUsageData(usageVMUptimeMap, key, usageInstance.getVmName(), currentDuration);
+ break;
+ }
+ }
+
+ for (String vmIdKey : usageVMUptimeMap.keySet()) {
+ Pair vmUptimeInfo = usageVMUptimeMap.get(vmIdKey);
+ long runningTime = vmUptimeInfo.second().longValue();
+
+ // Only create a usage record if we have a runningTime of bigger than zero.
+ if (runningTime > 0L) {
+ VMInfo info = vmServiceOfferingMap.get(vmIdKey);
+ createUsageRecord(UsageTypes.RUNNING_VM, runningTime, startDate, endDate, account, info.getVirtualMachineId(), vmUptimeInfo.first(), info.getZoneId(),
+ info.getServiceOfferingId(), info.getTemplateId(), info.getHypervisorType());
+ }
+ }
+
+ for (String vmIdKey : allocatedVMMap.keySet()) {
+ Pair vmAllocInfo = allocatedVMMap.get(vmIdKey);
+ long allocatedTime = vmAllocInfo.second().longValue();
+
+ // Only create a usage record if we have a runningTime of bigger than zero.
+ if (allocatedTime > 0L) {
+ VMInfo info = vmServiceOfferingMap.get(vmIdKey);
+ createUsageRecord(UsageTypes.ALLOCATED_VM, allocatedTime, startDate, endDate, account, info.getVirtualMachineId(), vmAllocInfo.first(), info.getZoneId(),
+ info.getServiceOfferingId(), info.getTemplateId(), info.getHypervisorType());
+ }
+ }
+
+ return true;
+ }
+
+ private static void updateVmUsageData(Map> usageDataMap, String key, String vmName, long duration) {
+ Pair vmUsageInfo = usageDataMap.get(key);
+ if (vmUsageInfo == null) {
+ vmUsageInfo = new Pair(vmName, new Long(duration));
+ } else {
+ Long runningTime = vmUsageInfo.second();
+ runningTime = new Long(runningTime.longValue() + duration);
+ vmUsageInfo = new Pair(vmUsageInfo.first(), runningTime);
+ }
+ usageDataMap.put(key, vmUsageInfo);
+ }
+
+ private static void createUsageRecord(int type, long runningTime, Date startDate, Date endDate, AccountVO account, long vmId, String vmName, long zoneId, long serviceOfferingId, long templateId, String hypervisorType) {
+ // Our smallest increment is hourly for now
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Total running time " + runningTime + "ms");
+ }
+
+ float usage = runningTime / 1000f / 60f / 60f;
+
+ DecimalFormat dFormat = new DecimalFormat("#.######");
+ String usageDisplay = dFormat.format(usage);
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating VM usage record for vm: " + vmName + ", type: " + type + ", usage: " + usageDisplay + ", startDate: " + startDate + ", endDate: " + endDate + ", for account: " + account.getId());
+ }
+
+ // Create the usage record
+ String usageDesc = vmName;
+ if (type == UsageTypes.ALLOCATED_VM) {
+ usageDesc += " allocated";
+ } else {
+ usageDesc += " running time";
+ }
+ usageDesc += " (ServiceOffering: " + serviceOfferingId + ") (Template: " + templateId + ")";
+ UsageVO usageRecord = new UsageVO(Long.valueOf(zoneId), account.getId(), account.getDomainId(), usageDesc, usageDisplay + " Hrs", type,
+ new Double(usage), Long.valueOf(vmId), vmName, Long.valueOf(serviceOfferingId), Long.valueOf(templateId), Long.valueOf(vmId), startDate, endDate, hypervisorType);
+ m_usageDao.persist(usageRecord);
+ }
+
+ private static class VMInfo {
+ private long virtualMachineId;
+ private long zoneId;
+ private long serviceOfferingId;
+ private long templateId;
+ private String hypervisorType;
+
+ public VMInfo(long vmId, long zId, long soId, long tId, String hypervisorType) {
+ virtualMachineId = vmId;
+ zoneId = zId;
+ serviceOfferingId = soId;
+ templateId = tId;
+ this.hypervisorType = hypervisorType;
+ }
+
+ public long getZoneId() {
+ return zoneId;
+ }
+ public long getVirtualMachineId() {
+ return virtualMachineId;
+ }
+ public long getServiceOfferingId() {
+ return serviceOfferingId;
+ }
+ public long getTemplateId() {
+ return templateId;
+ }
+ private String getHypervisorType(){
+ return hypervisorType;
+ }
+ }
+}
diff --git a/usage/src/com/cloud/usage/parser/VolumeUsageParser.java b/usage/src/com/cloud/usage/parser/VolumeUsageParser.java
new file mode 100644
index 00000000000..f805eb74f89
--- /dev/null
+++ b/usage/src/com/cloud/usage/parser/VolumeUsageParser.java
@@ -0,0 +1,172 @@
+/**
+ * * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved
+ *
+ */
+
+package com.cloud.usage.parser;
+
+import java.text.DecimalFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.usage.UsageServer;
+import com.cloud.usage.UsageTypes;
+import com.cloud.usage.UsageVO;
+import com.cloud.usage.UsageVolumeVO;
+import com.cloud.usage.dao.UsageDao;
+import com.cloud.usage.dao.UsageVolumeDao;
+import com.cloud.user.AccountVO;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentLocator;
+
+public class VolumeUsageParser {
+ public static final Logger s_logger = Logger.getLogger(VolumeUsageParser.class.getName());
+
+ private static ComponentLocator _locator = ComponentLocator.getLocator(UsageServer.Name, "usage-components.xml", "log4j-cloud_usage");
+ private static UsageDao m_usageDao = _locator.getDao(UsageDao.class);
+ private static UsageVolumeDao m_usageVolumeDao = _locator.getDao(UsageVolumeDao.class);
+
+ public static boolean parse(AccountVO account, Date startDate, Date endDate) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Parsing all Volume usage events for account: " + account.getId());
+ }
+ if ((endDate == null) || endDate.after(new Date())) {
+ endDate = new Date();
+ }
+
+ // - query usage_volume table with the following criteria:
+ // - look for an entry for accountId with start date in the given range
+ // - look for an entry for accountId with end date in the given range
+ // - look for an entry for accountId with end date null (currently running vm or owned IP)
+ // - look for an entry for accountId with start date before given range *and* end date after given range
+ List usageUsageVols = m_usageVolumeDao.getUsageRecords(account.getId(), account.getDomainId(), startDate, endDate, false, 0);
+
+ if(usageUsageVols.isEmpty()){
+ s_logger.debug("No volume usage events for this period");
+ return true;
+ }
+
+ // This map has both the running time *and* the usage amount.
+ Map> usageMap = new HashMap>();
+
+ Map diskOfferingMap = new HashMap();
+
+ // loop through all the usage volumes, create a usage record for each
+ for (UsageVolumeVO usageVol : usageUsageVols) {
+ long volId = usageVol.getId();
+ Long doId = usageVol.getDiskOfferingId();
+ long zoneId = usageVol.getZoneId();
+ Long templateId = usageVol.getTemplateId();
+ long size = usageVol.getSize();
+ String key = ""+volId;
+
+ diskOfferingMap.put(key, new VolInfo(volId, zoneId, doId, templateId, size));
+
+ Date volCreateDate = usageVol.getCreated();
+ Date volDeleteDate = usageVol.getDeleted();
+
+ if ((volDeleteDate == null) || volDeleteDate.after(endDate)) {
+ volDeleteDate = endDate;
+ }
+
+ // clip the start date to the beginning of our aggregation range if the vm has been running for a while
+ if (volCreateDate.before(startDate)) {
+ volCreateDate = startDate;
+ }
+
+ long currentDuration = (volDeleteDate.getTime() - volCreateDate.getTime()) + 1; // make sure this is an inclusive check for milliseconds (i.e. use n - m + 1 to find total number of millis to charge)
+
+
+ updateVolUsageData(usageMap, key, usageVol.getId(), currentDuration);
+ }
+
+ for (String volIdKey : usageMap.keySet()) {
+ Pair voltimeInfo = usageMap.get(volIdKey);
+ long useTime = voltimeInfo.second().longValue();
+
+ // Only create a usage record if we have a runningTime of bigger than zero.
+ if (useTime > 0L) {
+ VolInfo info = diskOfferingMap.get(volIdKey);
+ createUsageRecord(UsageTypes.VOLUME, useTime, startDate, endDate, account, info.getVolumeId(), info.getZoneId(), info.getDiskOfferingId(), info.getTemplateId(), info.getSize());
+ }
+ }
+
+ return true;
+ }
+
+ private static void updateVolUsageData(Map> usageDataMap, String key, long volId, long duration) {
+ Pair volUsageInfo = usageDataMap.get(key);
+ if (volUsageInfo == null) {
+ volUsageInfo = new Pair(new Long(volId), new Long(duration));
+ } else {
+ Long runningTime = volUsageInfo.second();
+ runningTime = new Long(runningTime.longValue() + duration);
+ volUsageInfo = new Pair(volUsageInfo.first(), runningTime);
+ }
+ usageDataMap.put(key, volUsageInfo);
+ }
+
+ private static void createUsageRecord(int type, long runningTime, Date startDate, Date endDate, AccountVO account, long volId, long zoneId, Long doId, Long templateId, long size) {
+ // Our smallest increment is hourly for now
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Total running time " + runningTime + "ms");
+ }
+
+ float usage = runningTime / 1000f / 60f / 60f;
+
+ DecimalFormat dFormat = new DecimalFormat("#.######");
+ String usageDisplay = dFormat.format(usage);
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Creating Volume usage record for vol: " + volId + ", usage: " + usageDisplay + ", startDate: " + startDate + ", endDate: " + endDate + ", for account: " + account.getId());
+ }
+
+ // Create the usage record
+ String usageDesc = "Volume Id: "+volId+" usage time";
+
+ if(templateId != null){
+ usageDesc += " (Template: " +templateId+ ")";
+ } else if(doId != null){
+ usageDesc += " (DiskOffering: " +doId+ ")";
+ }
+
+ UsageVO usageRecord = new UsageVO(zoneId, account.getId(), account.getDomainId(), usageDesc, usageDisplay + " Hrs", type,
+ new Double(usage), null, null, doId, templateId, volId, size, startDate, endDate);
+ m_usageDao.persist(usageRecord);
+ }
+
+ private static class VolInfo {
+ private long volId;
+ private long zoneId;
+ private Long diskOfferingId;
+ private Long templateId;
+ private long size;
+
+ public VolInfo(long volId, long zoneId, Long diskOfferingId, Long templateId, long size) {
+ this.volId = volId;
+ this.zoneId = zoneId;
+ this.diskOfferingId = diskOfferingId;
+ this.templateId = templateId;
+ this.size = size;
+ }
+ public long getZoneId() {
+ return zoneId;
+ }
+ public long getVolumeId() {
+ return volId;
+ }
+ public Long getDiskOfferingId() {
+ return diskOfferingId;
+ }
+ public Long getTemplateId() {
+ return templateId;
+ }
+ public long getSize() {
+ return size;
+ }
+ }
+}