进程和线程

进程:一个在内存中运行的应用程序,每个进程有自己独立的一块内存空间。资源分配的最小单位

线程:进程中的一个执行单元,程序执行的最小单位。一个进程可以有多个线程。

Python的多线程特点:在Python中,由于GIL的存在,在多线程的时候,同一时间只能有一个线程在CPU上运行,而且是单个CPU,不管CPU核数为多少。所以,Python不能利用多线程发挥多核的优势,但是,可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

什么时候使用多线程/多进程:在python中,如果一个进程包含多个线程,做CPU密集型任务时,多线程并不能有多少效率提升,相反可能还会因为线程的频繁切换导致效率下降,此时推荐使用多进程;如果做IO密集型任务,多线程的进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。

Python中单线程、多线程和多进程的效率对比实验 | 菜鸟教程 (runoob.com)

Python多进程实现方法

多进程的实现

Python的多进程是通过multiprocessing模块实现,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法差不多,也有start(), run(), join()等方法

from multiprocessing importProcessdef fun1(name):print('测试%s多进程' %name)if __name__ == '__main__':process_list = []for i in range(5):# 开启 5 个子进程执行fun1函数p = Process(target=fun1,args=('Python',))# 实例化进程对象p.start()process_list.append(p)for i in process_list:p.join()print('结束测试')

多进程之间的通信

由于每个进程有自己独立的一块内存空间,系统独立分配资源(CPU、内存),因此进程之间是独立的。每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能共享,这是多进程使用时与多线程的区别。所以,不同的多进程之间需要通信

常用的多进程之间的通信方式有:队列Queue、管道Pipe、Managers。

Queue和Pipe实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据,需要用到Managers来共享内存。

以Queue为例,Python中多进程的通信如下:

def func1(i):time.sleep(1)print(f'args {i}')def run__queue():from multiprocessing import Process, Queuequeue = Queue(maxsize=4)# the following attribute can call in anywherequeue.put(True)queue.put([0, None, object])# you can put deepcopy thingqueue.qsize()# the length of queueprint(queue.get())# First In First Outprint(queue.get())# First In First Outqueue.qsize()# the length of queueprocess = [Process(target=func1, args=(queue,)), Process(target=func1, args=(queue,)), ][p.start() for p in process][p.join() for p in process]if __name__ =='__main__':run__queue()

进程池

进程池维护一个进程序列,使用时去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:apply:同步,一般不使用;apply_async:异步,常用。

但是pool.apply_async不能和pytorch推理一起用,受到spwan的影响。因为GPU模型Pytorch规定多进程的启动方法必须是“spawn”,使用map_async或者apply_async这类方法都不行。

Pytorch 多进程在单卡上测试_咆哮的阿杰的博客-CSDN博客_单卡多进程计算

并且,进程池pool的两个父子进程之间通信不能用Queue,需要Manager。

  • Pool和Process的区别

  • Process需要自己管理进程,起一个Process就是起一个新进程;

  • Pool是进程池,它可以开启固定数量的进程,然后将任务放到一个池子里,系统来调度多进程执行池子里的任务;

参考

  • Python如何使用多进程Process、Pool、Queue、Manager等

一篇文章搞定Python多进程(全) – 知乎 (zhihu.com)

在Python中优雅地用多进程 – 知乎 (zhihu.com)

  • Python使用 Pool.apply_async 和 Manager.Queue实现进程池通信

Python高级——消息队列(Queue)与进程池(Pool)_HMMHMH的博客-CSDN博客

Python中的Queue与多进程(multiprocessing)_SQZHAO的博客-CSDN博客_python queue 多进程

Pytorch结合多进程的使用场景

场景1:读取图片数据,判断是否损坏

只需要给函数open_image使用多进程,不需要考虑进程通信,采用进程池Pool的imap方法

def open_image(img_name):try:Image.open(os.path.join(image_path, img_name))except:return img_namedef detect_broken_delete(img_path, delete=False):listiter = os.listdir(img_path)process_num = max(cpu_count() - 2, 1)with Pool(process_num) as pool:output = set(tqdm(pool.imap(open_image, listiter), total=len(listiter)))print("broken images:", output)if delete:# delete broken imagesfor img in output:if img:os.remove(os.path.join(img_path, img))print("broken images have been deleted.")

场景2:Pytorch减少模型推理时间

在有些时候,pytorch模型的性能瓶颈可能不在模型推断,而是在图像的预处理和后处理。这时候将torch模型转为onnx或者tensorRT收益不大,但可以使用多进程缩短前后处理时间和推理时间。

下面的例子为人脸解析face parsing模型推理案例:推理1w张图片大概从20min缩短为10min。

简单描述:

  1. 首先定义三个函数,分别为预处理preprocess_img,模型推断inference,后处理afterprocess_img(将人脸解析的分割mask作用到原图上,提取想要的人脸部分)

  1. 然后需要定义进程函数,用于读取Queue的数据并用前三个函数处理,实现进程之间的通信。进程函数的输入为queue相关的或者全局变量,然后通过queue的get和put方法实现数据传递。

  1. 总的流程为,第一个进程函数getimgpath_process获取图像,存到第一个队列img_path_queue;第二个进程函数preprocessimg_process,读取第一个队列,并预处理,结果存到第二个队列img_queue;第三个进程函数inference_process读取第二个队列,做出推断,结果存在第三个队列result_queue;第四个进程函数afterprocessimg_process读取第三个队列,做后处理并保存。

  1. 一共有三个Queue存数据、四个Process处理数据。在evaluate_multiprocess中,第一个函数读取全部图像路径,只采用单个进程;后面三个进程函数都采用了8个进程数量。最后关闭进程。

  1. 注意,需要通过 torch.multiprocessing.set_start_method(“spawn”)来设置pytorch多进程和cuda的使用。这样可以在单卡上同时处理多个图片,8个进程就可以同时单卡处理8张图片;当然,需要注意显存的使用,可以和onnx等结合加速推理和减少显存占用。

from model import BiSeNetimport torchimport osimport os.path as ospimport numpy as npfrom PIL import Image, ImageFilefrom tqdm import tqdmimport timeimport torchvision.transforms as transformsfrom multiprocessing import Pool, cpu_count, Process, Queueimport torch.multiprocessing as mpImageFile.LOAD_TRUNCATED_IMAGES = TrueImage.MAX_IMAGE_PIXELS = Noneto_tensor = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),])def load_net(model_path="res/cp/79999_iter.pth"):n_classes = 19net = BiSeNet(n_classes=n_classes)net.cuda()net.load_state_dict(torch.load(model_path))net.eval()return netdef preprocess_img(img_path):img = Image.open(img_path)image = img.resize((512, 512), Image.BILINEAR).convert('RGB')img_arr = np.array(image)img = to_tensor(image)img = torch.unsqueeze(img, 0)return img, img_arrdef inference(net, img):with torch.no_grad():img = img.cuda()out = net(img)[0]parsing = out.squeeze(0).cpu().numpy().argmax(0)return parsingdef afterprocess_img(parsing, img_arr, img_path, res_path):mask = np.zeros_like(img_arr)indices = np.isin(parsing, [1, 2, 3, 10, 12, 13])# face_dataset config: 1:skin, 2:l_brow, 3:r_brow, 4:nose, 12:u_lip, 13:l_lipmask[indices] = img_arr[indices]img_mask = Image.fromarray(mask)img_mask.save(osp.join(res_path, osp.basename(img_path)))print(f"save {osp.basename(img_path)}")# accelerate infer by multiprocess, useing Queue in communication between processes (4 Processes & 3 Queues)# ==================================================================================# P(getimgpath)-----P(preprocess)-----P(inference)-----P(afterprocess)-->save img#| ||#Queue(img_path)Queue(img) Queue(result)# ==================================================================================# get img_path processdef getimgpath_process(root_path, img_path_queue):for img_name in os.listdir(root_path):img_path = osp.join(root_path, img_name)img_path_queue.put(img_path)# preprocess img processdef preprocessimg_process(img_path_queue, img_queue):while True:img_path = img_path_queue.get()img, img_arr = preprocess_img(img_path)img_queue.put((img, img_arr, img_path))# inference processdef inference_process(net, img_queue, result_queue):while True:img, img_arr, img_path = img_queue.get()parsing = inference(net, img)result_queue.put((parsing, img_arr, img_path))# afterprocess img processdef afterprocessimg_process(result_queue, res_path):while True:parsing, img_arr, img_path = result_queue.get()afterprocess_img(parsing, img_arr, img_path, res_path)def evaluate_multiprocess(net, root_path, res_path):if not os.path.exists(res_path):os.makedirs(res_path)mp.set_start_method("spawn")img_path_queue, img_queue, result_queue = Queue(), Queue(), Queue()# pool.apply_async can not use in spawn.Imagepath_Process = Process(target=getimgpath_process, args=(root_path, img_path_queue))Imagepath_Process.start()for i in range(8):Preprocess_Process = Process(target=preprocessimg_process, args=(img_path_queue, img_queue))Preprocess_Process.start()for i in range(8):Inference_Process = Process(target=inference_process, args=(net, img_queue, result_queue))Inference_Process.start()for i in range(8):Afterprocess_Process = Process(target=afterprocessimg_process, args=(result_queue, res_path))Afterprocess_Process.start()# Imagepath_Process.start()# Preprocess_Process.start()# Inference_Process.start()# Afterprocess_Process.start()time.sleep(1)# wait process starting, replace of `join`stime = time.time()while True:if(result_queue.empty() and (img_queue.empty()) and (img_path_queue.empty())):time.sleep(1) # wait final process endingImagepath_Process.terminate()Preprocess_Process.terminate()Inference_Process.terminate()Afterprocess_Process.terminate()img_path_queue.close()img_queue.close()result_queue.close()breakelse:passetime = time.time()print(f"all images cost {etime - stime} seconds")if __name__ == "__main__":net = load_net()root_path="test"res_path='testtest'evaluate_multiprocess(net, root_path, res_path)

参考

多进程缩短推理时间 – 知乎 (zhihu.com)

pytorch多进程最佳实践_小篆的博客-CSDN博客_pytorch 多进程