sp1.join() 检查 sp1 是否还在运行,如果还在运行,主进程进入等待状态。操作系统需要把 CPU 让给 sp1,等它结束后主进程再结束。
from multiprocessing import Process
os
():
( % (spname, os.getpid()))
__name__ == :
()
( % os.getpid())
sp1 = Process(target=spfunc, args=(,))
sp2 = Process(target=spfunc, args=(,))
sp1.start()
sp2.start()
sp1.join()
sp2.join()
()
import
def
spfunc
spname
print
"子进程标识:%s,ID:%s"
if
"__main__"
print
"主进程开始:"
print
"主进程 ID:%s"
'sp1'
'sp2'
print
"主进程结束。"
使用 Process 类的派生类创建进程:
import os, time
from multiprocessing import Process
classSubProcess(Process):
def__init__(self):
Process.__init__(self)
defrun(self):
print("子进程名称 = {0},ID = {1}".format(self.name, self.pid))
time.sleep(0.1)
if __name__ == '__main__':
print("主进程开始:")
for i inrange(3):
sp = SubProcess()
sp.start()
sp.join()
print("主进程结束。")
使用 Pool 类创建进程
如果要启动大量子进程,可以用 Pool 类的方式批量创建子进程。
import multiprocessing, time, os
defspfunc(i):
print("子进程 ID = {},结果 = {}".format(os.getpid(), i**2))
time.sleep(1)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
for i inrange(1, 4):
pool.apply(spfunc, (i,))
pool.close()
pool.join()
进程通信
使用 Queue 实现进程间通信
from multiprocessing import Process, Queue
import os, time, random
defwriteQueue(q):
for i inrange(5):
n = random.randint(1, 100)
print("放进队列的随机数为:%d" % n)
q.put(n)
time.sleep(1)
defreadQueue(q):
for i inrange(5):
d = q.get(True)
print("从队列中读取的数据为:%d" % d)
if __name__ == '__main__':
q = Queue()
spw = Process(target=writeQueue, args=(q,))
spr = Process(target=readQueue, args=(q,))
spw.start()
spr.start()
spw.join()
spr.join()
from concurrent.futures import ThreadPoolExecutor
defexecutetask(task):
print("执行:%s" % (task))
if __name__ == '__main__':
tasklist = []
for i inrange(5):
tasklist.append("任务" + str(i))
with ThreadPoolExecutor(5) as ect:
future = ect.map(executetask, tasklist)
线程通信
import threading, queue, time
classmythread(threading.Thread):
def__init__(self, q, i):
super(mythread, self).__init__()
self.q = q
self.i = i
defrun(self):
self.q.put("这是第%d个线程..." % (self.i))
if __name__ == '__main__':
q = queue.Queue()
for i inrange(3):
mythread(q, i).start()
whilenot q.empty():
print(q.get())
线程同步
lock 实现同步
加了锁都是线程 A,不加锁如下。
当前线程是 A,num=1
当前线程是 B,num=2
当前线程是 A,num=3
import threading, time
defcountfunc(tname, lock):
global num
lock.acquire()
whileTrue:
if num <= 3:
print("当前线程是%s,num=%s" % (tname, num))
num += 1
time.sleep(1)
else:
break
lock.release()
if __name__ == '__main__':
num = 1
lock = threading.Lock()
t1 = threading.Thread(target=countfunc, args=('A', lock))
t2 = threading.Thread(target=countfunc, args=('B', lock))
t1.start()
t2.start()
t1.join()
t2.join()
Condition 实现同步
原版:两个线程都是"先等待,后通知",导致第一次打印后就互相等待
修改版:奇数线程"先通知,后等待",确保了总有线程能唤醒对方
import threading, time
defthfunc1(thname, cond):
with cond:
for i inrange(0, 6, 2):
print("线程%s:%d." % (thname, i))
cond.wait()
cond.notify()
defthfunc2(thname, cond):
with cond:
for i inrange(1, 6, 2):
print("线程%s:%d." % (thname, i))
cond.notify()
cond.wait()
if __name__ == '__main__':
cond = threading.Condition()
th1 = threading.Thread(target=thfunc1, args=("th1", cond))
th2 = threading.Thread(target=thfunc2, args=("th2", cond))
th1.start()
th2.start()
th1.join()
th2.join()
Event 对象实现同步
event.isSet() or event.set()是判断是否为 false,如果是 false 那么改成 true,也就是有事件发生。
event.wait() # 等待第一个事件(加班通知),等待第二个事件(吃夜宵通知)。
import time, threading
defbossfunc(thname, event):
print("%s:今晚加班到 11 点" % thname)
event.set()
time.sleep(2)
print("%s:大家一起吃夜宵" % thname)
event.set()
defemployeefunc(thname, i, event):
event.wait()
print("%s %d:好痛苦..." % (thname, i))
time.sleep(1)
event.clear()
event.wait()
print("%s %d;太好了" % (thname, i))
if __name__ == '__main__':
event = threading.Event()
thboss = threading.Thread(target=bossfunc, args=("thBoss", event)).start()
for i inrange(4):
themployee = threading.Thread(target=employeefunc, args=("thEmployee", i, event)).start()
案例分析
使用多进程导入/导出数据
使用多进程将多个 excel 文件的书籍信息导入 SQLite3 数据库
import multiprocessing, sqlite3, os, openpyxl
defcreate_connect_database():
try:
conn = sqlite3.connect(os.getcwd() + '\\resources\\bookDB.db')
cur = conn.cursor()
cur.execute(
"""create table if not exists table_book(ID text PRIMARY KEY,Name text not null,Autore text,Price float);"""
)
conn.commit()
except Exception as e:
print("创建/链接数据库或者创建数据表失败:", e)
defeachxlsx(xlsxfn):
wb = openpyxl.load_workbook(xlsxfn)
ws = wb.worksheets[0]
for index, row inenumerate(ws.rows):
if index == 0:
continueyieldtuple(map(lambda x: x.value, row))
defspfunc(filename):
with sqlite3.connect(os.getcwd() + '\\resources\\bookDB.db') as conn:
try:
cur = conn.cursor()
sql = 'INSERT INTO table_book VALUES(?,?,?,?)'
cur.executemany(sql, eachxlsx(filename))
conn.commit()
except Exception as e:
print("导入数据失败.", e)
if __name__ == '__main__':
print("Excel 文件导入数据库开始...")
create_connect_database()
pool = multiprocessing.Pool()
excel_path = os.getcwd() + "\\excel_files_dir"
excel_File_list = os.listdir(excel_path)
for filename in excel_File_list:
excelFile = excel_path + "\\" + filename
pool.apply(spfunc, (excelFile,))
pool.close()
pool.join()
print("Excel 文件数据导入数据库结束!")