J

JAVA同行者

V1

2022/11/08阅读:36主题:默认主题

parallelStream并发流线程安全问题

起因

公司项目中有数据获取任务,由于返回数据的类型是字符串,需要进行转换,变为我们定义的类型id,在准备我们定义的类型Map时,先找到所有类型数据,然后使用了并行流进行遍历,将name存储为key,id存储为value,生成转换map。测试阶段一切正常,但是当项目上线一段时间(某天夜里)后,突然出现定时任务中的报表部分执行失败。

出错位置:类型转换map在报错的那几次任务中没有存储所有的类型键值对,简单来讲,就是丢了一些,导致没有找到转换类型的数据直接入了字符串到数据库,导致报表统计的update语句执行报错,报表数据丢失。

技术原因:parallelStream并行流并不是线程安全的,并发时存在数据丢失的情况,在往普通的collection中add数据时会出现抢占资源的线程问题。导致数据随机缺失

代码复现

public class TestParallelStream {
 public static void main(String[] args) {
  List<String> alist = new ArrayList<String>(Arrays.asList("1","2","3","4","5","6","7"));
  for(int i=0;i<10000;i++) {
   Map<String, String> result = new HashMap<String, String>();
   alist.parallelStream().forEach(item->{
    result.put(item, item);
   });
   System.out.println("i="+i+",map大小:"+result.size());
  }
 }
}

返回结果

......
i=5678,map大小:7
i=5679,map大小:7
i=5680,map大小:6      //出现map只有6个元素的情况了
i=5681,map大小:7
......

==结论==:java8的并行流不保证线程安全,要保证线程安全需要加一个其他流程

解决

  1. 给collection上锁

    //对要存储元素的map要求线程安全
    Map<String, String> result = new HashMap<String, String>();
    修改为
    Map<String, String> result = Collections.synchronizedMap(new HashMap());
       
  2. 使用标准for循环或增强for循环 代码就不展示了

  3. 使用串行流stream

    alist.stream().forEach(item->{
     result.put(item, item);
    });
  4. 使用java8中的收集器(使用parallelStream后使用collect )

alist.parallelStream().collect(Collectors.toList()).forEach(item->{
 result.put(item, item);
});

Collectors提供了toSet、toList、toConcurrentMap等方法

分类:

后端

标签:

后端

作者介绍

J
JAVA同行者
V1