我想使用Stream
并行处理未知数目的异类远程存储的JSON文件集(文件数事先未知)。文件的大小差异很大,从每个文件1个JSON记录到其他文件中的100,000个记录。在这种情况下,JSON记录表示在文件中表示为一行的自包含JSON对象。
我真的很想为此使用Streams,所以我实现了Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
我遇到的问题是,虽然Stream一开始很漂亮地实现了并行化,但最终最大的文件仍留在单个线程中进行处理。我认为,近端原因已得到充分证明:分离器“不平衡”。
更具体地说,似乎在trySplit
生命周期中的某个点之后未调用Stream.forEach
方法,因此很少执行在trySplit
末尾分配小批量的额外逻辑。
请注意,从trySplit返回的所有拆分器如何共享相同的paths
迭代器。我认为这是在所有拆分器之间平衡工作的非常聪明的方法,但还不足以实现完全并行。
我希望并行处理首先在文件之间进行,然后在仍然拆分几个大文件时,我希望在剩余文件的大块之间进行并行处理。这就是else
末尾的trySplit
块的意图。
解决这个问题是否有简单/简单/规范的方法?
参考方案
不管基础文件的大小如何,您的trySplit
应该输出相等大小的拆分。您应该将所有文件视为一个单元,并每次用相同数量的JSON对象填充ArrayList
支持的分隔符。对象的数量应使得处理一次拆分所需的时间介于1到10毫秒之间:小于1毫秒,并且您开始接近将批处理交给工作线程的成本,高于此成本,并且由于以下原因而开始冒着CPU负载不均匀的风险过于粗粒度的任务。
分隔符没有义务报告大小估计,并且您已经正确执行了此操作:您的估计值为Long.MAX_VALUE
,这是一个特殊值,表示“无界”。但是,如果您有多个带有单个JSON对象的文件,导致批量大小为1,则这将以两种方式损害您的性能:打开,读取,关闭文件的开销可能会成为瓶颈,并且,如果设法逃脱这就是说,与处理一项的成本相比,线程移交的成本可能很高,这再次造成了瓶颈。
五年前,我正在解决类似的问题,您可以看一下my solution。
Java中的“ <<”运算符 - java最喜欢的语句来自Java的Character类:(1 << Character.PARAGRAPH_SEPARATOR)) >> type PARAGRAPH_SEPARATOR是字节,type是整数。这句话中的操作员,他们做什么?如何以及在哪里可以使用这些运算符?这是oracles java.lang.Character文档。该类中…
java:继承 - java有哪些替代继承的方法? java大神给出的解决方案 有效的Java:偏重于继承而不是继承。 (这实际上也来自“四人帮”)。他提出的理由是,如果扩展类未明确设计为继承,则继承会引起很多不正常的副作用。例如,对super.someMethod()的任何调用都可以引导您通过未知代码的意外路径。取而代之的是,持有对本来应该扩展的类的引用,然后委托给它。这是与Eric…
JAVA:如何检查对象数组中的所有对象是否都是子类的对象? - java我有一个对象数组。现在,我要检查所有这些对象是否都是MyObject的实例。有没有比这更好的选择:boolean check = true; for (Object o : justAList){ if (!(o instanceof MyObject)){ check = false; break; } } java大神给出的解决方案 如果您不喜欢循环,则…
Java-如何将此字符串转换为日期? - java我从服务器收到此消息,我不明白T和Z的含义,2012-08-24T09:59:59Z将此字符串转换为Date对象的正确SimpleDateFormat模式是什么? java大神给出的解决方案 这是ISO 8601标准。您可以使用SimpleDateFormat simpleFormat = new SimpleDateFormat("yyyy-MM…
展平地图中的列表列表 - java我有订单流(来源是订单列表)。每个订单都有一个客户和一个OrderLine列表。我要实现的目标是在一张简单的列表中以客户为关键的地图,并将属于该客户的所有订单行作为值。现在,通过执行以下操作,我现在管理的内容返回了Map<Customer>, List<Set<OrderLine>>>:orders .collect…