V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
yusheng88
V2EX  ›  程序员

线程安全问题,求大佬解惑

  •  1
     
  •   yusheng88 · 2022-12-14 23:07:53 +08:00 · 2153 次点击
    这是一个创建于 736 天前的主题,其中的信息可能已经有所发展或是发生改变。

    背景

    我想实现一个合并请求工具类,思路:

    1. 把请求放入请求队列,阻塞当前线程 t1 [ Locksupport.park ]
    2. 线程 t2 收集一个批次的请求,提交到线程池
    3. 线程 tn 执行批量处理,逐个设置返回值,然后逐个唤醒阻塞线程[t1...] [ Locksupport.unpark(t1)]

    遇到的问题

    测试时发现,打印每个线程的返回值时,偶尔会遇到返回值为 null 。

    源码

    传递的请求封装对象:

    class BizTask {
        private Thread thread;
        private Object param;
        private Object response;
    
        public Object getParam() {
            return param;
        }
    
        public void setParam(Object param) {
            this.param = param;
        }
    
        public Object getResponse() {
            return response;
        }
    
        public void setResponse(Object response) {
            this.response = response;
        }
    
        public Thread getThread() {
            return thread;
        }
    
        public void setThread(Thread thread) {
            this.thread = thread;
        }
    }
    

    工具类:

    public class BatchHelper {
    
        private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 20L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100000), new ThreadPoolExecutor.CallerRunsPolicy());
        private final BlockingQueue<BizTask> requestQueue = new LinkedBlockingQueue<>();
        private final Consumer<List<BizTask>> function;
    
        public BatchHelper(Consumer<List<BizTask>> function) {
            this.function = function;
            threadPool.execute(this::autoDispatch);
        }
    
        public Object take(Object param) {
            BizTask task = new BizTask();
            task.setParam(param);
            task.setThread(Thread.currentThread());
            // synchronized (task) {
            try {
                requestQueue.put(task);
            } catch (InterruptedException ignored) {
            }
            // 阻塞
            LockSupport.park();
            // }
            return task.getResponse();
        }
    
        public void autoDispatch() {
            while (true) {
                try {
                    BizTask t1 = requestQueue.take();
                    List<BizTask> tasks = new ArrayList<>(128);
                    tasks.add(t1);
                    BizTask t2 = requestQueue.poll();
                    if (t2 == null) {
                        dispatch(tasks);
                        continue;
                    }
                    int sum = 1;
                    while (t2 != null) {
                        tasks.add(t2);
                        sum = (sum + 1) & 127;
                        if (sum == 0) {
                            dispatch(tasks);
                            tasks = new ArrayList<>(128);
                        }
                        t2 = requestQueue.poll();
                    }
                    if (sum > 0) {
                        dispatch(tasks);
                    }
                    Thread.sleep(10);
                } catch (InterruptedException ignored) {
                }
            }
        }
    
        private void dispatch(List<BizTask> list) {
            threadPool.execute(() -> {
                // 批量处理,设置返回值
                function.accept(list);
                for (BizTask task : list) {
                    // synchronized (task) {
                    Thread lock = task.getThread();
                    // 唤醒
                    LockSupport.unpark(lock);
                    // }
                }
            });
        }
    }
    

    测试案例:

    public class TestDemo {
        public static void main(String[] args) {
            BatchHelper batchHelper = new BatchHelper((tasks) -> {
                for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) {
                    BizTask task = tasks.get(i);
                    // System.out.println("requestId = " + requestId);
                    task.setResponse(i);
                }
            });
    
            // 模拟请求线程
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 20, 20L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
            for (int i = 0; i < 100; i++) {
                int finalI = i;
                threadPool.execute(() -> {
                    Object take = batchHelper.take(finalI + "");
                    if (take == null) {
                        System.out.println("take = " + take);
                    }
                });
            }
    
    
        }
    }
    
    10 条回复    2022-12-21 15:14:15 +08:00
    yusheng88
        1
    yusheng88  
    OP
       2022-12-14 23:11:37 +08:00
    为什么会出现返回值为 null?
    TylerYY
        2
    TylerYY  
       2022-12-15 09:04:40 +08:00
    是不是可见性问题呢?线程 tn 设置返回值后,upark 阻塞的线程,唤醒的线程不一定能立即看到设置的返回值?
    给 BizTask 的 response 加一个 volatile 修饰试下
    yusheng88
        3
    yusheng88  
    OP
       2022-12-15 09:07:29 +08:00 via Android
    @TylerYY 这个尝试过了,仍然会出现 take=null
    senninha
        4
    senninha  
       2022-12-15 11:21:09 +08:00
    可能的原因:
    LockSupport 的 permit 提前被设置了,这时候调用 park 会直接返回,resp 肯定就是 null 了,也就是 park 与 unpark 调用不对称?难道是 LinkedBlockingQueue 有问题?
    看文档还有这三种情况 park 会直接返回:
    Some other thread invokes unpark with the current thread as the target; or
    Some other thread interrupts the current thread; or
    The call spuriously (that is, for no reason) returns.
    yusheng88
        5
    yusheng88  
    OP
       2022-12-15 14:16:58 +08:00 via Android
    @senninha 这就是我觉得奇怪的地方,设置值在 unpark 前,获取 take 前会阻塞,无法理解为什么会出现 take=null 。我尝试过打印执行次数,次数是正确的
    oldshensheep
        6
    oldshensheep  
       2022-12-15 14:57:40 +08:00   ❤️ 1
    应该是因为这个 Spurious wakeup
    类似的问题
    https://stackoverflow.com/questions/67118821/futuretask-get-method-may-distable-locksupport-park
    https://stackoverflow.com/questions/1050592/do-spurious-wakeups-in-java-actually-happen
    我简化了楼主的代码逻辑

    if (task.data == null) {
    System.out.println(task);
    System.out.println(task);
    }
    里面的 56-58 行代码输出有时是这样的
    Task{id=3678, data='null', thread=Thread[pool-1-thread-3,5,main]}
    Task{id=3678, data='3678 FINISHED.', thread=Thread[pool-1-thread-3,5,main]}
    应该是提取被唤醒了……
    https://gist.github.com/oldshensheep/034044093ce9608ee3d02d7629c2bf81
    GloryJie
        7
    GloryJie  
       2022-12-15 15:13:33 +08:00
    在执行 setResponse 之前打印时间 A ,take == null 的时候时间 B 。得出 B < A 的,还没执行前,线程就被唤醒唤醒了。感觉是楼上说的 spuriously 的原因
    senninha
        8
    senninha  
       2022-12-15 15:36:52 +08:00
    @yusheng88 楼下有说到 spuriously (that is, for no reason) returns 。看 unlock 的说明也是需要 re-check condition 的。
    Callers should re-check the conditions which caused the thread to park in the first place.
    yusheng88
        9
    yusheng88  
    OP
       2022-12-15 15:41:25 +08:00 via Android
    @oldshensheep 感谢大佬,就是这个原因了,看注释,没理解 spuriously 调用是啥 0.0
    strayerxx
        10
    strayerxx  
       2022-12-21 15:14:15 +08:00
    我在想 LinkedBlockingQueue 底层好像也是使用 park 和 unpark 而且也都是对当前线程操作,会不会是相互之间产生了影响
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1313 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 23:46 · PVG 07:46 · LAX 15:46 · JFK 18:46
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.