...
 
Commits (6)
......@@ -235,6 +235,10 @@
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator-annotation-processor</artifactId>
......@@ -244,6 +248,16 @@
<groupId>org.mortbay.jasper</groupId>
<artifactId>apache-el</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<exclusions>
<exclusion>
<artifactId>http-client</artifactId>
<groupId>com.rabbitmq</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
......
......@@ -32,6 +32,8 @@ public class AppInitializer extends AbstractAnnotationConfigDispatcherServletIni
PersistenceConfig.class,
SecurityConfig.class,
WebSocketConfig.class,
RabbitConfig.class,
TaskExecutorConfig.class,
};
}
......
package de.thm.arsnova.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import java.util.Arrays;
import java.util.List;
import static org.springframework.amqp.core.BindingBuilder.bind;
@Configuration
public class RabbitConfig {
private final static String roomAfterCreationEventExchangeName = "backend.room.aftercreationevent";
private final static String roomAfterDeletionEventExchangeName = "backend.room.afterdeletionevent";
private final static String roomAfterFullUpdateEventExchangeName = "backend.room.afterfullupdateevent";
private final static String roomAfterPatchEventExchangeName = "backend.room.afterpatchevent";
private final static String roomAfterUpdateEventExchangeName = "backend.room.afterupdateevent";
@Bean
@Autowired
public ConnectionFactory connectionFactory(
@TaskExecutorConfig.RabbitConnectionExecutor TaskExecutor executor
) {
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost", 5672);
connectionFactory.setUsername("arsnova");
connectionFactory.setPassword("arsnova");
connectionFactory.setExecutor(executor);
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
@Autowired
public List<Declarable> declareExchanges(RabbitAdmin rabbitAdmin) {
FanoutExchange roomAfterCreationEventExchange = new FanoutExchange(roomAfterCreationEventExchangeName);
FanoutExchange roomAfterDeletionEventExchange = new FanoutExchange(roomAfterDeletionEventExchangeName);
FanoutExchange roomAfterFullUpdateEventExchange = new FanoutExchange(roomAfterFullUpdateEventExchangeName);
FanoutExchange roomAfterPatchEventExchange = new FanoutExchange(roomAfterPatchEventExchangeName);
FanoutExchange roomAfterUpdateEventExchange = new FanoutExchange(roomAfterUpdateEventExchangeName);
rabbitAdmin.declareExchange(roomAfterCreationEventExchange);
rabbitAdmin.declareExchange(roomAfterDeletionEventExchange);
rabbitAdmin.declareExchange(roomAfterFullUpdateEventExchange);
rabbitAdmin.declareExchange(roomAfterPatchEventExchange);
rabbitAdmin.declareExchange(roomAfterUpdateEventExchange);
return Arrays.asList(
roomAfterCreationEventExchange,
roomAfterDeletionEventExchange,
roomAfterFullUpdateEventExchange,
roomAfterPatchEventExchange,
roomAfterUpdateEventExchange
);
}
@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
@Bean
@Autowired
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(jackson2Converter());
return factory;
}
}
package de.thm.arsnova.config;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Configuration
public class TaskExecutorConfig {
@Target({FIELD, PARAMETER, METHOD})
@Retention(RUNTIME)
@Qualifier
public @interface RabbitConnectionExecutor {}
@Target({FIELD, PARAMETER, METHOD})
@Retention(RUNTIME)
@Qualifier
public @interface RabbitListenerExecutor {}
/**
* "The executor’s thread pool should be unbounded, or set appropriately for the expected utilization (usually, at least one thread per connection). If multiple channels are created on each connection then the pool size will affect the concurrency, so a variable (or simple cached) thread pool executor would be most suitable."
*
* Reference:
* http://docs.spring.io/spring-amqp/reference/htmlsingle/#connections
*/
@Bean
@Autowired
@RabbitConnectionExecutor
public TaskExecutor rabbitConnectionExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("RabbitConnection");
executor.afterPropertiesSet();
return executor;
}
/**
* Listeners would use a SimpleAsyncTaskExecutor by default (creates a new thread for each task).
*
* Reference:
* http://docs.spring.io/spring-amqp/reference/htmlsingle/#_threading_and_asynchronous_consumers
*/
@Bean
@Autowired
@RabbitListenerExecutor
public TaskExecutor rabbitListenerExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("RabbitListener");
executor.afterPropertiesSet();
return executor;
}
}
......@@ -24,4 +24,9 @@ public class AfterCreationEvent<E extends Entity> extends CrudEvent<E> {
public AfterCreationEvent(final Object source, final E entity) {
super(source, entity);
}
@Override
public String toString() {
return "AfterCreationEvent{}";
}
}
package de.thm.arsnova.event;
import com.fasterxml.jackson.annotation.JsonFilter;
import com.fasterxml.jackson.annotation.JsonIgnoreType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import de.thm.arsnova.model.Entity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
*
* EventToTopicPublisher listens to application events, converts them to messages and sends them to topics.
* Use the 'backend.' prefix for internal events that should not be sent to frontend clients.
* Restrict messages to what is needed in other services as long as the arsnova-backend is owner of the functionality.
*
*/
@Component
public class EventToTopicPublisher {
@JsonFilter("ampqFilter")
public class AmqpFilter {
}
@JsonIgnoreType
public class MyMixInForIgnoreType {}
private static final Logger logger = LoggerFactory.getLogger(EventToTopicPublisher.class);
private class MessagePublishingConfig {
public String entityType;
public String applicationEventType;
public Set<String> attributes = new HashSet<>();
}
final private RabbitTemplate messagingTemplate;
private List<MessagePublishingConfig> config = new ArrayList<>();
@Autowired
public EventToTopicPublisher(RabbitTemplate t) {
messagingTemplate = t;
MessagePublishingConfig c = new MessagePublishingConfig();
c.entityType = "Room";
c.applicationEventType = "AfterCreationEvent";
c.attributes.add("id");
c.attributes.add("ownerId");
config.add(c);
}
@EventListener
public <T extends CrudEvent, E extends Entity> void publishRoomStateEvent(final T event) {
logger.trace("Inspecting CRUD Event: {}", event);
String eventType = event.getClass().getSimpleName();
String entityType = event.getEntity().getClass().getSimpleName();
for (MessagePublishingConfig c : this.config) {
if (c.applicationEventType.equals(eventType)) {
String exchangeName = "backend." + entityType.toLowerCase() + "." + eventType.toLowerCase();
ObjectMapper mapper = new ObjectMapper();
mapper.addMixIn(Object.class, AmqpFilter.class);
mapper.addMixIn(org.springframework.core.ResolvableType.class, MyMixInForIgnoreType.class);
SimpleFilterProvider filterProvider = new SimpleFilterProvider();
filterProvider.addFilter("ampqFilter", SimpleBeanPropertyFilter.serializeAllExcept(c.attributes));
mapper.setFilterProvider(filterProvider);
try {
String t = mapper.writeValueAsString(event);
logger.error("#################");
logger.error(t);
logger.error("#################");
} catch (JsonProcessingException e) {
System.out.println("adsfjklajskdlf lkasdjfkl asdjökfj aölsdj fklas döf");
logger.error(e.getMessage());
}
}
}
}
/* @EventListener
public void publishRoomStateEvent(final AfterCreationEvent<Room> event) {
final Room room = event.getEntity();
final RoomCreatedPayload payload = new RoomCreatedPayload(room);
final RoomCreated message = new RoomCreated(payload);
messagingTemplate.convertAndSend(
"service.room.created.exchange",
"",
message
);
}*/
}
package de.thm.arsnova.model.serialization;
import com.fasterxml.jackson.annotation.JsonFilter;
/*@JsonFilter("ampqFilter")
public class AmqpFilter {
}*/
......@@ -3,7 +3,7 @@ package de.thm.arsnova.websocket.message;
public class WebSocketMessage<P extends WebSocketPayload> {
private String type;
private P payload;
protected P payload;
public WebSocketMessage(final String type) {
this.type = type;
......
package de.thm.arsnova.websocket.message.backend;
import de.thm.arsnova.websocket.message.WebSocketMessage;
public class RoomCreated extends WebSocketMessage<RoomCreatedPayload> {
public RoomCreated() {
super(RoomCreated.class.getSimpleName());
}
public RoomCreated(RoomCreatedPayload p) {
super(RoomCreated.class.getSimpleName());
payload = p;
}
@Override
public String toString() {
return "RoomCreated{" +
"payload=" + payload +
'}';
}
}
package de.thm.arsnova.websocket.message.backend;
import de.thm.arsnova.model.Room;
import de.thm.arsnova.websocket.message.WebSocketPayload;
import java.util.Objects;
import java.util.Set;
public class RoomCreatedPayload implements WebSocketPayload {
private String id;
private String ownerId;
private Set<Room.Moderator> moderators;
public RoomCreatedPayload() {
}
public RoomCreatedPayload(String id, String ownerId, Set<Room.Moderator> moderators) {
this.id = id;
this.ownerId = ownerId;
this.moderators = moderators;
}
public RoomCreatedPayload(Room room) {
this.id = room.getId();
this.ownerId = room.getOwnerId();
this.moderators = room.getModerators();
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getOwnerId() {
return ownerId;
}
public void setOwnerId(String ownerId) {
this.ownerId = ownerId;
}
public Set<Room.Moderator> getModerators() {
return moderators;
}
public void setModerators(Set<Room.Moderator> moderators) {
this.moderators = moderators;
}
@Override
public String toString() {
return "RoomCreatedPayload{" +
"id='" + id + '\'' +
", ownerId='" + ownerId + '\'' +
", moderators=" + moderators +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RoomCreatedPayload that = (RoomCreatedPayload) o;
return Objects.equals(id, that.id) &&
Objects.equals(ownerId, that.ownerId) &&
Objects.equals(moderators, that.moderators);
}
@Override
public int hashCode() {
return Objects.hash(id, ownerId, moderators);
}
}