V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
v2410117
V2EX  ›  Java

一个 Java 的问题,HTTP 与 Websocket

  •  1
     
  •   v2410117 · 2019-12-25 16:14:22 +08:00 · 4497 次点击
    这是一个创建于 1843 天前的主题,其中的信息可能已经有所发展或是发生改变。

    现在的情况是这样的,Java 接收 HTTP 请求,在这个 HTTP 请求中会使用到 Websocket 去请求其他数据,怎么在这个 HTTP 中同步等待 Websocket 的业务返回?代码如下,主要是 Websocket 收到消息是走 OnMessage,我没想出好办法做这个处理,目前的处理办法就是在接到 HTTP 请求的时候,往数据库里加一条记录,然后一直使用这个记录 ID 循环查询数据库,当 Websocket 返回数据了,就将数据写入数据库并改变那条记录的状态,HTTP 请求里就直接相当于在数据库里查询到 Websocket 的业务返回,虽然这样做我是可以暂时完成功能,但好像感觉不太靠谱,求大神指点一下

    public class TestController {
        @Autowired
        private MyWebSocket myWebSocket;
    
        @RequestMapping("/test")
        public Object test(){
           myWebSocket.sendText("ceshi");
           return xxx;
        }
    
    }
    
    
    
    @ServerEndpoint(port = 9898)
    @Component
    public class MyWebSocket {
        
    
    private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<Session>());
    
        @OnOpen
        public void onOpen(Session session, HttpHeaders headers, ParameterMap parameterMap) throws IOException {
            System.out.println("new connection");
            sessions.add(session);
        }
    
        @OnMessage
        public void onMessage(Session session, String message) {
            System.out.println(message);
            if ("heart".equals(message)) {
                session.sendText("heart");
            }else {
                session.sendText(message);
            }
        }
    
         public void sendText(String msg) {
                 for (Session session: sessions) {
                     session.sendText(msg);
                 }
          }
    }
    
    19 条回复    2019-12-25 19:41:47 +08:00
    v2410117
        1
    v2410117  
    OP
       2019-12-25 16:38:06 +08:00
    求大佬来指点下思路呀! V2 大神们,出来了,别潜水了!
    muzuiget
        2
    muzuiget  
       2019-12-25 16:45:51 +08:00
    你那个数据库用法,其实就是当锁用了,关键词“Java Lock”,搜索下。
    v2410117
        3
    v2410117  
    OP
       2019-12-25 16:48:16 +08:00
    @muzuiget 好的,感谢,我先搜索一下
    672795574
        4
    672795574  
       2019-12-25 16:48:53 +08:00
    用 CompletableFuture 试试?
    gaius
        5
    gaius  
       2019-12-25 16:55:12 +08:00
    不用 websocket 用服务调用,或者把 http 的业务流程也改成异步的
    gy123
        6
    gy123  
       2019-12-25 17:07:19 +08:00
    你写这个应该只是演示吧,因为你 http 收到请求后,发送给的是所有 session,然后需求是等待 onmessage 返回结果后响应 http;实际中应该是通过 http 请求给相应 session 发送吧?目前想到的方案是这样:
    一.你要把 http 跟 websocket 全用 netty 来写可能灵活性好一些;
    二.通过传递响应对象和锁工具:
    (1) http 接收到请求后,拿到当前请求 sessionid 或者你根据业务自己生成的随机数,然后分别存储 key 为随机数,value 为 HttpServletResponse 和类似 CountDownLatch 对象存储到两个全局 map 中;并且将该随机数发送给 websocket 接收者;
    后面用 CountDownLatch 等工具 hold 住线程;
    (2)onmessage 接受到消息后,通过随机数取出 map 中 HttpServletResponse 对象和 CountDownLatch,进行返回数据和释放锁;
    buruoyanyang
        7
    buruoyanyang  
       2019-12-25 17:10:26 +08:00
    做过差不多的,http 请求过来,通过 MQ 调用其他模块等返回。
    用 Future,然后做线程等待,等 MQ 回消息后,在唤醒等待线程即可
    gy123
        8
    gy123  
       2019-12-25 17:22:00 +08:00
    @buruoyanyang 具体说说,请教下?目前看你描述是在整个 http 请求同步代码中,只是使用 future 做异步请求 mq 拿到结果,如何能应用到他这个案例?
    buruoyanyang
        9
    buruoyanyang  
       2019-12-25 17:32:48 +08:00
    @gy123 http 请求->注册消息通知器->添加等待线程->提交到线程池,得到 Future->同步发送消息->收到 MQ 返回->唤醒等待线程->future.get()
    大概是这么个流程,太久了也不太记得了
    http 请求会在 future.get()被阻塞
    他这个就是把 MQ 换成 WebSocket,WebSocket 返回后唤醒等待线程
    pws22
        10
    pws22  
       2019-12-25 17:39:53 +08:00
    @buruoyanyang 你这块 Future 是获取 mq 的返回状态码,还是说获取消费端的结果,如果是获取结果,是 future 去拉结果?
    跟题主的问题可能有点出路吧
    gy123
        11
    gy123  
       2019-12-25 17:41:47 +08:00
    @buruoyanyang future.get()这个会阻塞没错,但是 callback 在 onmessage 里怎么拿到并做返回?这里有点疑问
    pws22
        12
    pws22  
       2019-12-25 17:51:59 +08:00
    要不直接内存是做判断 , http.sendMsg(Callback callback,long timeout),callback 里面传入你当前的 id,然后在内存中(map)去循环判断有没有该 id,当 socket 返回信息的时候,在内存(map)里面存入这个 id,其实跟你存数据库道理一样,最好就是不用 socket 那块,直接改成 http,都做到同步,或者是你在提供一个接口,是获取状态数据的,前台去长轮询获取最终结果
    buruoyanyang
        13
    buruoyanyang  
       2019-12-25 17:58:14 +08:00
    @gy123 所有被 wait 的消息通知器都会被丢到一个 key 为 id,value 为消息通知器的 map 里面,onMessage 通过消息 id 从 map 里面得到消息通知器,然后 notify,可能这个消息通知器的名字有点误导人
    mizzle
        14
    mizzle  
       2019-12-25 17:59:39 +08:00 via Android
    这个场景用,涉及多线程交互。直观方法是 exchanger,或者用通知队列。
    当年看的《如何造火箭》,还是能派上用场的。
    buruoyanyang
        15
    buruoyanyang  
       2019-12-25 18:05:43 +08:00
    @pws22 Future 获取的是 mq 的返回状态码,不获取结果,结果直接丢到消息通知器里面
    zsy979
        16
    zsy979  
       2019-12-25 18:24:46 +08:00
    是不是我没看懂题 Thread.join 不🉑️吗
    ```
    @RequestMapping("/test")
    public Object test(){
    Thread t = new Thread(new sendText());
    t.run();
    t.join();
    //t 执行完执行。。
    return xxx;
    }
    ```
    0NF09LJPS51k57uH
        17
    0NF09LJPS51k57uH  
       2019-12-25 19:09:37 +08:00
    用发布订阅去做
    http 业务逻辑中创建并监听这个 key 的 delete event
    在 websocket 处理逻辑中删除这个 key。
    araaaa
        18
    araaaa  
       2019-12-25 19:11:11 +08:00 via iPhone
    reactor 的 mono.zip
    Malthael
        19
    Malthael  
       2019-12-25 19:41:47 +08:00
    DeferredResult 用过吗?我现在的项目 http 接口和 tcp 异步转同步用的就是这个。http 接口请求后发送 tcp 请求,然后等待客户端回复之后接口返回,还有超时设置。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2725 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 11:30 · PVG 19:30 · LAX 03:30 · JFK 06:30
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.