English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

python Queue Communication: Verwendung von RabbitMQ (Beispiel Erklärung)

(I) Einleitung

Warum eine Nachrichtenqueuer einführen?

1.Entkoppeln Sie das Programm

2.Steigern Sie die Leistung

3.Verringern Sie die Komplexität mehrerer Geschäftslogiken

(II) Python-Operationen an RabbitMQ

Die Konfiguration und Installation von RabbitMQ sowie die grundlegende Verwendung sind im vorherigen Artikel beschrieben und werden hier nicht nochmal erwähnt.

Um RabbitMQ mit Python zu verwenden, muss das pika-Modul installiert werden. Installieren Sie es direkt mit pip:

pip install pika

1.Die einfachste Kommunikation zwischen RabbitMQ-Producer und -Consumer:

producer:

#Author :ywq
import pika
auth = pika.PlainCredentials('ywq', 'qwe') #save auth info
connection = pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth)) #Mit Rabbit verbinden
channel = connection.channel() #create channel
channel.queue_declare(queue='hello') #Queue erklären
#n Eine Nachricht kann in RabbitMQ niemals direkt an die Queue gesendet werden, sie muss immer durch einen Exchange gehen.
channel.basic_publish(exchange='',
   routing_key='hello',
   body='Hello World!') #Der Inhalt ist die Nachrichten-Information
print(" [x] Gesendet 'Hello World!'")
connection.close()

consumer:

#Author :ywq
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth)) #Mit Rabbit verbinden
channel = connection.channel()  #Channel erstellen
channel.queue_declare(queue='hello') #Queue erklären
def callback(ch, method, properties, body):
 print(" [x] Empfangen %r" % body)
channel.basic_consume(callback,
   queue='hello',
   no_ack=True)
print(' [*] Wartet auf Nachrichten. Zum Beenden drücken Sie STRG+C')
channel.start_consuming()

Während des Nachrichten-Übersendungskonsums können Sie die Nachrichten-Queue-Informationen in der Rabbit Web-Management-Seite in Echtzeit ansehen.

2. Persistente Nachrichten-Queue, um den Verlust der Nachrichten-Queue durch Ausfall oder andere unerwartete Situationen zu vermeiden.

Consumer-Ende muss nicht geändert werden, fügen Sie im Producer-Ende-Code zwei Eigenschaften hinzu, um die Nachrichten- und Queue-Persistence zu aktivieren, nur eine davon auswählen führt zu Nachrichtenverlust, beide müssen gleichzeitig aktiviert werden:

delivery_mode=2 # make msg persistent
durable=True

Eigenschafts-Einfügestelle siehe folgenden Code (Producer-Ende):

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth_info
 ))
channel=connection.channel()
channel.queue_declare(queue='test1#durable=True, machen Queue persistent
msg=''.join(sys.argv[1:]) or 'Hello'
channel.basic_publish(
 exchange='',
 routing_key='test1',
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2 # make msg persistent
 )
)
print('Send done:',msg)
connection.close()

3.Gerechte Verteilung

Bei mehreren Consumers ist Rabbit standardmäßig im Rundenmodus, aber einige Consumer verbrauchen schneller, einige langsamer. Um eine ausgewogenere Ressourcennutzung zu erreichen, wird ein Acknowledgment-Prüfmechanismus eingeführt. Nachdem der Consumer eine Nachricht verbraucht hat, sendet er eine Acknowledgment an Rabbit. Wenn die Anzahl der nicht akzeptierten Nachrichten über die festgelegte zulässige Anzahl hinausgeht, wird der Consumer nicht mehr Nachrichten gesendet und Nachrichten werden an andere Consumer gesendet.

producer端代码不用改变,需要给consumer端代码插入两个属性:

channel.basic_qos(prefetch_count= *) # define the max non_ack_count
channel.basic_ack(delivery_tag=deliver.delivery_tag) # send ack to rabbitmq

属性插入位置见如下代码(consumer端):

#Author :ywq
import pika,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.queue_declare(queue='test2',durable=True)
def callback(chann,deliver,properties,body):
 print('Recv:',body)
 time.sleep(5)
 chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit
channel.basic_qos(prefetch_count=1)
'''
Hinweis, no_ack=False Hinweis, der no_ack-Typ hier ist nur dazu da, Rabbit zu sagen, ob dieser Consumer-Queue eine Acknowledgment zurückgibt. Wenn ein Acknowledgment zurückgegeben werden soll, muss es im Callback definiert werden
prefetch_count=1,未ack的msg数量超过1件,则此consumer不再接受msg,此配置需写在channel.basic_consume上方,否则会造成non_ack情况出现。
'''
channel.basic_consume(
 callback,
 queue='test2'
)
channel.start_consuming()

Drei, Nachricht veröffentlichen/Abonnement

Obige Modelle sind so, dass der Producer eine Nachricht sendet und der Consumer eine Nachricht empfängt. Können wir erreichen, dass ein Producer eine Nachricht sendet und mehrere verbundene Consumer gleichzeitig Nachrichten empfangen? Natürlich, Rabbit unterstützt Nachrichtenveröffentlichung und Abonnement und unterstützt drei Modelle, die durch den Komponenten Exchange-Forwarder realisiert werden3三种模式:

fanout: Alle Queues, die an diesen Exchange gebunden sind, können Nachrichten empfangen, ähnlich einem Broadcast.

direct: Der durch den routingKey und den exchange bestimmte eindeutige Queue, die Nachrichten empfangen kann, wird an den Consumer gesendet, der diese Queue gebunden hat, ähnlich einem Multicast.

topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息,类似前缀列表匹配路由。

1.fanout

publish端(producer):

#Author :ywq
import pika,sys,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='hello',
    exchange_type='fanout'
    )
msg=''.join(sys.argv[1:]) or 'Hello world %s' %time.time()
channel.basic_publish(
 exchange='hello',
 routing_key='',
 body=msg,
 properties=pika.BasicProperties(
 delivery_mode=2
 )
)
print('send done')
connection.close()

subscribe端(consumer):

#Author :ywq
import pika
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(
 exchange='hello',
 exchange_type='fanout'
)
random_num=channel.queue_declare(exclusive=True) #随机与rabbit建立一个queue,comsumer断开后,该queue立即删除释放
queue_name=random_num.method.queue
channel.basic_qos(prefetch_count=1)
channel.queue_bind(
 queue=queue_name,
 exchange='hello'
)
def callback(chann,deliver,properties,body):
 print('Recv:',body)
 chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit
channel.basic_consume(
 callback,
 queue=queue_name,
)
channel.start_consuming()

实现producer一次发送,多个关联consumer接收。

使用exchange模式时:

1.producer端不再声明queue,直接声明exchange

2.consumer端仍需绑定队列并指定exchange来接收message

3.consumer最好创建随机queue,使用完后立即释放。

随机队列名在web下可以检测到:

2.direct

使用exchange同时consumer有选择性地接收消息。队列绑定关键字,producer将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列,consumer相应接收。即在fanout基础上增加了routing key.

producer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='direct_log',
   exchange_type='direct',
   )
while True:
 route_key=input('Geben Sie den Routing Key ein:')
 msg=''.join(sys.argv[1:]) or 'Hello'
 channel.basic_publish(
 exchange='direct_log',
 routing_key=route_key,
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2
 )
 )
connection.close()

consumer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
 exchange='direct_log',
 exchange_type='direct',
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Geben Sie den Routing Key ein:')
channel.queue_bind(
 queue=queue_name,
 exchange='direct_log',
 routing_key=route_key
)
def callback(chann,deliver,property,body):
 print('Recv:[level:%s],[msg:%s]' %(route_key,body))
 chann.basic_ack(delivery_tag=deliver.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
 callback,
 queue=queue_name
)
channel.start_consuming()

Mehrere consumer gleichzeitig aktivieren, davon zwei empfangen notice, zwei empfangen warning, die Ausgabe sieht so aus:

3.topic

Im Gegensatz zu direct ermöglicht topic eine verschachtelte Match-Methode (bei der consumer-seitigen Angabe der Match-Methode), sodass eine Nachricht an die gebundene Queue gesendet wird, wenn der routing key die angegebenen Schlüssel enthält.

RabbitMQ Wildcard-Regeln:

Das Zeichen ‘#’ matcht einen oder mehrere Wörter, das Leerzeichen matcht ein Wort. Daher kann ‘abc.#’ ‘abc.m.n’ matchen, aber ‘abc.' nicht.*‘'Nur 'abc.m' wird gematcht. ‘.’ als Trennzeichen. Bei der Verwendung von Wildcards muss ‘.’ als Trennzeichen verwendet werden.

producer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='topic_log',
   exchange_type='topic',
   )
while True:
 route_key=input('Geben Sie den Routing Key ein:')
 msg=''.join(sys.argv[1:]) or 'Hello'
 channel.basic_publish(
 exchange='topic_log',
 routing_key=route_key,
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2
 )
 )
connection.close()

consumer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
 exchange='topic_log',
 exchange_type='topic'
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Geben Sie den Routing Key ein:')
channel.queue_bind(
 queue=queue_name,
 exchange='topic_log',
 routing_key=route_key
)
def callback(chann,deliver,property,body):
 print('Recv:[type:%s],[msg:%s]' %(route_key,body))
 chann.basic_ack(delivery_tag=deliver.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
 callback,
 queue=queue_name
)
channel.start_consuming()

Laufende Ergebnisse:

Drei Arten von rabbitmq-publish/Die einführende Einführung in das subscribe-Modell ist abgeschlossen.

Dieser Artikel über die Verwendung von rabbitMQ in der Python-Queue-Kommunikation (Beispiel-Übersicht) ist alles, was der Redakteur weitergeben möchte. Hoffentlich bietet es Ihnen eine Referenz und wir hoffen, dass Sie die呐喊教程大力支持.

Erklärung: Der Inhalt dieses Artikels wurde aus dem Internet übernommen und gehört dem Urheberrecht des jeweiligen Autors. Der Inhalt wurde von Internetnutzern freiwillig bereitgestellt und hochgeladen. Diese Website besitzt keine Eigentumsrechte und hat den Inhalt nicht manuell bearbeitet. Sie übernimmt auch keine rechtlichen Verantwortlichkeiten. Wenn Sie Inhalte finden, die möglicherweise gegen das Urheberrecht verstoßen, freuen wir uns über eine E-Mail an: notice#w3Hinweis: Bitte ersetzen Sie bei der E-Mail-Adresse das # durch @ und geben Sie eine Beschwerde ein, sowie relevante Beweise. Sobald überprüft, wird die Website die fraglichen Inhalte sofort löschen.

Vermutlich gefällt Ihnen