PySpark排序排序 - python

请帮助初学者。
通常的db放在一个表中。

使用Python分析Apache Spark中的数据。想要编写一个查询,该查询将提取所有按订单分类的电子邮件的客户的所有交易,这些订单现在订购的产品已经停产,并且尚未发货。基本上,“&item_in_list(F.lit(“ NotShipped”),ShippedStatus)“不能正常工作。

%python
import pyspark.sql.functions as F
from pyspark.sql.types import *

list_len = F.udf(lambda x: len(x), IntegerType())
item_in_list = F.udf(lambda x, y: x in y, BooleanType())
df = spark.sql("select * from orderdb")
df1 = df.select("email", "OrderedProduct","ShippedStatus").groupBy("email")
df1 = df1.agg(F.collect_set("OrderedProduct"))\
       .withColumnRenamed("collect_set(OrderedProduct)", "OrderedProduct")
df1 = df1.filter((list_len(df1.OrderedProduct) > 1) & 
               item_in_list(F.lit("DiscontinuedProduct"), OrderedProduct) 
        &item_in_list(F.lit("NotShipped"), ShippedStatus)

df1 = df1.select("email")
df = df1.join(df, "email", "left_outer")
display(df)

ID字符串为空
date DateTimestamp null
OrderedProduct字符串为null
ShippedStatus布尔值null

参考方案

首先,udf在pyspark中表现很差。如果要更改类型,请使用以下内容:

from pyspark.sql.types import IntegerType

df = df.withColumn("column", df["column"].cast(IntegerType()))

话虽如此,剩下的我们需要一个可复制的示例,但是我想您可以使用'where'子句解决它。

# Your code
df1 = df1.filter((list_len(df1.OrderedProduct) > 1) & 
               item_in_list(F.lit("DiscontinuedProduct"), OrderedProduct) 
        &item_in_list(F.lit("NotShipped"), ShippedStatus)

# My code
condition1 = F.col('OrderedProduct') > 1
condition2 = F.col('ShippedStatus') == F.lit('NotShipped')
condition3 = F.col('OrderedProduct') == F.lit('DiscontinuedProduct')

df1 = df1.where(condition 1 & condition2 & condition3)

Python:在不更改段落顺序的情况下在文件的每个段落中反向单词? - python

我想通过反转text_in.txt文件中的单词来生成text_out.txt文件,如下所示:text_in.txt具有两段,如下所示:Hello world, I am Here. I am eighteen years old. text_out.txt应该是这样的:Here. am I world, Hello old. years eighteen a…

用大写字母拆分字符串,但忽略AAA Python Regex - python

我的正则表达式:vendor = "MyNameIsJoe. I'mWorkerInAAAinc." ven = re.split(r'(?<=[a-z])[A-Z]|[A-Z](?=[a-z])', vendor) 以大写字母分割字符串,例如:'我的名字是乔。 I'mWorkerInAAAinc”变成…

Python sqlite3数据库已锁定 - python

我在Windows上使用Python 3和sqlite3。我正在开发一个使用数据库存储联系人的小型应用程序。我注意到,如果应用程序被强制关闭(通过错误或通过任务管理器结束),则会收到sqlite3错误(sqlite3.OperationalError:数据库已锁定)。我想这是因为在应用程序关闭之前,我没有正确关闭数据库连接。我已经试过了: connectio…

如何在python中将从PDF提取的文本格式化为json - python

我已经使用pyPDF2提取了一些文本格式的发票PDF。我想将此文本文件转换为仅包含重要关键字和令牌的json文件。输出应该是这样的:#PurchaseOrder {"doctype":"PO", "orderingcompany":"Demo Company", "su…

查找字符串中的行数 - python

我正在创建一个python电影播放器​​/制作器,我想在多行字符串中找到行数。我想知道是否有任何内置函数或可以编写代码的函数来做到这一点:x = """ line1 line2 """ getLines(x) python大神给出的解决方案 如果换行符是'\n',则nlines …