Skip to content

Commit

Permalink
Refactor the WebSocket system
Browse files Browse the repository at this point in the history
Simpler, more consistent design.

Uses several queues (private messages, room messages and broadcasts).

Sent messages are always echoed back. This allows real time updates when using multiple clients at the same time.

Enable SockJS if some client wants to use it.
  • Loading branch information
zapek committed Dec 11, 2024
1 parent b6699d0 commit 0b02548
Show file tree
Hide file tree
Showing 24 changed files with 539 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,85 +19,116 @@

package io.xeres.app.api.controller.chat;

import io.xeres.app.service.MessageService;
import io.xeres.app.xrs.service.chat.ChatRsService;
import io.xeres.common.id.LocationId;
import io.xeres.common.message.MessageType;
import io.xeres.common.message.chat.ChatMessage;
import io.xeres.common.message.chat.ChatRoomMessage;
import jakarta.validation.Valid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.stereotype.Controller;

import java.util.Objects;

import static io.xeres.common.message.MessageHeaders.DESTINATION_ID;
import static io.xeres.common.message.MessageHeaders.MESSAGE_TYPE;
import static io.xeres.common.rest.PathConfig.CHAT_PATH;
import static io.xeres.common.message.MessagePath.*;

/**
* This controller receives WebSocket messages sent to /app, which means they're produced by the app user.
* <p>
* <img src="doc-files/websocket.svg" alt="WebSocket diagram">
*/
@Controller
@MessageMapping(CHAT_ROOT)
public class ChatMessageController
{
private static final Logger log = LoggerFactory.getLogger(ChatMessageController.class);

private final ChatRsService chatRsService;
private final MessageService messageService;

public ChatMessageController(ChatRsService chatRsService)
public ChatMessageController(ChatRsService chatRsService, MessageService messageService)
{
this.chatRsService = chatRsService;
this.messageService = messageService;
}

@MessageMapping(CHAT_PATH)
public void processMessageFromClient(SimpMessageHeaderAccessor accessor, @Payload @Valid ChatMessage message)
@MessageMapping(CHAT_PRIVATE_DESTINATION)
public void processPrivateChatMessageFromProducer(@Header(DESTINATION_ID) String destinationId, @Header(MESSAGE_TYPE) MessageType messageType, @Payload @Valid ChatMessage chatMessage)
{
var destinationId = accessor.getFirstNativeHeader(DESTINATION_ID);
var messageType = MessageType.valueOf(accessor.getFirstNativeHeader(MESSAGE_TYPE));

switch (messageType)
{
case CHAT_PRIVATE_MESSAGE -> {
log.debug("Received websocket message, sending to peer location: {}, content {}", destinationId, message);
chatRsService.sendPrivateMessage(LocationId.fromString(destinationId), message.getContent());
case CHAT_PRIVATE_MESSAGE ->
{
log.debug("Received websocket message, sending to peer location: {}, content {}", destinationId, chatMessage);
var locationId = LocationId.fromString(destinationId);
chatRsService.sendPrivateMessage(locationId, chatMessage.getContent());
chatMessage.setOwn(true);
messageService.sendToConsumers(BROKER_PREFIX + CHAT_ROOT + CHAT_PRIVATE_DESTINATION, messageType, locationId, chatMessage);
}
case CHAT_ROOM_MESSAGE ->
case CHAT_TYPING_NOTIFICATION ->
{
log.debug("Sending to room: {}, content {}", destinationId, message);
log.debug("Sending chat typing notification...");
Objects.requireNonNull(destinationId);
chatRsService.sendChatRoomMessage(Long.parseLong(destinationId), message.getContent());
chatRsService.sendPrivateTypingNotification(LocationId.fromString(destinationId));
}
case CHAT_BROADCAST_MESSAGE ->
case CHAT_AVATAR ->
{
log.debug("Sending broadcast message");
chatRsService.sendBroadcastMessage(message.getContent());
log.debug("Requesting avatar...");
Objects.requireNonNull(destinationId);
chatRsService.sendAvatarRequest(LocationId.fromString(destinationId));
}
case CHAT_TYPING_NOTIFICATION ->
default -> throw new IllegalStateException("Unexpected value: " + messageType);
}
}

@MessageMapping(CHAT_ROOM_DESTINATION)
public void processChatRoomMessageFromProducer(@Header(DESTINATION_ID) String destinationId, @Header(MESSAGE_TYPE) MessageType messageType, @Payload @Valid ChatRoomMessage chatRoomMessage)
{
switch (messageType)
{
case CHAT_ROOM_MESSAGE ->
{
log.debug("Sending chat typing notification...");
log.debug("Sending to room: {}, content {}", destinationId, chatRoomMessage);
Objects.requireNonNull(destinationId);
chatRsService.sendPrivateTypingNotification(LocationId.fromString(destinationId));
var chatRoomId = Long.parseLong(destinationId);
chatRsService.sendChatRoomMessage(chatRoomId, chatRoomMessage.getContent());
messageService.sendToConsumers(BROKER_PREFIX + CHAT_ROOT + CHAT_ROOM_DESTINATION, messageType, chatRoomId, chatRoomMessage);
}
case CHAT_ROOM_TYPING_NOTIFICATION ->
{
log.debug("Sending chat room typing notification...");
Objects.requireNonNull(destinationId);
chatRsService.sendChatRoomTypingNotification(Long.parseLong(destinationId));
}
case CHAT_AVATAR ->
default -> throw new IllegalStateException("Unexpected value: " + messageType);
}
}

@MessageMapping(CHAT_BROADCAST_DESTINATION)
public void processBroadcastMessageFromProducer(@Header(DESTINATION_ID) String destinationId, @Header(MESSAGE_TYPE) MessageType messageType, @Payload @Valid ChatMessage chatMessage)
{
switch (messageType)
{
case CHAT_BROADCAST_MESSAGE ->
{
log.debug("Requesting avatar...");
Objects.requireNonNull(destinationId);
chatRsService.sendAvatarRequest(LocationId.fromString(destinationId));
log.debug("Sending broadcast message");
chatRsService.sendBroadcastMessage(chatMessage.getContent());
}
default -> log.error("Couldn't figure out which message to send");
default -> throw new IllegalStateException("Unexpected value: " + messageType);
}
}

@MessageExceptionHandler
@SendToUser("/queue/errors") // XXX: how can we use this? Well, it works... just have to subscribe to it
@SendToUser(DIRECT_PREFIX + "/errors") // XXX: how can we use this? Well, it works... just have to subscribe to it
public String handleException(Throwable e)
{
log.debug("Got exception: {}", e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
@startuml
'https://plantuml.com/component-diagram


package "UI Client" {
component producer #Cyan [
Producer
/app/chat/private
]
component consumer #Cyan [
Consumer
/topic/chat/private
]
}

package "App Server" {
[MessageHandler] #Green
}

cloud "RS Network" {
[Friend]
}

[producer] --> [MessageHandler] : /app
[MessageHandler] --> [consumer] : /topic
[MessageHandler] <-> [Friend]

@enduml
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;

import static io.xeres.common.message.MessagePath.APP_PREFIX;
import static io.xeres.common.message.MessagePath.BROKER_PREFIX;
import static io.xeres.common.message.MessagingConfiguration.MAXIMUM_MESSAGE_SIZE;
import static io.xeres.common.rest.PathConfig.CHAT_PATH;

/**
* Configuration of the WebSocket. This is used for anything that requires a persistent connection from
Expand All @@ -49,14 +50,14 @@ public class WebSocketMessageBrokerConfiguration implements WebSocketMessageBrok
public void registerStompEndpoints(StompEndpointRegistry registry)
{
registry.addEndpoint("/ws");
//registry.addEndpoint("/ws").withSockJS(); apparently you can *add* that one too
registry.addEndpoint("/ws").withSockJS();
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry)
{
registry.setApplicationDestinationPrefixes("/app"); // this is for @Controller annotated endpoints
registry.enableSimpleBroker(CHAT_PATH); // this is for the broker (subscriptions, ...)
registry.enableSimpleBroker(BROKER_PREFIX); // this is for the broker (subscriptions, ...)
registry.setApplicationDestinationPrefixes(APP_PREFIX); // this is for @Controller annotated endpoints using @MessageMapping and such
}

@EventListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,33 @@
import io.xeres.app.xrs.item.Item;
import io.xeres.app.xrs.serialization.SerializationFlags;
import io.xeres.app.xrs.service.RsService;
import io.xeres.common.id.Identifier;
import io.xeres.common.location.Availability;
import io.xeres.common.message.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Component;

import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;

import static io.xeres.app.net.peer.PeerAttribute.PEER_CONNECTION;
import static io.xeres.common.message.MessageHeaders.buildMessageHeaders;

@Component
public class PeerConnectionManager
{
private static final Logger log = LoggerFactory.getLogger(PeerConnectionManager.class);

private final SimpMessageSendingOperations messagingTemplate;
private final StatusNotificationService statusNotificationService;
private final AvailabilityNotificationService availabilityNotificationService;
private final ApplicationEventPublisher publisher;

private final Map<Long, PeerConnection> peers = new ConcurrentHashMap<>();

public PeerConnectionManager(SimpMessageSendingOperations messagingTemplate, StatusNotificationService statusNotificationService, AvailabilityNotificationService availabilityNotificationService, ApplicationEventPublisher publisher)
public PeerConnectionManager(StatusNotificationService statusNotificationService, AvailabilityNotificationService availabilityNotificationService, ApplicationEventPublisher publisher)
{
this.messagingTemplate = messagingTemplate;
this.statusNotificationService = statusNotificationService;
this.availabilityNotificationService = availabilityNotificationService;
this.publisher = publisher;
Expand Down Expand Up @@ -186,30 +179,6 @@ public int getNumberOfPeers()
return peers.size();
}

public void sendToClientSubscriptions(String path, MessageType type, Object payload)
{
var headers = buildMessageHeaders(type);
sendToClientSubscriptions(path, headers, payload);
}

public void sendToClientSubscriptions(String path, MessageType type, long destination, Object payload)
{
var headers = buildMessageHeaders(type, String.valueOf(destination));
sendToClientSubscriptions(path, headers, payload);
}

public void sendToClientSubscriptions(String path, MessageType type, Identifier destination, Object payload)
{
var headers = buildMessageHeaders(type, destination.toString());
sendToClientSubscriptions(path, headers, payload);
}

public void sendToClientSubscriptions(String path, Map<String, Object> headers, Object payload)
{
Objects.requireNonNull(payload, "Payload *must* be an object that can be serialized to JSON");
messagingTemplate.convertAndSend(path, payload, headers);
}

private void updateCurrentUsersCount()
{
statusNotificationService.setCurrentUsersCount(peers.size());
Expand Down
Loading

0 comments on commit 0b02548

Please sign in to comment.