English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Python-Parallelität2Verwendung von asyncio zur Verarbeitung von Parallelität

asyncio

in Python 2in der Ära, hohes Netzwerkprogrammieren ist hauptsächlich durch die drei Bibliotheken Twisted, Tornado und Gevent, aber ihre asynchronen Codes sind weder kompatibel noch portierbar. Wie im vorherigen Abschnitt erwähnt, wollte Gvanrossum, dass Python 3 Implementiert eine nativ basierte auf Generatoren koroutine-Bibliothek, die direkt die Unterstützung für asynchrone IO enthält, das ist asyncio, das in Python 3.4Wurde in die Standardbibliothek eingeführt.

Der Paket asyncio verwendet koroutine-basierte Synchronisation durch Ereigniszyklus für die Parallelität.

Das Paket asyncio trug vor seiner Aufnahme in die Standardbibliothek den Namen "Tulip" (Tulpe), daher finden Sie oft diesen Namen im Internet, wenn Sie nach Informationen suchen.

Was ist der Ereigniszyklus?63;

Wikipedia sagt: Der Ereigniszyklus ist "eine Programmarchitektur, die darauf wartet, dass das Programm Ereignisse oder Nachrichten zugeteilt bekommt". Im Grunde genommen ist der Ereigniszyklus: "Wenn A passiert, führt B aus". Oder mit einem einfachen Beispiel erklärt, dieses Konzept ist die JavaScript-Ereigniswiederholung in jedem Browser vorhanden. Wenn Sie etwas klicken („Wenn A passiert“), diese Klickaktion wird an die Ereigniswiederholung von JavaScript gesendet und überprüft, ob es existierende onclick-Abkömmlinge gibt, um diesen Klick zu verarbeiten (B auszuführen). Solange es registrierte Callback-Funktionen gibt, werden diese zusammen mit den Details der Klickaktion ausgeführt. Der Ereigniszyklus wird als flüchtig angesehen, weil er ständig Ereignisse abholt und durch den Zyklus diese Ereignisse weiterleitet, wie sie behandelt werden sollen.

Für Python ist asyncio, der in die Standardbibliothek aufgenommen wurde, bevor er als "Tulip" (Tulpe) bezeichnet wurde. Daher finden Sie oft diesen Namen im Internet, wenn Sie nach Informationen suchen./O ist bereit zu lesen und/oder als "wenn A passiert" (durch das Modul selectors). Neben GUI und I/O, die Ereigniswiederholung wird oft verwendet, um Code in anderen Threads oder Unterprozessen auszuführen und die Ereigniswiederholung als Regulierungsmechanismus (z.B. kooperativer Multitasking) zu verwenden. Wenn Sie genau verstehen, was das Python GIL ist, ist die Ereigniswiederholung für Orte sehr nützlich, wo der GIL freigegeben werden muss.

Threads und Koroutines

Wir schauen uns zwei Abschnitte von Code an, die durch das Modul threading und das Paket asyncio implementiert wurden.

# sinner_thread.py
import threading
import itertools
import time
import sys
class Signal: # Diese Klasse definiert ein veränderliches Objekt, um von außen Threads zu steuern
 go = True
def spin(msg, signal): # Diese Funktion wird in einem separaten Thread ausgeführt und der Parameter signal ist eine Instanz der zuvor definierten Signal-Klasse
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|)/-\"): # Die Funktion itertools.cycle generiert Elemente aus der angegebenen Sequenz unendlich oft
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x0)8' * len(status)) # Verwende den Backspace-Operator, um den Cursor an den Anfang der Zeile zurückzusetzen
  time.sleep(.)1) # Jede 0.1 ein Sekunde aktualisieren
  if not signal.go: # Wenn das Attribut go nicht True ist, wird der Zyklus beendet
   break
 write(' ' * len(status) + '\x08' * len(status)) # Use spaces to clear the status message, move the cursor back to the beginning
def slow_function(): # Simuliert eine zeitaufwändige Operation
 # Assume waiting I/O for a while
 time.sleep(3) # Der Aufruf von sleep blockiert den Hauptthread, dies geschieht, um den GIL zu lösen und einen Unterthread zu erstellen
 return 42
def supervisor(): # Diese Funktion stellt die Unterthread, zeigt das Thread-Objekt an, führt die Zeitverbrauchsrechnung durch und tötet letztendlich den Prozess
 signal = Signal()
 spinner = threading.Thread(target=spin,
        args=('thinking!', signal))
 print('spinner object:', spinner) # Zeige den Thread-Objekt Ausgabe spinner object: <Thread(Thread-1, initial)>
 spinner.start() # Starte den Unterprozess
 result = slow_function() # Führe die Zeilen von slow_function aus, blockiert den Hauptthread. Zeige den spinner als Animation im Hintergrund
 signal.go = False
 spinner.join() # Warte auf das Ende des spinner-Threads
 return result
def main():
 result = supervisor() 
 print('Antwort', result)
if __name__ == '__main__':
 main()

Führe dies aus, und das Ergebnis sieht ungefähr so aus:

Dies ist ein animierter Bildschirm, bei dem das Zeichen vor "thinking" blinkt (um das Aufzeichnen zu erleichtern, habe ich die Sleep-Zeit erhöht)

Python bietet keine API zum Beenden von Threads, daher muss eine Nachricht an den Thread gesendet werden, um ihn zu schließen. Hier verwenden wir das Attribut signal.go: Setze es im Hauptthread auf False, und der spinner-Thread wird dies empfangen und beenden

Nun schauen wir uns die Version mit dem Paket asyncio an:

# spinner_asyncio.py
# Zeige Text-Spinne durch Coroutine an
import asyncio
import itertools
import sys
@asyncio.coroutine # Dekorator @asyncio.coroutine für Koroutines, die von asyncio bearbeitet werden sollen
def spin(msg):
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|)/-\"): # Die Funktion itertools.cycle generiert Elemente aus der angegebenen Sequenz unendlich oft
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x0)8' * len(status)) # Verwende den Backspace-Operator, um den Cursor an den Anfang der Zeile zurückzusetzen
  try:
   yield from asyncio.sleep(0.)1) # Verwende yield from asyncio.sleep(0.)1) ersetzt time.sleep(.)1Diese Art der Ruhe wird den Ereigniszyklus nicht blockieren
  except asyncio.CancelledError: # If the spin function wakes up and throws asyncio.CancelledError exception, the reason is that a cancellation request was made
   break
 write(' ' * len(status) + '\x08' * len(status)) # Use spaces to clear the status message, move the cursor back to the beginning
@asyncio.coroutine
def slow_function(): # 5 Now this function is a coroutine, and use sleep to simulate I/# O while performing operations, use yield from to continue the event loop
 # Assume waiting I/O for a while
 yield from asyncio.sleep(3) # This expression passes control to the main loop, and this coroutine is resumed after the sleep ends
 return 42
@asyncio.coroutine
def supervisor(): # This function is also a coroutine, so it can use yield from to drive slow_function
 spinner = asyncio.async(spin('thinking!')) # The asyncio.async() function schedules the execution time of the coroutine, wraps the spin coroutine with a Task object and returns immediately
 print('spinner object:', spinner) # Task object, output similar to spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
 # Drive the slow_function() function, get the return value after it ends. At the same time, the event loop continues to run,
 # Because slow_function function uses yield from asyncio.sleep at the end3) Expression passes control to the main loop
 result = yield from slow_function()
 # The Task object can be canceled; after cancellation, asyncio.CancelledError exception will be thrown at the yield where the coroutine is currently paused
 # The coroutine can catch this exception, or delay cancellation, or even refuse cancellation
 spinner.cancel()
 return result
def main():
 loop = asyncio.get_event_loop() # Get the event loop reference
 # Drive the supervisor coroutine to run until completion; the return value of this coroutine is the return value of this call
 result = loop.run_until_complete(supervisor())
 loop.close()
 print('Antwort', result)
if __name__ == '__main__':
 main()

Verwenden Sie time.sleep() nicht in asyncio-Koroutines, es sei denn, Sie möchten die Hauptthread blockieren und den Ereigniszyklus oder die gesamte Anwendung einfrieren.

Wenn eine Koroutine für eine Weile nichts tun muss, sollte yield from asyncio.sleep(DELAY) verwendet werden.

Die Verwendung des Dekorators @asyncio.coroutine ist nicht obligatorisch, wird aber empfohlen, da dies die Koroutines im Code hervorhebt. Wenn die Koroutine noch keine Werte ausgegeben hat, wird der Müllrecycler ausgelöst (was bedeutet, dass die Operation nicht abgeschlossen ist und möglicherweise defekt ist), und eine Warnung kann ausgegeben werden. Dieser Dekorator aktiviert die Koroutine nicht vorzeitig.

Die Ausführungsergebnisse dieser beiden Codezeilen sind fast gleich. Lassen Sie uns nun die Hauptunterschiede im Core-Code supervisor betrachten:

  1. Das asyncio.Task-Objekt ist fast äquivalent zum threading.Thread-Objekt (Task-Objekte sind wie grüne Threads in Bibliotheken für schreibende Multitasking)
  2. Das Task-Objekt wird verwendet, um Koroutines anzuregen, und das Thread-Objekt wird verwendet, um aufrufbare Objekte aufzurufen.
  3. Das Task-Objekt wird nicht selbst instanziert, sondern durch das Übergeben der Koroutine an die Funktion asyncio.async(...) oder die Methode loop.create_task(...) erhalten.
  4. Der erhaltene Task-Objekt ist bereits geplant, um ausgeführt zu werden; das Thread-Objekt muss die Methode start aufrufen, um es zu informieren, dass es ausgeführt werden soll.
  5. In der Thread-Version der supervisor-Funktion ist slow_function eine gewöhnliche Funktion, die direkt von dem Thread aufgerufen wird, während die asynchrone Version der slow_function-Funktion eine Koroutine ist, die von yield from angetrieben wird.
  6. Es gibt keine API, um Threads von außen abzubrechen, da Threads jederzeit unterbrochen werden können. Um eine Aufgabe abzubrechen, kann die Instanzmethode Task.cancel() verwendet werden, um eine CancelledError-Exception im Koroutine-Interna auszulösen. Koroutines können diese Ausnahme an der yield-Pause erfassen und den Abbruchantrag bearbeiten.
  7. Der Supervisor-Koroutine muss von der main-Funktion mittels der Methode loop.run_until_complete ausgeführt werden.

Ein entscheidender Vorteil von Koroutines im Vergleich zu Threads ist, dass Threads gezwungen sind, Locks zu behalten, um wichtige Teile des Programms zu schützen, um zu verhindern, dass mehrstufige Operationen während der Ausführung unterbrochen werden und um zu verhindern, dass das Wasser im Zustand von Xiaoyao bleibt. Koroutines schützen standardmäßig, wir müssen explizit die Kontrolle abgeben (indem wir yield oder yield from verwenden), damit der Rest des Programms ausgeführt werden kann.

asyncio.Future: absichtlich nicht blockieren

Die Schnittstelle der Klasse asyncio.Future ist im Grunde identisch mit der der Klasse concurrent.futures.Future, aber die Implementierung ist unterschiedlich und kann nicht vertauscht werden.

Im letzten Artikel über [python Parallelität 1: Verwenden Sie futures zur Verarbeitung von Parallelität]() Wir haben bereits über den future der concurrent.futures.Future besprochen. In concurrent.futures.Future ist future nur das Ergebnis der Verwaltung der Ausführung eines bestimmten Elements. In der asyncio-Pakete akzeptiert das Verfahren BaseEventLoop.create_task(...) eine Koroutine, plant ihre Laufzeit und gibt eine Instanz von asyncio.Task zurück (die auch eine Instanz der Klasse asyncio.Future ist, da Task eine Unterklasse von Future ist und Koroutines umschließt). (Ein ähnliches Verfahren in concurrent.futures.Future ist Executor.submit(...)).

Ähnlich wie die Klasse concurrent.futures.Future bietet auch die Klasse asyncio.Future

  1. .done() gibt einen Boolean-Wert zurück, der angibt, ob der Future bereits ausgeführt wurde.
  2. Das .add_done_callback()-Verfahren hat nur einen Parameter, der vom Typ eines aufrufbaren Objekts ist. Nachdem der Future abgeschlossen ist, wird dieser Objekt aufgerufen.
  3. Das .result()-Verfahren hat keine Parameter, daher kann keine Zeitüberschreitung angegeben werden. Wenn das .result()-Verfahren aufgerufen wird und das Verfahren noch nicht abgeschlossen ist, wird eine asyncio.InvalidStateError-Ausnahme ausgelöst.

Nachdem die Future des concurrent.futures.Future-Klassenobjekts abgeschlossen ist, wird result() aufgerufen, um das Ergebnis des aufrufbaren Objekts zurückzugeben oder eine Ausnahme auszulegen, die beim Ausführen des aufrufbaren Objekts ausgelöst wurde. Wenn das Future nicht abgeschlossen ist und das f.result()-Verfahren aufgerufen wird, wird die Ausführung der Thread, in der der Aufrufer sich befindet, blockiert, bis ein Ergebnis zurückgegeben wird. In diesem Fall kann das result()-Verfahren auch den timeout-Parameter akzeptieren. Wenn das Future innerhalb der angegebenen Zeit nicht abgeschlossen ist, wird eine TimeoutError-Ausnahme ausgelöst.

Wenn wir asyncio.Future verwenden, verwenden wir normalerweise yield from, um Ergebnisse zu erhalten, anstatt das result()-Verfahren zu verwenden. Der yield from-Ausdruck erzeugt einen Rückgabewert im angehaltenen Koroutine, um den Ablauf des Rückgabeprozesses fortzusetzen.

Das Ziel der Klasse asyncio.Future ist die Verwendung mit yield from, daher ist es in der Regel nicht erforderlich, die folgenden Methoden zu verwenden:

  1. Es ist nicht erforderlich, my_future.add_down_callback(...), da die Operationen, die nach dem Abschluss des Futures ausgeführt werden sollen, direkt hinter dem Ausdruck yield from my_future in der Koroutine platziert werden können. (Da Koroutines Funktionen pausieren und wieder fortsetzen können)
  2. Es ist nicht erforderlich, my_future.result() aufzurufen, da das durch yield from erzeugte Ergebnis genau das ist ((result = yield from my_future))

Im Paket asyncio kann mit yield from ein Ergebnis aus einem asyncio.Future-Objekt produziert werden. Dies bedeutet, dass wir so schreiben können:

res = yield from foo() # foo kann eine Koroutine-Funktion oder eine normale Funktion sein, die ein Future- oder Task-Objekt zurückgibt

asyncio.async(...)* Funktion

asyncio.async(coro_or_future, *, loop=None)

Diese Funktion vereinheitlicht Koroutines und Future: Der erste Parameter kann einer von beiden sein. Wenn es sich um ein Future- oder Task-Objekt handelt, wird es direkt zurückgegeben. Wenn es sich um eine Koroutine handelt, ruft die async-Funktion automatisch die Methode loop.create_task(...) auf, um ein Task-Objekt zu erstellen. Der Parameter loop ist optional und wird verwendet, um den Ereigniszyklus einzugeben; wenn er nicht übergeben wird, wird die async-Funktion durch Aufruf der Funktion asyncio.get_event_loop() ein Loop-Objekt erhalten.

BaseEventLoop.create_task(coro)

Diese Methode plant die Ausführungszeit von Koroutines und gibt ein asyncio.Task-Objekt zurück. Wenn sie auf einer eigenen BaseEventLoop-Unterklasse aufgerufen wird, könnte das zurückgegebene Objekt eine Instanz einer Klasse sein, die mit der Task-Klasse kompatibel ist, die in einer externen Bibliothek vorhanden ist.

Die Methode BaseEventLoop.create_task() ist nur in Python3.4.2 und höheren Versionen verfügbar. Python3.3 Nur die Funktion asyncio.async(...) kann verwendet werden.
Wenn Sie future und Koroutines in der Python-Konsole oder in kleinen Testskripten experimentieren möchten, können Sie den folgenden Abschnitt verwenden:

import asyncio
def run_sync(coro_or_future):
 loop = asyncio.get_event_loop()
 return loop.run_until_complete(coro_or_future)
a = run_sync(some_coroutine())

Downloaden Sie mit asyncio und dem Paket aiohttp

Jetzt, da wir die Grundlagen von asyncio kennen, ist es an der Zeit, asyncio zu verwenden, um unsere Konkurrenz in unserem letzten Beitrag [python concurrent] neu zu schreiben 1:Verwenden Sie futures, um den parallelen Download von Flaggen abzuhandeln]

Sehen wir uns einmal den Code an:

import asyncio
import aiohttp # Sie müssen pip install aiohttp
from flags import save_flag, show, main, BASE_URL
@asyncio.coroutine # Wir wissen, dass Koroutinen mit asyncio.coroutine dekoriert werden sollten
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
  # Blockierende Operationen werden durch Koroutinen realisiert, und das Kundencode delegiert die Verantwortung an die Koroutine über yield from, um asynchrone Operationen durchzuführen
 resp = yield from aiohttp.request('GET', url) 
 # Das Lesen ist ebenfalls eine asynchrone Operation
 image = yield from resp.read()
 return image
@asyncio.coroutine
def download_one(cc): # Diese Funktion muss auch eine Koroutine sein, da yield from verwendet wird
 image = yield from get_flag(cc) 
 show(cc)
 save_flag(image, cc.lower()) + '.gif')}
 return cc
def download_many(cc_list):
 loop = asyncio.get_event_loop() # Rufe den Beleg der Unterlage des Ereigniszyklus ab
 to_do = [download_one(cc) for cc in sorted(cc_list)] # Rufe download_one auf, um alle Flaggen zu erhalten und eine Liste von Generator-Objekten zu erstellen
 # Obwohl der Name der Funktion wait ist, ist sie keine blockierende Funktion. Wait ist eine Koroutine, die endet, wenn alle Koroutinen, die an ihn übergeben werden, ausgeführt sind.
 wait_coro = asyncio.wait(to_do)
 res, _ = loop.run_until_complete(wait_coro) # Führe den Ereigniszyklus aus, bis wait_coro beendet ist; während der Lauf des Ereigniszyklus wird dieser Skript hier blockiert.
 loop.close() # Schließe den Ereigniszyklus
 return len(res)
if __name__ == '__main__':
 main(download_many)

Die Zusammenfassung der Ausführung dieses Codes ist wie folgt:

  1. In der Funktion download_many wird ein Ereigniszyklus abgerufen, um mehrere Koroutinen zu verarbeiten, die durch die Ausführung der Funktion download_one erstellt werden.
  2. Der Ereigniszyklus von asyncio aktiviert einmalig alle Koroutinen.
  3. Wenn der Koroutinen (get_flag) im Kundencode die Verantwortung an die Koroutinen (aiohttp.request) in der Bibliothek übergeben wird, die mit yield from verwaltet werden, wird das Kontrollrecht an den Ereigniszyklus zurückgegeben und die Koroutinen, die vor der Ausführung eingereiht sind, ausgeführt.
  4. Der Event-Loop erhält Benachrichtigungen über den Untergrund-API basierend auf Callbacks, nachdem blockierende Operationen abgeschlossen sind.
  5. Nachdem die Benachrichtigung erhalten wurde, sendet der Hauptzyklus das Ergebnis an die angehaltene Koroutine
  6. Die Koroutine führt vorwärts bis zum nächsten Ausdruck yield from aus, z.B. get_flag functions yield from resp.read(). Der Event-Loop erhält wieder die Kontrolle und wiederholt Schritt4~6Schritt fortzusetzen, bis der Zyklus endet.

In der Funktion download_many verwenden wir die Funktion asyncio.wait(...), die eine Koroutine ist und als Parameter ein iterables Objekt aus future oder Koroutine ist; wait packt jede Koroutine in ein Task-Objekt. Das Endresultat ist, dass alle von wait verarbeiteten Objekte auf irgendeine Weise in Instanzen der Klasse Future umgewandelt werden.

wait ist eine Koroutine-Funktion, daher wird eine Koroutine oder ein Generator-Objekt zurückgegeben; der Variable waite_coro wird dieses Objekt gespeichert

Die Methode loop.run_until_complete nimmt ein future oder eine Koroutine als Parameter. Wenn es eine Koroutine ist, packt die Methode run_until_complete sie wie die Funktion wait in ein Task-Objekt. Hier packt die Methode run_until_complete wait_coro in ein Task-Objekt, das von yield from angetrieben wird. Nach dem Ende von wait_coro werden zwei Parameter zurückgegeben, der erste Parameter ist der abgeschlossene future und der zweite Parameter ist der unvollständige future.

<section class="caption">wait</section> hat zwei Namensparameter, timeout und return_when. Wenn sie gesetzt sind, kann es zu einer Rückgabe unvollständiger future kommen.

Sie haben vielleicht auch bemerkt, dass wir die Funktion get_flags neu geschrieben haben, weil die zuvor verwendete Bibliothek requests eine blockierende I/O-Operation. Um den Paket asyncio zu verwenden, müssen wir die Funktion in eine asynchrone Version ändern.

Tipp

Falls Sie den Eindruck haben, dass der Code nach der Verwendung von Koroutines schwer zu verstehen ist, können Sie den Rat des Vaters von Python (Guido van Rossum) befolgen und annehmen, dass es kein 'yield from' gibt.

Nehmen wir das obige Beispiel

@asyncio.coroutine
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url) 
 image = yield from resp.read()
 return image
# Entfernen Sie 'yield form'
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = aiohttp.request('GET', url) 
 image = resp.read()
 return image
# Es ist jetzt klarer, oder?

Wissenspunkte

Bei der Verwendung von yield from im API des asyncio-Pakets gibt es eine Detail, das beachtet werden muss:

Bei der Verwendung des asyncio-Pakets enthalten wir in unserem asynchronen Code Koroutines (Delegierte Generatoren), die von asyncio selbst angetrieben werden, und die Generatoren leiten letztlich die Verantwortung an Koroutines im asyncio-Paket oder in Drittanbieterbibliotheken weiter. Diese Vorgehensweise entspricht dem Aufbau eines Rohres, das den asyncio-Ereigniszyklus anregt, um die unteren asynchronen I/Die Bibliotheksfunktion O.

Vermeiden Sie blockierende Aufrufe

Zunächst sehen wir ein Diagramm, das die Verzögerungen bei der Lesung von Daten aus verschiedenen Speichermedien durch den Computer zeigt:

Durch dieses Diagramm können wir sehen, dass blockierende Aufrufe für den CPU eine enorme Verschwendung sind. Gibt es einen Weg, um blockierende Aufrufe zu vermeiden, dass sie die gesamte Anwendung unterbrechen?

Es gibt zwei Methoden:

  1. Führen Sie jede blockierende Operation in einem separaten Thread aus
  2. Konvertieren Sie jede blockierende Operation in eine nicht-blockierende asynchrone Aufruf

Natürlich empfehlen wir die zweite Methode, da die erste Methode zu teuer wäre, wenn jeder Verbindung ein Thread verwendet wird.

Eine zweite Möglichkeit, um asynchrone Programmierung zu realisieren, ist es, Generatoren als Koroutines zu verwenden. Für den Ereigniszyklus ist der Aufruf von Callbacks ähnlich wie das Aufrufen der Methode .send() auf einer angehaltenen Koroutine. Die von angehaltenen Koroutines verbrauchten Speicher sind viel geringer als die von Threads.

Nun sollten Sie verstehen können, warum der flags_asyncio.py-Skript viel schneller ist als flags.py.

Da flags.py nacheinander synchron heruntergeladen wird, müssen für jede Download-Vorgang mehrere Milliarden CPU-Zyklen auf das Ergebnis gewartet werden. In flags_asyncio.py wird bei der Aufrufung der Methode loop.run_until_complete im download_many-Funktion der Ereigniszyklus die verschiedenen download_one-Koroutines angetrieben, bis zum Ausdruck yield from, der wiederum die verschiedenen get_flag-Koroutines angetrieben, bis zum ersten Ausdruck yield from, die Funktion aiohttp.request() aufgerufen. Diese Aufrufe blockieren nicht, sodass alle Anfragen innerhalb weniger Sekunden vollständig gestartet werden können.

Verbessern Sie den asyncio-Download-Skript

Nun verbessern wir die flags_asyncio.py, indem wir darin Ausnahmebehandlung und Zähler hinzufügen

import asyncio
import collections
from collections import namedtuple
from enum import Enum
import aiohttp
from aiohttp import web
from flags import save_flag, show, main, BASE_URL
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')
# Benutzerdefinierte Ausnahme zum Verpacken anderer HTTP- oder Netzwerkfehler und zum Erhalten von country_code, um Fehler zu melden
class FetchError(Exception):
 def __init__(self, country_code):
  self.country_code = country_code
@asyncio.coroutine
def get_flag(cc):
 # Diese Coroutine hat drei mögliche Rückgabewerte:
 # 1. Gibt das heruntergeladene Bild zurück
 # 2. HTTP-Antwort ist404 . Wird web.HTTPNotFound-Ausnahme ausgelöst
 # 3. Werden andere HTTP-Statuscodes zurückgegeben, wird aiohttp.HttpProcessingError ausgelöst
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url)
 if resp.status == 200:
  image = yield from resp.read()
  return image
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers
  )
@asyncio.coroutine
def download_one(cc, semaphore):
 # Der semaphore-Parameter ist eine Instanz der asyncio.Semaphore-Klasse
 # Die Semaphore-Klasse ist ein Synchronisationsmechanismus, der zur Begrenzung von parallelen Anfragen verwendet wird
 try:
  with (yield from semaphore):
    # Semaphore wird im yield from Ausdruck als Kontextmanager verwendet, um das Blockieren des gesamten Systems zu verhindern
    # Wenn der Wert des semaphore-Zählers der maximal erlaubte Wert ist, wird nur diese Coroutine blockieren
    image = yield from get_flag(cc)
    # Nach dem Verlassen des with-Statements wird der Wert des semaphore-Zählers verringert
    # Entfernen von Blockaden, die möglicherweise von anderen Coroutine-Instanzen, die denselben semaphore-Objekt erwarten, verursacht werden
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  save_flag(image, cc.lower()) + '.gif')}
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)
@asyncio.coroutine
def downloader_coro(cc_list):
 counter = collections.Counter()
 # Erstellen eines asyncio.Semaphore-Beispiels, das maximal MAX_CONCUR_REQ aktivierte Coroutines zulässt, die diesen Zähler verwenden
 semaphore = asyncio.Semaphore(MAX_CONCUR_REQ)
 # Mehrmalige Aufrufe von download_one Coroutine, um eine Liste von Coroutine-Objekten zu erstellen
 to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
 # Erhalten eines Iterators, der nach dem Abschluss des futures zurückgibt
 to_do_iter = asyncio.as_completed(to_do)
 for future in to_do_iter:
  # Iteration über die abgeschlossenen future 
  try:
   res = yield from future # Ergebnisse des asyncio.Future-Objekts erhalten (kann auch future.result aufgerufen werden)
  except FetchError as exc:
   # Alle ausgelösten Ausnahmen werden in das Objekt FetchError verpackt
   country_code = exc.country_code
   try:
    # Versuchen, die Fehlermeldung aus der ursprünglichen Ausnahme (__cause__) zu erhalten
    error_msg = exc.__cause__.args[0]
   except IndexError:
    # Wenn keine Fehlermeldung im ursprünglichen Ausnahmequellcode gefunden wird, wird der Name der Klasse der angeschlossenen Ausnahme als Fehlermeldung verwendet
    error_msg = exc.__cause__.__class__.__name__
   if error_msg:
    msg = '*** Fehler für {}: {}'
    print(msg.format(country_code, error_msg))
   status = HTTPStatus.error
  else:
   status = res.status
  counter[status] += 1
 return counter
def download_many(cc_list):
 loop = asyncio.get_event_loop()
 coro = downloader_coro(cc_list)
 counts = loop.run_until_complete(coro)
 loop.close()
 return counts
if __name__ == '__main__':
 main(download_many)

Da die Anfragen, die von Koroutines initiiert werden, schnell durchgeführt werden, um sicherzustellen, dass keine zu viele gleichzeitig an den Server gesendeten Anfragen die Last erhöhen und den Server überlasten, erstellen wir in der Funktion download_coro eine Instanz von asyncio.Semaphore und übergeben sie an die Funktion download_one.

<secion class="caption">Semaphore</section> 对象维护着一个内部计数器,若在对象上调用 .acquire() 协程方法,计数器则递减;若在对象上调用 .release() 协程方法,计数器则递增。计数器的值是在初始化的时候设定。

如果计数器大于0,那么调用 .acquire() 方法不会阻塞,如果计数器为0, .acquire() 方法会阻塞调用这个方法的协程,直到其他协程在同一个 Semaphore 对象上调用 .release() 方法,让计数器递增。

在上边的代码中,我们并没有手动调用 .acquire() 或 .release() 方法,而是在 download_one 函数中 把 semaphore 当做上下文管理器使用:

with (yield from semaphore):
 image = yield from get_flag(cc)

这段代码保证,任何时候都不会有超过 MAX_CONCUR_REQ 个 get_flag 协程启动。

使用 asyncio.as_completed 函数

因为要使用 yield from 获取 asyncio.as_completed 函数产出的future的结果,所以 as_completed 函数秩序在协程中调用。由于 download_many 要作为参数传给非协程的main 函数,我已我们添加了一个新的 downloader_coro 协程,让download_many 函数只用于设置事件循环。

使用Executor 对象,防止阻塞事件循环

现在我们回去看下上边关于电脑从不同存储介质读取数据的延迟情况图,有一个实时需要注意,那就是访问本地文件系统也会阻塞。

上边的代码中,save_flag 函数阻塞了客户代码与 asyncio 事件循环公用的唯一线程,因此保存文件时,整个应用程序都会暂停。为了避免这个问题,可以使用事件循环对象的 run_in_executor 方法。

asyncio 的事件循环在后台维护着一个ThreadPoolExecutor 对象,我们可以调用 run_in_executor 方法,把可调用的对象发给它执行。

下边是我们改动后的代码:

@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  # 这里是改动部分
  loop = asyncio.get_event_loop() # 获取事件循环的引用
  loop.run_in_executor(None, save_flag, image, cc.lower()) + '.gif')}
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

Der erste Parameter der Methode run_in_executor ist ein Executor-Objekt; wenn er auf None gesetzt ist, wird der Standard ThreadPoolExecutor-Objekt des Event Loops verwendet.

Von Callbacks zu Future zu Koroutines

Bevor wir Koroutines verwendet haben, könnten wir eine gewisse Vorstellung von Callbacks haben. Was hat Koroutines im Vergleich zu Callbacks verbessert?

Python Callback-Code-Style:

def stage1(response1):
 request2 = step1(response1)
 api_call2(request2, stage2)
def stage2(response2):
 request3 = step3(response3)
 api_call3(request3, stage3) 
 def stage3(response3):
  step3(response3) 
api_call1(request1, stage1)

Die Defekte des obigen Codes:

  1. kann Callback-Hölle verursachen
  2. Der Code ist schwer lesbar

In diesem Problem kann eine Koroutine eine große Rolle spielen. Wenn man asynchrone Code, der mit Koroutine und yield from erstellt wurde, verwendete, könnte der Code wie folgt aussehen:

@asyncio.coroutine
def three_stages(request1):
 response1 = yield from api_call1(request1)
 request2 = step1(response1)
 response2 = yield from api_call2(requests)
 request3 = step2(response2)
 response3 = yield from api_call3(requests)
 step3(response3) 
loop.create_task(three_stages(request1)

Im Vergleich zu dem vorherigen Code ist dieser Code viel einfacher zu verstehen. Wenn asynchrone Aufrufe an api_call1,api_call2,api_call3 wird eine Exception ausgelöst, dann kann man den entsprechenden Ausdruck yield from in einen try-Block setzen./except-Block behandelt.

Um Koroutines zu verwenden, muss man sich an den Ausdruck yield from gewöhnen und Koroutines dürfen nicht direkt aufgerufen werden. Man muss die Ausführungszeit der Koroutine explizit einstellen oder sie in einer anderen Koroutine, die die Ausführungszeit hat, mit yield from aktivieren. Wenn man den Ausdruck yield from in einem try-Block verwendet, der die Exception behandelt, wird eine Exception ausgelöst.1)) dann passiert nichts.

Nun zeigen wir Ihnen mit einem praktischen Beispiel:

Bei jeder Download-Anfrage werden mehrere Anfragen发起

Wir ändern den Code zum Herunterladen der Nationalflagge, sodass gleichzeitig der Name des Landes abgerufen wird, der bei der Speicherung des Bildes verwendet wird.
Wir verwenden Koroutines und yield from, um dieses Problem zu lösen:

@asyncio.coroutine
def http_get(url):
 resp = yield from aiohttp.request('GET', url)
 if resp.status == 200:
  ctype = resp.headers.get('Content-type', '').lower()
  Wenn 'json' in ctype oder url.endswith('json') enthalten ist:
   data = yield from resp.json()
  else:
   data = yield from resp.read()
  return data
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers)
@asyncio.coroutine
def get_country(cc):
 url = ""/{cc}/metadata.json".format(BASE_URL, cc=cc.lower())
 metadata = yield from http_get(url)
 return metadata['country']
@asyncio.coroutine
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 return (yield from http_get(url))
@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
  with (yield from semaphore):
   country = yield from get_country(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  country = country.replace(' ', '_')
  filename = '{}--{}.gif'.format(country, cc)
  print(filename)
  loop = asyncio.get_event_loop()
  loop.run_in_executor(None, save_flag, image, filename)
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

In diesem Codeaufruf rufen wir in der Funktion download_one get_flag und get_country in zwei mit Semaphore kontrollierten with-Blöcken auf, um Zeit zu sparen.

Die return-Anweisung von get_flag wird im äußeren Block durch Klammern hinzugefügt, weil die Operatorpriorität von () hoch ist und die yield from-Anweisung im Klammern zuerst ausgeführt wird. Ohne hinzuzufügen wird ein syntaktischer Fehler gemeldet

Mit () hinzufügen, entspricht

image = yield from http_get(url)
return image

Falls () nicht hinzugefügt werden, wird das Programm an der Stelle von yield from unterbrochen und das Kontrollrecht abgegeben. In diesem Fall führt return zu einem syntaktischen Fehler.

Zusammenfassung

In diesem Artikel diskutieren wir:

  1. Ein Vergleich zwischen einem Multithreading-Programm und einer Version von asyncio zeigte die Beziehung zwischen Multithreading und asynchronen Aufgaben
  2. Der Unterschied zwischen der Klasse asyncio.Future und der Klasse concurrent.futures.Future wurde verglichen
  3. Wie kann asynchrone Programmierung verwendet werden, um die Hochkonfiguration in Netzwerkanwendungen zu verwalten
  4. Im Vergleich zum Callback in asynchroner Programmierung verbessert der Koroutine die Leistung erheblich

Das ist der gesamte Inhalt dieses Artikels. Wir hoffen, dass er Ihnen bei Ihrem Lernen hilft und dass Sie die呐喊教程大力支持.

Erklärung: Der Inhalt dieses Artikels wurde aus dem Internet übernommen und gehört dem Urheberrecht des jeweiligen Autors. Der Inhalt wurde von Internetnutzern freiwillig beigesteuert und hochgeladen. Diese Website besitzt keine Eigentumsrechte und hat den Inhalt nicht manuell bearbeitet. Sie übernimmt auch keine rechtlichen Verantwortlichkeiten. Wenn Sie Inhalte finden, die möglicherweise urheberrechtlich geschützt sind, freuen wir uns über eine E-Mail an: notice#oldtoolbag.com (bei der E-Mail senden Sie bitte # durch @ ersetzen und geben Sie relevante Beweise an. Sobald nachgewiesen wird, dass Inhalte urheberrechtlich geschützt sind, wird diese Website die fraglichen Inhalte sofort löschen.)

Gefällt mir