diff --git a/plugins/event-bus/kafka/src/org/apache/cloudstack/mom/kafka/KafkaEventBus.java b/plugins/event-bus/kafka/src/org/apache/cloudstack/mom/kafka/KafkaEventBus.java index d959a5e6699..20a0e4c9178 100644 --- a/plugins/event-bus/kafka/src/org/apache/cloudstack/mom/kafka/KafkaEventBus.java +++ b/plugins/event-bus/kafka/src/org/apache/cloudstack/mom/kafka/KafkaEventBus.java @@ -47,7 +47,10 @@ import com.cloud.utils.PropertiesUtil; @Local(value = EventBus.class) public class KafkaEventBus extends ManagerBase implements EventBus { - private final String _topic = "cloudstack"; + public static final String DEFAULT_TOPIC = "cloudstack"; + public static final String DEFAULT_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + + private String _topic = null; private Producer _producer; private static final Logger s_logger = Logger.getLogger(KafkaEventBus.class); @@ -58,7 +61,23 @@ public class KafkaEventBus extends ManagerBase implements EventBus { try { final FileInputStream is = new FileInputStream(PropertiesUtil.findConfigFile("kafka.producer.properties")); + props.load(is); + + _topic = (String)props.remove("topic"); + if (_topic == null) { + _topic = DEFAULT_TOPIC; + } + + if (!props.containsKey("key.serializer")) { + props.put("key.serializer", DEFAULT_SERIALIZER); + } + + if (!props.containsKey("value.serializer")) { + props.put("value.serializer", DEFAULT_SERIALIZER); + } + + is.close(); } catch (Exception e) { throw new ConfigurationException("Could not read kafka properties");