• 日常搜索
  • 百度一下
  • Google
  • 在线工具
  • 搜转载

Python 并行和并发编程简介

python 是最流行的数据处理和数据科学语言之一。该生态系统提供了许多促进高性能计算的库和框架。不过,在 Python 中进行并行编程可能会非常棘手。

在本教程中,我们将研究为什么并行性很难,尤其是在 Python 上下文中,为此,我们将完成以下内容:

  • 为什么 Python 中的并行性很棘手?(提示:这是因为 GIL——全局解释器锁。)

  • 线程与进程:实现并行的不同方式。什么时候使用一个而不是另一个?

  • 并行与并发:为什么在某些情况下我们可以满足于并发而不是并行。

  • 使用所讨论的各种技术构建一个简单但 实用的示例。

Python 并行和并发编程简介  第1张

全局解释器锁

全局解释器锁 (GIL)是  Python 世界中最具争议的主题之一。在最流行的 Python 实现 CPython 中,GIL 是一个互斥量,它使事物成为线程安全的。GIL 使得与非线程安全的外部库集成变得容易,并且它使非并行代码更快。不过,这是有代价的。由于 GIL,我们无法通过多线程实现真正的并行。基本上,同一个进程的两个不同的本机线程不能同时运行 Python 代码。

不过,事情并没有那么糟糕,原因如下:发生在 GIL 领域之外的事情可以自由并行。在这一类中包括像 I/O 这样的长时间运行的任务,幸运的是,像numpy.

线程与进程

所以Python并不是真正的多线程。但什么是线程?让我们退后一步,换个角度看问题。

进程是一个基本的操作系统抽象。它是一个正在执行的程序——换句话说,是正在运行的代码。多个进程始终在一台计算机中运行,并且它们是并行执行的。

一个进程可以有多个线程。它们执行属于父进程的相同代码。理想情况下,它们并行运行,但不一定。进程不够的原因是因为应用程序需要响应并在更新显示和保存文件时***用户操作。

如果这仍然有点不清楚,这里有一个备忘单:

流程线程
进程不共享内存线程共享内存
生成/转换过程是昂贵的生成/切换线程更便宜
流程需要更多资源线程需要更少的资源(有时称为轻量级进程)
不需要内存同步您需要使用同步机制来确保您正确处理数据

没有一种食谱可以容纳一切。选择一个在很大程度上取决于上下文和您要完成的任务。

并行与并发

现在我们将更进一步,深入研究并发性。并发性经常被误解并误认为是并行性。事实并非如此。并发意味着安排独立代码以协作方式执行。利用一段代码正在等待 I/O 操作这一事实,并在此期间运行不同但独立的代码部分。

在 Python 中,我们可以通过 greenlets 实现轻量级的并发行为。从并行化的角度来看,使用线程或 greenlets 是等价的,因为它们都不是并行运行的。Greenlets 的创建成本甚至低于线程。正因为如此,greenlets 被大量用于执行大量简单的 I/O 任务,例如网络和 Web 服务器中常见的任务。

现在我们知道线程和进程、并行和并发之间的区别,我们可以说明如何在这两种范例上执行不同的任务。这是我们要做的:我们将多次运行 GIL 外的任务和 GIL 内的任务。我们使用线程和进程串行运行它们。让我们定义任务:

import os
import time
import threading
import multiprocessing
 
NUM_WORKERS = 4
 
def only_sleep():
    """ Do nothing, wait for a timer to expire """
    print("PID: %s, Process Name: %s, Thread Name: %s" % (
        os.getpid(),
        multiprocessing.current_process().name,
        threading.current_thread().name)
    )
    time.sleep(1)
  
def crunch_numbers():
    """ Do some computations """
    print("PID: %s, Process Name: %s, Thread Name: %s" % (
        os.getpid(),
        multiprocessing.current_process().name,
        threading.current_thread().name)
    )
    x = 0
    while x < 10000000:
        x += 1

我们创建了两个任务。它们都是长期运行的,但只 crunch_numbers 主动执行计算。让我们以 only_sleep 串行、多线程和使用多个进程的方式运行并比较结果:

## Run tasks serially
start_time = time.time()
for _ in range(NUM_WORKERS):
    only_sleep()
end_time = time.time()
 
print("Serial time=", end_time - start_time)
 
# Run tasks using threads
start_time = time.time()
threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
end_time = time.time()
 
print("Threads time=", end_time - start_time)
 
# Run tasks using processes
start_time = time.time()
processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = time.time()
 
print("Parallel time=", end_time - start_time)

这是我得到的输出(你的应该是相似的,尽管 PID 和时间会有所不同):

PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
Serial time= 4.018089056015015
 
PID: 95726, Process Name: MainProcess, Thread Name: Thread-1
PID: 95726, Process Name: MainProcess, Thread Name: Thread-2
PID: 95726, Process Name: MainProcess, Thread Name: Thread-3
PID: 95726, Process Name: MainProcess, Thread Name: Thread-4
Threads time= 1.0047411918640137
 
PID: 95728, Process Name: Process-1, Thread Name: MainThread
PID: 95729, Process Name: Process-2, Thread Name: MainThread
PID: 95730, Process Name: Process-3, Thread Name: MainThread
PID: 95731, Process Name: Process-4, Thread Name: MainThread
Parallel time= 1.014023780822754

以下是一些观察结果:

  • 在 串行方法的情况下,事情是很明显的。我们正在一个接一个地执行任务。所有四次运行都由同一进程的同一线程执行。

  • 使用流程 ,我们将执行时间减少到原来的四分之一,这仅仅是因为任务是并行执行的。请注意每个任务是如何在不同的流程中以及在该 MainThread 流程中执行的。

  • 使用线程 ,我们利用了任务可以并发执行的事实。执行时间也减少了四分之一,即使没有并行运行。事情是这样的:我们生成第一个线程,它开始等待计时器到期。我们暂停它的执行,让它等待计时器到期,此时我们生成第二个线程。我们对所有线程重复此操作。有那么一刻,第一个线程的计时器到期,因此我们将执行切换到它并终止它。对第二个和所有其他线程重复该算法。最后,结果就像事情并行运行一样。您还会注意到,四个不同的线程从同一进程中分支出来并存在于同一进程中:  MainProcess。

您甚至可能注意到线程方法比真正的并行方法更快。这是由于产生过程的开销。正如我们之前提到的,生成和切换进程是一项昂贵的操作。

让我们执行相同的例程,但这次运行 crunch_numbers 任务:

start_time = time.time()
for _ in range(NUM_WORKERS):
    crunch_numbers()
end_time = time.time()
 
print("Serial time=", end_time - start_time)
 
start_time = time.time()
threads = [threading.Thread(target=crunch_numbers) for _ in range(NUM_WORKERS)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
end_time = time.time()
 
print("Threads time=", end_time - start_time)
  
start_time = time.time()
processes = [multiprocessing.Process(target=crunch_numbers) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = time.time()
 
print("Parallel time=", end_time - start_time)

这是我得到的输出:

PID: 96285, Process Name: MainProcess, Thread Name: MainThread
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
Serial time= 2.705625057220459
PID: 96285, Process Name: MainProcess, Thread Name: Thread-1
PID: 96285, Process Name: MainProcess, Thread Name: Thread-2
PID: 96285, Process Name: MainProcess, Thread Name: Thread-3
PID: 96285, Process Name: MainProcess, Thread Name: Thread-4
Threads time= 2.6961309909820557
PID: 96289, Process Name: Process-1, Thread Name: MainThread
PID: 96290, Process Name: Process-2, Thread Name: MainThread
PID: 96291, Process Name: Process-3, Thread Name: MainThread
PID: 96292, Process Name: Process-4, Thread Name: MainThread
Parallel time= 0.8014059066772461

这里的主要区别在于多线程方法的结果。这次它的执行方式与串行方法非常相似,原因如下:由于它执行计算而 Python 不执行真正的并行性,因此线程基本上一个接一个地运行,一个接一个执行,直到它们全部完成。

Python 并行和并发编程生态系统

Python 有丰富的 api 用于并行和并发编程。在本教程中,我们涵盖了最流行的,但您必须知道,对于您在该领域的任何需求,可能已经有一些可以帮助您实现目标的东西。 

在下一节中,我们将使用提供的所有库构建多种形式的实际应用程序。事不宜迟,以下是我们将要介绍的模块/库:

  • threading: 在 Python 中使用线程的标准方式。它是模块所公开功能的高级 API 包装器 _thread ,是操作系统线程实现的低级接口。

  • concurrent.futures:标准库的一个模块部分,它在线程上提供更高级别的抽象层。线程被建模为异步任务。

  • multiprocessing:类似于 threading 模块,提供非常相似的接口,但使用进程而不是线程。

  • gevent and greenlets: Greenlets,也称为微线程,是可以协同调度的执行单元,可以并发执行任务而不会产生太多开销。

  • celery: 高级分布式任务队列。任务使用各种范例(如 multiprocessing 或 )并发排队和执行gevent。

构建实际应用程序

了解理论很好,但最好的学习方法是构建一些实用的东西,对吧?在本节中,我们将构建一个贯穿所有不同范例的经典类型的应用程序。

让我们构建一个检查网站正常运行时间的应用程序。有很多这样的解决方案,最著名的可能是Jetpack Monitor 和Uptime Robot。这些应用程序的目的是在您的网站关闭时通知您,以便您可以快速采取行动。它们是这样工作的:

  • 该应用程序非常频繁地浏览网站 URL 列表,并检查这些网站是否正常运行。

  • 每个网站应该每 5-10 分钟检查一次,这样停机时间就不会很长。

  • 它不执行经典的 HTTP GET 请求,而是执行 HEAD 请求,因此它不会显着影响您的流量。

  • 如果 HTTP 状态处于危险范围(400+、500+),所有者会收到通知。

  • 通过电子邮件、短信或推送通知通知所有者。

这就是为什么必须采用并行/并发方法来解决问题的原因。随着网站列表的增长,连续浏览列表并不能保证我们每五分钟左右检查一次每个网站。网站可能会关闭几个小时,而所有者不会收到通知。

让我们从编写一些实用程序开始:

# utils.py
 
import time
import logging
import requests
  
class WebsiteDownException(Exception):
    pass
  
def ping_website(address, timeout=20):
    """
    Check if a website is down. A website is considered down 
    if either the status_code >= 400 or if the timeout expires
     
    Throw a WebsiteDownException if any of the website down conditions are met
    """
    try:
        response = requests.head(address, timeout=timeout)
        if response.status_code >= 400:
            logging.warning("Website %s returned status_code=%s" % (address, response.status_code))
            raise WebsiteDownException()
    except requests.exceptions.RequestException:
        logging.warning("Timeout expired for website %s" % address)
        raise WebsiteDownException()
          
def notify_owner(address):
    """ 
    Send the owner of the address a notification that their website is down 
     
    For now, we're just going to sleep for 0.5 seconds but this is where 
    you would send an email, push notification or text-message
    """
    logging.info("Notifying the owner of %s website" % address)
    time.sleep(0.5)
      
def check_website(address):
    """
    Utility function: check if a website is down, if so, notify the user
    """
    try:
        ping_website(address)
    except WebsiteDownException:
        notify_owner(address)

我们实际上需要一个网站列表来试用我们的系统。创建您自己的列表或使用我的列表:

# websites.py
 
WEBSITE_LIST = [
    'https://weixiaolive.com',
    'https://www.weixiaolive.com/en/',
    'https://www.weixiaolive.com',
    'http://wordpress.com',
    'http://bing.com',
]

通常,您会将此列表与所有者联系信息一起保存在数据库中,以便您可以联系他们。由于这不是本教程的主题,并且为了简单起见,我们将只使用这个 Python 列表。

如果您非常注意,您可能已经注意到列表中有两个非常长的域,它们不是有效的网站(我希望在您阅读本文时没有人购买它们以证明我错了!)。我添加了这两个域,以确保每次运行时我们都会关闭一些网站。另外,我们将我们的应用程序 命名为UptimeSquirrel。

串行方法

首先,让我们尝试串行方法,看看它的性能如何。我们会将此视为基准。

# serial_squirrel.py
 
import time
  
start_time = time.time()
 
for address in WEBSITE_LIST:
    check_website(address)
         
end_time = time.time()        
 
print("Time for SerialSquirrel: %ssecs" % (end_time - start_time))
 
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for SerialSquirrel: 15.881232261657715secs

线程方法

我们将通过线程方法的实施获得更多创意。我们正在使用队列将地址放入并创建工作线程以将它们从队列中取出并进行处理。我们将等待队列为空,这意味着所有地址都已由我们的工作线程处理。

# threaded_squirrel.py
 
import time
from queue import Queue
from threading import Thread
 
NUM_WORKERS = 4
task_queue = Queue()
 
def worker():
    # constantly check the queue for addresses
    while True:
        address = task_queue.get()
        check_website(address)
         
        # Mark the processed task as done
        task_queue.task_done()
 
start_time = time.time()
         
# Create the worker threads
threads = [Thread(target=worker) for _ in range(NUM_WORKERS)]
 
# Add the websites to the task queue
[task_queue.put(item) for item in WEBSITE_LIST]
 
# Start all the workers
[thread.start() for thread in threads]
 
# Wait for all the tasks in the queue to be processed
task_queue.join()
          
end_time = time.time()        
 
print("Time for ThreadedSquirrel: %ssecs" % (end_time - start_time))
 
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for ThreadedSquirrel: 3.110753059387207secs

应用程序concurrent.futures接口

如前所述, concurrent.futures 是使用线程的高级 API。我们在这里采用的方法意味着使用 ThreadPoolExecutor. 我们将向池中提交任务并取回未来,这些结果将在未来提供给我们。当然,我们可以等待所有的期货成为实际结果。

# future_squirrel.py
 
import time
import concurrent.futures
 
NUM_WORKERS = 4
 
start_time = time.time()
 
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
    futures = {executor.submit(check_website, address) for address in WEBSITE_LIST}
    concurrent.futures.wait(futures)
 
end_time = time.time()        
 
print("Time for FutureSquirrel: %ssecs" % (end_time - start_time))
 
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for FutureSquirrel: 1.812899112701416secs

多处理方法

该 multiprocessing 库为该库提供了几乎直接替换的 API  threading 。在这种情况下,我们将采用一种更类似于上述方法的方法 concurrent.futures 。我们 通过将函数映射到地址列表(想想经典的 Python 函数)来设置multiprocessing.Pool 并向其提交任务 。map

# multiprocessing_squirrel.py
 
import time
import socket
import multiprocessing
 
NUM_WORKERS = 4
 
start_time = time.time()
 
with multiprocessing.Pool(processes=NUM_WORKERS) as pool:
    results = pool.map_async(check_website, WEBSITE_LIST)
    results.wait()
 
end_time = time.time()        
 
print("Time for MultiProcessingSquirrel: %ssecs" % (end_time - start_time))
 
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for MultiProcessingSquirrel: 2.8224599361419678secs

Gevent

Gevent 是实现大规模并发的流行替代方案。在使用它之前,您需要了解一些事项:

  • 由 greenlets 同时执行的代码是确定性的。与其他提供的替代方案相反,此范例保证对于任何两次相同的运行,您将始终以相同的顺序获得相同的结果。

  • 您需要猴子修补标准功能,以便它们与 gevent 合作。这就是我的意思。通常,套接字操作是阻塞的。我们正在等待手术完成。如果我们处于多线程环境中,调度程序将简单地切换到另一个线程,而另一个线程正在等待 I/O。由于我们不在多线程环境中,gevent 修补标准函数,使它们成为非阻塞并将控制权返回给 gevent 调度程序。

要安装 gevent,请运行: pip install gevent

以下是如何使用 gevent 执行我们的任务 gevent.pool.Pool:

# green_squirrel.py
 
import time
from gevent.pool import Pool
from gevent import monkey
 
# Note that you can spawn many workers with gevent since the cost of creating and switching is very low
NUM_WORKERS = 4
 
# Monkey-Patch socket module for HTTP requests
monkey.patch_socket()
 
start_time = time.time()
 
pool = Pool(NUM_WORKERS)
for address in WEBSITE_LIST:
    pool.spawn(check_website, address)
 
# Wait for stuff to finish
pool.join()
         
end_time = time.time()        
 
print("Time for GreenSquirrel: %ssecs" % (end_time - start_time))
# Time for GreenSquirrel: 3.8395519256591797secs

Celery

Celery 是一种与我们目前所见大不相同的方法。它在非常复杂和高性能的环境中经过了实战测试。与上述所有解决方案相比,设置 Celery 需要更多的修改。

首先,我们需要安装 Celery:

pip install celery

任务是 Celery 项目中的核心概念。你想要在 Celery 中运行的所有东西都需要是一个任务。Celery 为运行任务提供了极大的灵活性:您可以同步或异步、实时或计划、在同一台机器或多台机器上运行它们,并使用线程、进程、Eventlet 或 gevent。

安排会稍微复杂一些。Celery 使用其他服务来发送和接收消息。这些消息通常是任务或任务的结果。为此,我们将在本教程中使用 redis。Redis 是一个不错的选择,因为它真的很容易安装和配置,而且您真的有可能已经在您的应用程序中将它用于其他目的,例如缓存和发布/订阅。 

您可以按照Redis 快速入门页面上的说明安装 Redis 。不要忘记安装 redis Python 库, pip install redis以及使用 Redis 和 Celery 所需的包:  pip install celery[redis]。

像这样启动 Redis 服务器: $ redis-server

要开始使用 Celery 构建东西,我们首先需要创建一个 Celery 应用程序。之后,Celery 需要知道它可能执行什么样的任务。为此,我们需要将任务注册到 Celery 应用程序。我们将使用 @app.task 装饰器来做到这一点:

# celery_squirrel.py
 
import time
from utils import check_website
from data import WEBSITE_LIST
from celery import Celery
from celery.result import ResultSet
 
app = Celery('celery_squirrel',
             broker='redis://localhost:6379/0',
             backend='redis://localhost:6379/0')
 
@app.task
def check_website_task(address):
    return check_website(address)
 
if __name__ == "__main__":
    start_time = time.time()
 
    # Using `delay` runs the task async
    rs = ResultSet([check_website_task.delay(address) for address in WEBSITE_LIST])
     
    # Wait for the tasks to finish
    rs.get()
 
    end_time = time.time()
 
    print("CelerySquirrel:", end_time - start_time)
    # CelerySquirrel: 2.4979639053344727

如果什么都没有发生,请不要惊慌。请记住,Celery 是一项服务,我们需要运行它。到现在为止,我们只是把任务放到了Redis中,并没有启动Celery去执行。为此,我们需要在代码所在的文件夹中运行此命令:

celery worker -A do_celery --loglevel=debug --concurrency=4

现在重新运行 Python 脚本,看看会发生什么。需要注意的一件事:注意我们如何将 Redis 地址传递给我们的 Redis 应用程序两次。该 broker 参数指定将任务传递给 Celery backend 的位置,也是 Celery 放置结果的位置,以便我们可以在我们的应用程序中使用它们。如果我们不指定 result backend,我们就无法知道任务何时被处理以及结果是什么。

另外,请注意日志现在位于 Celery 进程的标准输出中,因此请务必在适当的终端中检查它们。

结论

我希望这对您来说是一次有趣的旅程,也是对 Python 并行/并发编程世界的一个很好的介绍。旅程到此结束,我们可以得出一些结论:

  • 有几种范例可以帮助我们在 Python 中实现高性能计算。

  • 对于多线程范例,我们有 threading 和 concurrent.futures 库。

  • multiprocessingthreading 为进程而不是线程 提供了一个非常相似的接口 。

  • 请记住,进程实现了真正的并行性,但创建它们的成本更高。

  • 请记住,一个进程可以有更多的线程在其中运行。

  • 不要将并行误认为是并发。请记住,只有并行方法才能利用多核处理器,而并发编程可以智能地调度任务,以便在并行执行实际计算的同时完成等待长时间运行的操作。


文章目录
  • 全局解释器锁
  • 线程与进程
  • 并行与并发
  • Python 并行和并发编程生态系统
  • 构建实际应用程序
    • 串行方法
    • 线程方法
    • 多处理方法
    • Gevent
    • Celery
  • 结论
  • 发表评论