Python多处理和共享变量 - python

我不是python专家,但是我设法编写了一个多处理代码,该代码使用了PC中的所有cpus和内核。我的代码加载了一个非常大的数组,大约1.6 GB,并且我需要在每个进程中更新该数组。幸运的是,此更新包括向图像添加一些人造星,并且每个过程在添加人造星的位置都有一组不同的图像位置。

图片太大,每次调用流程时我都无法创建一个新图片。我的解决方案是在共享内存中创建一个变量,这样可以节省大量内存。出于某种原因,它适用于90%的图像,但是在某些区域中,我的代码在我之前发送给进程的某些位置添加了随机数。它与我创建共享变量的方式有关吗?在我的代码执行过程中,进程之间是否相互干扰?

奇怪的是,当使用单个cpu和单个内核时,图像是100%完美的,并且没有向图像添加随机数。您是否建议我在多个进程之间共享大型阵列的方法?这里是我代码的相关部分。请在定义变量im_data时阅读该行。

import warnings
warnings.filterwarnings("ignore")

from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
import sys,os
import subprocess
import numpy as np
import time
import cv2 as cv
import pyfits
from pyfits import getheader
import multiprocessing, Queue
import ctypes

class Worker(multiprocessing.Process):


def __init__(self, work_queue, result_queue):

    # base class initialization
    multiprocessing.Process.__init__(self)

    # job management stuff
    self.work_queue = work_queue
    self.result_queue = result_queue
    self.kill_received = False

def run(self):
    while not self.kill_received:

        # get a task
        try:
            i_range, psf_file = self.work_queue.get_nowait()
        except Queue.Empty:
            break

        # the actual processing
        print "Adding artificial stars - index range=", i_range

        radius=16
        x_c,y_c=( (psf_size[1]-1)/2, (psf_size[2]-1)/2 )
        x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c)
        distance = np.sqrt(x**2 + y**2)

        for i in range(i_range[0],i_range[1]):
            psf_xy=np.zeros(psf_size[1:3], dtype=float)
            j=0
            for i_order in range(psf_order+1):
                j_order=0
                while (i_order+j_order < psf_order+1):
                    psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order
                    j_order+=1
                    j+=1


            psf_factor=10.**( (30.-mock_mag[i])/2.5)/np.sum(psf_xy)
            psf_xy *= psf_factor

            npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4)
            npsf_factor=10.**( (30.-mock_mag[i])/2.5)/np.sum(npsf_xy)
            npsf_xy *= npsf_factor

            im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])]
            im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])]
            npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])]
            npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])]

            im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10.


        self.result_queue.put(id)

if __name__ == "__main__":

  n_cpu=2
  n_core=6
  n_processes=n_cpu*n_core*1
  input_mock_file=sys.argv[1]

  print "Reading file ", im_file[i]
  hdu=pyfits.open(im_file[i])
  data=hdu[0].data
  im_size=data.shape

  im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
  im_data = np.ctypeslib.as_array(im_data_base.get_obj())
  im_data = im_data.reshape(im_size[0], im_size[1])
  im_data[:] = data
  data=0
  assert im_data.base.base is im_data_base.get_obj()

  # run
  # load up work queue
  tic=time.time()
  j_step=np.int(np.ceil( mock_n*1./n_processes ))
  j_range=range(0,mock_n,j_step)
  j_range.append(mock_n)


  work_queue = multiprocessing.Queue()
  for j in range(np.size(j_range)-1):
    if work_queue.full():
      print "Oh no! Queue is full after only %d iterations" % j
    work_queue.put( (j_range[j:j+2], psf_file[i]) )

  # create a queue to pass to workers to store the results
  result_queue = multiprocessing.Queue()

  # spawn workers
  for j in range(n_processes):
    worker = Worker(work_queue, result_queue)
    worker.start()

  # collect the results off the queue
  while not work_queue.empty():
    result_queue.get()

  print "Writing file ", mock_im_file[i]
  hdu[0].data=im_data
  hdu.writeto(mock_im_file[i])
  print "%f s for parallel computation." % (time.time() - tic)

参考方案

我认为问题(正如您在问题中所建议的那样)来自以下事实:您正在从多个线程在同一数组中进行写入。

im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data

尽管我非常确定您可以以“过程安全”的方式写入im_data_base(python使用隐式锁来同步对数组的访问),但我不确定您可以通过以下方式写入im_data过程安全的方式。

因此,即使您不确定我是否会解决您的问题,我还是建议您围绕im_data创建一个明确的锁定

# Disable python implicit lock, we are going to use our own
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1], 
    lock=False)
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
# Create our own lock
im_data_lock = Lock()

然后在流程中,每次需要修改im_data时都要获取锁

self.im_data_lock.acquire()
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10
self.im_data_lock.release()

为了简洁起见,我省略了将锁传递给过程的构造函数并将其存储为成员字段(self.im_data_lock)的代码。您还应该将im_data数组传递给流程的构造函数,并将其存储为成员字段。

从Azure Data Factory执行python脚本 - python

有人可以帮我从Azure数据工厂执行python函数吗?我已经将python函数存储在blob中,并且我试图触发同样的功能。但是我无法做到这一点。请协助。第二,我可以从ADF参数化python函数调用吗? python参考方案 您可能会发现ADF中的Azure Function Activity概念,它允许您在Data Factory管道中运行Azure F…

Python sqlite3数据库已锁定 - python

我在Windows上使用Python 3和sqlite3。我正在开发一个使用数据库存储联系人的小型应用程序。我注意到,如果应用程序被强制关闭(通过错误或通过任务管理器结束),则会收到sqlite3错误(sqlite3.OperationalError:数据库已锁定)。我想这是因为在应用程序关闭之前,我没有正确关闭数据库连接。我已经试过了: connectio…

Python:集群作业管理 - python

我在具有两个阶段的计算群集(Slurm)上运行python脚本,它们是顺序的。我编写了两个python脚本,一个用于阶段1,另一个用于阶段2。每天早上,我检查所有第1阶段的工作是否都以视觉方式完成。只有这样,我才开始第二阶段。通过在单个python脚本中组合所有阶段和作业管理,是否有一种更优雅/自动化的方法?我如何知道工作是否完成?工作流程类似于以下内容:w…

Python:传递记录器是个好主意吗? - python

我的Web服务器的API日志如下:started started succeeded failed 那是同时收到的两个请求。很难说哪一个成功或失败。为了彼此分离请求,我为每个请求创建了一个随机数,并将其用作记录器的名称logger = logging.getLogger(random_number) 日志变成[111] started [222] start…

Python-Excel导出 - python

我有以下代码:import pandas as pd import requests from bs4 import BeautifulSoup res = requests.get("https://www.bankier.pl/gielda/notowania/akcje") soup = BeautifulSoup(res.cont…