使用TensorFlow Dataset API和flat_map的并行线程 - python

我正在将TensorFlow代码从旧队列界面更改为新的Dataset API。使用旧的接口,我可以为num_threads队列指定tf.train.shuffle_batch参数。但是,控制Dataset API中线程数量的唯一方法似乎是使用map参数在num_parallel_calls函数中。但是,我改用flat_map函数,该函数没有这样的参数。

问:是否可以控制flat_map函数的线程/进程数?还是有办法将mapflat_map结合使用并且仍然指定并行调用的数量?

请注意,并行运行多个线程至关重要,因为我打算在数据进入队列之前在CPU上进行大量的预处理。

GitHub上有两个(here和here)相关文章,但我不认为他们回答了这个问题。

这是我的用例的最小代码示例,以供说明:

with tf.Graph().as_default():
    data = tf.ones(shape=(10, 512), dtype=tf.float32, name="data")
    input_tensors = (data,)

    def pre_processing_func(data_):
        # normally I would do data-augmentation here
        results = (tf.expand_dims(data_, axis=0),)
        return tf.data.Dataset.from_tensor_slices(results)

    dataset_source = tf.data.Dataset.from_tensor_slices(input_tensors)
    dataset = dataset_source.flat_map(pre_processing_func)
    # do something with 'dataset'

python大神给出的解决方案

据我所知,目前flat_map不提供并行选项。
鉴于大部分计算是在pre_processing_func中完成的,因此可以使用的解决方案是并行进行map调用,然后进行一些缓冲,然后使用带有身份lambda函数的flat_map调用来处理展平输出。

在代码中:

NUM_THREADS = 5
BUFFER_SIZE = 1000

def pre_processing_func(data_):
    # data-augmentation here
    # generate new samples starting from the sample `data_`
    artificial_samples = generate_from_sample(data_)
    return atificial_samples

dataset_source = (tf.data.Dataset.from_tensor_slices(input_tensors).
                  map(pre_processing_func, num_parallel_calls=NUM_THREADS).
                  prefetch(BUFFER_SIZE).
                  flat_map(lambda *x : tf.data.Dataset.from_tensor_slices(x)).
                  shuffle(BUFFER_SIZE)) # my addition, probably necessary though

注意(给我自己和任何试图理解管道的人):

由于pre_processing_func从初始样本(以形状为(?, 512)的矩阵组织)开始生成任意数量的新样本,因此必须使用flat_map调用将所有生成的矩阵转换为包含单个样本的Dataset(因此(lambda中的tf.data.Dataset.from_tensor_slices(x)),然后将所有这些数据集展平为一个包含各个样本的大Dataset

.shuffle()最好将数据集或生成的样本打包在一起。

Python sqlite3数据库已锁定 - python

我在Windows上使用Python 3和sqlite3。我正在开发一个使用数据库存储联系人的小型应用程序。我注意到,如果应用程序被强制关闭(通过错误或通过任务管理器结束),则会收到sqlite3错误(sqlite3.OperationalError:数据库已锁定)。我想这是因为在应用程序关闭之前,我没有正确关闭数据库连接。我已经试过了: connectio…

使用Tensorflow 2.0-AMI设置AWS EC2实例还是自己构建它? - python

我需要使用Tensorflow 2.0设置一个AWS EC2 GPU实例。我看到的所有文档都表明当前的AWS AMI映像仅支持Tensorflow 1.14或1.15,但不支持Tensorflow 2.0。因此,我想知道在AWS实例上获取Tensorflow-gpu 2.0的最佳方法是什么。我可以创建一个EC2 GPU实例,安装Nvidia驱动程序,然后使用…

Python pytz时区函数返回的时区为9分钟 - python

由于某些原因,我无法从以下代码中找出原因:>>> from pytz import timezone >>> timezone('America/Chicago') 我得到:<DstTzInfo 'America/Chicago' LMT-1 day, 18:09:00 STD…

用大写字母拆分字符串,但忽略AAA Python Regex - python

我的正则表达式:vendor = "MyNameIsJoe. I'mWorkerInAAAinc." ven = re.split(r'(?<=[a-z])[A-Z]|[A-Z](?=[a-z])', vendor) 以大写字母分割字符串,例如:'我的名字是乔。 I'mWorkerInAAAinc”变成…

如何打印浮点数的全精度[Python] - python

我编写了以下函数,其中传递了x,y的值:def check(x, y): print(type(x)) print(type(y)) print(x) print(y) if x == y: print "Yes" 现在当我打电话check(1.00000000000000001, 1.0000000000000002)它正在打印:<…