Spark使用上一行的值将新列添加到数据框 - python

我想知道如何在Spark(Pyspark)中实现以下目标

初始数据框:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+

结果数据框:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+

我设法通过使用类似的东西将新的列通常“追加”到数据框:
df.withColumn("new_Col", df.num * 10)

但是,我不知道如何为新列实现这种“行移位”,以便新列具有上一行的字段值(如示例中所示)。我也无法在API文档中找到有关如何通过索引访问DF中特定行的任何内容。

任何帮助,将不胜感激。

参考方案

您可以按以下方式使用lag窗口函数

from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()

## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## |  2|3.0|    5.0|
## |  3|7.0|    3.0|
## |  4|9.0|    7.0|
## +---+---+-------+

但是有一些重要的问题:

如果您需要全局操作(不被其他一个或多个列分区),则效率极低。
您需要一种自然的方式来订购数据。

尽管第二个问题几乎从来都不是问题,但第一个问题可以成为破坏交易的方法。如果是这种情况,您只需将DataFrame转换为RDD并手动计算lag。参见例如:

How to transform data with sliding window over time series data in Pyspark
Apache Spark Moving Average(用Scala编写,但可以针对PySpark进行调整。请务必先阅读注释)。

其他有用的链接:

https://github.com/UrbanInstitute/pyspark-tutorials/blob/master/05_moving-average-imputation.ipynb
Spark Window Functions - rangeBetween dates

Python uuid4,如何限制唯一字符的长度 - python

在Python中,我正在使用uuid4()方法创建唯一的字符集。但是我找不到将其限制为10或8个字符的方法。有什么办法吗?uuid4()ffc69c1b-9d87-4c19-8dac-c09ca857e3fc谢谢。 参考方案 尝试:x = uuid4() str(x)[:8] 输出:"ffc69c1b" Is there a way to…

Python GPU资源利用 - python

我有一个Python脚本在某些深度学习模型上运行推理。有什么办法可以找出GPU资源的利用率水平?例如,使用着色器,float16乘法器等。我似乎在网上找不到太多有关这些GPU资源的文档。谢谢! 参考方案 您可以尝试在像Renderdoc这样的GPU分析器中运行pyxthon应用程序。它将分析您的跑步情况。您将能够获得有关已使用资源,已用缓冲区,不同渲染状态上…

Python-crontab模块 - python

我正在尝试在Linux OS(CentOS 7)上使用Python-crontab模块我的配置文件如下:{ "ossConfigurationData": { "work1": [ { "cronInterval": "0 0 0 1 1 ?", "attribute&…

Python:检查是否存在维基百科文章 - python

我试图弄清楚如何检查Wikipedia文章是否存在。例如,https://en.wikipedia.org/wiki/Food 存在,但是https://en.wikipedia.org/wiki/Fod 不会,页面只是说:“维基百科没有此名称的文章。”谢谢! 参考方案 >>> import urllib >>> prin…

Python sqlite3数据库已锁定 - python

我在Windows上使用Python 3和sqlite3。我正在开发一个使用数据库存储联系人的小型应用程序。我注意到,如果应用程序被强制关闭(通过错误或通过任务管理器结束),则会收到sqlite3错误(sqlite3.OperationalError:数据库已锁定)。我想这是因为在应用程序关闭之前,我没有正确关闭数据库连接。我已经试过了: connectio…