English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
asyncio
in Python 2in der Ära, hohes Netzwerkprogrammieren ist hauptsächlich durch die drei Bibliotheken Twisted, Tornado und Gevent, aber ihre asynchronen Codes sind weder kompatibel noch portierbar. Wie im vorherigen Abschnitt erwähnt, wollte Gvanrossum, dass Python 3 Implementiert eine nativ basierte auf Generatoren koroutine-Bibliothek, die direkt die Unterstützung für asynchrone IO enthält, das ist asyncio, das in Python 3.4Wurde in die Standardbibliothek eingeführt.
Der Paket asyncio verwendet koroutine-basierte Synchronisation durch Ereigniszyklus für die Parallelität.
Das Paket asyncio trug vor seiner Aufnahme in die Standardbibliothek den Namen "Tulip" (Tulpe), daher finden Sie oft diesen Namen im Internet, wenn Sie nach Informationen suchen.
Was ist der Ereigniszyklus?63;
Wikipedia sagt: Der Ereigniszyklus ist "eine Programmarchitektur, die darauf wartet, dass das Programm Ereignisse oder Nachrichten zugeteilt bekommt". Im Grunde genommen ist der Ereigniszyklus: "Wenn A passiert, führt B aus". Oder mit einem einfachen Beispiel erklärt, dieses Konzept ist die JavaScript-Ereigniswiederholung in jedem Browser vorhanden. Wenn Sie etwas klicken („Wenn A passiert“), diese Klickaktion wird an die Ereigniswiederholung von JavaScript gesendet und überprüft, ob es existierende onclick-Abkömmlinge gibt, um diesen Klick zu verarbeiten (B auszuführen). Solange es registrierte Callback-Funktionen gibt, werden diese zusammen mit den Details der Klickaktion ausgeführt. Der Ereigniszyklus wird als flüchtig angesehen, weil er ständig Ereignisse abholt und durch den Zyklus diese Ereignisse weiterleitet, wie sie behandelt werden sollen.
Für Python ist asyncio, der in die Standardbibliothek aufgenommen wurde, bevor er als "Tulip" (Tulpe) bezeichnet wurde. Daher finden Sie oft diesen Namen im Internet, wenn Sie nach Informationen suchen./O ist bereit zu lesen und/oder als "wenn A passiert" (durch das Modul selectors). Neben GUI und I/O, die Ereigniswiederholung wird oft verwendet, um Code in anderen Threads oder Unterprozessen auszuführen und die Ereigniswiederholung als Regulierungsmechanismus (z.B. kooperativer Multitasking) zu verwenden. Wenn Sie genau verstehen, was das Python GIL ist, ist die Ereigniswiederholung für Orte sehr nützlich, wo der GIL freigegeben werden muss.
Threads und Koroutines
Wir schauen uns zwei Abschnitte von Code an, die durch das Modul threading und das Paket asyncio implementiert wurden.
# sinner_thread.py import threading import itertools import time import sys class Signal: # Diese Klasse definiert ein veränderliches Objekt, um von außen Threads zu steuern go = True def spin(msg, signal): # Diese Funktion wird in einem separaten Thread ausgeführt und der Parameter signal ist eine Instanz der zuvor definierten Signal-Klasse write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|)/-\"): # Die Funktion itertools.cycle generiert Elemente aus der angegebenen Sequenz unendlich oft status = char + ' ' + msg write(status) flush() write('\x0)8' * len(status)) # Verwende den Backspace-Operator, um den Cursor an den Anfang der Zeile zurückzusetzen time.sleep(.)1) # Jede 0.1 ein Sekunde aktualisieren if not signal.go: # Wenn das Attribut go nicht True ist, wird der Zyklus beendet break write(' ' * len(status) + '\x08' * len(status)) # Use spaces to clear the status message, move the cursor back to the beginning def slow_function(): # Simuliert eine zeitaufwändige Operation # Assume waiting I/O for a while time.sleep(3) # Der Aufruf von sleep blockiert den Hauptthread, dies geschieht, um den GIL zu lösen und einen Unterthread zu erstellen return 42 def supervisor(): # Diese Funktion stellt die Unterthread, zeigt das Thread-Objekt an, führt die Zeitverbrauchsrechnung durch und tötet letztendlich den Prozess signal = Signal() spinner = threading.Thread(target=spin, args=('thinking!', signal)) print('spinner object:', spinner) # Zeige den Thread-Objekt Ausgabe spinner object: <Thread(Thread-1, initial)> spinner.start() # Starte den Unterprozess result = slow_function() # Führe die Zeilen von slow_function aus, blockiert den Hauptthread. Zeige den spinner als Animation im Hintergrund signal.go = False spinner.join() # Warte auf das Ende des spinner-Threads return result def main(): result = supervisor() print('Antwort', result) if __name__ == '__main__': main()
Führe dies aus, und das Ergebnis sieht ungefähr so aus:
Dies ist ein animierter Bildschirm, bei dem das Zeichen vor "thinking" blinkt (um das Aufzeichnen zu erleichtern, habe ich die Sleep-Zeit erhöht)
Python bietet keine API zum Beenden von Threads, daher muss eine Nachricht an den Thread gesendet werden, um ihn zu schließen. Hier verwenden wir das Attribut signal.go: Setze es im Hauptthread auf False, und der spinner-Thread wird dies empfangen und beenden
Nun schauen wir uns die Version mit dem Paket asyncio an:
# spinner_asyncio.py # Zeige Text-Spinne durch Coroutine an import asyncio import itertools import sys @asyncio.coroutine # Dekorator @asyncio.coroutine für Koroutines, die von asyncio bearbeitet werden sollen def spin(msg): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|)/-\"): # Die Funktion itertools.cycle generiert Elemente aus der angegebenen Sequenz unendlich oft status = char + ' ' + msg write(status) flush() write('\x0)8' * len(status)) # Verwende den Backspace-Operator, um den Cursor an den Anfang der Zeile zurückzusetzen try: yield from asyncio.sleep(0.)1) # Verwende yield from asyncio.sleep(0.)1) ersetzt time.sleep(.)1Diese Art der Ruhe wird den Ereigniszyklus nicht blockieren except asyncio.CancelledError: # If the spin function wakes up and throws asyncio.CancelledError exception, the reason is that a cancellation request was made break write(' ' * len(status) + '\x08' * len(status)) # Use spaces to clear the status message, move the cursor back to the beginning @asyncio.coroutine def slow_function(): # 5 Now this function is a coroutine, and use sleep to simulate I/# O while performing operations, use yield from to continue the event loop # Assume waiting I/O for a while yield from asyncio.sleep(3) # This expression passes control to the main loop, and this coroutine is resumed after the sleep ends return 42 @asyncio.coroutine def supervisor(): # This function is also a coroutine, so it can use yield from to drive slow_function spinner = asyncio.async(spin('thinking!')) # The asyncio.async() function schedules the execution time of the coroutine, wraps the spin coroutine with a Task object and returns immediately print('spinner object:', spinner) # Task object, output similar to spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>> # Drive the slow_function() function, get the return value after it ends. At the same time, the event loop continues to run, # Because slow_function function uses yield from asyncio.sleep at the end3) Expression passes control to the main loop result = yield from slow_function() # The Task object can be canceled; after cancellation, asyncio.CancelledError exception will be thrown at the yield where the coroutine is currently paused # The coroutine can catch this exception, or delay cancellation, or even refuse cancellation spinner.cancel() return result def main(): loop = asyncio.get_event_loop() # Get the event loop reference # Drive the supervisor coroutine to run until completion; the return value of this coroutine is the return value of this call result = loop.run_until_complete(supervisor()) loop.close() print('Antwort', result) if __name__ == '__main__': main()
Verwenden Sie time.sleep() nicht in asyncio-Koroutines, es sei denn, Sie möchten die Hauptthread blockieren und den Ereigniszyklus oder die gesamte Anwendung einfrieren.
Wenn eine Koroutine für eine Weile nichts tun muss, sollte yield from asyncio.sleep(DELAY) verwendet werden.
Die Verwendung des Dekorators @asyncio.coroutine ist nicht obligatorisch, wird aber empfohlen, da dies die Koroutines im Code hervorhebt. Wenn die Koroutine noch keine Werte ausgegeben hat, wird der Müllrecycler ausgelöst (was bedeutet, dass die Operation nicht abgeschlossen ist und möglicherweise defekt ist), und eine Warnung kann ausgegeben werden. Dieser Dekorator aktiviert die Koroutine nicht vorzeitig.
Die Ausführungsergebnisse dieser beiden Codezeilen sind fast gleich. Lassen Sie uns nun die Hauptunterschiede im Core-Code supervisor betrachten:
Ein entscheidender Vorteil von Koroutines im Vergleich zu Threads ist, dass Threads gezwungen sind, Locks zu behalten, um wichtige Teile des Programms zu schützen, um zu verhindern, dass mehrstufige Operationen während der Ausführung unterbrochen werden und um zu verhindern, dass das Wasser im Zustand von Xiaoyao bleibt. Koroutines schützen standardmäßig, wir müssen explizit die Kontrolle abgeben (indem wir yield oder yield from verwenden), damit der Rest des Programms ausgeführt werden kann.
asyncio.Future: absichtlich nicht blockieren
Die Schnittstelle der Klasse asyncio.Future ist im Grunde identisch mit der der Klasse concurrent.futures.Future, aber die Implementierung ist unterschiedlich und kann nicht vertauscht werden.
Im letzten Artikel über [python Parallelität 1: Verwenden Sie futures zur Verarbeitung von Parallelität]() Wir haben bereits über den future der concurrent.futures.Future besprochen. In concurrent.futures.Future ist future nur das Ergebnis der Verwaltung der Ausführung eines bestimmten Elements. In der asyncio-Pakete akzeptiert das Verfahren BaseEventLoop.create_task(...) eine Koroutine, plant ihre Laufzeit und gibt eine Instanz von asyncio.Task zurück (die auch eine Instanz der Klasse asyncio.Future ist, da Task eine Unterklasse von Future ist und Koroutines umschließt). (Ein ähnliches Verfahren in concurrent.futures.Future ist Executor.submit(...)).
Ähnlich wie die Klasse concurrent.futures.Future bietet auch die Klasse asyncio.Future
Nachdem die Future des concurrent.futures.Future-Klassenobjekts abgeschlossen ist, wird result() aufgerufen, um das Ergebnis des aufrufbaren Objekts zurückzugeben oder eine Ausnahme auszulegen, die beim Ausführen des aufrufbaren Objekts ausgelöst wurde. Wenn das Future nicht abgeschlossen ist und das f.result()-Verfahren aufgerufen wird, wird die Ausführung der Thread, in der der Aufrufer sich befindet, blockiert, bis ein Ergebnis zurückgegeben wird. In diesem Fall kann das result()-Verfahren auch den timeout-Parameter akzeptieren. Wenn das Future innerhalb der angegebenen Zeit nicht abgeschlossen ist, wird eine TimeoutError-Ausnahme ausgelöst.
Wenn wir asyncio.Future verwenden, verwenden wir normalerweise yield from, um Ergebnisse zu erhalten, anstatt das result()-Verfahren zu verwenden. Der yield from-Ausdruck erzeugt einen Rückgabewert im angehaltenen Koroutine, um den Ablauf des Rückgabeprozesses fortzusetzen.
Das Ziel der Klasse asyncio.Future ist die Verwendung mit yield from, daher ist es in der Regel nicht erforderlich, die folgenden Methoden zu verwenden:
Im Paket asyncio kann mit yield from ein Ergebnis aus einem asyncio.Future-Objekt produziert werden. Dies bedeutet, dass wir so schreiben können:
res = yield from foo() # foo kann eine Koroutine-Funktion oder eine normale Funktion sein, die ein Future- oder Task-Objekt zurückgibt
asyncio.async(...)* Funktion
asyncio.async(coro_or_future, *, loop=None)
Diese Funktion vereinheitlicht Koroutines und Future: Der erste Parameter kann einer von beiden sein. Wenn es sich um ein Future- oder Task-Objekt handelt, wird es direkt zurückgegeben. Wenn es sich um eine Koroutine handelt, ruft die async-Funktion automatisch die Methode loop.create_task(...) auf, um ein Task-Objekt zu erstellen. Der Parameter loop ist optional und wird verwendet, um den Ereigniszyklus einzugeben; wenn er nicht übergeben wird, wird die async-Funktion durch Aufruf der Funktion asyncio.get_event_loop() ein Loop-Objekt erhalten.
BaseEventLoop.create_task(coro)
Diese Methode plant die Ausführungszeit von Koroutines und gibt ein asyncio.Task-Objekt zurück. Wenn sie auf einer eigenen BaseEventLoop-Unterklasse aufgerufen wird, könnte das zurückgegebene Objekt eine Instanz einer Klasse sein, die mit der Task-Klasse kompatibel ist, die in einer externen Bibliothek vorhanden ist.
Die Methode BaseEventLoop.create_task() ist nur in Python3.4.2 und höheren Versionen verfügbar. Python3.3 Nur die Funktion asyncio.async(...) kann verwendet werden.
Wenn Sie future und Koroutines in der Python-Konsole oder in kleinen Testskripten experimentieren möchten, können Sie den folgenden Abschnitt verwenden:
import asyncio def run_sync(coro_or_future): loop = asyncio.get_event_loop() return loop.run_until_complete(coro_or_future) a = run_sync(some_coroutine())
Downloaden Sie mit asyncio und dem Paket aiohttp
Jetzt, da wir die Grundlagen von asyncio kennen, ist es an der Zeit, asyncio zu verwenden, um unsere Konkurrenz in unserem letzten Beitrag [python concurrent] neu zu schreiben 1:Verwenden Sie futures, um den parallelen Download von Flaggen abzuhandeln]
Sehen wir uns einmal den Code an:
import asyncio import aiohttp # Sie müssen pip install aiohttp from flags import save_flag, show, main, BASE_URL @asyncio.coroutine # Wir wissen, dass Koroutinen mit asyncio.coroutine dekoriert werden sollten def get_flag(cc): url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) # Blockierende Operationen werden durch Koroutinen realisiert, und das Kundencode delegiert die Verantwortung an die Koroutine über yield from, um asynchrone Operationen durchzuführen resp = yield from aiohttp.request('GET', url) # Das Lesen ist ebenfalls eine asynchrone Operation image = yield from resp.read() return image @asyncio.coroutine def download_one(cc): # Diese Funktion muss auch eine Koroutine sein, da yield from verwendet wird image = yield from get_flag(cc) show(cc) save_flag(image, cc.lower()) + '.gif')} return cc def download_many(cc_list): loop = asyncio.get_event_loop() # Rufe den Beleg der Unterlage des Ereigniszyklus ab to_do = [download_one(cc) for cc in sorted(cc_list)] # Rufe download_one auf, um alle Flaggen zu erhalten und eine Liste von Generator-Objekten zu erstellen # Obwohl der Name der Funktion wait ist, ist sie keine blockierende Funktion. Wait ist eine Koroutine, die endet, wenn alle Koroutinen, die an ihn übergeben werden, ausgeführt sind. wait_coro = asyncio.wait(to_do) res, _ = loop.run_until_complete(wait_coro) # Führe den Ereigniszyklus aus, bis wait_coro beendet ist; während der Lauf des Ereigniszyklus wird dieser Skript hier blockiert. loop.close() # Schließe den Ereigniszyklus return len(res) if __name__ == '__main__': main(download_many)
Die Zusammenfassung der Ausführung dieses Codes ist wie folgt:
In der Funktion download_many verwenden wir die Funktion asyncio.wait(...), die eine Koroutine ist und als Parameter ein iterables Objekt aus future oder Koroutine ist; wait packt jede Koroutine in ein Task-Objekt. Das Endresultat ist, dass alle von wait verarbeiteten Objekte auf irgendeine Weise in Instanzen der Klasse Future umgewandelt werden.
wait ist eine Koroutine-Funktion, daher wird eine Koroutine oder ein Generator-Objekt zurückgegeben; der Variable waite_coro wird dieses Objekt gespeichert
Die Methode loop.run_until_complete nimmt ein future oder eine Koroutine als Parameter. Wenn es eine Koroutine ist, packt die Methode run_until_complete sie wie die Funktion wait in ein Task-Objekt. Hier packt die Methode run_until_complete wait_coro in ein Task-Objekt, das von yield from angetrieben wird. Nach dem Ende von wait_coro werden zwei Parameter zurückgegeben, der erste Parameter ist der abgeschlossene future und der zweite Parameter ist der unvollständige future.
<section class="caption">wait</section> hat zwei Namensparameter, timeout und return_when. Wenn sie gesetzt sind, kann es zu einer Rückgabe unvollständiger future kommen.
Sie haben vielleicht auch bemerkt, dass wir die Funktion get_flags neu geschrieben haben, weil die zuvor verwendete Bibliothek requests eine blockierende I/O-Operation. Um den Paket asyncio zu verwenden, müssen wir die Funktion in eine asynchrone Version ändern.
Tipp
Falls Sie den Eindruck haben, dass der Code nach der Verwendung von Koroutines schwer zu verstehen ist, können Sie den Rat des Vaters von Python (Guido van Rossum) befolgen und annehmen, dass es kein 'yield from' gibt.
Nehmen wir das obige Beispiel
@asyncio.coroutine def get_flag(cc): url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request('GET', url) image = yield from resp.read() return image # Entfernen Sie 'yield form' def get_flag(cc): url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = aiohttp.request('GET', url) image = resp.read() return image # Es ist jetzt klarer, oder?
Wissenspunkte
Bei der Verwendung von yield from im API des asyncio-Pakets gibt es eine Detail, das beachtet werden muss:
Bei der Verwendung des asyncio-Pakets enthalten wir in unserem asynchronen Code Koroutines (Delegierte Generatoren), die von asyncio selbst angetrieben werden, und die Generatoren leiten letztlich die Verantwortung an Koroutines im asyncio-Paket oder in Drittanbieterbibliotheken weiter. Diese Vorgehensweise entspricht dem Aufbau eines Rohres, das den asyncio-Ereigniszyklus anregt, um die unteren asynchronen I/Die Bibliotheksfunktion O.
Vermeiden Sie blockierende Aufrufe
Zunächst sehen wir ein Diagramm, das die Verzögerungen bei der Lesung von Daten aus verschiedenen Speichermedien durch den Computer zeigt:
Durch dieses Diagramm können wir sehen, dass blockierende Aufrufe für den CPU eine enorme Verschwendung sind. Gibt es einen Weg, um blockierende Aufrufe zu vermeiden, dass sie die gesamte Anwendung unterbrechen?
Es gibt zwei Methoden:
Natürlich empfehlen wir die zweite Methode, da die erste Methode zu teuer wäre, wenn jeder Verbindung ein Thread verwendet wird.
Eine zweite Möglichkeit, um asynchrone Programmierung zu realisieren, ist es, Generatoren als Koroutines zu verwenden. Für den Ereigniszyklus ist der Aufruf von Callbacks ähnlich wie das Aufrufen der Methode .send() auf einer angehaltenen Koroutine. Die von angehaltenen Koroutines verbrauchten Speicher sind viel geringer als die von Threads.
Nun sollten Sie verstehen können, warum der flags_asyncio.py-Skript viel schneller ist als flags.py.
Da flags.py nacheinander synchron heruntergeladen wird, müssen für jede Download-Vorgang mehrere Milliarden CPU-Zyklen auf das Ergebnis gewartet werden. In flags_asyncio.py wird bei der Aufrufung der Methode loop.run_until_complete im download_many-Funktion der Ereigniszyklus die verschiedenen download_one-Koroutines angetrieben, bis zum Ausdruck yield from, der wiederum die verschiedenen get_flag-Koroutines angetrieben, bis zum ersten Ausdruck yield from, die Funktion aiohttp.request() aufgerufen. Diese Aufrufe blockieren nicht, sodass alle Anfragen innerhalb weniger Sekunden vollständig gestartet werden können.
Verbessern Sie den asyncio-Download-Skript
Nun verbessern wir die flags_asyncio.py, indem wir darin Ausnahmebehandlung und Zähler hinzufügen
import asyncio import collections from collections import namedtuple from enum import Enum import aiohttp from aiohttp import web from flags import save_flag, show, main, BASE_URL DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000 Result = namedtuple('Result', 'status data') HTTPStatus = Enum('Status', 'ok not_found error') # Benutzerdefinierte Ausnahme zum Verpacken anderer HTTP- oder Netzwerkfehler und zum Erhalten von country_code, um Fehler zu melden class FetchError(Exception): def __init__(self, country_code): self.country_code = country_code @asyncio.coroutine def get_flag(cc): # Diese Coroutine hat drei mögliche Rückgabewerte: # 1. Gibt das heruntergeladene Bild zurück # 2. HTTP-Antwort ist404 . Wird web.HTTPNotFound-Ausnahme ausgelöst # 3. Werden andere HTTP-Statuscodes zurückgegeben, wird aiohttp.HttpProcessingError ausgelöst url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request('GET', url) if resp.status == 200: image = yield from resp.read() return image elif resp.status == 404: raise web.HttpNotFound() else: raise aiohttp.HttpProcessionError( code=resp.status, message=resp.reason, headers=resp.headers ) @asyncio.coroutine def download_one(cc, semaphore): # Der semaphore-Parameter ist eine Instanz der asyncio.Semaphore-Klasse # Die Semaphore-Klasse ist ein Synchronisationsmechanismus, der zur Begrenzung von parallelen Anfragen verwendet wird try: with (yield from semaphore): # Semaphore wird im yield from Ausdruck als Kontextmanager verwendet, um das Blockieren des gesamten Systems zu verhindern # Wenn der Wert des semaphore-Zählers der maximal erlaubte Wert ist, wird nur diese Coroutine blockieren image = yield from get_flag(cc) # Nach dem Verlassen des with-Statements wird der Wert des semaphore-Zählers verringert # Entfernen von Blockaden, die möglicherweise von anderen Coroutine-Instanzen, die denselben semaphore-Objekt erwarten, verursacht werden except web.HTTPNotFound: status = HTTPStatus.not_found msg = 'not found' except Exception as exc: raise FetchError(cc) from exc else: save_flag(image, cc.lower()) + '.gif')} status = HTTPStatus.ok msg = 'ok' return Result(status, cc) @asyncio.coroutine def downloader_coro(cc_list): counter = collections.Counter() # Erstellen eines asyncio.Semaphore-Beispiels, das maximal MAX_CONCUR_REQ aktivierte Coroutines zulässt, die diesen Zähler verwenden semaphore = asyncio.Semaphore(MAX_CONCUR_REQ) # Mehrmalige Aufrufe von download_one Coroutine, um eine Liste von Coroutine-Objekten zu erstellen to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)] # Erhalten eines Iterators, der nach dem Abschluss des futures zurückgibt to_do_iter = asyncio.as_completed(to_do) for future in to_do_iter: # Iteration über die abgeschlossenen future try: res = yield from future # Ergebnisse des asyncio.Future-Objekts erhalten (kann auch future.result aufgerufen werden) except FetchError as exc: # Alle ausgelösten Ausnahmen werden in das Objekt FetchError verpackt country_code = exc.country_code try: # Versuchen, die Fehlermeldung aus der ursprünglichen Ausnahme (__cause__) zu erhalten error_msg = exc.__cause__.args[0] except IndexError: # Wenn keine Fehlermeldung im ursprünglichen Ausnahmequellcode gefunden wird, wird der Name der Klasse der angeschlossenen Ausnahme als Fehlermeldung verwendet error_msg = exc.__cause__.__class__.__name__ if error_msg: msg = '*** Fehler für {}: {}' print(msg.format(country_code, error_msg)) status = HTTPStatus.error else: status = res.status counter[status] += 1 return counter def download_many(cc_list): loop = asyncio.get_event_loop() coro = downloader_coro(cc_list) counts = loop.run_until_complete(coro) loop.close() return counts if __name__ == '__main__': main(download_many)
Da die Anfragen, die von Koroutines initiiert werden, schnell durchgeführt werden, um sicherzustellen, dass keine zu viele gleichzeitig an den Server gesendeten Anfragen die Last erhöhen und den Server überlasten, erstellen wir in der Funktion download_coro eine Instanz von asyncio.Semaphore und übergeben sie an die Funktion download_one.
<secion class="caption">Semaphore</section> 对象维护着一个内部计数器,若在对象上调用 .acquire() 协程方法,计数器则递减;若在对象上调用 .release() 协程方法,计数器则递增。计数器的值是在初始化的时候设定。
如果计数器大于0,那么调用 .acquire() 方法不会阻塞,如果计数器为0, .acquire() 方法会阻塞调用这个方法的协程,直到其他协程在同一个 Semaphore 对象上调用 .release() 方法,让计数器递增。
在上边的代码中,我们并没有手动调用 .acquire() 或 .release() 方法,而是在 download_one 函数中 把 semaphore 当做上下文管理器使用:
with (yield from semaphore): image = yield from get_flag(cc)
这段代码保证,任何时候都不会有超过 MAX_CONCUR_REQ 个 get_flag 协程启动。
使用 asyncio.as_completed 函数
因为要使用 yield from 获取 asyncio.as_completed 函数产出的future的结果,所以 as_completed 函数秩序在协程中调用。由于 download_many 要作为参数传给非协程的main 函数,我已我们添加了一个新的 downloader_coro 协程,让download_many 函数只用于设置事件循环。
使用Executor 对象,防止阻塞事件循环
现在我们回去看下上边关于电脑从不同存储介质读取数据的延迟情况图,有一个实时需要注意,那就是访问本地文件系统也会阻塞。
上边的代码中,save_flag 函数阻塞了客户代码与 asyncio 事件循环公用的唯一线程,因此保存文件时,整个应用程序都会暂停。为了避免这个问题,可以使用事件循环对象的 run_in_executor 方法。
asyncio 的事件循环在后台维护着一个ThreadPoolExecutor 对象,我们可以调用 run_in_executor 方法,把可调用的对象发给它执行。
下边是我们改动后的代码:
@asyncio.coroutine def download_one(cc, semaphore): try: with (yield from semaphore): image = yield from get_flag(cc) except web.HTTPNotFound: status = HTTPStatus.not_found msg = 'not found' except Exception as exc: raise FetchError(cc) from exc else: # 这里是改动部分 loop = asyncio.get_event_loop() # 获取事件循环的引用 loop.run_in_executor(None, save_flag, image, cc.lower()) + '.gif')} status = HTTPStatus.ok msg = 'ok' return Result(status, cc)
Der erste Parameter der Methode run_in_executor ist ein Executor-Objekt; wenn er auf None gesetzt ist, wird der Standard ThreadPoolExecutor-Objekt des Event Loops verwendet.
Von Callbacks zu Future zu Koroutines
Bevor wir Koroutines verwendet haben, könnten wir eine gewisse Vorstellung von Callbacks haben. Was hat Koroutines im Vergleich zu Callbacks verbessert?
Python Callback-Code-Style:
def stage1(response1): request2 = step1(response1) api_call2(request2, stage2) def stage2(response2): request3 = step3(response3) api_call3(request3, stage3) def stage3(response3): step3(response3) api_call1(request1, stage1)
Die Defekte des obigen Codes:
In diesem Problem kann eine Koroutine eine große Rolle spielen. Wenn man asynchrone Code, der mit Koroutine und yield from erstellt wurde, verwendete, könnte der Code wie folgt aussehen:
@asyncio.coroutine def three_stages(request1): response1 = yield from api_call1(request1) request2 = step1(response1) response2 = yield from api_call2(requests) request3 = step2(response2) response3 = yield from api_call3(requests) step3(response3) loop.create_task(three_stages(request1)
Im Vergleich zu dem vorherigen Code ist dieser Code viel einfacher zu verstehen. Wenn asynchrone Aufrufe an api_call1,api_call2,api_call3 wird eine Exception ausgelöst, dann kann man den entsprechenden Ausdruck yield from in einen try-Block setzen./except-Block behandelt.
Um Koroutines zu verwenden, muss man sich an den Ausdruck yield from gewöhnen und Koroutines dürfen nicht direkt aufgerufen werden. Man muss die Ausführungszeit der Koroutine explizit einstellen oder sie in einer anderen Koroutine, die die Ausführungszeit hat, mit yield from aktivieren. Wenn man den Ausdruck yield from in einem try-Block verwendet, der die Exception behandelt, wird eine Exception ausgelöst.1)) dann passiert nichts.
Nun zeigen wir Ihnen mit einem praktischen Beispiel:
Bei jeder Download-Anfrage werden mehrere Anfragen发起
Wir ändern den Code zum Herunterladen der Nationalflagge, sodass gleichzeitig der Name des Landes abgerufen wird, der bei der Speicherung des Bildes verwendet wird.
Wir verwenden Koroutines und yield from, um dieses Problem zu lösen:
@asyncio.coroutine def http_get(url): resp = yield from aiohttp.request('GET', url) if resp.status == 200: ctype = resp.headers.get('Content-type', '').lower() Wenn 'json' in ctype oder url.endswith('json') enthalten ist: data = yield from resp.json() else: data = yield from resp.read() return data elif resp.status == 404: raise web.HttpNotFound() else: raise aiohttp.HttpProcessionError( code=resp.status, message=resp.reason, headers=resp.headers) @asyncio.coroutine def get_country(cc): url = ""/{cc}/metadata.json".format(BASE_URL, cc=cc.lower()) metadata = yield from http_get(url) return metadata['country'] @asyncio.coroutine def get_flag(cc): url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) return (yield from http_get(url)) @asyncio.coroutine def download_one(cc, semaphore): try: with (yield from semaphore): image = yield from get_flag(cc) with (yield from semaphore): country = yield from get_country(cc) except web.HTTPNotFound: status = HTTPStatus.not_found msg = 'not found' except Exception as exc: raise FetchError(cc) from exc else: country = country.replace(' ', '_') filename = '{}--{}.gif'.format(country, cc) print(filename) loop = asyncio.get_event_loop() loop.run_in_executor(None, save_flag, image, filename) status = HTTPStatus.ok msg = 'ok' return Result(status, cc)
In diesem Codeaufruf rufen wir in der Funktion download_one get_flag und get_country in zwei mit Semaphore kontrollierten with-Blöcken auf, um Zeit zu sparen.
Die return-Anweisung von get_flag wird im äußeren Block durch Klammern hinzugefügt, weil die Operatorpriorität von () hoch ist und die yield from-Anweisung im Klammern zuerst ausgeführt wird. Ohne hinzuzufügen wird ein syntaktischer Fehler gemeldet
Mit () hinzufügen, entspricht
image = yield from http_get(url) return image
Falls () nicht hinzugefügt werden, wird das Programm an der Stelle von yield from unterbrochen und das Kontrollrecht abgegeben. In diesem Fall führt return zu einem syntaktischen Fehler.
Zusammenfassung
In diesem Artikel diskutieren wir:
Das ist der gesamte Inhalt dieses Artikels. Wir hoffen, dass er Ihnen bei Ihrem Lernen hilft und dass Sie die呐喊教程大力支持.
Erklärung: Der Inhalt dieses Artikels wurde aus dem Internet übernommen und gehört dem Urheberrecht des jeweiligen Autors. Der Inhalt wurde von Internetnutzern freiwillig beigesteuert und hochgeladen. Diese Website besitzt keine Eigentumsrechte und hat den Inhalt nicht manuell bearbeitet. Sie übernimmt auch keine rechtlichen Verantwortlichkeiten. Wenn Sie Inhalte finden, die möglicherweise urheberrechtlich geschützt sind, freuen wir uns über eine E-Mail an: notice#oldtoolbag.com (bei der E-Mail senden Sie bitte # durch @ ersetzen und geben Sie relevante Beweise an. Sobald nachgewiesen wird, dass Inhalte urheberrechtlich geschützt sind, wird diese Website die fraglichen Inhalte sofort löschen.)