rabbitmq util
This commit is contained in:
		
							parent
							
								
									281c10ec3f
								
							
						
					
					
						commit
						6874f4d5ca
					
				
					 2 changed files with 113 additions and 42 deletions
				
			
		| 
						 | 
					@ -7,56 +7,44 @@ from threading import Thread
 | 
				
			||||||
import logging
 | 
					import logging
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
from core import processor
 | 
					from core import processor
 | 
				
			||||||
 | 
					from util import rabbit
 | 
				
			||||||
 | 
					
 | 
				
			||||||
logger = logging.getLogger(__name__)
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class MailConsumer(rabbit.Consumer):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def process_message(chan, method, properties, body):
 | 
					    def process(self, channel, method, properties, body):
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            topic = method.routing_key
 | 
				
			||||||
 | 
					            data = json.loads(body)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    try:
 | 
					            if topic == 'mail.message':
 | 
				
			||||||
        topic = method.routing_key
 | 
					                if "STACOSYS" in data['subject']:
 | 
				
			||||||
        data = json.loads(body)
 | 
					                    logger.info('new message => {}'.format(data))
 | 
				
			||||||
 | 
					                    processor.enqueue({'request': 'new_mail', 'data': data})
 | 
				
			||||||
        if topic == 'mail.message':
 | 
					                else:
 | 
				
			||||||
            if "STACOSYS" in data['subject']:
 | 
					                    logger.info('ignore message => {}'.format(data))
 | 
				
			||||||
                logger.info('new message => {}'.format(data))
 | 
					 | 
				
			||||||
                processor.enqueue({'request': 'new_mail', 'data': data})
 | 
					 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                logger.info('ignore message => {}'.format(data))
 | 
					                logger.warn('unsupported message [topic={}]'.format(topic))
 | 
				
			||||||
        else:
 | 
					        except:
 | 
				
			||||||
            logger.warn('unsupported message [topic={}]'.format(topic))
 | 
					            logger.exception('cannot process message')
 | 
				
			||||||
    except:
 | 
					 | 
				
			||||||
        logger.exception('cannot process message')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class MessageConsumer(Thread):
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def run(self):
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        credentials = pika.PlainCredentials(
 | 
					 | 
				
			||||||
            config.rabbitmq['username'], config.rabbitmq['password'])
 | 
					 | 
				
			||||||
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[
 | 
					 | 
				
			||||||
                                             'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost']))
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        channel = connection.channel()
 | 
					 | 
				
			||||||
        channel.exchange_declare(exchange=config.rabbitmq['exchange'],
 | 
					 | 
				
			||||||
                                 exchange_type='topic')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        result = channel.queue_declare(exclusive=True)
 | 
					 | 
				
			||||||
        queue_name = result.method.queue
 | 
					 | 
				
			||||||
        channel.queue_bind(exchange=config.rabbitmq['exchange'],
 | 
					 | 
				
			||||||
                           queue=queue_name,
 | 
					 | 
				
			||||||
                           routing_key='mail.message')
 | 
					 | 
				
			||||||
        channel.basic_consume(process_message,
 | 
					 | 
				
			||||||
                              queue=queue_name,
 | 
					 | 
				
			||||||
                              no_ack=True)
 | 
					 | 
				
			||||||
        channel.start_consuming()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def stop(self):
 | 
					 | 
				
			||||||
        self.loop = False
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def start():
 | 
					def start():
 | 
				
			||||||
    logger.info('start rmqclient')
 | 
					    logger.info('start rmqclient')
 | 
				
			||||||
    c = MessageConsumer()
 | 
					    #c = MessageConsumer()
 | 
				
			||||||
 | 
					    #c.start()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    credentials = pika.PlainCredentials(config.rabbitmq['username'], config.rabbitmq['password'])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    parameters = pika.ConnectionParameters(
 | 
				
			||||||
 | 
					        host=config.rabbitmq['host'], 
 | 
				
			||||||
 | 
					        port=config.rabbitmq['port'], 
 | 
				
			||||||
 | 
					        credentials=credentials, 
 | 
				
			||||||
 | 
					        virtual_host=config.rabbitmq['vhost']
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    connection = rabbit.Connection(parameters)
 | 
				
			||||||
 | 
					    c = MailConsumer(connection, config.rabbitmq['exchange'], 'mail.message')
 | 
				
			||||||
    c.start()
 | 
					    c.start()
 | 
				
			||||||
 | 
					    #print('exit rmqclient ' + str(c))
 | 
				
			||||||
							
								
								
									
										83
									
								
								app/util/rabbit.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										83
									
								
								app/util/rabbit.py
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
					@ -0,0 +1,83 @@
 | 
				
			||||||
 | 
					#!/usr/bin/env python
 | 
				
			||||||
 | 
					# -*- coding: utf-8 - *-
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import logging
 | 
				
			||||||
 | 
					import pika
 | 
				
			||||||
 | 
					import time
 | 
				
			||||||
 | 
					from threading import Thread
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					EXCHANGE_TYPE = "topic"
 | 
				
			||||||
 | 
					CONNECT_DELAY = 3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Connection:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __init__(self, connection_parameters):
 | 
				
			||||||
 | 
					        self._connection_parameters = connection_parameters
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def open(self):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self._connection = None
 | 
				
			||||||
 | 
					        while True:
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                self._connection = pika.BlockingConnection(
 | 
				
			||||||
 | 
					                    self._connection_parameters)
 | 
				
			||||||
 | 
					                break
 | 
				
			||||||
 | 
					            except:
 | 
				
			||||||
 | 
					                time.sleep(CONNECT_DELAY)
 | 
				
			||||||
 | 
					                logger.warn("rabbitmq connection failure. try again...")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def close(self):
 | 
				
			||||||
 | 
					        self._connection.close()
 | 
				
			||||||
 | 
					        self._connection = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def get(self):
 | 
				
			||||||
 | 
					        return self._connection
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Consumer(Thread):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    _connection = None
 | 
				
			||||||
 | 
					    _channel = None
 | 
				
			||||||
 | 
					    _queue_name = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __init__(self, connection, exchange_name, routing_key):
 | 
				
			||||||
 | 
					        Thread.__init__(self)
 | 
				
			||||||
 | 
					        self._connection = connection
 | 
				
			||||||
 | 
					        self._exchange_name = exchange_name
 | 
				
			||||||
 | 
					        self._routing_key = routing_key
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def configure(self):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self._connection = None
 | 
				
			||||||
 | 
					        self._channel = None
 | 
				
			||||||
 | 
					        while True:
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                self._channel = self._connection.channel()
 | 
				
			||||||
 | 
					                self._channel.exchange_declare(
 | 
				
			||||||
 | 
					                    exchange=self._exchange_name, exchange_type=EXCHANGE_TYPE
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                result = self._channel.queue_declare(exclusive=True)
 | 
				
			||||||
 | 
					                self._queue_name = result.method.queue
 | 
				
			||||||
 | 
					                self._channel.queue_bind(
 | 
				
			||||||
 | 
					                    exchange=self._exchange_name,
 | 
				
			||||||
 | 
					                    queue=self._queue_name,
 | 
				
			||||||
 | 
					                    routing_key=self._routing_key,
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                break
 | 
				
			||||||
 | 
					            except:
 | 
				
			||||||
 | 
					                time.sleep(CONNECT_DELAY)
 | 
				
			||||||
 | 
					                logger.warn("connection failure. try again...")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def run(self):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.configure()
 | 
				
			||||||
 | 
					        self._channel.basic_consume(
 | 
				
			||||||
 | 
					            self.process, queue=self._queue_name, no_ack=True)
 | 
				
			||||||
 | 
					        self._channel.start_consuming()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def process(self, channel, method, properties, body):
 | 
				
			||||||
 | 
					        raise NotImplemented
 | 
				
			||||||
		Loading…
	
	Add table
		
		Reference in a new issue