互斥锁 进程之间的通信: 队列. 进程之间的通信实例 生产者消费者模型(常用于并发)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了互斥锁 进程之间的通信: 队列. 进程之间的通信实例 生产者消费者模型(常用于并发),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含5654字,纯文字阅读大概需要9分钟。
内容图文
![互斥锁 进程之间的通信: 队列. 进程之间的通信实例 生产者消费者模型(常用于并发)](/upload/InfoBanner/zyjiaocheng/944/c1fc0bf7ac104936a2bc0c7c7ea8b16f.jpg)
day33
一丶互斥锁
含义:
每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象(串行)
目的:
来保证共享数据操作的完整性和安全性(文本数据),保证数据的公平性
区别join:
共同点: 都能实现cpu的进程串行
不同点: join是人为指定顺序, 不能保证公平性. 互斥锁能够保证公平性
### 加锁处理
from multiprocessing import Lock
?
def task1(loc):
loc.acquire() # 上锁
print('task1: 开始打印')
time.sleep(random.randint(1,3))
print('task1: 结束打印')
loc.release() # 解锁
?
def task2(loc):
loc.acquire()
print('task2: 开始打印')
time.sleep(random.randint(1,3))
print('task2: 结束打印')
loc.release()
?
?
def task3(loc):
loc.acquire()
print('task3: 开始打印')
time.sleep(random.randint(1,3))
print('task3: 结束打印')
loc.release()
?
?
if __name__ == '__main__':
loc=Lock() # 生成锁对象
p1=Process(target=task1,args=(loc,)).start() #把锁对象作为参数传给具体的方法
p2=Process(target=task2,args=(loc,)).start()
p3=Process(target=task3,args=(loc,)).start()
锁死:
锁嵌套锁
### 锁中嵌套了锁, 会造成锁死现象
from multiprocessing import Lock
?
def task1(loc):
print('task1')
loc.acquire()
print('task1: 开始打印')
time.sleep(random.randint(1,3))
print('task1: 结束打印')
loc.release()
?
def task2(loc):
print('task2')
loc.acquire() # 第一层锁
loc.acquire() #第二层锁,形成嵌套锁,会造成锁死 (程序被卡主)~~~
loc.release()
print('task2: 开始打印')
time.sleep(random.randint(1,3))
print('task2: 结束打印')
loc.release()
?
?
def task3(loc):
print('task3')
loc.acquire()
print('task3: 开始打印')
time.sleep(random.randint(1,3))
print('task3: 结束打印')
loc.release()
?
?
if __name__ == '__main__':
loc=Lock()
p1=Process(target=task1,args=(loc,)).start()
p2=Process(target=task2,args=(loc,)).start()
p3=Process(target=task3,args=(loc,)).start()
案例:模拟抢票(多进程串行执行够任务.)
### db.json 自己提前创建好
with open('db.json', 'w', encoding='utf-8') as f:
dic={'count':1}
json.dump(dic, f)
?
### searc方法 打印剩余票数
def search():
time.sleep(random.random())
with open('db.json', encoding='utf-8') as f:
dic = json.load(f)
print(f'剩余票数:{dic["count"]}')
?
?
### 模拟多用户(多进程)抢票
def get():
with open('db.json', encoding='utf-8') as f:
dic = json.load(f)
time.sleep(random.randint(0, 2))
if dic['count'] > 0:
dic['count'] -= 1
with open('db.json', 'w', encoding='utf-8') as f:
json.dump(dic, f)
print(f'用户:{os.getpid()} ,购买成功~~')
else:
print(f'{os.getpid()} 没票了~~~~')
?
?
def task(lock):
search()
lock.acquire() #给抢票购买, 加锁 . 既保证了数据的安全性,也保证了数据公平性
get()
lock.release()# 解锁
?
?
if __name__ == '__main__':
lock = Lock()
for i in range(5):
p1 = Process(target=task, args=(lock,)) # 模拟5个用户进程
p1.start()
二丶进程之间的通信: 队列.
含义:
队列就是存在于内存中一个数据容器,一种特殊的线性表
特点:先进先出(FIFO),Queue是多进程安全的队列,自动加锁,自动阻塞
目的:
实现进程之间的通信
multiprocessing模块:
模块支持两种形式:队列和管道,这两种方式都是用于进程间消息传递
### 队列Queue基本用法
# 1.放值 put(值,block=False,timeout=X) block是否阻塞, timeout是否超时
# 2.取值 get() #get完队列里的所有数据时,程序卡出. 如果队列中有新的数据时,会继续执行
# 3.maxsize 队列中允许最大存放数
# 4.empty():调用此方法时q为空则返回True,该结果不可靠,
# 5.full():调用此方法时q已满则返回True,该结果不可靠,
# 6.qsize():返回队列中目前项目的正确数量,结果也不可靠,
# 7.get_nowait() 和 put_nowait() 同 block=False 不阻塞,不等待
?
from multiprocessing import Queue
q=Queue(3) # 设置队列里最大的元素个数
q.put('1')
q.put('2')
q.put('3')
q.put('4') # 夯住 ,只能放3个,不允许继续添加,程序卡在此处. 下面的程序不再执行
?
print(q.get())
print(q.get())
print(q.get())
print(q.get()) #### 夯住 只能取3个,程序卡在此处. 如果队列中有新的数据时,会继续执行
?
?
# 原理同上
# timeout 超时抛出异常(Full or Empty) , block默认阻塞,block=Fasle不会阻塞
q=Queue(3)
q.put(1)
q.put(3)
q.put(2)
q.put(4,block=False,timeout=3)
?
print(q.get())
print(q.get())
print(q.get())
q.get(block=False,timeout=3)
三丶进程之间的通信实例
### 队列模拟进程之间 ,30个进程,队列只获取10个.
?
from multiprocessing import Process
from multiprocessing import Queue
import os
?
def task(q):
try:
q.put(os.getpid(),block=False)
except Exception:
return
?
if __name__ == '__main__':
q=Queue(10) # 生成Queue队列
for i in range(30):
Process(target=task,args=(q,)).start()
?
for j in range(1,11):
print(f'第{j}用户:{q.get()}')
四丶生产者消费者模型(常用于并发)
含义:
完完全全的实现进程之间的通信.有三个主体:生产者,消费者,存数据的容器(队列).
No BB see 代码:
# -*-coding:utf-8-*-
# Author:Ds
?
### 合理的去调控多个进程去生成数据以及提取数据,中间有个必不可少的环节容器队列.
?
from multiprocessing import Process
from multiprocessing import Queue
import time
import random
?
# 生产者
def Producer(name,q):
for el in range(1,11):
time.sleep(random.randint(1,2)) # 随机
res=f'生产者:{name} , 生产的---第 {el} 号包子 '
q.put(res) #放到队列容器中
print(f'\033[0;35m {res} \033[0m')
# 消费者
def Consumer(name,q):
while 1:
try:
time.sleep(random.randint(1,3))
ret=q.get(timeout=5)
print(f'消费者{name}: 吃了 {ret}')
except Exception:
return
?
?
###
if __name__ == '__main__':
?
q=Queue() # 实例化队列对象
?
# 2 生产者对象
for i in range(1,3):
Process(target=Producer,args=(i,q)).start()
?
?
# 3 个消费者对象
for j in range(1,4):
Process(target=Consumer,args=(j,q)).start()
?
内容总结
以上是互联网集市为您收集整理的互斥锁 进程之间的通信: 队列. 进程之间的通信实例 生产者消费者模型(常用于并发)全部内容,希望文章能够帮你解决互斥锁 进程之间的通信: 队列. 进程之间的通信实例 生产者消费者模型(常用于并发)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。