websocket 整合

This commit is contained in:
张良(004796)
2023-12-29 18:17:27 +08:00
parent 3888da346c
commit 0ee961e550
46 changed files with 366 additions and 223 deletions

View File

@@ -1,4 +0,0 @@
package com.ruoyi.cai.ws.bean;
public class CallerRoom {
}

View File

@@ -7,7 +7,7 @@ import lombok.Data;
public class FdCtxData {
private String sessionKey;
private String roomId;
private Long roomId;
private Long userId;
private Integer userType;
private Long tarUserId;

View File

@@ -10,7 +10,7 @@ public class Room {
private UserData callUserData;
private UserData receiverUserData;
public String getRoomId(){
public Long getRoomId(){
return roomData.getRoomId();
}
@@ -26,4 +26,20 @@ public class Room {
return RoomStatusEnums.STATUS_AGREE.getCode().equals(roomData.getStatus());
}
/**
* 房间是否已经结算
* @return
*/
public boolean isSettle() {
return roomData.getSettleTime() != null && roomData.getSettleTime() > 0;
}
/**
* 是否已经释放房间资源
* @return
*/
public boolean isReleaseRes(){
return roomData.isReleaseRes();
}
}

View File

@@ -6,14 +6,18 @@ import java.math.BigDecimal;
@Data
public class RoomData {
private String roomId;
private Long roomId;
private Long callPrice;
private String skillName;
private Integer status;
private BigDecimal videoDivide;
private Integer payCoin = 0;
private Integer payIncome = 0;
private Long payCoin = 0L;
private Long payIncome = 0L;
private Long hangUpTime;
private Long settleTime;
private Long beginTime;
private boolean releaseRes = false;
}

View File

@@ -7,7 +7,7 @@ import java.math.BigDecimal;
@Data
public class UserData {
private Long id;
private String roomId;
private Long roomId;
private int userType;
private String nickname;
private String userCode;
@@ -18,4 +18,6 @@ public class UserData {
private Long unionUserId;
private BigDecimal unionUserRate;
private Boolean unionIsGet;
private Long connectTime;
}

View File

@@ -1,10 +1,13 @@
package com.ruoyi.cai.ws.cache;
import com.ruoyi.cai.ws.constant.RedisConstant;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -17,10 +20,10 @@ public class CallerRoomCache {
return String.format(RedisConstant.CALLER_ROOM_DATA,fromUserId);
}
public String getRoomId(Long fromUserId, Long toUserId){
public Long getRoomId(Long fromUserId, Long toUserId){
String key = getKey(fromUserId);
Object roomId = redisTemplate.opsForHash().get(key, toUserId);
return roomId == null ? null : String.valueOf(roomId);
return roomId == null ? null : Long.valueOf(roomId.toString());
}
public void addRoom(Long fromUid, Long toUid, Long roomId) {
@@ -38,4 +41,17 @@ public class CallerRoomCache {
String key = getKey(fromUid);
redisTemplate.delete(key);
}
private final static String DEL_ROOM_LUA = "return KEYS[1]\n" +
"local r = tonumber(redis.call('hget',KEYS[1],ARGV[1]))\n" +
"if r == tonumber(ARGV[2]) then\n" +
" return redis.call('hdel',KEYS[1],ARGV[1])\n" +
"end\n" +
"return 0";
public boolean delRoom(Long receiverId, Long roomId) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(DEL_ROOM_LUA,Boolean.class);
Boolean execute = redisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), receiverId,roomId);
return BooleanUtils.isTrue(execute);
}
}

View File

@@ -14,7 +14,11 @@ public class OnlineDataCache {
return RedisConstant.ONLINE_ROOM_DATA;
}
public void add(String roomId){
redisTemplate.opsForSet().add(getKey(),roomId);
public void add(Long roomId){
redisTemplate.opsForSet().add(getKey(), String.valueOf(roomId));
}
public void remove(Long roomId) {
redisTemplate.opsForSet().remove(getKey(),roomId);
}
}

View File

@@ -2,7 +2,7 @@ package com.ruoyi.cai.ws.cache;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.constant.UserDataConstant;
import com.ruoyi.websocket.holder.WebSocketSessionHolder;
import com.ruoyi.cai.ws.holder.WebSocketSessionHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@@ -17,17 +17,17 @@ public class RoomCtxCache {
@Autowired
private StringRedisTemplate redisTemplate;
public String getKey(String roomId){
public String getKey(Long roomId){
return String.format(RedisConstant.FDCTX_ROOM_DATA,roomId);
}
public void addFd(String sessionKey,String roomId,Integer userType){
public void addFd(String sessionKey,Long roomId,Integer userType){
String key = getKey(roomId);
redisTemplate.opsForHash().putIfAbsent(key,sessionKey,userType);
redisTemplate.expire(key,7, TimeUnit.DAYS);
}
public List<String> getSessionKeysByRoomId(String roomId){
public List<String> getSessionKeysByRoomId(Long roomId){
String key = getKey(roomId);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
List<String> res = new ArrayList<>();
@@ -37,7 +37,7 @@ public class RoomCtxCache {
return res;
}
public String getSessionKeyByRoomIdAndUserType(String roomId,Integer userType){
public String getSessionKeyByRoomIdAndUserType(Long roomId,Integer userType){
String key = getKey(roomId);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
for (Map.Entry<Object, Object> entry : entries.entrySet()) {
@@ -50,15 +50,15 @@ public class RoomCtxCache {
return null;
}
public String getSessionKeyReceiverByRoomId(String roomId){
public String getSessionKeyReceiverByRoomId(Long roomId){
return getSessionKeyByRoomIdAndUserType(roomId, UserDataConstant.TYPE_RECEIVER);
}
public String getSessionKeyCallerByRoomId(String roomId){
public String getSessionKeyCallerByRoomId(Long roomId){
return getSessionKeyByRoomIdAndUserType(roomId, UserDataConstant.TYPE_CALLER);
}
public void del(String roomId) {
public void del(Long roomId) {
String key = getKey(roomId);
redisTemplate.delete(key);
}

View File

@@ -27,19 +27,16 @@ public class RoomDataCache {
@Autowired
private StringRedisTemplate stringRedisTemplate;
public String getKey(String roomId){
public String getKey(Long roomId){
return String.format(RedisConstant.ROOM_DATA,roomId);
}
public RoomData getByRoomId(String roomId){
public RoomData getByRoomId(Long roomId){
Map<Object, Object> map = stringRedisTemplate.opsForHash().entries(getKey(roomId));
if(map.get("roomId") == null){
return null;
}
RoomData roomData = new RoomData();
roomData.setRoomId(map.get("roomId").toString());
roomData.setCallPrice(Long.valueOf(map.get("callPrice").toString()));
return roomData;
return JSON.parseObject(JSON.toJSONString(map),RoomData.class);
}
public void init(RoomData roomData) {
@@ -47,6 +44,16 @@ public class RoomDataCache {
stringRedisTemplate.opsForHash().putAll(getKey(roomData.getRoomId()),map);
}
public void hMSet(Long roomId,Map<String,Object> data) {
String key = getKey(roomId);
stringRedisTemplate.opsForHash().putAll(key,data);
}
public void hMSet(Long roomId,String mKey,Object mData) {
String key = getKey(roomId);
stringRedisTemplate.opsForHash().put(key,mKey,mData);
}
private final static String HANG_UP = "local hangupTime = tonumber(redis.call('hGet', KEYS[1], 'hangupTime'))\n" +
"if hangupTime > 0 then\n" +
@@ -54,14 +61,14 @@ public class RoomDataCache {
"end\n" +
"return redis.call('hMSet', KEYS[1], 'status', 8, 'hangupTime', ARGV[1])";
public boolean hangUp(String roomId) {
public boolean hangUp(Long roomId) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(HANG_UP,Boolean.class);
String currentTime = String.valueOf(System.currentTimeMillis() / 1000);
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), currentTime);
return BooleanUtils.isTrue(execute);
}
public void setStatus(String roomId, RoomStatusEnums status) {
public void setStatus(Long roomId, RoomStatusEnums status) {
String key = getKey(roomId);
stringRedisTemplate.opsForHash().put(key,"status",status.getCode());
}
@@ -72,7 +79,7 @@ public class RoomDataCache {
"end\n" +
"return redis.call('hmset', KEYS[1], 'status', 3)";
public boolean setStatusReceiverConnection(String roomId) {
public boolean setStatusReceiverConnection(Long roomId) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(SET_STATUS_RECEIVER_CONNECTION,Boolean.class);
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)));
return BooleanUtils.isTrue(execute);
@@ -86,7 +93,7 @@ public class RoomDataCache {
"end\n" +
"return redis.call('hmset', KEYS[1], 'status', 7, 'beginTime', ARGV[1])";
public boolean setStatusAgree(String roomId) {
public boolean setStatusAgree(Long roomId) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(SET_STATUS_AGREE,Boolean.class);
String currentTime = String.valueOf(System.currentTimeMillis() / 1000);
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),currentTime);

View File

@@ -4,7 +4,6 @@ import com.alibaba.fastjson2.JSON;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.constant.UserDataConstant;
import com.ruoyi.cai.ws.util.MapGetUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@@ -16,37 +15,25 @@ public class UserDataCache {
@Autowired
private StringRedisTemplate redisTemplate;
public String getKey(String roomId,int type){
public String getKey(Long roomId,int type){
return String.format(RedisConstant.USER_ROOM_DATA,roomId,type== UserDataConstant.TYPE_CALLER?"caller":"receiver");
}
public UserData getCallerUserDataByRoom(String roomId){
public UserData getCallerUserDataByRoom(Long roomId){
return getUserDataByRoom(roomId,UserDataConstant.TYPE_CALLER);
}
public UserData getReceiverUserDataByRoom(String roomId){
public UserData getReceiverUserDataByRoom(Long roomId){
return getUserDataByRoom(roomId,UserDataConstant.TYPE_RECEIVER);
}
public UserData getUserDataByRoom(String roomId,int type){
public UserData getUserDataByRoom(Long roomId,int type){
String key = getKey(roomId, type);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
if(entries.get("roomId") == null){
return null;
}
UserData userData = new UserData();
userData.setId(MapGetUtil.getLong(entries.get("id")));
userData.setRoomId(MapGetUtil.getString(entries.get("roomId")));
userData.setUserType(MapGetUtil.getInt(entries.get("userType")));
userData.setNickname(MapGetUtil.getString(entries.get("nickname")));
userData.setUserCode(MapGetUtil.getString(entries.get("userCode")));
userData.setInviterId(MapGetUtil.getLong(entries.get("inviterId")));
userData.setInviterRate(MapGetUtil.getBigDecimal(entries.get("inviterRate")));
userData.setInviterIsGet(MapGetUtil.getBoolean(entries.get("inviterIsGet")));
userData.setUnionUserId(MapGetUtil.getLong(entries.get("unionUserId")));
userData.setUnionUserRate(MapGetUtil.getBigDecimal(entries.get("unionUserRate")));
userData.setUnionIsGet(MapGetUtil.getBoolean(entries.get("unionIsGet")));
return userData;
return JSON.parseObject(JSON.toJSONString(entries),UserData.class);
}
public void init(UserData userData,int type){
@@ -64,7 +51,7 @@ public class UserDataCache {
init(callerUserData,UserDataConstant.TYPE_RECEIVER);
}
public void hMSet(String roomId,Integer userType,Map<String,Object> data) {
public void hMSet(Long roomId,Integer userType,Map<String,Object> data) {
String key = getKey(roomId, userType);
redisTemplate.opsForHash().putAll(key,data);
}

View File

@@ -0,0 +1,42 @@
package com.ruoyi.cai.ws.config;
import cn.hutool.core.util.StrUtil;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.server.HandshakeInterceptor;
/**
* WebSocket 配置
*
* @author zendwang
*/
@Configuration
@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
@EnableConfigurationProperties(WebSocketProperties.class)
@EnableWebSocket
public class WebSocketConfig {
@Bean
public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor,
WebSocketHandler webSocketHandler,
WebSocketProperties webSocketProperties) {
if (StrUtil.isBlank(webSocketProperties.getPath())) {
webSocketProperties.setPath("/ws");
}
if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) {
webSocketProperties.setAllowedOrigins("*");
}
return registry -> registry
.addHandler(webSocketHandler, webSocketProperties.getPath())
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins(webSocketProperties.getAllowedOrigins());
}
}

View File

@@ -0,0 +1,26 @@
package com.ruoyi.cai.ws.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* WebSocket 配置项
*
* @author zendwang
*/
@ConfigurationProperties("websocket")
@Data
public class WebSocketProperties {
private Boolean enabled;
/**
* 路径
*/
private String path;
/**
* 设置访问源地址
*/
private String allowedOrigins;
}

View File

@@ -0,0 +1,32 @@
package com.ruoyi.cai.ws.constant;
/**
* websocket的常量配置
*
* @author zendwang
*/
public interface WebSocketConstants {
/**
* websocketSession中的参数的key
*/
String TOKEN = "token";
String ROOM_ID = "roomId";
String USER_ID = "userId";
/**
* 订阅的频道
*/
String WEB_SOCKET_TOPIC = "global:websocket";
/**
* 前端心跳检查的命令
*/
String PING = "ping";
/**
* 服务端心跳恢复的字符串
*/
String PONG = "pong";
}

View File

@@ -0,0 +1,27 @@
package com.ruoyi.cai.ws.dto;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* 消息的dto
*
* @author zendwang
*/
@Data
public class WebSocketMessageDto implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 需要推送到的session key 列表
*/
private List<Long> sessionKeys;
/**
* 需要发送的消息
*/
private String message;
}

View File

@@ -0,0 +1,109 @@
package com.ruoyi.cai.ws.dto;
import com.ruoyi.common.constant.HttpStatus;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 响应信息主体
*
* @author Lion Li
*/
@Data
@NoArgsConstructor
public class WsR<T> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 成功
*/
public static final int SUCCESS = 200;
/**
* 失败
*/
public static final int FAIL = 500;
private int code;
private String method;
private String msg;
private T data;
public static <T> WsR<T> ok() {
return restResult(null, SUCCESS, "操作成功");
}
public static <T> WsR<T> ok(T data) {
return restResult(data, SUCCESS, "操作成功");
}
public static <T> WsR<T> ok(String msg) {
return restResult(null, SUCCESS, msg);
}
public static <T> WsR<T> ok(String msg, T data) {
return restResult(data, SUCCESS, msg);
}
public static <T> WsR<T> fail() {
return restResult(null, FAIL, "操作失败");
}
public static <T> WsR<T> fail(String msg) {
return restResult(null, FAIL, msg);
}
public static <T> WsR<T> fail(T data) {
return restResult(data, FAIL, "操作失败");
}
public static <T> WsR<T> fail(String msg, T data) {
return restResult(data, FAIL, msg);
}
public static <T> WsR<T> fail(int code, String msg) {
return restResult(null, code, msg);
}
/**
* 返回警告消息
*
* @param msg 返回内容
* @return 警告消息
*/
public static <T> WsR<T> warn(String msg) {
return restResult(null, HttpStatus.WARN, msg);
}
/**
* 返回警告消息
*
* @param msg 返回内容
* @param data 数据对象
* @return 警告消息
*/
public static <T> WsR<T> warn(String msg, T data) {
return restResult(data, HttpStatus.WARN, msg);
}
private static <T> WsR<T> restResult(T data, int code, String msg) {
WsR<T> r = new WsR<>();
r.setCode(code);
r.setData(data);
r.setMsg(msg);
return r;
}
public static <T> Boolean isError(WsR<T> ret) {
return !isSuccess(ret);
}
public static <T> Boolean isSuccess(WsR<T> ret) {
return WsR.SUCCESS == ret.getCode();
}
}

View File

@@ -0,0 +1,130 @@
package com.ruoyi.cai.ws.dto;
import com.alibaba.fastjson.JSON;
import com.ruoyi.cai.domain.CaiGift;
import java.util.HashMap;
import java.util.Map;
public class WsRMsgGen {
public static WsR response(Long roomId){
Map<String,Object> map = new HashMap<>();
map.put("roomid",roomId);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("response");
ok.setMsg("连线成功");
return ok;
}
public static WsR startVideo(Long roomId,Long duration){
Map<String,Object> map = new HashMap<>();
map.put("roomid",roomId);
map.put("duration",duration);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("startVideo");
ok.setMsg("通话成功!");
return ok;
}
public static WsR sysNotice(String content){
Map<String,Object> map = new HashMap<>();
map.put("type",1);
map.put("content",content);
map.put("linkType",0);
map.put("linkUrl",null);
map.put("fromUid",null);
map.put("toUid",null);
map.put("cancalltime",null);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("notice");
ok.setMsg("提示!");
return ok;
}
public static WsR updateTip() {
Map<String,Object> map = new HashMap<>();
map.put("tip","等待对方响应...");
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("updatetip");
ok.setMsg("提示!");
return ok;
}
public static WsR hangup(String message, Long roomId, Integer hangUpType) {
Map<String,Object> map = new HashMap<>();
map.put("roomid","roomId");
map.put("type","hangUpType");
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("hangup");
ok.setMsg(message);
return ok;
}
public static WsR canCallTime(long time) {
Map<String,Object> map = new HashMap<>();
map.put("tip",time);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("cancalltime");
ok.setMsg("提示!");
return ok;
}
public static WsR rechargeNotice(String content) {
Map<String,Object> map = new HashMap<>();
map.put("type",1);
map.put("content",content);
map.put("linkType",2);
map.put("linkUrl","rechargeCoin");
map.put("fromUid",null);
map.put("toUid",null);
map.put("cancalltime",null);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("notice");
ok.setMsg("提示!");
return ok;
}
public static WsR gift(CaiGift gift, Long callerId, Long receiverId) {
Map<String,Object> content = new HashMap<>();
content.put("giftid",gift.getId());
content.put("giftname",gift.getName());
content.put("gifticon",gift.getImg());
content.put("giftsvga",gift.getSvga());
Map<String,Object> map = new HashMap<>();
map.put("type",3);
map.put("content", JSON.toJSONString(content));
map.put("linkType",null);
map.put("linkUrl",null);
map.put("fromUid",callerId);
map.put("toUid",receiverId);
map.put("cancalltime",null);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("notice");
ok.setMsg("提示!");
return ok;
}
public static WsR heartbeat() {
Map<String,Object> content = new HashMap<>();
WsR<Map<String, Object>> ok = WsR.ok(content);
ok.setMethod("heartbeat");
ok.setMsg("检测成功!");
return ok;
}
public static WsR chatData(String txt, Long from, Long to) {
Map<String,Object> map = new HashMap<>();
map.put("type",2);
map.put("content", txt);
map.put("linkType",null);
map.put("linkUrl",null);
map.put("fromUid",from);
map.put("toUid",to);
map.put("cancalltime",null);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("notice");
ok.setMsg("提示!");
return ok;
}
}

View File

@@ -0,0 +1,40 @@
package com.ruoyi.cai.ws.handler;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.cache.RoomCtxCache;
import com.ruoyi.cai.ws.dto.WsR;
import com.ruoyi.cai.ws.util.RoomWebSocketUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public abstract class AbstractMessageHandle implements IMessageHandler {
@Autowired
private RoomCtxCache roomCtxCache;
protected void sendToCurrent(FdCtxData fdCtxData, WsR r){
RoomWebSocketUtil.sendSendMessage(fdCtxData.getSessionKey(), r);
}
protected void sendToTar(FdCtxData fdCtxData, WsR r) {
String sessionKey = roomCtxCache.getSessionKeyByRoomIdAndUserType(fdCtxData.getRoomId(), fdCtxData.getTarUserType());
RoomWebSocketUtil.sendSendMessage(sessionKey, r);
}
protected void sendToReceiver(Long roomId, WsR r){
String receiverSessionKey = roomCtxCache.getSessionKeyReceiverByRoomId(roomId);
RoomWebSocketUtil.sendSendMessage(receiverSessionKey, r);
}
protected void sendToAll(Long roomId, WsR ... r ){
List<String> sessionKeys = roomCtxCache.getSessionKeysByRoomId(roomId);
for (WsR wsR : r) {
RoomWebSocketUtil.sendSendMessage(sessionKeys, wsR);
}
}
}

View File

@@ -0,0 +1,10 @@
package com.ruoyi.cai.ws.handler;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
public interface IMessageHandler {
void processOn(Room room, FdCtxData fdCtxData, JSONObject map);
}

View File

@@ -0,0 +1,61 @@
package com.ruoyi.cai.ws.handler;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.cache.FdCtxDataCache;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.service.CheckConnectionDTO;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.util.MapGetUtil;
import com.ruoyi.cai.ws.util.WsExceptionUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.util.Map;
/**
* ws消息处理器统一入口
*/
@Component
public class MessageHandleApplication {
@Autowired
private Map<String,IMessageHandler> map;
@Autowired
private RoomService roomService;
@Autowired
private FdCtxDataCache fdCtxDataCache;
public void processOn(WebSocketSession session, TextMessage message) {
String payload = message.getPayload();
JSONObject jsonObject = JSON.parseObject(payload);
Object method = jsonObject.get("method");
if(method == null){
return;
}
Map<String, Object> attributes = session.getAttributes();
Long roomId = MapGetUtil.getLong(attributes.get("roomId"));
String sessionKey = MapGetUtil.getString(attributes.get("token"));
Room room = roomService.load(roomId);
if(room == null){
WsExceptionUtil.throwException("房间不可用", sessionKey,HangUpEnums.OTHER, roomId);
return;
}
CheckConnectionDTO checkConnect = roomService.checkConnect(room);
if(checkConnect != null){
WsExceptionUtil.throwException(sessionKey,checkConnect.getMessage(),checkConnect.getHangUpEnums(),roomId);
return;
}
IMessageHandler handler = map.get(String.valueOf(method));
if(handler == null){
return;
}
FdCtxData fdCtxData = fdCtxDataCache.getByRoomId(sessionKey);
handler.processOn(room,fdCtxData, jsonObject);
}
}

View File

@@ -0,0 +1,106 @@
package com.ruoyi.cai.ws.handler;
import com.ruoyi.cai.ws.constant.WebSocketConstants;
import com.ruoyi.cai.ws.holder.WebSocketSessionHolder;
import com.ruoyi.cai.ws.processon.OpenLogic;
import com.ruoyi.cai.ws.util.WebSocketUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.util.Map;
/**
* WebSocketHandler 实现类
*
* @author zendwang
*/
@Slf4j
@Component
public class RoomWebSocketHandler extends AbstractWebSocketHandler {
@Autowired
private OpenLogic openLogic;
@Autowired
private MessageHandleApplication messageHandleApplication;
/**
* 连接成功后
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
Map<String, Object> attributes = session.getAttributes();
if(attributes.get("token") != null){
WebSocketSessionHolder.addSession(attributes.get("token").toString(), session);
}
openLogic.processOn(session);
log.info("[connect] sessionId: {},userId:{}", session.getId(), session.getId());
}
/**
* 处理发送来的文本消息
*
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String token = String.valueOf(session.getAttributes().get(WebSocketConstants.TOKEN));
messageHandleApplication.processOn(session,message);
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
super.handleBinaryMessage(session, message);
}
/**
* 心跳监测的回复
*
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
WebSocketUtils.sendPongMessage(session);
}
/**
* 连接出错时
*
* @param session
* @param exception
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
}
/**
* 连接关闭后
*
* @param session
* @param status
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String token = String.valueOf(session.getAttributes().get("token"));
WebSocketSessionHolder.removeSession(token);
log.info("[disconnect] sessionId: {},token:{}", session.getId(), token);
}
/**
* 是否支持分片消息
*
* @return
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}

View File

@@ -0,0 +1,47 @@
package com.ruoyi.cai.ws.handler.message;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.CaiUserCall;
import com.ruoyi.cai.service.CaiUserCallService;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.cache.RoomDataCache;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.handler.IMessageHandler;
import com.ruoyi.cai.ws.handler.AbstractMessageHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 被叫同意接听处理
*/
@Component("agree")
public class AgreeMessageHandle extends AbstractMessageHandle implements IMessageHandler {
@Autowired
private RoomDataCache roomDataCache;
@Autowired
private CaiUserCallService userCallService;
@Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
if(!fdCtxData.isReceiver()){
return;
}
boolean agree = roomDataCache.setStatusAgree(room.getRoomId());
if(!agree){
return;
}
// 通知可进行接通
userCallService.update(Wrappers.lambdaUpdate(CaiUserCall.class)
.eq(CaiUserCall::getId,room.getRoomId())
.set(CaiUserCall::getStatus, RoomStatusEnums.STATUS_AGREE.getCode())
.set(CaiUserCall::getBeginTime, LocalDateTime.now()));
String message = "提示禁止任何涉黄、任何微信QQ引导到其它平台行为";
sendToAll(room.getRoomId(), WsRMsgGen.startVideo(room.getRoomId(),0L),WsRMsgGen.sysNotice(message));
}
}

View File

@@ -0,0 +1,25 @@
package com.ruoyi.cai.ws.handler.message;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.handler.IMessageHandler;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.handler.AbstractMessageHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 获取可通话时长
*/
@Component("cancalltime")
public class CanCallTimeMessageHandler extends AbstractMessageHandle implements IMessageHandler {
@Autowired
private RoomService roomService;
@Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
Long time = roomService.canCallTime(room);
sendToCurrent(fdCtxData,WsRMsgGen.canCallTime(time));
}
}

View File

@@ -0,0 +1,55 @@
package com.ruoyi.cai.ws.handler.message;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.CaiUserCall;
import com.ruoyi.cai.service.CaiUserCallService;
import com.ruoyi.cai.trd.ImDataRes;
import com.ruoyi.cai.trd.ImMsgGen;
import com.ruoyi.cai.trd.Yunxin;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.handler.IMessageHandler;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.handler.AbstractMessageHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 主叫方取消通话
*/
@Component("cancel")
public class CancelMessageHandler extends AbstractMessageHandle implements IMessageHandler {
@Autowired
private CaiUserCallService userCallService;
@Autowired
private RoomService roomService;
@Autowired
private Yunxin yunxin;
@Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
if(!fdCtxData.isCaller()){
return;
}
if(!RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus())
&& !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){
return;
}
Long roomId = room.getRoomId();
sendToCurrent(fdCtxData,WsRMsgGen.hangup("通话已取消",roomId, HangUpEnums.CANCEL.getCode()));
sendToReceiver(roomId,WsRMsgGen.hangup("对方已取消",roomId, HangUpEnums.CANCEL.getCode()));
roomService.closeAllFd(roomId);
// IM
Long receiverId = room.getReceiverUserData().getId();
Long callerId = room.getCallUserData().getId();
ImDataRes imDataRes = ImMsgGen.callNotice(1, callerId, receiverId, 0);
yunxin.sendTo(receiverId,imDataRes,callerId);
// 更新房间状态
userCallService.update(Wrappers.lambdaUpdate(CaiUserCall.class)
.eq(CaiUserCall::getId,roomId)
.set(CaiUserCall::getStatus,RoomStatusEnums.STATUS_CALLER_CANCEL.getCode()));
}
}

View File

@@ -0,0 +1,60 @@
package com.ruoyi.cai.ws.handler.message;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.cai.domain.CaiAccount;
import com.ruoyi.cai.domain.CaiGift;
import com.ruoyi.cai.dto.app.query.GiveGiftRes;
import com.ruoyi.cai.service.CaiAccountService;
import com.ruoyi.cai.service.CaiGiftService;
import com.ruoyi.cai.service.CaiUserGiftService;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.handler.IMessageHandler;
import com.ruoyi.cai.ws.handler.AbstractMessageHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component("gift")
public class GiftMessageHandler extends AbstractMessageHandle implements IMessageHandler {
@Autowired
private CaiGiftService giftService;
@Autowired
private CaiAccountService accountService;
@Autowired
private CaiUserGiftService userGiftService;
@Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
Long giftId = map.getLong("giftId");
if(giftId == null){
return;
}
Long giftCount = map.getLongValue("giftCount",1L);
CaiGift gift = giftService.getById(giftId);
if(gift == null){
return;
}
Long giftAmount = gift.getPrice() * giftCount;
CaiAccount account = accountService.getByUserId(fdCtxData.getUserId());
Long userAccount = account.getIncomeCoin() + account.getCoin();
if(userAccount < giftAmount){
sendToCurrent(fdCtxData,WsRMsgGen.rechargeNotice("余额不足,点此充值"));
return;
}
if(fdCtxData.isCaller() && (userAccount - giftAmount) < room.getRoomData().getCallPrice()){
sendToCurrent(fdCtxData,WsRMsgGen.rechargeNotice("赠送后通话时间不足1分钟点此充值"));
return;
}
GiveGiftRes giveGiftRes = new GiveGiftRes();
giveGiftRes.setType(3);
giveGiftRes.setToUserId(fdCtxData.getTarUserId());
giveGiftRes.setGiftId(giftId);
giveGiftRes.setGiftCount(giftCount);
boolean b = userGiftService.giveGift(giveGiftRes);
if(!b){
sendToCurrent(fdCtxData,WsRMsgGen.sysNotice("赠送失败,请重试"));
return;
}
sendToAll(room.getRoomId(),WsRMsgGen.gift(gift,fdCtxData.getUserId(),fdCtxData.getTarUserId()));
}
}

View File

@@ -0,0 +1,48 @@
package com.ruoyi.cai.ws.handler.message;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.cache.RoomDataCache;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.handler.AbstractMessageHandle;
import com.ruoyi.cai.ws.handler.IMessageHandler;
import com.ruoyi.cai.ws.service.RoomService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 挂断处理
*/
@Component("hangup")
public class HangupMessageHandler extends AbstractMessageHandle implements IMessageHandler {
@Autowired
private RoomService roomService;
@Autowired
private RoomDataCache roomDataCache;
@Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
if(room.getRoomId() == null){
return;
}
// 经测试app端挂断时可能会把旧的房间id传上来所以需要判断id与fd上下文的一致性
if(!room.getRoomId().equals(fdCtxData.getRoomId())){
return;
}
boolean b = roomDataCache.hangUp(room.getRoomId());
if(!b){
return;
}
// 触发结算 TODO
Integer type = fdCtxData.isCaller() ? HangUpEnums.FROM.getCode() : HangUpEnums.TO.getCode();
sendToCurrent(fdCtxData,WsRMsgGen.hangup("您已挂断",room.getRoomId(), type));
sendToTar(fdCtxData,WsRMsgGen.hangup("对方已挂断",room.getRoomId(), type));
roomService.closeAllFd(room.getRoomId());
}
}

View File

@@ -0,0 +1,34 @@
package com.ruoyi.cai.ws.handler.message;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.cache.UserDataCache;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.handler.IMessageHandler;
import com.ruoyi.cai.ws.handler.AbstractMessageHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* 心跳处理
*/
@Component("heartbeat")
public class HeartbeatMessageHandler extends AbstractMessageHandle implements IMessageHandler {
@Autowired
private UserDataCache userDataCache;
@Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
if(!room.isCanCall()){
return;
}
Map<String,Object> update = new HashMap<>();
update.put("heartTime", DateUtil.currentSeconds());
userDataCache.hMSet(room.getRoomId(),fdCtxData.getUserType(),update);
sendToCurrent(fdCtxData, WsRMsgGen.heartbeat());
}
}

View File

@@ -0,0 +1,25 @@
package com.ruoyi.cai.ws.handler.message;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.handler.IMessageHandler;
import com.ruoyi.cai.ws.handler.AbstractMessageHandle;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
/**
* 聊天消息处理
*/
@Component("message")
public class MessageHandler extends AbstractMessageHandle implements IMessageHandler {
@Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
String txt = map.getString("txt");
if(StringUtils.isEmpty(txt) || !room.isOnline()){
return;
}
sendToAll(room.getRoomId(),WsRMsgGen.chatData(txt,fdCtxData.getUserId(),fdCtxData.getTarUserId()));
}
}

View File

@@ -0,0 +1,59 @@
package com.ruoyi.cai.ws.handler.message;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.CaiUserCall;
import com.ruoyi.cai.service.CaiUserCallService;
import com.ruoyi.cai.trd.ImDataRes;
import com.ruoyi.cai.trd.ImMsgGen;
import com.ruoyi.cai.trd.Yunxin;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.cache.RoomDataCache;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.handler.IMessageHandler;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.handler.AbstractMessageHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 被叫方拒绝通话
*/
@Component("cancalltime")
public class RefuseMessageHandler extends AbstractMessageHandle implements IMessageHandler {
@Autowired
private RoomDataCache roomDataCache;
@Autowired
private RoomService roomService;
@Autowired
private CaiUserCallService userCallService;
@Autowired
private Yunxin yunxin;
@Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
if(!fdCtxData.isReceiver() || !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){
return;
}
boolean b = roomDataCache.hangUp(room.getRoomId());
if(!b){
return;
}
sendToCurrent(fdCtxData, WsRMsgGen.hangup("已拒绝",room.getRoomId(), HangUpEnums.REFUSE.getCode()));
sendToTar(fdCtxData,WsRMsgGen.hangup("对方已拒绝",room.getRoomId(), HangUpEnums.REFUSE.getCode()));
roomService.closeAllFd(room.getRoomId());
//发送IM通知
Long receiverId = room.getReceiverUserData().getId();
Long callerId = room.getCallUserData().getId();
ImDataRes imDataRes = ImMsgGen.callNotice(1, callerId, receiverId, 0);
yunxin.sendTo(receiverId,imDataRes,callerId);
// 更新房间状态
userCallService.update(Wrappers.lambdaUpdate(CaiUserCall.class)
.eq(CaiUserCall::getId,room.getRoomId())
.set(CaiUserCall::getStatus,RoomStatusEnums.STATUS_REFUSE.getCode()));
}
}

View File

@@ -0,0 +1,42 @@
package com.ruoyi.cai.ws.holder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.web.socket.WebSocketSession;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* WebSocketSession 用于保存当前所有在线的会话信息
*
* @author zendwang
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketSessionHolder {
private static final Map<String, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();
public static void addSession(String sessionKey, WebSocketSession session) {
USER_SESSION_MAP.put(sessionKey, session);
}
public static void removeSession(String sessionKey) {
if (USER_SESSION_MAP.containsKey(sessionKey)) {
USER_SESSION_MAP.remove(sessionKey);
}
}
public static WebSocketSession getSessions(String sessionKey) {
return USER_SESSION_MAP.get(sessionKey);
}
public static Set<String> getSessionsAll() {
return USER_SESSION_MAP.keySet();
}
public static Boolean existSession(String sessionKey) {
return USER_SESSION_MAP.containsKey(sessionKey);
}
}

View File

@@ -0,0 +1,47 @@
package com.ruoyi.cai.ws.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* WebSocket握手请求的拦截器
*
* @author zendwang
*/
@Slf4j
@Component
public class PlusWebSocketInterceptor implements HandshakeInterceptor {
/**
* 握手前
*
* @param request request
* @param response response
* @param wsHandler wsHandler
* @param attributes attributes
* @return 是否握手成功
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
return true;
}
/**
* 握手后
*
* @param request request
* @param response response
* @param wsHandler wsHandler
* @param exception 异常
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}

View File

@@ -16,7 +16,6 @@ import com.ruoyi.cai.ws.cache.UserDataCache;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.common.exception.ServiceException;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
@@ -44,8 +43,8 @@ public class WebSocketManager {
private RedissonClient redissonClient;
public Room checkOnlineRoom(Long fromUserId,Long toUserId){
String roomId = callerRoomCache.getRoomId(fromUserId, toUserId);
if(StringUtils.isEmpty(roomId)){
Long roomId = callerRoomCache.getRoomId(fromUserId, toUserId);
if(roomId == null){
return null;
}
Room room = roomService.load(roomId);
@@ -55,7 +54,7 @@ public class WebSocketManager {
return room;
}
public String createRoom(String roomId) {
public Long createRoom(Long roomId) {
CaiUserCall userCall = userCallService.getById(roomId);
if(userCall == null){
throw new ServiceException("无效房间");
@@ -77,7 +76,7 @@ public class WebSocketManager {
return roomId;
}
public String initRoom(CaiUserCall call){
public Long initRoom(CaiUserCall call){
call = userCallService.getById(call.getId());
CaiUser callUser = userService.getById(call.getFromUid());
if(callUser == null){
@@ -103,7 +102,7 @@ public class WebSocketManager {
roomService.delCallRoom(call.getFromUid());
// 初始化
RoomData roomData = new RoomData();
roomData.setRoomId(call.getId()+"");
roomData.setRoomId(call.getId());
roomData.setCallPrice(call.getCallPrice());
roomData.setSkillName(call.getSkillName());
roomData.setStatus(call.getStatus());
@@ -111,13 +110,13 @@ public class WebSocketManager {
roomDataCache.init(roomData);
UserData callerUserData = new UserData();
callerUserData.setId(call.getFromUid());
callerUserData.setRoomId(call.getId()+"");
callerUserData.setRoomId(call.getId());
callerUserData.setNickname(callUser.getNickname());
callerUserData.setUserCode(callUser.getUsercode());
userDataCache.initCaller(callerUserData);
UserData receiveUserData = new UserData();
receiveUserData.setId(call.getToUid());
receiveUserData.setRoomId(call.getId()+"");
receiveUserData.setRoomId(call.getId());
receiveUserData.setNickname(receiverUser.getNickname());
receiveUserData.setUserCode(receiverUser.getUsercode());
receiveUserData.setInviterId(call.getReceiverInviteUserId());
@@ -128,7 +127,7 @@ public class WebSocketManager {
receiveUserData.setUnionIsGet(call.getReceiverUnionGet());
userDataCache.initReceiver(callerUserData);
callerRoomCache.addRoom(call.getFromUid(),call.getToUid(),call.getId());
return call.getId()+"";
return call.getId();
}
}

View File

@@ -11,12 +11,12 @@ import com.ruoyi.cai.ws.cache.*;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.constant.UserDataConstant;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.service.CheckConnectionDTO;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.util.MapGetUtil;
import com.ruoyi.cai.ws.util.RoomWebSocketUtil;
import com.ruoyi.cai.ws.util.WsExceptionUtil;
import com.ruoyi.websocket.dto.WsRMsgGen;
import com.ruoyi.websocket.handler.IOpenLogic;
import com.ruoyi.websocket.util.RoomWebSocketUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -27,7 +27,7 @@ import java.util.List;
import java.util.Map;
@Component
public class OpenLogic implements IOpenLogic {
public class OpenLogic {
@Autowired
private RoomService roomService;
@Autowired
@@ -47,17 +47,16 @@ public class OpenLogic implements IOpenLogic {
@Autowired
private Agora agora;
@Override
public void processOn(WebSocketSession session) {
Map<String, Object> map = session.getAttributes();
String token = map.get("token").toString();
String roomId = map.get("roomId").toString();
Long userId = Long.valueOf(map.get("userId").toString());
String token = MapGetUtil.getString(map.get("token"));
Long roomId = MapGetUtil.getLong(map.get("roomId"));
Long userId = MapGetUtil.getLong(map.get("userId"));
// 校验token
process(token,roomId,userId);
}
public void process(String sessionKey,String roomId,Long userId){
public void process(String sessionKey,Long roomId,Long userId){
Room room = roomService.load(roomId);
if(room == null || (room.getCallUserData().getId().equals(userId) && room.getReceiverUserData().getId().equals(userId))){
WsExceptionUtil.throwException(sessionKey,"房间不可用", HangUpEnums.OTHER,roomId);
@@ -117,7 +116,7 @@ public class OpenLogic implements IOpenLogic {
return;
}
Long callTime = roomService.getCallTime(room);
RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.startVideo(room.getRoomId(), callTime));
RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.startVideo(room.getRoomId(), callTime));
RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.sysNotice("重连成功,房间已通话(转换成时分秒) "));
String sessionKeyReceiver = roomCtxCache.getSessionKeyReceiverByRoomId(room.getRoomId());
if(StringUtils.isNotEmpty(sessionKeyReceiver)){

View File

@@ -2,14 +2,19 @@ package com.ruoyi.cai.ws.service;
import cn.hutool.core.date.DateUtil;
import com.ruoyi.cai.domain.CaiAccount;
import com.ruoyi.cai.executor.ExecutorConstant;
import com.ruoyi.cai.service.CaiAccountService;
import com.ruoyi.cai.service.CaiAnchorService;
import com.ruoyi.cai.service.CaiUserService;
import com.ruoyi.cai.trd.Agora;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.RoomData;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.*;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.websocket.util.WebSocketUtils;
import com.ruoyi.cai.ws.util.MapGetUtil;
import com.ruoyi.cai.ws.util.WebSocketUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -33,8 +38,14 @@ public class RoomService {
private FdCtxDataCache fdCtxDataCache;
@Autowired
private RoomCtxCache roomCtxCache;
@Autowired
private CaiUserService userService;
@Autowired
private CaiAnchorService anchorService;
@Autowired
private Agora agora;
public Room load(String roomId){
public Room load(Long roomId){
Room room = new Room();
RoomData roomData = roomDataCache.getByRoomId(roomId);
if(roomData == null){
@@ -57,7 +68,7 @@ public class RoomService {
public void closeAllRoom(Long fromUid){
Map<Object, Object> all = callerRoomCache.getAll(fromUid);
for (Map.Entry<Object, Object> entry : all.entrySet()) {
String roomId = String.valueOf(entry.getValue());
Long roomId = MapGetUtil.getLong(entry.getValue());
Room room = load(roomId);
if(room == null){
continue;
@@ -74,7 +85,7 @@ public class RoomService {
}
}
public void closeAllFd(String roomId){
public void closeAllFd(Long roomId){
List<String> sessionKeysByRoomId = roomCtxCache.getSessionKeysByRoomId(roomId);
for (String sessionKey : sessionKeysByRoomId) {
WebSocketUtils.close(sessionKey);
@@ -86,7 +97,7 @@ public class RoomService {
public void delCallRoom(Long fromUid) {
callerRoomCache.del(fromUid);
}
private static Map<Integer, CheckConnectionDTO> STATUS_TO_HANG_UP = new HashMap<>();
private static final Map<Integer, CheckConnectionDTO> STATUS_TO_HANG_UP = new HashMap<>();
static {
STATUS_TO_HANG_UP.put(RoomStatusEnums.STATUS_CALLER_CANCEL.getCode(),new CheckConnectionDTO(HangUpEnums.CANCEL,"通话已取消"));
STATUS_TO_HANG_UP.put(RoomStatusEnums.STATUS_CONNECT_CANCEL.getCode(),new CheckConnectionDTO(HangUpEnums.TIMEOUT,"通话未接听"));
@@ -127,7 +138,7 @@ public class RoomService {
return 0L;
}
RoomData roomData = room.getRoomData();
int blockAmount = roomData.getPayCoin() + roomData.getPayIncome();
long blockAmount = roomData.getPayCoin() + roomData.getPayIncome();
long totalAmount = account.getTotalCoin()+account.getIncomeCoin() + blockAmount;
long totalSecond = (totalAmount / roomData.getCallPrice()) / 60;
long useTime = 0;
@@ -140,4 +151,40 @@ public class RoomService {
return 0L;
}
}
/**
* 释放房间资源
*/
public void releaseRes(Long roomId) {
Room room = this.load(roomId);
if(room == null){
return;
}
try {
UserData receiverUserData = room.getReceiverUserData();
Long receiverId = receiverUserData.getId();
Long callerId = room.getCallUserData().getId();
// 呼叫方释放资源
callerRoomCache.delRoom(receiverId,roomId);
userService.updateVideoStatus(callerId,0);
// 接收方释放资源(已连接的情况下)
if(receiverUserData.getConnectTime() != null && receiverUserData.getConnectTime() > 0){
userService.updateVideoStatus(receiverId,0);
anchorService.updateVideoStatus(receiverId,0);
}
// 声网踢人
if(room.getRoomData().getBeginTime() != null && room.getRoomData().getBeginTime() > 0){
ExecutorConstant.ROOM_EXECUTOR.execute(() -> {
agora.closeChannel(roomId);
});
}
// 修改释放状态
Map<String,Object> map = new HashMap<>();
map.put("releaseRes",1);
roomDataCache.hMSet(roomId,map);
}catch (Exception e){
log.error("释放房间资源失败!房间号:{}",roomId,e);
}
}
}

View File

@@ -0,0 +1,125 @@
package com.ruoyi.cai.ws.service;
import cn.hutool.core.date.DateUtil;
import com.ruoyi.cai.domain.CaiAccount;
import com.ruoyi.cai.service.CaiAccountService;
import com.ruoyi.cai.trd.ImDataRes;
import com.ruoyi.cai.trd.ImMsgGen;
import com.ruoyi.cai.trd.Yunxin;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.RoomData;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.cache.RoomDataCache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
@Slf4j
public class SettleService {
@Autowired
private RoomService roomService;
@Autowired
private RoomDataCache roomDataCache;
@Autowired
private CaiAccountService accountService;
@Autowired
private OnlineDataCache onlineDataCache;
@Autowired
private Yunxin yunxin;
public void processOn(Room room){
try {
deal(room);
}finally {
}
}
private void deal(Room room){
if(room.isCanCall() || room.isSettle()){
return;
}
if(!room.isReleaseRes()){
roomService.releaseRes(Long.valueOf(room.getRoomId()));
return;
}
// 结算
if(!settle(room)){
return;
}
// 移除在线房间状态
onlineDataCache.remove(room.getRoomId());
// 更新支付金额信息
Map<String,Object> map = new HashMap<>();
map.put("payCoin", ""); // 实际支付的金额
map.put("payIncome", "");
map.put("settleTime", DateUtil.currentSeconds());
roomDataCache.hMSet(Long.valueOf(room.getRoomId()),map);
// 主叫方通话市场通知
Long callTime = roomService.getCallTime(room);
Long receiverUserId = room.getReceiverUserData().getId();
Long callUserId = room.getCallUserData().getId();
if(callTime > 0){
ImDataRes imDataRes = ImMsgGen.callNotice(4, receiverUserId, callUserId, callTime);
yunxin.sendTo(room.getCallUserData().getId(),imDataRes,room.getReceiverUserData().getId());
}
// 收入通知
if(room != null){ // TODO修改数据
}
// 邀请人收入通知
// 排行榜通知
}
/**
* 结算
*/
private boolean settle(Room room){
Long roomId = room.getRoomId();
// 未通话,无需结算
if(room.getRoomData().getBeginTime() == null){
roomDataCache.hMSet(roomId,"settleTime", DateUtil.currentSeconds());
return false;
}
// 开始结算
// 呼叫方扣费
this.computeCallerPay(room);
// 结束结算
return true;
}
private void computeCallerPay(Room room) {
RoomData roomData = room.getRoomData();
UserData callUserData = room.getCallUserData();
Long callPrice = roomData.getCallPrice();
Long callTime = roomService.getCallTime(room);
// 本次支付金额
Long totalAmount = callPrice * (callTime / 60);
// 补差价
Long diff = totalAmount - roomData.getPayCoin() + roomData.getPayIncome();
CaiAccount callAccount = accountService.getByUserId(callUserData.getId());
Long userAmount = callAccount.getCoin() + callAccount.getIncomeCoin();
if(diff > 0){
// 账户上面有余额
if(userAmount > 0){
diff = (userAmount > diff) ? diff : userAmount;
log.info("roomid {} 已支付 {} 仍需要补差价:{}", roomData.getRoomId(),roomData.getPayCoin() + roomData.getPayIncome(),diff);
// TODO 对账户进行统一扣费
}else{
log.info("房间需要补差价,但用户余额不足 roomid {} 已支付 {} 仍需要补差价:{}", roomData.getRoomId(),roomData.getPayCoin() + roomData.getPayIncome(),diff);
}
}else{
// 退钱逻辑
diff = Math.abs(diff);
// TODO 对账户进行统一退钱逻辑
}
}
}

View File

@@ -0,0 +1,19 @@
package com.ruoyi.cai.ws.util;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.cai.ws.dto.WsR;
import java.util.List;
public class RoomWebSocketUtil {
public static void sendSendMessage(String sessionKey, WsR r){
WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r));
}
public static void sendSendMessage(List<String> sessionKey, WsR r){
for (String s : sessionKey) {
WebSocketUtils.sendMessage(s, JSON.toJSONString(r));
}
}
}

View File

@@ -0,0 +1,67 @@
package com.ruoyi.cai.ws.util;
import com.ruoyi.cai.ws.holder.WebSocketSessionHolder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
/**
* 工具类
*
* @author zendwang
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketUtils {
/**
* 发送消息
*
* @param sessionKey session主键 一般为用户id
* @param message 消息文本
*/
public static void sendMessage(String sessionKey, String message) {
WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
sendMessage(session, message);
}
public static boolean close(String sessionKey) {
WebSocketSession sessions = WebSocketSessionHolder.getSessions(sessionKey);
if(sessions != null){
try {
sessions.close();
return true;
} catch (IOException e) {
log.error("关闭ws失败sessionKey={}",sessionKey,e);
}
}
return false;
}
public static void sendPongMessage(WebSocketSession session) {
sendMessage(session, new PongMessage());
}
public static void sendMessage(WebSocketSession session, String message) {
sendMessage(session, new TextMessage(message));
}
private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
if (session == null || !session.isOpen()) {
log.warn("[send] session会话已经关闭");
} else {
try {
session.sendMessage(message);
} catch (IOException e) {
log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
}
}
}
}

View File

@@ -1,15 +1,12 @@
package com.ruoyi.cai.ws.util;
import com.esotericsoftware.minlog.Log;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.websocket.dto.WsRMsgGen;
import com.ruoyi.websocket.util.RoomWebSocketUtil;
import com.ruoyi.websocket.util.WebSocketUtils;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WsExceptionUtil {
public static void throwException(String sessionKey, String message, HangUpEnums hangUpType, String roomId){
public static void throwException(String sessionKey, String message, HangUpEnums hangUpType, Long roomId){
if(hangUpType == null){
hangUpType = HangUpEnums.OTHER;
}