parent
f07adec658
commit
4e70fb4ffd
@ -0,0 +1,31 @@
|
|||||||
|
package com.luojia_channel.modules.interact.controller;
|
||||||
|
|
||||||
|
import com.luojia_channel.common.domain.Result;
|
||||||
|
import com.luojia_channel.common.domain.UserDTO;
|
||||||
|
import com.luojia_channel.modules.interact.service.FollowService;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/follow")
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class FollowController {
|
||||||
|
|
||||||
|
private final FollowService followService;
|
||||||
|
@PutMapping("/{id}/{isFollow}")
|
||||||
|
public Result<Void> follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow){
|
||||||
|
followService.follow(followUserId, isFollow);
|
||||||
|
return Result.success();
|
||||||
|
}
|
||||||
|
@GetMapping("/or/not/{id}")
|
||||||
|
public Result<Boolean> isFollow(@PathVariable("id") Long followUserId){
|
||||||
|
return Result.success(followService.isFollow(followUserId));
|
||||||
|
}
|
||||||
|
@GetMapping("/common/{id}")
|
||||||
|
public Result<List<UserDTO>> followCommons(@PathVariable("id") Long id){
|
||||||
|
return Result.success(followService.followCommons(id));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,18 @@
|
|||||||
|
package com.luojia_channel.modules.interact.mapper;
|
||||||
|
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
|
import com.luojia_channel.modules.interact.entity.Follow;
|
||||||
|
import org.apache.ibatis.annotations.Delete;
|
||||||
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
import org.apache.ibatis.annotations.Select;
|
||||||
|
|
||||||
|
|
||||||
|
@Mapper
|
||||||
|
public interface FollowMapper extends BaseMapper<Follow> {
|
||||||
|
@Delete("delete from follow where user_id = #{userId} and follow_user_id = #{followUserId}")
|
||||||
|
boolean delete(Long userId, Long followUserId);
|
||||||
|
|
||||||
|
@Select("select count(*) from follow where user_id = #{userId} and follow_user_id = #{followUserId}")
|
||||||
|
Integer queryCount(Long userId, Long followUserId);
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
package com.luojia_channel.modules.interact.service;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.service.IService;
|
||||||
|
import com.luojia_channel.common.domain.UserDTO;
|
||||||
|
import com.luojia_channel.modules.interact.entity.Follow;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
|
public interface FollowService extends IService<Follow> {
|
||||||
|
|
||||||
|
void follow(Long followUserId, Boolean isFollow);
|
||||||
|
|
||||||
|
boolean isFollow(Long followUserId);
|
||||||
|
|
||||||
|
List<UserDTO> followCommons(Long id);
|
||||||
|
}
|
@ -1,17 +1,27 @@
|
|||||||
package com.luojia_channel.modules.message.config;
|
package com.luojia_channel.modules.message.config;
|
||||||
|
|
||||||
|
import jakarta.servlet.ServletContext;
|
||||||
|
import org.springframework.boot.web.servlet.ServletContextInitializer;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
||||||
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
|
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
||||||
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
|
import org.springframework.web.util.WebAppRootListener;
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@EnableWebSocket
|
@EnableWebSocket
|
||||||
public class WebSocketConfig implements WebSocketConfigurer {
|
public class WebSocketConfig implements ServletContextInitializer {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ServerEndpointExporter serverEndpointExporter() {
|
||||||
|
return new ServerEndpointExporter();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
|
public void onStartup(ServletContext servletContext) {
|
||||||
registry.addHandler(new MyWebSocketHandler(), "/ws")
|
servletContext.addListener(WebAppRootListener.class);
|
||||||
.setAllowedOrigins("*"); // 允许跨域
|
servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize", (1024 * 200) + "");
|
||||||
|
servletContext.setInitParameter("org.apache.tomcat.websocket.binaryBufferSize", (1024 * 200) + "");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,20 @@
|
|||||||
|
package com.luojia_channel.modules.message.entity;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.*;
|
||||||
|
import lombok.Data;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息实体类
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@TableName("message")
|
||||||
|
public class MessageDO {
|
||||||
|
@TableId(type = IdType.AUTO)
|
||||||
|
private Long id;
|
||||||
|
private Integer messageType; // 0-私聊, 1-系统消息
|
||||||
|
private String content;
|
||||||
|
private Long senderId;
|
||||||
|
private Long receiverId;
|
||||||
|
private LocalDateTime createTime;
|
||||||
|
}
|
@ -0,0 +1,10 @@
|
|||||||
|
package com.luojia_channel.modules.message.mapper;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
|
import com.luojia_channel.modules.message.entity.MessageDO;
|
||||||
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
|
||||||
|
@Mapper
|
||||||
|
public interface MessageMapper extends BaseMapper<MessageDO> {
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,36 @@
|
|||||||
|
package com.luojia_channel.modules.message.mq.consumer;
|
||||||
|
|
||||||
|
import cn.hutool.core.bean.BeanUtil;
|
||||||
|
import com.luojia_channel.modules.message.dto.MessageRequest;
|
||||||
|
import com.luojia_channel.modules.message.mq.domain.NotificationMessage;
|
||||||
|
import com.luojia_channel.modules.message.server.WebSocketServer;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class NotificationListener {
|
||||||
|
public static final String EXCHANGE_NAME = "notify.exchange";
|
||||||
|
public static final String QUEUE_NAME = "notify.queue";
|
||||||
|
public static final String ROUTING_KEY = "notify.routing.key";
|
||||||
|
|
||||||
|
private final WebSocketServer webSocketServer;
|
||||||
|
@RabbitListener(bindings = @QueueBinding(
|
||||||
|
value = @Queue(name = QUEUE_NAME),
|
||||||
|
exchange = @Exchange(name = EXCHANGE_NAME, type = "direct"),
|
||||||
|
key = ROUTING_KEY
|
||||||
|
))
|
||||||
|
|
||||||
|
public void handleNotification(NotificationMessage message) {
|
||||||
|
MessageRequest request = BeanUtil.copyProperties(message, MessageRequest.class);
|
||||||
|
if (message.getMessageType() == 0) {
|
||||||
|
webSocketServer.sendPrivateMessage(message.getSenderId(), request);
|
||||||
|
} else {
|
||||||
|
webSocketServer.sendSystemNotification(request);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,19 @@
|
|||||||
|
package com.luojia_channel.modules.message.mq.domain;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@Builder
|
||||||
|
public class NotificationMessage {
|
||||||
|
private Long senderId;
|
||||||
|
private Long receiverId;
|
||||||
|
private String content;
|
||||||
|
private String senderName; // 用户名
|
||||||
|
private String senderAvatar; // 用户头像
|
||||||
|
private Integer messageType; // 0-私信 1-系统
|
||||||
|
}
|
@ -0,0 +1,141 @@
|
|||||||
|
package com.luojia_channel.modules.message.server;
|
||||||
|
|
||||||
|
import cn.hutool.core.bean.BeanUtil;
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.luojia_channel.modules.message.dto.MessageRequest;
|
||||||
|
import com.luojia_channel.modules.message.dto.MessageResponse;
|
||||||
|
import com.luojia_channel.modules.message.entity.MessageDO;
|
||||||
|
import com.luojia_channel.modules.message.mapper.MessageMapper;
|
||||||
|
import jakarta.websocket.*;
|
||||||
|
import jakarta.websocket.server.PathParam;
|
||||||
|
import jakarta.websocket.server.ServerEndpoint;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@ServerEndpoint(value = "/connect/{userId}")
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class WebSocketServer {
|
||||||
|
// 存储在线用户会话 <userId, Session>
|
||||||
|
private final static Map<String, Session> CLIENTS = new ConcurrentHashMap<>();
|
||||||
|
private final MessageMapper messageMapper;
|
||||||
|
|
||||||
|
@OnOpen
|
||||||
|
public void onOpen(@PathParam("userId") String userId,
|
||||||
|
Session session) {
|
||||||
|
// 将新连接加入客户端列表
|
||||||
|
CLIENTS.put(userId, session);
|
||||||
|
log.info("用户 [{}] 已连接,当前在线人数:{}", userId, CLIENTS.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnClose
|
||||||
|
public void onClose(@PathParam("userId") String userId,
|
||||||
|
Session session) {
|
||||||
|
// 移除断开连接的用户
|
||||||
|
CLIENTS.remove(userId);
|
||||||
|
log.info("用户 [{}] 已断开,当前在线人数:{}", userId, CLIENTS.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnError
|
||||||
|
public void onError(@PathParam("userId") String userId,
|
||||||
|
Session session, Throwable e) {
|
||||||
|
log.error("用户 [{}] 发生错误: {}", userId, e.getMessage(), e);
|
||||||
|
try {
|
||||||
|
session.close();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
log.error("关闭会话失败: {}", ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnMessage
|
||||||
|
public void onMessage(@PathParam("userId") String senderId,
|
||||||
|
String message, Session session) {
|
||||||
|
try {
|
||||||
|
// 解析客户端发送的 JSON 消息
|
||||||
|
MessageRequest request = JSON.parseObject(message, MessageRequest.class);
|
||||||
|
|
||||||
|
switch (request.getMessageType()) {
|
||||||
|
case 0:
|
||||||
|
sendPrivateMessage(Long.parseLong(senderId), request);
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
sendSystemNotification(request);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
log.warn("未知消息类型: {}", request.getMessageType());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("消息处理失败: {}", e.getMessage());
|
||||||
|
sendErrorResponse(session, "消息处理失败,请稍后重试");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送一对一私信
|
||||||
|
public void sendPrivateMessage(Long senderId, MessageRequest request) {
|
||||||
|
Long receiverId = request.getReceiverId();
|
||||||
|
Session receiverSession = CLIENTS.get(receiverId.toString());
|
||||||
|
// 构建私信响应
|
||||||
|
MessageResponse response = MessageResponse.builder()
|
||||||
|
.messageType(0)
|
||||||
|
.content(request.getContent())
|
||||||
|
.senderId(senderId)
|
||||||
|
.senderName(request.getSenderName())
|
||||||
|
.senderAvatar(request.getSenderAvatar())
|
||||||
|
.receiverId(receiverId)
|
||||||
|
.createTime(LocalDateTime.now())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// 发送给接收方
|
||||||
|
if (receiverSession != null && receiverSession.isOpen()) {
|
||||||
|
sendMessage(receiverSession, JSON.toJSONString(response));
|
||||||
|
} else {
|
||||||
|
log.warn("接收方 [{}] 不在线,消息无法即时送达", receiverId);
|
||||||
|
}
|
||||||
|
MessageDO message = BeanUtil.copyProperties(response, MessageDO.class);
|
||||||
|
messageMapper.insert(message);
|
||||||
|
sendMessage(CLIENTS.get(senderId.toString()), JSON.toJSONString(response));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送系统通知
|
||||||
|
public void sendSystemNotification(MessageRequest request) {
|
||||||
|
MessageResponse response = MessageResponse.builder()
|
||||||
|
.messageType(1)
|
||||||
|
.content(request.getContent())
|
||||||
|
.createTime(LocalDateTime.now())
|
||||||
|
.build();
|
||||||
|
// 广播给所有在线用户
|
||||||
|
for (Session session : CLIENTS.values()) {
|
||||||
|
sendMessage(session, JSON.toJSONString(response));
|
||||||
|
}
|
||||||
|
MessageDO message = BeanUtil.copyProperties(response, MessageDO.class);
|
||||||
|
messageMapper.insert(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 安全消息
|
||||||
|
private void sendMessage(Session session, String message) {
|
||||||
|
try {
|
||||||
|
if (session != null && session.isOpen()) {
|
||||||
|
session.getBasicRemote().sendText(message);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("发送消息失败: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送错误响应
|
||||||
|
private void sendErrorResponse(Session session, String errorMessage) {
|
||||||
|
MessageResponse errorResponse = MessageResponse.builder()
|
||||||
|
.messageType(-1) // 错误消息类型
|
||||||
|
.content(errorMessage)
|
||||||
|
.createTime(LocalDateTime.now())
|
||||||
|
.build();
|
||||||
|
sendMessage(session, JSON.toJSONString(errorResponse));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,27 @@
|
|||||||
|
package com.luojia_channel.modules.post.mq.producer;
|
||||||
|
|
||||||
|
import com.luojia_channel.modules.message.mq.AbstractSendProduceTemplate;
|
||||||
|
import com.luojia_channel.modules.message.mq.BaseSendExtendDTO;
|
||||||
|
import com.luojia_channel.modules.message.mq.domain.NotificationMessage;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class NotificationProducer extends AbstractSendProduceTemplate<NotificationMessage> {
|
||||||
|
|
||||||
|
public NotificationProducer(@Autowired RabbitTemplate rabbitTemplate){
|
||||||
|
super(rabbitTemplate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BaseSendExtendDTO buildBaseSendParam(NotificationMessage messageSendEvent) {
|
||||||
|
return BaseSendExtendDTO.builder()
|
||||||
|
.eventName("NotificationMessageEvent")
|
||||||
|
.exchange("notify.exchange")
|
||||||
|
.routingKey("notify.routing.key")
|
||||||
|
.keys(messageSendEvent.getSenderId().toString())
|
||||||
|
.delay(null)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in new issue