Python多进程multiprocessing用法实例分析

发布时间 - 2026-01-11 02:51:20    点击率:

本文实例讲述了Python多进程multiprocessing用法。分享给大家供大家参考,具体如下:

mutilprocess简介

像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。

简单的创建进程:

import multiprocessing
def worker(num):
  """thread worker function"""
  print 'Worker:', num
  return
if __name__ == '__main__':
  jobs = []
  for i in range(5):
    p = multiprocessing.Process(target=worker, args=(i,))
    jobs.append(p)
    p.start()

确定当前的进程,即是给进程命名,方便标识区分,跟踪

import multiprocessing
import time
def worker():
  name = multiprocessing.current_process().name
  print name, 'Starting'
  time.sleep(2)
  print name, 'Exiting'
def my_service():
  name = multiprocessing.current_process().name
  print name, 'Starting'
  time.sleep(3)
  print name, 'Exiting'
if __name__ == '__main__':
  service = multiprocessing.Process(name='my_service',
                   target=my_service)
  worker_1 = multiprocessing.Process(name='worker 1',
                    target=worker)
  worker_2 = multiprocessing.Process(target=worker) # default name
  worker_1.start()
  worker_2.start()
  service.start()

守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了

守护进程:

import multiprocessing
import time
import sys
def daemon():
  name = multiprocessing.current_process().name
  print 'Starting:', name
  time.sleep(2)
  print 'Exiting :', name
def non_daemon():
  name = multiprocessing.current_process().name
  print 'Starting:', name
  print 'Exiting :', name
if __name__ == '__main__':
  d = multiprocessing.Process(name='daemon',
                target=daemon)
  d.daemon = True
  n = multiprocessing.Process(name='non-daemon',
                target=non_daemon)
  n.daemon = False
  d.start()
  n.start()
  d.join(1)
  print 'd.is_alive()', d.is_alive()
  n.join()

最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态

终止进程:

import multiprocessing
import time
def slow_worker():
  print 'Starting worker'
  time.sleep(0.1)
  print 'Finished worker'
if __name__ == '__main__':
  p = multiprocessing.Process(target=slow_worker)
  print 'BEFORE:', p, p.is_alive()
  p.start()
  print 'DURING:', p, p.is_alive()
  p.terminate()
  print 'TERMINATED:', p, p.is_alive()
  p.join()
  print 'JOINED:', p, p.is_alive()

①. == 0 未生成任何错误 
②. 0 进程有一个错误,并以该错误码退出
③. < 0 进程由一个-1 * exitcode信号结束

进程的退出状态:

import multiprocessing
import sys
import time
def exit_error():
  sys.exit(1)
def exit_ok():
  return
def return_value():
  return 1
def raises():
  raise RuntimeError('There was an error!')
def terminated():
  time.sleep(3)
if __name__ == '__main__':
  jobs = []
  for f in [exit_error, exit_ok, return_value, raises, terminated]:
    print 'Starting process for', f.func_name
    j = multiprocessing.Process(target=f, name=f.func_name)
    jobs.append(j)
    j.start()
  jobs[-1].terminate()
  for j in jobs:
    j.join()
    print '%15s.exitcode = %s' % (j.name, j.exitcode)

方便的调试,可以用logging

日志:

import multiprocessing
import logging
import sys
def worker():
  print 'Doing some work'
  sys.stdout.flush()
if __name__ == '__main__':
  multiprocessing.log_to_stderr()
  logger = multiprocessing.get_logger()
  logger.setLevel(logging.INFO)
  p = multiprocessing.Process(target=worker)
  p.start()
  p.join()

利用class来创建进程,定制子类

派生进程:

import multiprocessing
class Worker(multiprocessing.Process):
  def run(self):
    print 'In %s' % self.name
    return
if __name__ == '__main__':
  jobs = []
  for i in range(5):
    p = Worker()
    jobs.append(p)
    p.start()
  for j in jobs:
    j.join()

python进程间传递消息:

import multiprocessing
class MyFancyClass(object):
  def __init__(self, name):
    self.name = name
  def do_something(self):
    proc_name = multiprocessing.current_process().name
    print 'Doing something fancy in %s for %s!' % \
      (proc_name, self.name)
def worker(q):
  obj = q.get()
  obj.do_something()
if __name__ == '__main__':
  queue = multiprocessing.Queue()
  p = multiprocessing.Process(target=worker, args=(queue,))
  p.start()
  queue.put(MyFancyClass('Fancy Dan'))
  # Wait for the worker to finish
  queue.close()
  queue.join_thread()
  p.join()
import multiprocessing
import time
class Consumer(multiprocessing.Process):
  def __init__(self, task_queue, result_queue):
    multiprocessing.Process.__init__(self)
    self.task_queue = task_queue
    self.result_queue = result_queue
  def run(self):
    proc_name = self.name
    while True:
      next_task = self.task_queue.get()
      if next_task is None:
        # Poison pill means shutdown
        print '%s: Exiting' % proc_name
        self.task_queue.task_done()
        break
      print '%s: %s' % (proc_name, next_task)
      answer = next_task()
      self.task_queue.task_done()
      self.result_queue.put(answer)
    return
class Task(object):
  def __init__(self, a, b):
    self.a = a
    self.b = b
  def __call__(self):
    time.sleep(0.1) # pretend to take some time to do the work
    return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
  def __str__(self):
    return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
  # Establish communication queues
  tasks = multiprocessing.JoinableQueue()
  results = multiprocessing.Queue()
  # Start consumers
  num_consumers = multiprocessing.cpu_count() * 2
  print 'Creating %d consumers' % num_consumers
  consumers = [ Consumer(tasks, results)
         for i in xrange(num_consumers) ]
  for w in consumers:
    w.start()
  # Enqueue jobs
  num_jobs = 10
  for i in xrange(num_jobs):
    tasks.put(Task(i, i))
  # Add a poison pill for each consumer
  for i in xrange(num_consumers):
    tasks.put(None)
  # Wait for all of the tasks to finish
  tasks.join()
  # Start printing results
  while num_jobs:
    result = results.get()
    print 'Result:', result
    num_jobs -= 1

Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

进程间信号传递:

import multiprocessing
import time
def wait_for_event(e):
  """Wait for the event to be set before doing anything"""
  print 'wait_for_event: starting'
  e.wait()
  print 'wait_for_event: e.is_set()->', e.is_set()
def wait_for_event_timeout(e, t):
  """Wait t seconds and then timeout"""
  print 'wait_for_event_timeout: starting'
  e.wait(t)
  print 'wait_for_event_timeout: e.is_set()->', e.is_set()
if __name__ == '__main__':
  e = multiprocessing.Event()
  w1 = multiprocessing.Process(name='block', 
                 target=wait_for_event,
                 args=(e,))
  w1.start()
  w2 = multiprocessing.Process(name='nonblock', 
                 target=wait_for_event_timeout, 
                 args=(e, 2))
  w2.start()
  print 'main: waiting before calling Event.set()'
  time.sleep(3)
  e.set()
  print 'main: event is set'

Python多进程,一般的情况是Queue来传递。

Queue:

from multiprocessing import Process, Queue
def f(q):
  q.put([42, None, 'hello'])
if __name__ == '__main__':
  q = Queue()
  p = Process(target=f, args=(q,))
  p.start()
  print q.get()  # prints "[42, None, 'hello']"
  p.join()

多线程优先队列Queue:

import Queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
  def __init__(self, threadID, name, q):
    threading.Thread.__init__(self)
    self.threadID = threadID
    self.name = name
    self.q = q
  def run(self):
    print "Starting " + self.name
    process_data(self.name, self.q)
    print "Exiting " + self.name
def process_data(threadName, q):
  while not exitFlag:
    queueLock.acquire()
    if not workQueue.empty():
      data = q.get()
      queueLock.release()
      print "%s processing %s" % (threadName, data)
    else:
      queueLock.release()
    time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1
# Create new threads
for tName in threadList:
  thread = myThread(threadID, tName, workQueue)
  thread.start()
  threads.append(thread)
  threadID += 1
# Fill the queue
queueLock.acquire()
for word in nameList:
  workQueue.put(word)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
  pass
# Notify threads it's time to exit
exitFlag = 1
# Wait for all threads to complete
for t in threads:
  t.join()
print "Exiting Main Thread"

多进程使用Queue通信的例子

import time
from multiprocessing import Process,Queue
MSG_QUEUE = Queue(5)
def startA(msgQueue):
  while True:
    if msgQueue.empty() > 0:
      print ('queue is empty %d' % (msgQueue.qsize()))
    else:
      msg = msgQueue.get()
      print( 'get msg %s' % (msg,))
    time.sleep(1)
def startB(msgQueue):
  while True:
    msgQueue.put('hello world')
    print( 'put hello world queue size is %d' % (msgQueue.qsize(),))
    time.sleep(3)
if __name__ == '__main__':
  processA = Process(target=startA,args=(MSG_QUEUE,))
  processB = Process(target=startB,args=(MSG_QUEUE,))
  processA.start()
  print( 'processA start..')

主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python进程与线程操作技巧总结》、《Python Socket编程技巧总结》、《Python数据结构与算法教程》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》

希望本文所述对大家Python程序设计有所帮助。


# Python  # 多进程  # multiprocessing  # Python多进程并发(multiprocessing)用法实例详解  # Python3多进程 multiprocessing 模块实例详解  # python multiprocessing多进程变量共享与加锁的实现  # Python标准库之多进程(multiprocessing包)介绍  # python基于multiprocessing的多进程创建方法  # python multiprocessing 多进程并行计算的操作  # 简单学习Python多进程Multiprocessing  # Python使用multiprocessing实现多进程的详细步骤记录  # 操作技巧  # 多核  # 自己的  # 进阶  # 相关内容  # 子类  # 浮点  # 就不  # 主程序  # 可以用  # 感兴趣  # 数据结构  # 给大家  # 使其  # 可选  # 等了  # 即是  # 用户可以  # 会比  # 更多关于 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: 如何在腾讯云免费申请建站?  详解免费开源的.NET多类型文件解压缩组件SharpZipLib(.NET组件介绍之七)  如何在 Pandas 中基于一列条件计算另一列的分组均值  Laravel如何使用Blade模板引擎?(完整语法和示例)  免费制作统计图的网站有哪些,如何看待现如今年轻人买房难的情况?  如何快速重置建站主机并恢复默认配置?  Laravel Octane如何提升性能_使用Laravel Octane加速你的应用  Gemini怎么用新功能实时问答_Gemini实时问答使用【步骤】  Laravel如何实现事件和监听器?(Event & Listener实战)  Laravel如何部署到服务器_线上部署Laravel项目的完整流程与步骤  三星网站视频制作教程下载,三星w23网页如何全屏?  活动邀请函制作网站有哪些,活动邀请函文案?  Laravel如何记录自定义日志?(Log频道配置)  焦点电影公司作品,电影焦点结局是什么?  极客网站有哪些,DoNews、36氪、爱范儿、虎嗅、雷锋网、极客公园这些互联网媒体网站有什么差异?  米侠浏览器网页背景异常怎么办 米侠显示修复  Laravel如何处理表单验证?(Requests代码示例)  Laravel中Service Container是做什么的_Laravel服务容器与依赖注入核心概念解析  教你用AI润色文章,让你的文字表达更专业  公司网站制作需要多少钱,找人做公司网站需要多少钱?  Laravel模型关联查询教程_Laravel Eloquent一对多关联写法  Laravel如何操作JSON类型的数据库字段?(Eloquent示例)  如何使用 Go 正则表达式精准提取括号内首个纯字母标识符(忽略数字与嵌套)  uc浏览器二维码扫描入口_uc浏览器扫码功能使用地址  Laravel如何使用API Resources格式化JSON响应_Laravel数据资源封装与格式化输出  矢量图网站制作软件,用千图网的一张矢量图做公司app首页,该网站并未说明版权等问题,这样做算不算侵权?应该如何解决?  laravel怎么实现图片的压缩和裁剪_laravel图片压缩与裁剪方法  Laravel怎么调用外部API_Laravel Http Client客户端使用  在线制作视频的网站有哪些,电脑如何制作视频短片?  Laravel队列由Redis驱动怎么配置_Laravel Redis队列使用教程  Laravel如何处理CORS跨域问题_Laravel项目CORS配置与解决方案  清除minerd进程的简单方法  Python结构化数据采集_字段抽取解析【教程】  Laravel如何发送邮件和通知_Laravel邮件与通知系统发送步骤  西安市网站制作公司,哪个相亲网站比较好?西安比较好的相亲网站?  青岛网站建设如何选择本地服务器?  香港网站服务器数量如何影响SEO优化效果?  Laravel软删除怎么实现_Laravel Eloquent SoftDeletes功能使用教程  用v-html解决Vue.js渲染中html标签不被解析的问题  Laravel怎么自定义错误页面_Laravel修改404和500页面模板  简历在线制作网站免费版,如何创建个人简历?  如何在 Telegram Web View(iOS)中防止键盘遮挡底部输入框  如何制作新型网站程序文件,新型止水鱼鳞网要拆除吗?  晋江文学城电脑版官网 晋江文学城网页版直接进入  HTML5空格和margin有啥区别_空格与外边距的使用场景【说明】  如何快速上传建站程序避免常见错误?  如何在万网ECS上快速搭建专属网站?  高性能网站服务器部署指南:稳定运行与安全配置优化方案  如何快速使用云服务器搭建个人网站?  香港代理服务器配置指南:高匿IP选择、跨境加速与SEO优化技巧