use rabbitmq
This commit is contained in:
parent
feb280ed8c
commit
e0c9f335fc
9 changed files with 138 additions and 98 deletions
|
@ -1,6 +1,3 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
# TODO move to JSON config
|
|
||||||
|
|
||||||
zmq = {'pub_port': 7701, 'sub_port':7702}
|
|
|
@ -24,16 +24,16 @@ json_schema = """
|
||||||
"rss": {
|
"rss": {
|
||||||
"$ref": "#/definitions/RSS"
|
"$ref": "#/definitions/RSS"
|
||||||
},
|
},
|
||||||
"zmq": {
|
"rabbitmq": {
|
||||||
"$ref": "#/definitions/Zmq"
|
"$ref": "#/definitions/Rabbitmq"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"required": [
|
"required": [
|
||||||
"general",
|
"general",
|
||||||
"http",
|
"http",
|
||||||
|
"rabbitmq",
|
||||||
"rss",
|
"rss",
|
||||||
"security",
|
"security"
|
||||||
"zmq"
|
|
||||||
],
|
],
|
||||||
"title": "stacosys"
|
"title": "stacosys"
|
||||||
},
|
},
|
||||||
|
@ -79,6 +79,43 @@ json_schema = """
|
||||||
],
|
],
|
||||||
"title": "http"
|
"title": "http"
|
||||||
},
|
},
|
||||||
|
"Rabbitmq": {
|
||||||
|
"type": "object",
|
||||||
|
"additionalProperties": false,
|
||||||
|
"properties": {
|
||||||
|
"active": {
|
||||||
|
"type": "boolean"
|
||||||
|
},
|
||||||
|
"host": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"port": {
|
||||||
|
"type": "integer"
|
||||||
|
},
|
||||||
|
"username": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"password": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"vhost": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"exchange": {
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"active",
|
||||||
|
"exchange",
|
||||||
|
"host",
|
||||||
|
"password",
|
||||||
|
"port",
|
||||||
|
"username",
|
||||||
|
"vhost"
|
||||||
|
],
|
||||||
|
"title": "rabbitmq"
|
||||||
|
},
|
||||||
"RSS": {
|
"RSS": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"additionalProperties": false,
|
"additionalProperties": false,
|
||||||
|
@ -116,27 +153,6 @@ json_schema = """
|
||||||
"secret"
|
"secret"
|
||||||
],
|
],
|
||||||
"title": "security"
|
"title": "security"
|
||||||
},
|
|
||||||
"Zmq": {
|
|
||||||
"type": "object",
|
|
||||||
"additionalProperties": false,
|
|
||||||
"properties": {
|
|
||||||
"host": {
|
|
||||||
"type": "string"
|
|
||||||
},
|
|
||||||
"pub_port": {
|
|
||||||
"type": "integer"
|
|
||||||
},
|
|
||||||
"sub_port": {
|
|
||||||
"type": "integer"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"required": [
|
|
||||||
"host",
|
|
||||||
"pub_port",
|
|
||||||
"sub_port"
|
|
||||||
],
|
|
||||||
"title": "zmq"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ import processor
|
||||||
from interface import api
|
from interface import api
|
||||||
from interface import form
|
from interface import form
|
||||||
from interface import report
|
from interface import report
|
||||||
from interface import zclient
|
from interface import rmqclient
|
||||||
|
|
||||||
# configure logging
|
# configure logging
|
||||||
def configure_logging(level):
|
def configure_logging(level):
|
||||||
|
@ -50,7 +50,7 @@ logger = logging.getLogger(__name__)
|
||||||
database.setup()
|
database.setup()
|
||||||
|
|
||||||
# start broker client
|
# start broker client
|
||||||
zclient.start()
|
rmqclient.start()
|
||||||
|
|
||||||
# start processor
|
# start processor
|
||||||
template_path = os.path.abspath(os.path.join(current_path, '../templates'))
|
template_path = os.path.abspath(os.path.join(current_path, '../templates'))
|
||||||
|
|
|
@ -18,7 +18,7 @@ import json
|
||||||
from conf import config
|
from conf import config
|
||||||
import PyRSS2Gen
|
import PyRSS2Gen
|
||||||
import markdown
|
import markdown
|
||||||
import zmq
|
import pika
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
|
@ -26,11 +26,6 @@ proc = None
|
||||||
env = None
|
env = None
|
||||||
|
|
||||||
|
|
||||||
context = zmq.Context()
|
|
||||||
zpub = context.socket(zmq.PUB)
|
|
||||||
zpub.connect('tcp://127.0.0.1:{}'.format(config.zmq['sub_port']))
|
|
||||||
|
|
||||||
|
|
||||||
class Processor(Thread):
|
class Processor(Thread):
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
@ -142,7 +137,13 @@ def reply_comment_email(data):
|
||||||
token = m.group(2)
|
token = m.group(2)
|
||||||
|
|
||||||
# retrieve site and comment rows
|
# retrieve site and comment rows
|
||||||
comment = Comment.select().where(Comment.id == comment_id).get()
|
try:
|
||||||
|
comment = Comment.select().where(Comment.id == comment_id).get()
|
||||||
|
except:
|
||||||
|
logger.warn('unknown comment %d' % comment_id)
|
||||||
|
send_delete_command(data)
|
||||||
|
return
|
||||||
|
|
||||||
if comment.site.token != token:
|
if comment.site.token != token:
|
||||||
logger.warn('ignore corrupted email. Unknown token %d' % comment_id)
|
logger.warn('ignore corrupted email. Unknown token %d' % comment_id)
|
||||||
return
|
return
|
||||||
|
@ -152,7 +153,7 @@ def reply_comment_email(data):
|
||||||
return
|
return
|
||||||
|
|
||||||
# accept email: request to delete
|
# accept email: request to delete
|
||||||
send_deletion_order(data)
|
send_delete_command(data)
|
||||||
|
|
||||||
# safe logic: no answer or unknown answer is a go for publishing
|
# safe logic: no answer or unknown answer is a go for publishing
|
||||||
if message[:2].upper() == 'NO':
|
if message[:2].upper() == 'NO':
|
||||||
|
@ -394,26 +395,37 @@ def rss(token, onstart=False):
|
||||||
rss.write_xml(open(config.rss['file'], 'w'), encoding='utf-8')
|
rss.write_xml(open(config.rss['file'], 'w'), encoding='utf-8')
|
||||||
|
|
||||||
|
|
||||||
|
def get_rmq_channel():
|
||||||
|
credentials = pika.PlainCredentials(
|
||||||
|
config.rabbitmq['username'], config.rabbitmq['password'])
|
||||||
|
connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[
|
||||||
|
'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost']))
|
||||||
|
channel = connection.channel()
|
||||||
|
return channel
|
||||||
|
|
||||||
|
|
||||||
def mail(to_email, subject, message):
|
def mail(to_email, subject, message):
|
||||||
|
|
||||||
zmsg = {
|
body = {
|
||||||
'topic': 'email:send',
|
|
||||||
'to': to_email,
|
'to': to_email,
|
||||||
'subject': subject,
|
'subject': subject,
|
||||||
'content': message
|
'content': message
|
||||||
}
|
}
|
||||||
|
channel = get_rmq_channel()
|
||||||
# TODO test broker failure and find alternative
|
channel.basic_publish(exchange=config.rabbitmq['exchange'],
|
||||||
zpub.send_string(json.dumps(zmsg, indent=False, sort_keys=False))
|
routing_key='mail.command.send',
|
||||||
|
body=json.dumps(body, indent=False, sort_keys=False))
|
||||||
logger.debug('Email for %s posted' % to_email)
|
logger.debug('Email for %s posted' % to_email)
|
||||||
|
|
||||||
#logger.warn('Cannot post email for %s' % to_email)
|
#logger.warn('Cannot post email for %s' % to_email)
|
||||||
|
|
||||||
|
|
||||||
def send_deletion_order(zmsg):
|
def send_delete_command(content):
|
||||||
zmsg['topic'] = 'email:delete'
|
|
||||||
zpub.send_string(json.dumps(zmsg, indent=False, sort_keys=False))
|
channel = get_rmq_channel()
|
||||||
logger.debug('Email accepted. Deletion request sent for %s' % zmsg)
|
channel.basic_publish(exchange=config.rabbitmq['exchange'],
|
||||||
|
routing_key='mail.command.delete',
|
||||||
|
body=json.dumps(content, indent=False, sort_keys=False))
|
||||||
|
logger.debug('Email accepted. Delete request sent for %s' % content)
|
||||||
|
|
||||||
|
|
||||||
def get_template(name):
|
def get_template(name):
|
||||||
|
|
55
app/interface/rmqclient.py
Normal file
55
app/interface/rmqclient.py
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import pika
|
||||||
|
from conf import config
|
||||||
|
from threading import Thread
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from core import processor
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def process_message(chan, method, properties, body):
|
||||||
|
topic = method.routing_key
|
||||||
|
data = json.loads(body)
|
||||||
|
|
||||||
|
if topic == 'mail.message':
|
||||||
|
logger.info('new message => {}'.format(data))
|
||||||
|
processor.enqueue({'request': 'new_mail', 'data': data})
|
||||||
|
else:
|
||||||
|
logger.warn('unsupported message [topic={}]'.format(topic))
|
||||||
|
|
||||||
|
|
||||||
|
class MessageConsumer(Thread):
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
|
||||||
|
credentials = pika.PlainCredentials(
|
||||||
|
config.rabbitmq['username'], config.rabbitmq['password'])
|
||||||
|
connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[
|
||||||
|
'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost']))
|
||||||
|
|
||||||
|
channel = connection.channel()
|
||||||
|
channel.exchange_declare(exchange=config.rabbitmq['exchange'],
|
||||||
|
exchange_type='topic')
|
||||||
|
|
||||||
|
result = channel.queue_declare(exclusive=True)
|
||||||
|
queue_name = result.method.queue
|
||||||
|
channel.queue_bind(exchange=config.rabbitmq['exchange'],
|
||||||
|
queue=queue_name,
|
||||||
|
routing_key='mail.message')
|
||||||
|
channel.basic_consume(process_message,
|
||||||
|
queue=queue_name,
|
||||||
|
no_ack=True)
|
||||||
|
channel.start_consuming()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.loop = False
|
||||||
|
|
||||||
|
|
||||||
|
def start():
|
||||||
|
logger.info('start rmqclient')
|
||||||
|
c = MessageConsumer()
|
||||||
|
c.start()
|
|
@ -1,44 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
import zmq
|
|
||||||
from conf import config
|
|
||||||
from threading import Thread
|
|
||||||
import logging
|
|
||||||
import json
|
|
||||||
from core import processor
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
context = zmq.Context()
|
|
||||||
|
|
||||||
|
|
||||||
def process(message):
|
|
||||||
data = json.loads(message)
|
|
||||||
if data['topic'] == 'email:mail':
|
|
||||||
logger.info('newmail => {}'.format(data))
|
|
||||||
processor.enqueue({'request': 'new_mail', 'data': data})
|
|
||||||
|
|
||||||
|
|
||||||
class Consumer(Thread):
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
zsub = context.socket(zmq.SUB)
|
|
||||||
zsub.connect('tcp://127.0.0.1:{}'.format(config.zmq['pub_port']))
|
|
||||||
zsub.setsockopt_string(zmq.SUBSCRIBE, '')
|
|
||||||
self.loop = True
|
|
||||||
while self.loop:
|
|
||||||
message = zsub.recv()
|
|
||||||
try:
|
|
||||||
process(message)
|
|
||||||
except:
|
|
||||||
logger.exception('cannot process broker message')
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.loop = False
|
|
||||||
|
|
||||||
|
|
||||||
def start():
|
|
||||||
logger.info('start zclient')
|
|
||||||
c = Consumer()
|
|
||||||
c.start()
|
|
|
@ -28,7 +28,7 @@ def stacosys_server(config_pathname):
|
||||||
config.http = conf['http']
|
config.http = conf['http']
|
||||||
config.security = conf['security']
|
config.security = conf['security']
|
||||||
config.rss = conf['rss']
|
config.rss = conf['rss']
|
||||||
config.zmq = conf['zmq']
|
config.rabbitmq = conf['rabbitmq']
|
||||||
|
|
||||||
# start application
|
# start application
|
||||||
from core import app
|
from core import app
|
||||||
|
|
12
config.json
12
config.json
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"general" : {
|
"general" : {
|
||||||
"debug": true,
|
"debug": false,
|
||||||
"lang": "fr",
|
"lang": "fr",
|
||||||
"db_url": "sqlite:///db.sqlite"
|
"db_url": "sqlite:///db.sqlite"
|
||||||
},
|
},
|
||||||
|
@ -18,9 +18,13 @@
|
||||||
"proto": "http",
|
"proto": "http",
|
||||||
"file": "comments.xml"
|
"file": "comments.xml"
|
||||||
},
|
},
|
||||||
"zmq": {
|
"rabbitmq": {
|
||||||
|
"active": true,
|
||||||
"host": "127.0.0.1",
|
"host": "127.0.0.1",
|
||||||
"pub_port": 7701,
|
"port": 5672,
|
||||||
"sub_port": 7702
|
"username": "techuser",
|
||||||
|
"password": "tech",
|
||||||
|
"vhost": "devhub",
|
||||||
|
"exchange": "hub.topic"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,8 +12,8 @@ Markdown==2.6.11
|
||||||
MarkupSafe==1.0
|
MarkupSafe==1.0
|
||||||
od==1.0
|
od==1.0
|
||||||
peewee==2.10.2
|
peewee==2.10.2
|
||||||
|
pika==0.11.2
|
||||||
PyRSS2Gen==1.1
|
PyRSS2Gen==1.1
|
||||||
pyzmq==16.0.3
|
|
||||||
sigtools==2.0.1
|
sigtools==2.0.1
|
||||||
six==1.11.0
|
six==1.11.0
|
||||||
Werkzeug==0.14.1
|
Werkzeug==0.14.1
|
||||||
|
|
Loading…
Add table
Reference in a new issue