Commit 2972b6e4 authored by Tom Käsler's avatar Tom Käsler

Add room event publisher for comment service

parent 22376c70
Pipeline #27602 failed with stages
in 4 minutes and 2 seconds
......@@ -208,6 +208,20 @@
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-ldap</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<exclusions>
<exclusion>
<artifactId>http-client</artifactId>
<groupId>com.rabbitmq</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
......
......@@ -32,6 +32,8 @@ public class AppInitializer extends AbstractAnnotationConfigDispatcherServletIni
PersistenceConfig.class,
SecurityConfig.class,
WebSocketConfig.class,
RabbitConfig.class,
TaskExecutorConfig.class,
};
}
......
package de.thm.arsnova.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import java.util.Arrays;
import java.util.List;
import static org.springframework.amqp.core.BindingBuilder.bind;
@Configuration
public class RabbitConfig {
@Bean
@Autowired
public ConnectionFactory connectionFactory(
@TaskExecutorConfig.RabbitConnectionExecutor TaskExecutor executor
) {
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost", 5672);
connectionFactory.setUsername("arsnova");
connectionFactory.setPassword("arsnova");
connectionFactory.setExecutor(executor);
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
@Autowired
public List<Declarable> fanoutBindings(RabbitAdmin rabbitAdmin) {
Queue fanoutQueue1 = new Queue("comment.service.room.created", true);
FanoutExchange fanoutExchange = new FanoutExchange("service.room.created.exchange");
rabbitAdmin.declareQueue(fanoutQueue1);
rabbitAdmin.declareExchange(fanoutExchange);
Binding b = BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
rabbitAdmin.declareBinding(b);
return Arrays.asList(
fanoutQueue1,
fanoutExchange,
bind(fanoutQueue1).to(fanoutExchange));
}
@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
@Bean
@Autowired
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(jackson2Converter());
return factory;
}
}
package de.thm.arsnova.config;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Configuration
public class TaskExecutorConfig {
@Target({FIELD, PARAMETER, METHOD})
@Retention(RUNTIME)
@Qualifier
public @interface RabbitConnectionExecutor {}
@Target({FIELD, PARAMETER, METHOD})
@Retention(RUNTIME)
@Qualifier
public @interface RabbitListenerExecutor {}
/**
* "The executor’s thread pool should be unbounded, or set appropriately for the expected utilization (usually, at least one thread per connection). If multiple channels are created on each connection then the pool size will affect the concurrency, so a variable (or simple cached) thread pool executor would be most suitable."
*
* Reference:
* http://docs.spring.io/spring-amqp/reference/htmlsingle/#connections
*/
@Bean
@Autowired
@RabbitConnectionExecutor
public TaskExecutor rabbitConnectionExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("RabbitConnection");
executor.afterPropertiesSet();
return executor;
}
/**
* Listeners would use a SimpleAsyncTaskExecutor by default (creates a new thread for each task).
*
* Reference:
* http://docs.spring.io/spring-amqp/reference/htmlsingle/#_threading_and_asynchronous_consumers
*/
@Bean
@Autowired
@RabbitListenerExecutor
public TaskExecutor rabbitListenerExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("RabbitListener");
executor.afterPropertiesSet();
return executor;
}
}
package de.thm.arsnova.event;
import de.thm.arsnova.model.Room;
import de.thm.arsnova.websocket.message.backend.RoomCreated;
import de.thm.arsnova.websocket.message.backend.RoomCreatedPayload;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
*
* EventToTopicPublisher listens to application events, converts them to messages and sends them to topics.
* Use the 'backend.' prefix for internal events that should not be sent to frontend clients.
* Restrict messages to what is needed in other services as long as the arsnova-backend is owner of the functionality.
*
*/
@Component
public class EventToTopicPublisher {
final private RabbitTemplate messagingTemplate;
@Autowired
public EventToTopicPublisher(RabbitTemplate t) {
messagingTemplate = t;
}
@EventListener
public void publishRoomStateEvent(final AfterCreationEvent<Room> event) {
final Room room = event.getEntity();
final RoomCreatedPayload payload = new RoomCreatedPayload(room);
final RoomCreated message = new RoomCreated(payload);
messagingTemplate.convertAndSend(
"service.room.created.exchange",
"",
message
);
}
}
......@@ -20,6 +20,8 @@ package de.thm.arsnova.event;
import de.thm.arsnova.model.Content;
import de.thm.arsnova.model.Entity;
import de.thm.arsnova.model.Room;
import de.thm.arsnova.websocket.message.backend.RoomCreated;
import de.thm.arsnova.websocket.message.backend.RoomCreatedPayload;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.event.EventListener;
......
......@@ -3,7 +3,7 @@ package de.thm.arsnova.websocket.message;
public class WebSocketMessage<P extends WebSocketPayload> {
private String type;
private P payload;
protected P payload;
public WebSocketMessage(String type) {
this.type = type;
......
package de.thm.arsnova.websocket.message.backend;
import de.thm.arsnova.websocket.message.WebSocketMessage;
public class RoomCreated extends WebSocketMessage<RoomCreatedPayload> {
public RoomCreated() {
super(RoomCreated.class.getSimpleName());
}
public RoomCreated(RoomCreatedPayload p) {
super(RoomCreated.class.getSimpleName());
payload = p;
}
@Override
public String toString() {
return "RoomCreated{" +
"payload=" + payload +
'}';
}
}
package de.thm.arsnova.websocket.message.backend;
import de.thm.arsnova.model.Room;
import de.thm.arsnova.websocket.message.WebSocketPayload;
import java.util.Objects;
import java.util.Set;
public class RoomCreatedPayload implements WebSocketPayload {
private String id;
private String ownerId;
private Set<Room.Moderator> moderators;
public RoomCreatedPayload() {
}
public RoomCreatedPayload(String id, String ownerId, Set<Room.Moderator> moderators) {
this.id = id;
this.ownerId = ownerId;
this.moderators = moderators;
}
public RoomCreatedPayload(Room room) {
this.id = room.getId();
this.ownerId = room.getOwnerId();
this.moderators = room.getModerators();
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getOwnerId() {
return ownerId;
}
public void setOwnerId(String ownerId) {
this.ownerId = ownerId;
}
public Set<Room.Moderator> getModerators() {
return moderators;
}
public void setModerators(Set<Room.Moderator> moderators) {
this.moderators = moderators;
}
@Override
public String toString() {
return "RoomCreatedPayload{" +
"id='" + id + '\'' +
", ownerId='" + ownerId + '\'' +
", moderators=" + moderators +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RoomCreatedPayload that = (RoomCreatedPayload) o;
return Objects.equals(id, that.id) &&
Objects.equals(ownerId, that.ownerId) &&
Objects.equals(moderators, that.moderators);
}
@Override
public int hashCode() {
return Objects.hash(id, ownerId, moderators);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment