Apache Beam with Python:如何在会话窗口中计算最小值并将其应用于所有相关的PCollections - python

我正在使用Apache Beam的Python SDK来处理字典,这些字典代表着流分析命中率。由于会话窗口的缘故,命中次数得以汇总。我的DataFlow真正要做的就是应用这些会话窗口,并将会话ID分配给所有相关的匹配。

作为会话ID,我已经确定我将使用首次匹配的时间戳(结合每个用户的Cookie ID)。这是我的管道:

msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
    topic='projects/{PROJECT_ID}/topics/{SOURCE_TOPIC}'.format(
        PROJECT_ID=PROJECT_ID, SOURCE_TOPIC=SOURCE_TOPIC),
    id_label='hit_id',
    timestamp_attribute='time')

hits = msgs | 'Parse' >> beam.Map(my_parser)

windowed_hits = hits | 'Windowing' >> beam.WindowInto(beam.window.Sessions(1 * 60))

visit_id = (windowed_hits | 'ExtractTimestamp' >> beam.Map(my_extracter)
    | 'GetMinimum' >> beam.CombineGlobally(my_min).without_defaults())

windowed_hits | 'SetVisitId' >>
    beam.Map(set_visit_id, visit_id=beam.pvalue.AsSingleton(visit_id))

my_parser正在应用literal_eval将字符串转换为字典。 my_extracter正在删除匹配的时间戳。 set_visit_id只是接受一个参数并将其分配给键visit_id。

这似乎不起作用。调试时,似乎我的visit_id分支正常运行,并且它在计算最小值之前等待会话结束。但是当用作辅助输入时,我只会得到一个pvalue.EmptySideInput。如何获得所需的结果,为什么我的代码返回空侧输入?

编辑:我已经用AsIter替换了AsSingleton,以了解这里出了什么问题。我得到的是一个_FilteringIterable:

_iterable包含一个WindowedWalue。该值是我发送的唯一匹配的时间戳(我们将其称为TS1)。它与一个从TS1到TS1 + 60的窗口相关联。奇怪的是,此WindowedValue的timestamp属性为TS1 + 60(.238),但是我猜这是因为计算最小值的分支在计算会话之前等待会话完成最低。
_target_window包含一个窗口,范围从TS + 60(.24)到TS + 120(.24)。

所以我想问题是这个_target_window,但是我不明白为什么它的范围从TS + 60到TS +120。可能是因为WindowedValue的时间戳吗?似乎很有可能,因为_target_window的边界似乎是从其取整值得出的。

参考方案

最终,我通过丢弃所有Combine并将其替换为GroupByKey来实现了自己想做的事情。

def my_parser(msg):
    result = literal_eval(msg)
    return result

def set_key(hit):
    return (hit['cid'], hit)

def set_vid2(keyed_hits):
    k, hits = keyed_hits
    visit_id = min([h['time'] for h in hits])
    for h in hits:
        h['visit_id'] = visit_id
    return hits

def unpack_list(l):
    for d in l:
        yield d

msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
    topic='projects/{PROJECT_ID}/topics/{SOURCE_TOPIC}'.format(
        PROJECT_ID=PROJECT_ID, SOURCE_TOPIC=SOURCE_TOPIC),
    id_label='hit_id',
    timestamp_attribute='time')

hits = msgs | 'Parse' >> beam.Map(my_parser)

keyed_hits = hits | 'SetKey' >> beam.Map(set_key)

windowed_hits = (keyed_hits | 'Windowing' >> beam.WindowInto(beam.window.Sessions(1 * 60))
    | 'Grouping' >> beam.GroupByKey())

clean_hits = windowed_hits | 'ComputeMin' >> beam.Map(set_vid2)

clean_hits | 'Unpack' >> beam.FlatMap(unpack_list)

在GroupByKey之后,我有一个PCollection,其中包含命中列表(按Cookie ID +会话窗口分组)。然后,一旦计算了访问ID并在每个匹配上设置了访问ID,就可以将我的匹配列表PCollection转换为具有unpack_list的匹配PCollection。

我不确定这是否是正确的方法,但是它是否会对性能产生影响。

Selenium with Python:从具有只读功能的表单中收集电子邮件 - python

我正在尝试从内部具有只读内容的网站上的表单收集电子邮件地址。<input name="email" id="email" type="text" class="form-control" value="[email protected]" readonl…

如何使用文件和循环进入列表? - python

我有一个清单:alph = ['a','b','c','d','e','f','g'] 我需要使用for循环并使用文件之前创建的来打印它。alphabeth = ['a','b',…

在返回'Response'(Python)中传递多个参数 - python

我在Angular工作,正在使用Http请求和响应。是否可以在“响应”中发送多个参数。角度文件:this.http.get("api/agent/applicationaware").subscribe((data:any)... python文件:def get(request): ... return Response(seriali…

Python exchangelib在子文件夹中读取邮件 - python

我想从Outlook邮箱的子文件夹中读取邮件。Inbox ├──myfolder 我可以使用account.inbox.all()阅读收件箱,但我想阅读myfolder中的邮件我尝试了此页面folder部分中的内容,但无法正确完成https://pypi.python.org/pypi/exchangelib/ 参考方案 您需要首先掌握Folder的myfo…

如何修复AttributeError:模块'numpy'没有属性'square' - python

Improve this question 我已经将numpy更新为1.14.0。我使用Windows10。我尝试运行我的代码,但出现此错误: AttributeError:模块“ numpy”没有属性“ square”这是我的进口商品:%matplotlib inline import matplotlib.pyplot as plt import ten…