diff --git a/app/conf/config.py b/app/conf/config.py index 7ca283f..3a1213c 100644 --- a/app/conf/config.py +++ b/app/conf/config.py @@ -1,6 +1,3 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# TODO move to JSON config - -zmq = {'pub_port': 7701, 'sub_port':7702} \ No newline at end of file diff --git a/app/conf/schema.py b/app/conf/schema.py index bcd7d15..8cca55f 100644 --- a/app/conf/schema.py +++ b/app/conf/schema.py @@ -24,16 +24,16 @@ json_schema = """ "rss": { "$ref": "#/definitions/RSS" }, - "zmq": { - "$ref": "#/definitions/Zmq" + "rabbitmq": { + "$ref": "#/definitions/Rabbitmq" } }, "required": [ "general", "http", + "rabbitmq", "rss", - "security", - "zmq" + "security" ], "title": "stacosys" }, @@ -79,6 +79,43 @@ json_schema = """ ], "title": "http" }, + "Rabbitmq": { + "type": "object", + "additionalProperties": false, + "properties": { + "active": { + "type": "boolean" + }, + "host": { + "type": "string" + }, + "port": { + "type": "integer" + }, + "username": { + "type": "string" + }, + "password": { + "type": "string" + }, + "vhost": { + "type": "string" + }, + "exchange": { + "type": "string" + } + }, + "required": [ + "active", + "exchange", + "host", + "password", + "port", + "username", + "vhost" + ], + "title": "rabbitmq" + }, "RSS": { "type": "object", "additionalProperties": false, @@ -116,27 +153,6 @@ json_schema = """ "secret" ], "title": "security" - }, - "Zmq": { - "type": "object", - "additionalProperties": false, - "properties": { - "host": { - "type": "string" - }, - "pub_port": { - "type": "integer" - }, - "sub_port": { - "type": "integer" - } - }, - "required": [ - "host", - "pub_port", - "sub_port" - ], - "title": "zmq" } } } diff --git a/app/core/__init__.py b/app/core/__init__.py index a85d304..8a6cccb 100644 --- a/app/core/__init__.py +++ b/app/core/__init__.py @@ -25,7 +25,7 @@ import processor from interface import api from interface import form from interface import report -from interface import zclient +from interface import rmqclient # configure logging def configure_logging(level): @@ -50,7 +50,7 @@ logger = logging.getLogger(__name__) database.setup() # start broker client -zclient.start() +rmqclient.start() # start processor template_path = os.path.abspath(os.path.join(current_path, '../templates')) diff --git a/app/core/processor.py b/app/core/processor.py index 3f5f2fc..7c92dd4 100644 --- a/app/core/processor.py +++ b/app/core/processor.py @@ -18,7 +18,7 @@ import json from conf import config import PyRSS2Gen import markdown -import zmq +import pika logger = logging.getLogger(__name__) queue = Queue() @@ -26,11 +26,6 @@ proc = None env = None -context = zmq.Context() -zpub = context.socket(zmq.PUB) -zpub.connect('tcp://127.0.0.1:{}'.format(config.zmq['sub_port'])) - - class Processor(Thread): def stop(self): @@ -142,7 +137,13 @@ def reply_comment_email(data): token = m.group(2) # retrieve site and comment rows - comment = Comment.select().where(Comment.id == comment_id).get() + try: + comment = Comment.select().where(Comment.id == comment_id).get() + except: + logger.warn('unknown comment %d' % comment_id) + send_delete_command(data) + return + if comment.site.token != token: logger.warn('ignore corrupted email. Unknown token %d' % comment_id) return @@ -152,7 +153,7 @@ def reply_comment_email(data): return # accept email: request to delete - send_deletion_order(data) + send_delete_command(data) # safe logic: no answer or unknown answer is a go for publishing if message[:2].upper() == 'NO': @@ -394,26 +395,37 @@ def rss(token, onstart=False): rss.write_xml(open(config.rss['file'], 'w'), encoding='utf-8') +def get_rmq_channel(): + credentials = pika.PlainCredentials( + config.rabbitmq['username'], config.rabbitmq['password']) + connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[ + 'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost'])) + channel = connection.channel() + return channel + + def mail(to_email, subject, message): - zmsg = { - 'topic': 'email:send', + body = { 'to': to_email, 'subject': subject, 'content': message } - - # TODO test broker failure and find alternative - zpub.send_string(json.dumps(zmsg, indent=False, sort_keys=False)) + channel = get_rmq_channel() + channel.basic_publish(exchange=config.rabbitmq['exchange'], + routing_key='mail.command.send', + body=json.dumps(body, indent=False, sort_keys=False)) logger.debug('Email for %s posted' % to_email) - #logger.warn('Cannot post email for %s' % to_email) -def send_deletion_order(zmsg): - zmsg['topic'] = 'email:delete' - zpub.send_string(json.dumps(zmsg, indent=False, sort_keys=False)) - logger.debug('Email accepted. Deletion request sent for %s' % zmsg) +def send_delete_command(content): + + channel = get_rmq_channel() + channel.basic_publish(exchange=config.rabbitmq['exchange'], + routing_key='mail.command.delete', + body=json.dumps(content, indent=False, sort_keys=False)) + logger.debug('Email accepted. Delete request sent for %s' % content) def get_template(name): diff --git a/app/interface/rmqclient.py b/app/interface/rmqclient.py new file mode 100644 index 0000000..91857a6 --- /dev/null +++ b/app/interface/rmqclient.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import pika +from conf import config +from threading import Thread +import logging +import json +from core import processor + +logger = logging.getLogger(__name__) + + +def process_message(chan, method, properties, body): + topic = method.routing_key + data = json.loads(body) + + if topic == 'mail.message': + logger.info('new message => {}'.format(data)) + processor.enqueue({'request': 'new_mail', 'data': data}) + else: + logger.warn('unsupported message [topic={}]'.format(topic)) + + +class MessageConsumer(Thread): + + def run(self): + + credentials = pika.PlainCredentials( + config.rabbitmq['username'], config.rabbitmq['password']) + connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[ + 'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost'])) + + channel = connection.channel() + channel.exchange_declare(exchange=config.rabbitmq['exchange'], + exchange_type='topic') + + result = channel.queue_declare(exclusive=True) + queue_name = result.method.queue + channel.queue_bind(exchange=config.rabbitmq['exchange'], + queue=queue_name, + routing_key='mail.message') + channel.basic_consume(process_message, + queue=queue_name, + no_ack=True) + channel.start_consuming() + + def stop(self): + self.loop = False + + +def start(): + logger.info('start rmqclient') + c = MessageConsumer() + c.start() diff --git a/app/interface/zclient.py b/app/interface/zclient.py deleted file mode 100644 index 3650b79..0000000 --- a/app/interface/zclient.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import zmq -from conf import config -from threading import Thread -import logging -import json -from core import processor - -logger = logging.getLogger(__name__) - -context = zmq.Context() - - -def process(message): - data = json.loads(message) - if data['topic'] == 'email:mail': - logger.info('newmail => {}'.format(data)) - processor.enqueue({'request': 'new_mail', 'data': data}) - - -class Consumer(Thread): - - def run(self): - zsub = context.socket(zmq.SUB) - zsub.connect('tcp://127.0.0.1:{}'.format(config.zmq['pub_port'])) - zsub.setsockopt_string(zmq.SUBSCRIBE, '') - self.loop = True - while self.loop: - message = zsub.recv() - try: - process(message) - except: - logger.exception('cannot process broker message') - - def stop(self): - self.loop = False - - -def start(): - logger.info('start zclient') - c = Consumer() - c.start() diff --git a/app/stacosys.py b/app/stacosys.py index 36d3553..d1cfec0 100644 --- a/app/stacosys.py +++ b/app/stacosys.py @@ -28,7 +28,7 @@ def stacosys_server(config_pathname): config.http = conf['http'] config.security = conf['security'] config.rss = conf['rss'] - config.zmq = conf['zmq'] + config.rabbitmq = conf['rabbitmq'] # start application from core import app diff --git a/config.json b/config.json index 0a2ec66..fac65d0 100644 --- a/config.json +++ b/config.json @@ -1,6 +1,6 @@ { "general" : { - "debug": true, + "debug": false, "lang": "fr", "db_url": "sqlite:///db.sqlite" }, @@ -18,9 +18,13 @@ "proto": "http", "file": "comments.xml" }, - "zmq": { + "rabbitmq": { + "active": true, "host": "127.0.0.1", - "pub_port": 7701, - "sub_port": 7702 + "port": 5672, + "username": "techuser", + "password": "tech", + "vhost": "devhub", + "exchange": "hub.topic" } } diff --git a/requirements.txt b/requirements.txt index c5fd0c1..992beb2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,8 +12,8 @@ Markdown==2.6.11 MarkupSafe==1.0 od==1.0 peewee==2.10.2 +pika==0.11.2 PyRSS2Gen==1.1 -pyzmq==16.0.3 sigtools==2.0.1 six==1.11.0 Werkzeug==0.14.1