mirror of https://github.com/apache/cloudstack.git
make topic configurable, provide default serializers
Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>
This commit is contained in:
parent
04b30e0e66
commit
dd77fdf6c6
|
|
@ -47,7 +47,10 @@ import com.cloud.utils.PropertiesUtil;
|
|||
@Local(value = EventBus.class)
|
||||
public class KafkaEventBus extends ManagerBase implements EventBus {
|
||||
|
||||
private final String _topic = "cloudstack";
|
||||
public static final String DEFAULT_TOPIC = "cloudstack";
|
||||
public static final String DEFAULT_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
|
||||
|
||||
private String _topic = null;
|
||||
private Producer<String,String> _producer;
|
||||
private static final Logger s_logger = Logger.getLogger(KafkaEventBus.class);
|
||||
|
||||
|
|
@ -58,7 +61,23 @@ public class KafkaEventBus extends ManagerBase implements EventBus {
|
|||
|
||||
try {
|
||||
final FileInputStream is = new FileInputStream(PropertiesUtil.findConfigFile("kafka.producer.properties"));
|
||||
|
||||
props.load(is);
|
||||
|
||||
_topic = (String)props.remove("topic");
|
||||
if (_topic == null) {
|
||||
_topic = DEFAULT_TOPIC;
|
||||
}
|
||||
|
||||
if (!props.containsKey("key.serializer")) {
|
||||
props.put("key.serializer", DEFAULT_SERIALIZER);
|
||||
}
|
||||
|
||||
if (!props.containsKey("value.serializer")) {
|
||||
props.put("value.serializer", DEFAULT_SERIALIZER);
|
||||
}
|
||||
|
||||
|
||||
is.close();
|
||||
} catch (Exception e) {
|
||||
throw new ConfigurationException("Could not read kafka properties");
|
||||
|
|
|
|||
Loading…
Reference in New Issue