将dask数据框中的列转换为Doc2Vec的TaggedDocument - python

介绍
目前,我正在尝试与gensim一起使用dask进行NLP文档计算,并且在将我的语料库转换为“ TaggedDocument”时遇到了问题。
因为我已经尝试了多种解决该问题的方法,所以将列出我的尝试。
每次处理此问题的尝试都会遇到一些稍有不同的麻烦。
首先是一些初始给定。
数据

df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, claim_no to litigation
dtypes: object(2), int64(3)

  claim_no   claim_txt I                                    CL ICC lit
0 8697278-17 battery comprising interior battery active ele... 106 2 0

期望的输出

>>tagged_document[0]
>>TaggedDocument(words=['battery', 'comprising', 'interior', 'battery', 'active', 'elements', 'battery', 'cell', 'casing', 'said', 'cell', 'casing', 'comprising', 'first', 'casing', 'element', 'first', 'contact', 'surface', 'second', 'casing', 'element', 'second', 'contact', 'surface', 'wherein', 'assembled', 'position', 'first', 'second', 'contact', 'surfaces', 'contact', 'first', 'second', 'casing', 'elements', 'encase', 'active', 'materials', 'battery', 'cell', 'interior', 'space', 'wherein', 'least', 'one', 'gas', 'tight', 'seal', 'layer', 'arranged', 'first', 'second', 'contact', 'surfaces', 'seal', 'interior', 'space', 'characterized', 'one', 'first', 'second', 'contact', 'surfaces', 'comprises', 'electrically', 'insulating', 'void', 'volume', 'layer', 'first', 'second', 'contact', 'surfaces', 'comprises', 'formable', 'material', 'layer', 'fills', 'voids', 'surface', 'void', 'volume', 'layer', 'hermetically', 'assembled', 'position', 'form', 'seal', 'layer'], tags=['8697278-17'])
>>len(tagged_document) == len(df['claim_txt'])

错误编号1不允许生成器

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    for i, line in enumerate(df[corp]):
        yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))

tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()

TypeError:无法序列化类型生成器的对象。
在仍然使用发电机的情况下,我发现无法解决此问题。解决这个问题的方法非常棒!因为这对于普通的熊猫来说效果很好。
错误号2仅每个分区的第一个元素

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    for i, line in enumerate(df[corp]):
        return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))

tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()

这个有点笨,因为该函数不会进行迭代(我知道),但是给出所需的格式,但是只返回每个分区的第一行。
错误3函数调用挂起100%cpu

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    tagged_list = []
    for i, line in enumerate(df[corp]):
        tagged = gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
        tagged_list.append(tagged)
    return tagged_list

据我所知,在循环外重构返回值时,此函数挂起将在dask客户端中建立内存,并且我的CPU利用率达到100%,但未计算任何任务。请记住,我以相同的方式调用该函数。
熊猫解决方案

def tag_corp(corp,tag):
    return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(corp), ([tag]))

tagged_document = [tag_corp(x,y) for x,y in list(zip(df_smple['claim_txt'],df_smple['claim_no']))]

List comp我还没时间测试这个解决方案
其他熊猫解决方案

tagged_document = list(read_corpus_tag_sub(df))

这个解决方案将持续几个小时。但是,完成后,我没有足够的内存来处理这个问题。
结论(?)
我觉得现在超级迷路了。这是我看过的线程列表。我承认自己真的是个新手,我花了这么多时间,感觉就像是在做一个傻瓜。

Dask Bag from generator
Processing Text With Dask
Speed up Pandas apply using Dask
How do you parallelize apply() on Pandas Dataframes making use of all cores on one machine?
python dask DataFrame, support for (trivially parallelizable) row apply?
What is map_partitions doing?
simple dask map_partitions example
The Docs

参考方案

我不熟悉Dask API /限制,但通常:

如果您可以将数据作为(单词,标签)元组进行遍历–甚至忽略Doc2Vec / TaggedDocument步骤–那么Dask端就已经处理好了,将这些元组转换为TaggedDocument实例应该很简单
通常,对于大型数据集,您不想(并且可能没有足够的RAM来)将完整数据集实例化为内存中的list –因此,涉及list().append()的尝试可能会起作用,达到一定程度,但是耗尽了本地内存(导致严重的交换)和/或只是没有达到数据的末尾。

大型数据集的首选方法是创建一个可迭代的对象,该对象每次被要求对数据进行迭代(因为Doc2Vec训练将需要多次通过)可以依次提供每个项目,但从不读取将整个数据集存储到一个内存对象中。

关于此模式的一个不错的博客文章是:Data streaming in Python: generators, iterators, iterables

给定您显示的代码,我怀疑适合您的方法可能是:

from gensim.utils import simple_preprocess

class MyDataframeCorpus(object):
    def __init__(self, source_df, text_col, tag_col):
        self.source_df = source_df
        self.text_col = text_col
        self.tag_col = tag_col

    def __iter__(self):
        for i, row in self.source_df.iterrows():
            yield TaggedDocument(words=simple_preprocess(row[self.text_col]), 
                                 tags=[row[self.tag_col]])

corpus_for_doc2vec = MyDataframeCorpus(df, 'claim_txt', 'claim_no')

R'relaimpo'软件包的Python端口 - python

我需要计算Lindeman-Merenda-Gold(LMG)分数,以进行回归分析。我发现R语言的relaimpo包下有该文件。不幸的是,我对R没有任何经验。我检查了互联网,但找不到。这个程序包有python端口吗?如果不存在,是否可以通过python使用该包? python参考方案 最近,我遇到了pingouin库。

我不明白为什么sum(df ['series'])!= df ['series']。sum() - python

我正在汇总一系列值,但是根据我的操作方式,我会得到不同的结果。我尝试过的两种方法是:sum(df['series']) df['series'].sum() 他们为什么会返回不同的值?示例代码。s = pd.Series([ 0.428229 , -0.948957 , -0.110125 , 0.791305 , 0…

如何用'-'解析字符串到节点js本地脚本? - python

我正在使用本地节点js脚本来处理字符串。我陷入了将'-'字符串解析为本地节点js脚本的问题。render.js:#! /usr/bin/env -S node -r esm let argv = require('yargs') .usage('$0 [string]') .argv; console.log(argv…

Python:传递记录器是个好主意吗? - python

我的Web服务器的API日志如下:started started succeeded failed 那是同时收到的两个请求。很难说哪一个成功或失败。为了彼此分离请求,我为每个请求创建了一个随机数,并将其用作记录器的名称logger = logging.getLogger(random_number) 日志变成[111] started [222] start…

Python-Excel导出 - python

我有以下代码:import pandas as pd import requests from bs4 import BeautifulSoup res = requests.get("https://www.bankier.pl/gielda/notowania/akcje") soup = BeautifulSoup(res.cont…