您可以重新平衡大小未知的不平衡分离器吗? - java

我想使用Stream并行处理未知数目的异类远程存储的JSON文件集(文件数事先未知)。文件的大小差异很大,从每个文件1个JSON记录到其他文件中的100,000个记录。在这种情况下,JSON记录表示在文件中表示为一行的自包含JSON对象。

我真的很想为此使用Streams,所以我实现了Spliterator

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

我遇到的问题是,虽然Stream一开始很漂亮地实现了并行化,但最终最大的文件仍留在单个线程中进行处理。我认为,近端原因已得到充分证明:分离器“不平衡”。

更具体地说,似乎在trySplit生命周期中的某个点之后未调用Stream.forEach方法,因此很少执行在trySplit末尾分配小批量的额外逻辑。

请注意,从trySplit返回的所有拆分器如何共享相同的paths迭代器。我认为这是在所有拆分器之间平衡工作的非常聪明的方法,但还不足以实现完全并行。

我希望并行处理首先在文件之间进行,然后在仍然拆分几个大文件时,我希望在剩余文件的大块之间进行并行处理。这就是else末尾的trySplit块的意图。

解决这个问题是否有简单/简单/规范的方法?

参考方案

不管基础文件的大小如何,您的trySplit应该输出相等大小的拆分。您应该将所有文件视为一个单元,并每次用相同数量的JSON对象填充ArrayList支持的分隔符。对象的数量应使得处理一次拆分所需的时间介于1到10毫秒之间:小于1毫秒,并且您开始接近将批处理交给工作线程的成本,高于此成本,并且由于以下原因而开始冒着CPU负载不均匀的风险过于粗粒度的任务。

分隔符没有义务报告大小估计,并且您已经正确执行了此操作:您的估计值为Long.MAX_VALUE,这是一个特殊值,表示“无界”。但是,如果您有多个带有单个JSON对象的文件,导致批量大小为1,则这将以两种方式损害您的性能:打开,读取,关闭文件的开销可能会成为瓶颈,并且,如果设法逃脱这就是说,与处理一项的成本相比,线程移交的成本可能很高,这再次造成了瓶颈。

五年前,我正在解决类似的问题,您可以看一下my solution。

Java中的“ <<”运算符 - java

最喜欢的语句来自Java的Character类:(1 << Character.PARAGRAPH_SEPARATOR)) >> type PARAGRAPH_SEPARATOR是字节,type是整数。这句话中的操作员,他们做什么?如何以及在哪里可以使用这些运算符?这是oracles java.lang.Character文档。该类中…

java:继承 - java

有哪些替代继承的方法? java大神给出的解决方案 有效的Java:偏重于继承而不是继承。 (这实际上也来自“四人帮”)。他提出的理由是,如果扩展类未明确设计为继承,则继承会引起很多不正常的副作用。例如,对super.someMethod()的任何调用都可以引导您通过未知代码的意外路径。取而代之的是,持有对本来应该扩展的类的引用,然后委托给它。这是与Eric…

JAVA:如何检查对象数组中的所有对象是否都是子类的对象? - java

我有一个对象数组。现在,我要检查所有这些对象是否都是MyObject的实例。有没有比这更好的选择:boolean check = true; for (Object o : justAList){ if (!(o instanceof MyObject)){ check = false; break; } } java大神给出的解决方案 如果您不喜欢循环,则…

Java-如何将此字符串转换为日期? - java

我从服务器收到此消息,我不明白T和Z的含义,2012-08-24T09:59:59Z将此字符串转换为Date对象的正确SimpleDateFormat模式是什么? java大神给出的解决方案 这是ISO 8601标准。您可以使用SimpleDateFormat simpleFormat = new SimpleDateFormat("yyyy-MM…

展平地图中的列表列表 - java

我有订单流(来源是订单列表)。每个订单都有一个客户和一个OrderLine列表。我要实现的目标是在一张简单的列表中以客户为关键的地图,并将属于该客户的所有订单行作为值。现在,通过执行以下操作,我现在管理的内容返回了Map<Customer>, List<Set<OrderLine>>>:orders .collect…