fetch emails through Zero MQ

This commit is contained in:
Yax 2018-01-16 18:49:19 +01:00
parent dd7ab75460
commit 2c5b63fcf5
10 changed files with 62 additions and 27 deletions

89
app/interface/api.py Normal file
View file

@ -0,0 +1,89 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import config
from flask import request, jsonify, abort
from app import app
from app.models.site import Site
from app.models.comment import Comment
from app.services import processor
logger = logging.getLogger(__name__)
@app.route("/comments", methods=['GET'])
def query_comments():
comments = []
try:
token = request.args.get('token', '')
url = request.args.get('url', '')
logger.info('retrieve comments for token %s, url %s' % (token, url))
for comment in Comment.select(Comment).join(Site).where(
(Comment.url == url) &
(Comment.published.is_null(False)) &
(Site.token == token)).order_by(+Comment.published):
d = {}
d['author'] = comment.author_name
d['content'] = comment.content
if comment.author_site:
d['site'] = comment.author_site
d['avatar'] = comment.author_gravatar
d['date'] = comment.published.strftime("%Y-%m-%d %H:%M:%S")
logger.debug(d)
comments.append(d)
r = jsonify({'data': comments})
r.status_code = 200
except:
logger.warn('bad request')
r = jsonify({'data': []})
r.status_code = 400
return r
@app.route("/comments/count", methods=['GET'])
def get_comments_count():
try:
token = request.args.get('token', '')
url = request.args.get('url', '')
count = Comment.select(Comment).join(Site).where(
(Comment.url == url) &
(Comment.published.is_null(False)) &
(Site.token == token)).count()
r = jsonify({'count': count})
r.status_code = 200
except:
r = jsonify({'count': 0})
r.status_code = 200
return r
@app.route("/comments", methods=['POST'])
def new_comment():
try:
data = request.get_json()
logger.info(data)
# validate token: retrieve site entity
token = data.get('token', '')
site = Site.select().where(Site.token == token).get()
if site is None:
logger.warn('Unknown site %s' % token)
abort(400)
# honeypot for spammers
captcha = data.get('captcha', '')
if captcha:
logger.warn('discard spam: data %s' % data)
abort(400)
processor.enqueue({'request': 'new_comment', 'data': data})
except:
logger.exception("new comment failure")
abort(400)
return "OK"

41
app/interface/form.py Normal file
View file

@ -0,0 +1,41 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import config
from flask import request, jsonify, abort, redirect
from app import app
from app.models.site import Site
from app.models.comment import Comment
from app.helpers.hashing import md5
from app.services import processor
logger = logging.getLogger(__name__)
@app.route("/newcomment", methods=['POST'])
def new_form_comment():
try:
data = request.form
logger.info(data)
# validate token: retrieve site entity
token = data.get('token', '')
site = Site.select().where(Site.token == token).get()
if site is None:
logger.warn('Unknown site %s' % token)
abort(400)
# honeypot for spammers
captcha = data.get('captcha', '')
if captcha:
logger.warn('discard spam: data %s' % data)
abort(400)
processor.enqueue({'request': 'new_comment', 'data': data})
except:
logger.exception("new comment failure")
abort(400)
return redirect('/redirect/', code=302)

25
app/interface/mail.py Normal file
View file

@ -0,0 +1,25 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from flask import request, abort
from app import app
from app.services import processor
logger = logging.getLogger(__name__)
@app.route("/inbox", methods=['POST'])
def new_mail():
try:
data = request.get_json()
logger.debug(data)
processor.enqueue({'request': 'new_mail', 'data': data})
except:
logger.exception("new mail failure")
abort(400)
return "OK"

29
app/interface/reader.py Normal file
View file

@ -0,0 +1,29 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from flask import request, abort
from app import app
from app.services import processor
logger = logging.getLogger(__name__)
@app.route("/unsubscribe", methods=['GET'])
def unsubscribe():
try:
data = {
'token': request.args.get('token', ''),
'url': request.args.get('url', ''),
'email': request.args.get('email', '')
}
logger.debug(data)
processor.enqueue({'request': 'unsubscribe', 'data': data})
except:
logger.exception("unsubscribe failure")
abort(400)
return "OK"

78
app/interface/report.py Normal file
View file

@ -0,0 +1,78 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import config
from flask import request, jsonify, abort
from app import app
from app.models.site import Site
from app.models.comment import Comment
from app.helpers.hashing import md5
from app.services import processor
logger = logging.getLogger(__name__)
@app.route("/report", methods=['GET'])
def report():
try:
token = request.args.get('token', '')
secret = request.args.get('secret', '')
if secret != config.SECRET:
logger.warn('Unauthorized request')
abort(401)
site = Site.select().where(Site.token == token).get()
if site is None:
logger.warn('Unknown site %s' % token)
abort(404)
processor.enqueue({'request': 'report', 'data': token})
except:
logger.exception("report failure")
abort(500)
return "OK"
@app.route("/accept", methods=['GET'])
def accept_comment():
try:
id = request.args.get('comment', '')
secret = request.args.get('secret', '')
if secret != config.SECRET:
logger.warn('Unauthorized request')
abort(401)
processor.enqueue({'request': 'late_accept', 'data': id})
except:
logger.exception("accept failure")
abort(500)
return "PUBLISHED"
@app.route("/reject", methods=['GET'])
def reject_comment():
try:
id = request.args.get('comment', '')
secret = request.args.get('secret', '')
if secret != config.SECRET:
logger.warn('Unauthorized request')
abort(401)
processor.enqueue({'request': 'late_reject', 'data': id})
except:
logger.exception("reject failure")
abort(500)
return "REJECTED"

43
app/interface/zclient.py Normal file
View file

@ -0,0 +1,43 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import zmq
from conf import config
from threading import Thread
import logging
import json
from app.services import processor
logger = logging.getLogger(__name__)
context = zmq.Context()
def process(message):
data = json.loads(message)
if data['topic'] == 'email:newmail':
logger.info('newmail => {}'.format(data))
processor.enqueue({'request': 'new_mail', 'data': data})
class Consumer(Thread):
def run(self):
zsub = context.socket(zmq.SUB)
zsub.connect('tcp://127.0.0.1:{}'.format(config.zmq['pub_port']))
zsub.setsockopt_string(zmq.SUBSCRIBE, '')
self.loop = True
while self.loop:
message = zsub.recv()
try:
process(message)
except:
logger.exception('cannot process broker message')
def stop(self):
self.loop = False
def start():
c = Consumer()
c.start()