minimalism
This commit is contained in:
parent
83cd8725c3
commit
a06db608bc
64 changed files with 205 additions and 7153 deletions
|
|
@ -6,9 +6,10 @@
|
|||
|
||||
json_schema = """
|
||||
{
|
||||
"$ref": "#/definitions/Stacosys",
|
||||
"$schema": "http://json-schema.org/draft-06/schema#",
|
||||
"$ref": "#/definitions/Welcome",
|
||||
"definitions": {
|
||||
"Stacosys": {
|
||||
"Welcome": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
|
|
@ -23,19 +24,15 @@ json_schema = """
|
|||
},
|
||||
"rss": {
|
||||
"$ref": "#/definitions/RSS"
|
||||
},
|
||||
"rabbitmq": {
|
||||
"$ref": "#/definitions/Rabbitmq"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"general",
|
||||
"http",
|
||||
"rabbitmq",
|
||||
"rss",
|
||||
"security"
|
||||
],
|
||||
"title": "stacosys"
|
||||
"title": "Welcome"
|
||||
},
|
||||
"General": {
|
||||
"type": "object",
|
||||
|
|
@ -56,7 +53,7 @@ json_schema = """
|
|||
"debug",
|
||||
"lang"
|
||||
],
|
||||
"title": "general"
|
||||
"title": "General"
|
||||
},
|
||||
"HTTP": {
|
||||
"type": "object",
|
||||
|
|
@ -77,44 +74,7 @@ json_schema = """
|
|||
"port",
|
||||
"root_url"
|
||||
],
|
||||
"title": "http"
|
||||
},
|
||||
"Rabbitmq": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"active": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"host": {
|
||||
"type": "string"
|
||||
},
|
||||
"port": {
|
||||
"type": "integer"
|
||||
},
|
||||
"username": {
|
||||
"type": "string"
|
||||
},
|
||||
"password": {
|
||||
"type": "string"
|
||||
},
|
||||
"vhost": {
|
||||
"type": "string"
|
||||
},
|
||||
"exchange": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"active",
|
||||
"exchange",
|
||||
"host",
|
||||
"password",
|
||||
"port",
|
||||
"username",
|
||||
"vhost"
|
||||
],
|
||||
"title": "rabbitmq"
|
||||
"title": "HTTP"
|
||||
},
|
||||
"RSS": {
|
||||
"type": "object",
|
||||
|
|
@ -131,7 +91,7 @@ json_schema = """
|
|||
"file",
|
||||
"proto"
|
||||
],
|
||||
"title": "rss"
|
||||
"title": "RSS"
|
||||
},
|
||||
"Security": {
|
||||
"type": "object",
|
||||
|
|
@ -142,17 +102,13 @@ json_schema = """
|
|||
},
|
||||
"secret": {
|
||||
"type": "string"
|
||||
},
|
||||
"private": {
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"private",
|
||||
"salt",
|
||||
"secret"
|
||||
],
|
||||
"title": "security"
|
||||
"title": "Security"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,9 +5,9 @@ import os
|
|||
import sys
|
||||
import logging
|
||||
from flask import Flask
|
||||
from flask.ext.cors import CORS
|
||||
from conf import config
|
||||
from jsonschema import validate
|
||||
from flask_apscheduler import APScheduler
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
|
@ -24,8 +24,6 @@ import database
|
|||
import processor
|
||||
from interface import api
|
||||
from interface import form
|
||||
from interface import report
|
||||
from interface import rmqclient
|
||||
|
||||
# configure logging
|
||||
def configure_logging(level):
|
||||
|
|
@ -46,26 +44,37 @@ configure_logging(logging_level)
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class Config(object):
|
||||
JOBS = [
|
||||
{
|
||||
'id': 'fetch_mail',
|
||||
'func': 'core.cron:fetch_mail_answers',
|
||||
'trigger': 'interval',
|
||||
'seconds': 120
|
||||
},
|
||||
{
|
||||
'id': 'submit_new_comment',
|
||||
'func': 'core.cron:submit_new_comment',
|
||||
'trigger': 'interval',
|
||||
'seconds': 60
|
||||
},
|
||||
]
|
||||
|
||||
# initialize database
|
||||
database.setup()
|
||||
|
||||
# start broker client
|
||||
rmqclient.start()
|
||||
|
||||
# start processor
|
||||
template_path = os.path.abspath(os.path.join(current_path, '../templates'))
|
||||
processor.start(template_path)
|
||||
|
||||
# less feature in private mode
|
||||
if not config.security['private']:
|
||||
# enable CORS
|
||||
cors = CORS(app, resources={r"/comments/*": {"origins": "*"}})
|
||||
from app.controllers import reader
|
||||
logger.debug('imported: %s ' % reader.__name__)
|
||||
# cron
|
||||
app.config.from_object(Config())
|
||||
scheduler = APScheduler()
|
||||
scheduler.init_app(app)
|
||||
scheduler.start()
|
||||
|
||||
# tune logging level
|
||||
if not config.general['debug']:
|
||||
logging.getLogger('app.cors').level = logging.WARNING
|
||||
logging.getLogger('werkzeug').level = logging.WARNING
|
||||
|
||||
logger.info("Start Stacosys application")
|
||||
|
|
|
|||
47
app/core/cron.py
Normal file
47
app/core/cron.py
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
import time
|
||||
from core import app
|
||||
from core import processor
|
||||
from models.comment import Comment
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def fetch_mail_answers():
|
||||
|
||||
logger.info('DEBUT POP MAIL')
|
||||
time.sleep(80)
|
||||
logger.info('FIN POP MAIL')
|
||||
#data = request.get_json()
|
||||
#logger.debug(data)
|
||||
|
||||
#processor.enqueue({'request': 'new_mail', 'data': data})
|
||||
|
||||
def submit_new_comment():
|
||||
|
||||
for comment in Comment.select().where(Comment.notified.is_null()):
|
||||
# render email body template
|
||||
comment_list = (
|
||||
"author: %s" % comment.author_name,
|
||||
"site: %s" % comment.author_site,
|
||||
"date: %s" % comment.create,
|
||||
"url: %s" % comment.url,
|
||||
"",
|
||||
"%s" % comment.message,
|
||||
"",
|
||||
)
|
||||
comment_text = "\n".join(comment_list)
|
||||
email_body = get_template("new_comment").render(url=url, comment=comment_text)
|
||||
|
||||
if clientip:
|
||||
client_ips[comment.id] = clientip
|
||||
|
||||
# send email
|
||||
subject = "STACOSYS %s: [%d:%s]" % (site.name, comment.id, token)
|
||||
mailer.send_mail(site.admin_email, subject, email_body)
|
||||
logger.debug("new comment processed ")
|
||||
|
||||
def get_template(name):
|
||||
return env.get_template(config.general["lang"] + "/" + name + ".tpl")
|
||||
|
|
@ -23,7 +23,5 @@ def provide_db(func):
|
|||
def setup(db):
|
||||
from models.site import Site
|
||||
from models.comment import Comment
|
||||
from models.reader import Reader
|
||||
from models.report import Report
|
||||
|
||||
db.create_tables([Site, Comment, Reader, Report], safe=True)
|
||||
db.create_tables([Site, Comment], safe=True)
|
||||
|
|
|
|||
3
app/core/mailer.py
Normal file
3
app/core/mailer.py
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
|
@ -4,142 +4,64 @@
|
|||
import os
|
||||
import logging
|
||||
import re
|
||||
import PyRSS2Gen
|
||||
import markdown
|
||||
import json
|
||||
from datetime import datetime
|
||||
from threading import Thread
|
||||
from queue import Queue
|
||||
from jinja2 import Environment
|
||||
from jinja2 import FileSystemLoader
|
||||
from models.site import Site
|
||||
from models.reader import Reader
|
||||
from models.report import Report
|
||||
from models.comment import Comment
|
||||
from helpers.hashing import md5
|
||||
import json
|
||||
from conf import config
|
||||
from util import rabbit
|
||||
import PyRSS2Gen
|
||||
import markdown
|
||||
import pika
|
||||
from core import mailer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
queue = Queue()
|
||||
proc = None
|
||||
env = None
|
||||
|
||||
# store client IP in memory until classification
|
||||
# keep client IP in memory until classified
|
||||
client_ips = {}
|
||||
|
||||
|
||||
class Processor(Thread):
|
||||
|
||||
def stop(self):
|
||||
logger.info("stop requested")
|
||||
self.is_running = False
|
||||
|
||||
def run(self):
|
||||
|
||||
logger.info('processor thread started')
|
||||
logger.info("processor thread started")
|
||||
self.is_running = True
|
||||
while self.is_running:
|
||||
try:
|
||||
msg = queue.get()
|
||||
if msg['request'] == 'new_comment':
|
||||
new_comment(msg['data'], msg.get('clientip', ''))
|
||||
elif msg['request'] == 'new_mail':
|
||||
reply_comment_email(msg['data'])
|
||||
send_delete_command(msg['data'])
|
||||
elif msg['request'] == 'unsubscribe':
|
||||
unsubscribe_reader(msg['data'])
|
||||
elif msg['request'] == 'report':
|
||||
report(msg['data'])
|
||||
elif msg['request'] == 'late_accept':
|
||||
late_accept_comment(msg['data'])
|
||||
elif msg['request'] == 'late_reject':
|
||||
late_reject_comment(msg['data'])
|
||||
if msg["request"] == "new_mail":
|
||||
reply_comment_email(msg["data"])
|
||||
send_delete_command(msg["data"])
|
||||
else:
|
||||
logger.info("throw unknown request " + str(msg))
|
||||
except:
|
||||
logger.exception("processing failure")
|
||||
|
||||
|
||||
def new_comment(data, clientip):
|
||||
|
||||
logger.info('new comment received: %s' % data)
|
||||
|
||||
token = data.get('token', '')
|
||||
url = data.get('url', '')
|
||||
author_name = data.get('author', '').strip()
|
||||
author_email = data.get('email', '').strip()
|
||||
author_site = data.get('site', '').strip()
|
||||
message = data.get('message', '')
|
||||
subscribe = data.get('subscribe', '')
|
||||
|
||||
# private mode: email contains gravar md5 hash
|
||||
if config.security['private']:
|
||||
author_gravatar = author_email
|
||||
author_email = ''
|
||||
else:
|
||||
author_gravatar = md5(author_email.lower())
|
||||
|
||||
# create a new comment row
|
||||
site = Site.select().where(Site.token == token).get()
|
||||
|
||||
if author_site and author_site[:4] != 'http':
|
||||
author_site = 'http://' + author_site
|
||||
|
||||
created = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
# add a row to Comment table
|
||||
comment = Comment(site=site, url=url, author_name=author_name,
|
||||
author_site=author_site, author_email=author_email,
|
||||
author_gravatar=author_gravatar,
|
||||
content=message, created=created, published=None)
|
||||
comment.save()
|
||||
|
||||
article_url = "http://" + site.url + url
|
||||
|
||||
# render email body template
|
||||
comment_list = (
|
||||
'author: %s' % author_name,
|
||||
'email: %s' % author_email,
|
||||
'site: %s' % author_site,
|
||||
'date: %s' % created,
|
||||
'url: %s' % url,
|
||||
'',
|
||||
'%s' % message,
|
||||
''
|
||||
)
|
||||
comment_text = '\n'.join(comment_list)
|
||||
email_body = get_template('new_comment').render(
|
||||
url=article_url, comment=comment_text)
|
||||
|
||||
if clientip:
|
||||
client_ips[comment.id] = clientip
|
||||
|
||||
# send email
|
||||
subject = 'STACOSYS %s: [%d:%s]' % (site.name, comment.id, token)
|
||||
mail(site.admin_email, subject, email_body)
|
||||
|
||||
# Reader subscribes to further comments
|
||||
if not config.security['private'] and subscribe and author_email:
|
||||
subscribe_reader(author_email, token, url)
|
||||
|
||||
logger.debug("new comment processed ")
|
||||
|
||||
|
||||
def reply_comment_email(data):
|
||||
|
||||
from_email = data['from']
|
||||
subject = data['subject']
|
||||
message = ''
|
||||
for part in data['parts']:
|
||||
if part['content-type'] == 'text/plain':
|
||||
message = part['content']
|
||||
from_email = data["from"]
|
||||
subject = data["subject"]
|
||||
message = ""
|
||||
for part in data["parts"]:
|
||||
if part["content-type"] == "text/plain":
|
||||
message = part["content"]
|
||||
break
|
||||
|
||||
m = re.search('\[(\d+)\:(\w+)\]', subject)
|
||||
m = re.search("\[(\d+)\:(\w+)\]", subject)
|
||||
if not m:
|
||||
logger.warn('ignore corrupted email. No token %s' % subject)
|
||||
logger.warn("ignore corrupted email. No token %s" % subject)
|
||||
return
|
||||
comment_id = int(m.group(1))
|
||||
token = m.group(2)
|
||||
|
|
@ -148,317 +70,110 @@ def reply_comment_email(data):
|
|||
try:
|
||||
comment = Comment.select().where(Comment.id == comment_id).get()
|
||||
except:
|
||||
logger.warn('unknown comment %d' % comment_id)
|
||||
logger.warn("unknown comment %d" % comment_id)
|
||||
return
|
||||
|
||||
if comment.published:
|
||||
logger.warn('ignore already published email. token %d' % comment_id)
|
||||
logger.warn("ignore already published email. token %d" % comment_id)
|
||||
return
|
||||
|
||||
if comment.site.token != token:
|
||||
logger.warn('ignore corrupted email. Unknown token %d' % comment_id)
|
||||
logger.warn("ignore corrupted email. Unknown token %d" % comment_id)
|
||||
return
|
||||
|
||||
if not message:
|
||||
logger.warn('ignore empty email')
|
||||
logger.warn("ignore empty email")
|
||||
return
|
||||
|
||||
# safe logic: no answer or unknown answer is a go for publishing
|
||||
if message[:2].upper() in ('NO', 'SP'):
|
||||
if message[:2].upper() in ("NO", "SP"):
|
||||
|
||||
# put a log to help fail2ban
|
||||
if message[:2].upper() == 'SP': # SPAM
|
||||
if message[:2].upper() == "SP": # SPAM
|
||||
if comment_id in client_ips:
|
||||
logger.info('SPAM comment from %s: %d' %
|
||||
(client_ips[comment_id], comment_id))
|
||||
logger.info(
|
||||
"SPAM comment from %s: %d" % (client_ips[comment_id], comment_id)
|
||||
)
|
||||
else:
|
||||
logger.info('cannot identify SPAM source: %d' % comment_id)
|
||||
logger.info("cannot identify SPAM source: %d" % comment_id)
|
||||
|
||||
# forget client IP
|
||||
if comment_id in client_ips:
|
||||
del client_ips[comment_id]
|
||||
|
||||
# report event
|
||||
report_rejected(comment)
|
||||
|
||||
logger.info('discard comment: %d' % comment_id)
|
||||
logger.info("discard comment: %d" % comment_id)
|
||||
comment.delete_instance()
|
||||
email_body = get_template('drop_comment').render(original=message)
|
||||
mail(from_email, 'Re: ' + subject, email_body)
|
||||
email_body = get_template("drop_comment").render(original=message)
|
||||
mail(from_email, "Re: " + subject, email_body)
|
||||
else:
|
||||
# report event
|
||||
report_published(comment)
|
||||
|
||||
# update Comment row
|
||||
comment.published = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
comment.save()
|
||||
logger.info('commit comment: %d' % comment_id)
|
||||
logger.info("commit comment: %d" % comment_id)
|
||||
|
||||
# rebuild RSS
|
||||
rss(token)
|
||||
|
||||
# send approval confirmation email to admin
|
||||
email_body = get_template('approve_comment').render(original=message)
|
||||
mail(from_email, 'Re: ' + subject, email_body)
|
||||
|
||||
# notify reader once comment is published
|
||||
if not config.security['private']:
|
||||
reader_email = get_email_metadata(message)
|
||||
if reader_email:
|
||||
notify_reader(from_email, reader_email, comment.site.token,
|
||||
comment.site.url, comment.url)
|
||||
|
||||
# notify subscribers every time a new comment is published
|
||||
notify_subscribed_readers(
|
||||
comment.site.token, comment.site.url, comment.url)
|
||||
|
||||
|
||||
def late_reject_comment(id):
|
||||
|
||||
# retrieve site and comment rows
|
||||
comment = Comment.select().where(Comment.id == id).get()
|
||||
|
||||
# report event
|
||||
report_rejected(comment)
|
||||
|
||||
# delete Comment row
|
||||
comment.delete_instance()
|
||||
|
||||
logger.info('late reject comment: %s' % id)
|
||||
|
||||
|
||||
def late_accept_comment(id):
|
||||
|
||||
# retrieve site and comment rows
|
||||
comment = Comment.select().where(Comment.id == id).get()
|
||||
|
||||
# report event
|
||||
report_published(comment)
|
||||
|
||||
# update Comment row
|
||||
comment.published = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
comment.save()
|
||||
|
||||
logger.info('late accept comment: %s' % id)
|
||||
email_body = get_template("approve_comment").render(original=message)
|
||||
mail(from_email, "Re: " + subject, email_body)
|
||||
|
||||
|
||||
def get_email_metadata(message):
|
||||
# retrieve metadata reader email from email body sent by admin
|
||||
email = ""
|
||||
m = re.search('email:\s(.+@.+\..+)', message)
|
||||
m = re.search(r"email:\s(.+@.+\..+)", message)
|
||||
if m:
|
||||
email = m.group(1)
|
||||
return email
|
||||
|
||||
|
||||
def subscribe_reader(email, token, url):
|
||||
logger.info('subscribe reader %s to %s [%s]' % (email, url, token))
|
||||
recorded = Reader.select().join(Site).where(Site.token == token,
|
||||
Reader.email == email,
|
||||
Reader.url == url).count()
|
||||
if recorded:
|
||||
logger.debug('reader %s is already recorded' % email)
|
||||
else:
|
||||
site = Site.select().where(Site.token == token).get()
|
||||
reader = Reader(site=site, email=email, url=url)
|
||||
reader.save()
|
||||
|
||||
# report event
|
||||
report_subscribed(reader)
|
||||
|
||||
|
||||
def unsubscribe_reader(data):
|
||||
token = data.get('token', '')
|
||||
url = data.get('url', '')
|
||||
email = data.get('email', '')
|
||||
logger.info('unsubscribe reader %s from %s (%s)' % (email, url, token))
|
||||
for reader in Reader.select().join(Site).where(Site.token == token,
|
||||
Reader.email == email,
|
||||
Reader.url == url):
|
||||
# report event
|
||||
report_unsubscribed(reader)
|
||||
|
||||
reader.delete_instance()
|
||||
|
||||
|
||||
def notify_subscribed_readers(token, site_url, url):
|
||||
logger.info('notify subscribers for %s (%s)' % (url, token))
|
||||
article_url = "http://" + site_url + url
|
||||
for reader in Reader.select().join(Site).where(Site.token == token,
|
||||
Reader.url == url):
|
||||
to_email = reader.email
|
||||
logger.info('notify reader %s' % to_email)
|
||||
unsubscribe_url = '%s/unsubscribe?email=%s&token=%s&url=%s' % (
|
||||
config.http['root_url'], to_email, token, reader.url)
|
||||
email_body = get_template(
|
||||
'notify_subscriber').render(article_url=article_url,
|
||||
unsubscribe_url=unsubscribe_url)
|
||||
subject = get_template('notify_message').render()
|
||||
mail(to_email, subject, email_body)
|
||||
|
||||
|
||||
def notify_reader(from_email, to_email, token, site_url, url):
|
||||
logger.info('notify reader: email %s about URL %s' % (to_email, url))
|
||||
article_url = "http://" + site_url + url
|
||||
email_body = get_template('notify_reader').render(article_url=article_url)
|
||||
subject = get_template('notify_message').render()
|
||||
mail(to_email, subject, email_body)
|
||||
|
||||
|
||||
def report_rejected(comment):
|
||||
report = Report(site=comment.site, url=comment.url,
|
||||
name=comment.author_name, email=comment.author_email,
|
||||
rejected=True)
|
||||
report.save()
|
||||
|
||||
|
||||
def report_published(comment):
|
||||
report = Report(site=comment.site, url=comment.url,
|
||||
name=comment.author_name, email=comment.author_email,
|
||||
published=True)
|
||||
report.save()
|
||||
|
||||
|
||||
def report_subscribed(reader):
|
||||
report = Report(site=reader.site, url=reader.url,
|
||||
name='', email=reader.email,
|
||||
subscribed=True)
|
||||
report.save()
|
||||
|
||||
|
||||
def report_unsubscribed(reader):
|
||||
report = Report(site=reader.site, url=reader.url,
|
||||
name='', email=reader.email,
|
||||
unsubscribed=True)
|
||||
report.save()
|
||||
|
||||
|
||||
def report(token):
|
||||
site = Site.select().where(Site.token == token).get()
|
||||
|
||||
standbys = []
|
||||
for row in Comment.select().join(Site).where(
|
||||
Site.token == token, Comment.published.is_null(True)):
|
||||
standbys.append({'url': "http://" + site.url + row.url,
|
||||
'created': row.created.strftime('%d/%m/%y %H:%M'),
|
||||
'name': row.author_name, 'content': row.content,
|
||||
'id': row.id})
|
||||
|
||||
published = []
|
||||
for row in Report.select().join(Site).where(
|
||||
Site.token == token, Report.published):
|
||||
published.append({'url': "http://" + site.url + row.url,
|
||||
'name': row.name, 'email': row.email})
|
||||
|
||||
rejected = []
|
||||
for row in Report.select().join(Site).where(
|
||||
Site.token == token, Report.rejected):
|
||||
rejected.append({'url': "http://" + site.url + row.url,
|
||||
'name': row.name, 'email': row.email})
|
||||
|
||||
subscribed = []
|
||||
for row in Report.select().join(Site).where(
|
||||
Site.token == token, Report.subscribed):
|
||||
subscribed.append({'url': "http://" + site.url + row.url,
|
||||
'name': row.name, 'email': row.email})
|
||||
|
||||
unsubscribed = []
|
||||
for row in Report.select().join(Site).where(
|
||||
Site.token == token, Report.subscribed):
|
||||
unsubscribed.append({'url': "http://" + site.url + row.url,
|
||||
'name': row.name, 'email': row.email})
|
||||
|
||||
email_body = get_template('report').render(secret=config.security['secret'],
|
||||
root_url=config.http[
|
||||
'root_url'],
|
||||
standbys=standbys,
|
||||
published=published,
|
||||
rejected=rejected,
|
||||
subscribed=subscribed,
|
||||
unsubscribed=unsubscribed)
|
||||
subject = get_template('report_message').render(site=site.name)
|
||||
|
||||
mail(site.admin_email, subject, email_body)
|
||||
|
||||
# delete report table
|
||||
Report.delete().execute()
|
||||
|
||||
|
||||
def rss(token, onstart=False):
|
||||
|
||||
if onstart and os.path.isfile(config.rss['file']):
|
||||
if onstart and os.path.isfile(config.rss["file"]):
|
||||
return
|
||||
|
||||
site = Site.select().where(Site.token == token).get()
|
||||
rss_title = get_template('rss_title_message').render(site=site.name)
|
||||
rss_title = get_template("rss_title_message").render(site=site.name)
|
||||
md = markdown.Markdown()
|
||||
|
||||
items = []
|
||||
for row in Comment.select().join(Site).where(
|
||||
Site.token == token, Comment.published).order_by(
|
||||
-Comment.published).limit(10):
|
||||
item_link = "%s://%s%s" % (config.rss['proto'], site.url, row.url)
|
||||
items.append(PyRSS2Gen.RSSItem(
|
||||
title='%s - %s://%s%s' % (config.rss['proto'],
|
||||
row.author_name, site.url, row.url),
|
||||
link=item_link,
|
||||
description=md.convert(row.content),
|
||||
guid=PyRSS2Gen.Guid('%s/%d' % (item_link, row.id)),
|
||||
pubDate=row.published
|
||||
))
|
||||
for row in (
|
||||
Comment.select()
|
||||
.join(Site)
|
||||
.where(Site.token == token, Comment.published)
|
||||
.order_by(-Comment.published)
|
||||
.limit(10)
|
||||
):
|
||||
item_link = "%s://%s%s" % (config.rss["proto"], site.url, row.url)
|
||||
items.append(
|
||||
PyRSS2Gen.RSSItem(
|
||||
title="%s - %s://%s%s"
|
||||
% (config.rss["proto"], row.author_name, site.url, row.url),
|
||||
link=item_link,
|
||||
description=md.convert(row.content),
|
||||
guid=PyRSS2Gen.Guid("%s/%d" % (item_link, row.id)),
|
||||
pubDate=row.published,
|
||||
)
|
||||
)
|
||||
|
||||
rss = PyRSS2Gen.RSS2(
|
||||
title=rss_title,
|
||||
link='%s://%s' % (config.rss['proto'], site.url),
|
||||
link="%s://%s" % (config.rss["proto"], site.url),
|
||||
description="Commentaires du site '%s'" % site.name,
|
||||
lastBuildDate=datetime.now(),
|
||||
items=items)
|
||||
rss.write_xml(open(config.rss['file'], 'w'), encoding='utf-8')
|
||||
|
||||
|
||||
def get_rabbitmq_connection():
|
||||
|
||||
credentials = pika.PlainCredentials(
|
||||
config.rabbitmq['username'], config.rabbitmq['password'])
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=config.rabbitmq['host'],
|
||||
port=config.rabbitmq['port'],
|
||||
credentials=credentials,
|
||||
virtual_host=config.rabbitmq['vhost']
|
||||
items=items,
|
||||
)
|
||||
return rabbit.Connection(parameters)
|
||||
|
||||
def mail(to_email, subject, message):
|
||||
|
||||
body = {
|
||||
'to': to_email,
|
||||
'subject': subject,
|
||||
'content': message
|
||||
}
|
||||
connector = get_rabbitmq_connection()
|
||||
connection = connector.open()
|
||||
channel = connection.channel()
|
||||
channel.basic_publish(exchange=config.rabbitmq['exchange'],
|
||||
routing_key='mail.command.send',
|
||||
body=json.dumps(body, indent=False, sort_keys=False))
|
||||
connector.close()
|
||||
logger.debug('Email for %s posted' % to_email)
|
||||
rss.write_xml(open(config.rss["file"], "w"), encoding="utf-8")
|
||||
|
||||
|
||||
def send_delete_command(content):
|
||||
|
||||
connector = get_rabbitmq_connection()
|
||||
connection = connector.open()
|
||||
channel = connection.channel()
|
||||
channel.basic_publish(exchange=config.rabbitmq['exchange'],
|
||||
routing_key='mail.command.delete',
|
||||
body=json.dumps(content, indent=False, sort_keys=False))
|
||||
connector.close()
|
||||
logger.debug('Email accepted. Delete request sent for %s' % content)
|
||||
# TODO delete mail
|
||||
pass
|
||||
|
||||
|
||||
def get_template(name):
|
||||
return env.get_template(config.general['lang'] + '/' + name + '.tpl')
|
||||
return env.get_template(config.general["lang"] + "/" + name + ".tpl")
|
||||
|
||||
|
||||
def enqueue(something):
|
||||
|
|
|
|||
|
|
@ -2,46 +2,70 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
from flask import request, jsonify, abort, redirect
|
||||
from datetime import datetime
|
||||
from flask import request, abort, redirect
|
||||
from core import app
|
||||
from models.site import Site
|
||||
from models.comment import Comment
|
||||
from helpers.hashing import md5
|
||||
from core import processor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@app.route("/newcomment", methods=['POST'])
|
||||
|
||||
@app.route("/newcomment", methods=["POST"])
|
||||
def new_form_comment():
|
||||
|
||||
try:
|
||||
data = request.form
|
||||
|
||||
# add client IP if provided by HTTP proxy
|
||||
clientip = ''
|
||||
if 'X-Forwarded-For' in request.headers:
|
||||
clientip = request.headers['X-Forwarded-For']
|
||||
|
||||
# log
|
||||
ip = ""
|
||||
if "X-Forwarded-For" in request.headers:
|
||||
ip = request.headers["X-Forwarded-For"]
|
||||
|
||||
# log
|
||||
logger.info(data)
|
||||
|
||||
# validate token: retrieve site entity
|
||||
token = data.get('token', '')
|
||||
token = data.get("token", "")
|
||||
site = Site.select().where(Site.token == token).get()
|
||||
if site is None:
|
||||
logger.warn('Unknown site %s' % token)
|
||||
logger.warn("Unknown site %s" % token)
|
||||
abort(400)
|
||||
|
||||
# honeypot for spammers
|
||||
captcha = data.get('captcha', '')
|
||||
captcha = data.get("captcha", "")
|
||||
if captcha:
|
||||
logger.warn('discard spam: data %s' % data)
|
||||
logger.warn("discard spam: data %s" % data)
|
||||
abort(400)
|
||||
|
||||
processor.enqueue({'request': 'new_comment', 'data': data, 'clientip': clientip})
|
||||
url = data.get("url", "")
|
||||
author_name = data.get("author", "").strip()
|
||||
author_gravatar = data.get("email", "").strip()
|
||||
author_site = data.get("site", "").to_lower().strip()
|
||||
if author_site and author_site[:4] != "http":
|
||||
author_site = "http://" + author_site
|
||||
message = data.get("message", "")
|
||||
|
||||
created = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
# add a row to Comment table
|
||||
comment = Comment(
|
||||
site=site,
|
||||
url=url,
|
||||
author_name=author_name,
|
||||
author_site=author_site,
|
||||
author_gravatar=author_gravatar,
|
||||
content=message,
|
||||
created=created,
|
||||
notified=None,
|
||||
published=None,
|
||||
ip=ip,
|
||||
)
|
||||
comment.save()
|
||||
|
||||
except:
|
||||
logger.exception("new comment failure")
|
||||
abort(400)
|
||||
|
||||
return redirect('/redirect/', code=302)
|
||||
return redirect("/redirect/", code=302)
|
||||
|
|
|
|||
|
|
@ -1,25 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
from flask import request, abort
|
||||
from core import app
|
||||
from core import processor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@app.route("/inbox", methods=['POST'])
|
||||
def new_mail():
|
||||
|
||||
try:
|
||||
data = request.get_json()
|
||||
logger.debug(data)
|
||||
|
||||
processor.enqueue({'request': 'new_mail', 'data': data})
|
||||
|
||||
except:
|
||||
logger.exception("new mail failure")
|
||||
abort(400)
|
||||
|
||||
return "OK"
|
||||
|
|
@ -1,29 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
from flask import request, abort
|
||||
from core import app
|
||||
from core import processor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@app.route("/unsubscribe", methods=['GET'])
|
||||
def unsubscribe():
|
||||
|
||||
try:
|
||||
data = {
|
||||
'token': request.args.get('token', ''),
|
||||
'url': request.args.get('url', ''),
|
||||
'email': request.args.get('email', '')
|
||||
}
|
||||
logger.debug(data)
|
||||
|
||||
processor.enqueue({'request': 'unsubscribe', 'data': data})
|
||||
|
||||
except:
|
||||
logger.exception("unsubscribe failure")
|
||||
abort(400)
|
||||
|
||||
return "OK"
|
||||
|
|
@ -1,78 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
from conf import config
|
||||
from flask import request, jsonify, abort
|
||||
from core import app
|
||||
from models.site import Site
|
||||
from models.comment import Comment
|
||||
from helpers.hashing import md5
|
||||
from core import processor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@app.route("/report", methods=['GET'])
|
||||
def report():
|
||||
|
||||
try:
|
||||
token = request.args.get('token', '')
|
||||
secret = request.args.get('secret', '')
|
||||
|
||||
if secret != config.security['secret']:
|
||||
logger.warn('Unauthorized request')
|
||||
abort(401)
|
||||
|
||||
site = Site.select().where(Site.token == token).get()
|
||||
if site is None:
|
||||
logger.warn('Unknown site %s' % token)
|
||||
abort(404)
|
||||
|
||||
processor.enqueue({'request': 'report', 'data': token})
|
||||
|
||||
|
||||
except:
|
||||
logger.exception("report failure")
|
||||
abort(500)
|
||||
|
||||
return "OK"
|
||||
|
||||
|
||||
@app.route("/accept", methods=['GET'])
|
||||
def accept_comment():
|
||||
|
||||
try:
|
||||
id = request.args.get('comment', '')
|
||||
secret = request.args.get('secret', '')
|
||||
|
||||
if secret != config.security['secret']:
|
||||
logger.warn('Unauthorized request')
|
||||
abort(401)
|
||||
|
||||
processor.enqueue({'request': 'late_accept', 'data': id})
|
||||
|
||||
except:
|
||||
logger.exception("accept failure")
|
||||
abort(500)
|
||||
|
||||
return "PUBLISHED"
|
||||
|
||||
|
||||
@app.route("/reject", methods=['GET'])
|
||||
def reject_comment():
|
||||
|
||||
try:
|
||||
id = request.args.get('comment', '')
|
||||
secret = request.args.get('secret', '')
|
||||
|
||||
if secret != config.security['secret']:
|
||||
logger.warn('Unauthorized request')
|
||||
abort(401)
|
||||
|
||||
processor.enqueue({'request': 'late_reject', 'data': id})
|
||||
|
||||
except:
|
||||
logger.exception("reject failure")
|
||||
abort(500)
|
||||
|
||||
return "REJECTED"
|
||||
|
|
@ -1,49 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import pika
|
||||
from conf import config
|
||||
from threading import Thread
|
||||
import logging
|
||||
import json
|
||||
from core import processor
|
||||
from util import rabbit
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MailConsumer(rabbit.Consumer):
|
||||
|
||||
def process(self, channel, method, properties, body):
|
||||
try:
|
||||
topic = method.routing_key
|
||||
data = json.loads(body)
|
||||
|
||||
if topic == 'mail.message':
|
||||
if "STACOSYS" in data['subject']:
|
||||
logger.info('new message => {}'.format(data))
|
||||
processor.enqueue({'request': 'new_mail', 'data': data})
|
||||
else:
|
||||
logger.info('ignore message => {}'.format(data))
|
||||
else:
|
||||
logger.warn('unsupported message [topic={}]'.format(topic))
|
||||
except:
|
||||
logger.exception('cannot process message')
|
||||
|
||||
|
||||
def start():
|
||||
|
||||
logger.info('start rmqclient')
|
||||
|
||||
credentials = pika.PlainCredentials(
|
||||
config.rabbitmq['username'], config.rabbitmq['password'])
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=config.rabbitmq['host'],
|
||||
port=config.rabbitmq['port'],
|
||||
credentials=credentials,
|
||||
virtual_host=config.rabbitmq['vhost']
|
||||
)
|
||||
|
||||
connection = rabbit.Connection(parameters)
|
||||
c = MailConsumer(connection, config.rabbitmq['exchange'], 'mail.message')
|
||||
c.start()
|
||||
|
|
@ -13,11 +13,12 @@ from core.database import get_db
|
|||
class Comment(Model):
|
||||
url = CharField()
|
||||
created = DateTimeField()
|
||||
notified = DateTimeField(null=True,default=None)
|
||||
published = DateTimeField(null=True, default=None)
|
||||
author_name = CharField()
|
||||
author_email = CharField(default='')
|
||||
author_site = CharField(default='')
|
||||
author_gravatar = CharField(default='')
|
||||
ip = CharField(default='')
|
||||
content = TextField()
|
||||
site = ForeignKeyField(Site, related_name='site')
|
||||
|
||||
|
|
|
|||
|
|
@ -1,17 +0,0 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
from peewee import Model
|
||||
from peewee import CharField
|
||||
from peewee import ForeignKeyField
|
||||
from core.database import get_db
|
||||
from models.site import Site
|
||||
|
||||
|
||||
class Reader(Model):
|
||||
url = CharField()
|
||||
email = CharField(default='')
|
||||
site = ForeignKeyField(Site, related_name='reader_site')
|
||||
|
||||
class Meta:
|
||||
database = get_db()
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
from peewee import Model
|
||||
from peewee import CharField
|
||||
from peewee import BooleanField
|
||||
from peewee import ForeignKeyField
|
||||
from core.database import get_db
|
||||
from models.site import Site
|
||||
|
||||
class Report(Model):
|
||||
name = CharField()
|
||||
email = CharField()
|
||||
url = CharField()
|
||||
published = BooleanField(default=False)
|
||||
rejected = BooleanField(default=False)
|
||||
subscribed = BooleanField(default=False)
|
||||
unsubscribed = BooleanField(default=False)
|
||||
site = ForeignKeyField(Site, related_name='report_site')
|
||||
|
||||
class Meta:
|
||||
database = get_db()
|
||||
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
import logging
|
||||
import json
|
||||
from clize import clize, run
|
||||
from clize import Clize, run
|
||||
from jsonschema import validate
|
||||
from conf import config, schema
|
||||
|
||||
|
|
@ -15,20 +15,19 @@ def load_json(filename):
|
|||
return jsondoc
|
||||
|
||||
|
||||
@clize
|
||||
@Clize
|
||||
def stacosys_server(config_pathname):
|
||||
|
||||
# load and validate startup config
|
||||
conf = load_json(config_pathname)
|
||||
json_schema = json.loads(schema.json_schema)
|
||||
v = validate(conf, json_schema)
|
||||
validate(conf, json_schema)
|
||||
|
||||
# set configuration
|
||||
config.general = conf['general']
|
||||
config.http = conf['http']
|
||||
config.security = conf['security']
|
||||
config.rss = conf['rss']
|
||||
config.rabbitmq = conf['rabbitmq']
|
||||
|
||||
# start application
|
||||
from core import app
|
||||
|
|
|
|||
|
|
@ -1,9 +0,0 @@
|
|||
Hi,
|
||||
|
||||
Your comment has been approved. It should be published in few minutes.
|
||||
|
||||
{{ article_url }}
|
||||
|
||||
--
|
||||
Stacosys
|
||||
|
||||
|
|
@ -1,13 +0,0 @@
|
|||
Hi,
|
||||
|
||||
A new comment has been published for an article you have subscribed to.
|
||||
|
||||
{{ article_url }}
|
||||
|
||||
You can unsubscribe at any time using this link:
|
||||
|
||||
{{ unsubscribe_url }}
|
||||
|
||||
--
|
||||
Stacosys
|
||||
|
||||
|
|
@ -1,42 +0,0 @@
|
|||
{% if subscribed %}
|
||||
{% if subscribed|length > 1 %}NEW SUBSCRIPTIONS{% else %}NEW SUBSCRIPTION{% endif %} :
|
||||
{% for c in subscribed %}
|
||||
- {{ c.name }} ({{ c.email }}) => {{ c.url }}
|
||||
{% endfor %}
|
||||
|
||||
{% endif %}
|
||||
{% if unsubscribed %}
|
||||
{% if unsubscribed|length > 1 %}CANCELLED SUBSCRIPTIONS{% else %}CANCELLED SUBSCRIPTION{% endif %} :
|
||||
{% for c in unsubscribed %}
|
||||
- {{ c.name }} ({{ c.email }}) => {{ c.url }}
|
||||
{% endfor %}
|
||||
|
||||
{% endif %}
|
||||
{% if published %}
|
||||
{% if published|length > 1 %}PUBLISHED COMMENTS{% else %}PUBLISHED COMMENT{% endif %} :
|
||||
{% for c in published %}
|
||||
- {{ c.name }} ({{ c.email }}) => {{ c.url }}
|
||||
{% endfor %}
|
||||
|
||||
{% endif %}
|
||||
{% if rejected %}
|
||||
{% if rejected|length > 1 %}REJECTED COMMENTS{% else %}REJECTED COMMENT{% endif %} :
|
||||
{% for c in rejected %}
|
||||
- {{ c.name }} ({{ c.email }}) => {{ c.url }}
|
||||
{% endfor %}
|
||||
|
||||
{% endif %}
|
||||
{% if standbys %}
|
||||
{% if standbys|length > 1 %}STANDBY COMMENTS{% else %}STANDBY COMMENT{% endif %} :
|
||||
{% for c in standbys %}
|
||||
- {{ c.name }} ({{ c.created }}) => {{ c.url }}
|
||||
{{ c.content }}
|
||||
|
||||
Accepter : {{ root_url}}/accept?secret={{ secret}}&comment={{ c.id }}
|
||||
Rejeter : {{ root_url}}/reject?secret={{ secret}}&comment={{ c.id }}
|
||||
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
--
|
||||
Stacosys
|
||||
|
||||
|
|
@ -1 +0,0 @@
|
|||
Status report : {{ site }}
|
||||
|
|
@ -1,2 +0,0 @@
|
|||
Your request has been sent. In case of issue please contact site
|
||||
administrator.
|
||||
|
|
@ -1,9 +0,0 @@
|
|||
Bonjour,
|
||||
|
||||
Votre commentaire a été approuvé. Il sera publié dans quelques minutes.
|
||||
|
||||
{{ article_url }}
|
||||
|
||||
--
|
||||
Stacosys
|
||||
|
||||
|
|
@ -1,13 +0,0 @@
|
|||
Bonjour,
|
||||
|
||||
Un nouveau commentaire a été publié pour un article auquel vous êtes abonné.
|
||||
|
||||
{{ article_url }}
|
||||
|
||||
Vous pouvez vous désinscrire à tout moment en suivant ce lien :
|
||||
|
||||
{{ unsubscribe_url }}
|
||||
|
||||
--
|
||||
Stacosys
|
||||
|
||||
|
|
@ -1,42 +0,0 @@
|
|||
{% if subscribed %}
|
||||
{% if subscribed|length > 1 %}NOUVEAUX ABONNEMENTS{% else %}NOUVEL ABONNEMENT{% endif %} :
|
||||
{% for c in subscribed %}
|
||||
- {{ c.name }} ({{ c.email }}) => {{ c.url }}
|
||||
{% endfor %}
|
||||
|
||||
{% endif %}
|
||||
{% if unsubscribed %}
|
||||
{% if unsubscribed|length > 1 %}ABONNEMENTS RESILIES{% else %}ABONNEMENT RESILIE{% endif %} :
|
||||
{% for c in unsubscribed %}
|
||||
- {{ c.name }} ({{ c.email }}) => {{ c.url }}
|
||||
{% endfor %}
|
||||
|
||||
{% endif %}
|
||||
{% if published %}
|
||||
{% if published|length > 1 %}COMMENTAIRES PUBLIES{% else %}COMMENTAIRE PUBLIE{% endif %} :
|
||||
{% for c in published %}
|
||||
- {{ c.name }} ({{ c.email }}) => {{ c.url }}
|
||||
{% endfor %}
|
||||
|
||||
{% endif %}
|
||||
{% if rejected %}
|
||||
{% if rejected|length > 1 %}COMMENTAIRES REJETES{% else %}COMMENTAIRE REJETE{% endif %} :
|
||||
{% for c in rejected %}
|
||||
- {{ c.name }} ({{ c.email }}) => {{ c.url }}
|
||||
{% endfor %}
|
||||
|
||||
{% endif %}
|
||||
{% if standbys %}
|
||||
{% if standbys|length > 1 %}COMMENTAIRES EN ATTENTE{% else %}COMMENTAIRE EN ATTENTE{% endif %} :
|
||||
{% for c in standbys %}
|
||||
- {{ c.name }} ({{ c.created }}) => {{ c.url }}
|
||||
{{ c.content }}
|
||||
|
||||
Accepter : {{ root_url}}/accept?secret={{ secret}}&comment={{ c.id }}
|
||||
Rejeter : {{ root_url}}/reject?secret={{ secret}}&comment={{ c.id }}
|
||||
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
--
|
||||
Stacosys
|
||||
|
||||
|
|
@ -1 +0,0 @@
|
|||
Rapport d'activité : {{ site }}
|
||||
|
|
@ -1,2 +0,0 @@
|
|||
Votre requête a été envoyée. En cas de problème, contactez l'administrateur du
|
||||
site.
|
||||
|
|
@ -1,84 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 - *-
|
||||
|
||||
import logging
|
||||
import pika
|
||||
import time
|
||||
from threading import Thread
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
EXCHANGE_TYPE = "topic"
|
||||
CONNECT_DELAY = 3
|
||||
|
||||
class Connection:
|
||||
|
||||
def __init__(self, connection_parameters):
|
||||
self._connection_parameters = connection_parameters
|
||||
|
||||
def open(self):
|
||||
|
||||
self._connection = None
|
||||
while True:
|
||||
try:
|
||||
self._connection = pika.BlockingConnection(
|
||||
self._connection_parameters)
|
||||
break
|
||||
except:
|
||||
time.sleep(CONNECT_DELAY)
|
||||
logger.exception('rabbitmq connection failure. try again...')
|
||||
return self._connection
|
||||
|
||||
def close(self):
|
||||
self._connection.close()
|
||||
self._connection = None
|
||||
|
||||
def get(self):
|
||||
return self._connection
|
||||
|
||||
|
||||
class Consumer(Thread):
|
||||
|
||||
_connector = None
|
||||
_channel = None
|
||||
_queue_name = None
|
||||
|
||||
def __init__(self, connector, exchange_name, routing_key):
|
||||
Thread.__init__(self)
|
||||
self._connector = connector
|
||||
self._exchange_name = exchange_name
|
||||
self._routing_key = routing_key
|
||||
|
||||
def configure(self, connection):
|
||||
|
||||
self._channel = None
|
||||
while True:
|
||||
try:
|
||||
|
||||
self._channel = connection.channel()
|
||||
self._channel.exchange_declare(
|
||||
exchange=self._exchange_name, exchange_type=EXCHANGE_TYPE
|
||||
)
|
||||
|
||||
result = self._channel.queue_declare(exclusive=True)
|
||||
self._queue_name = result.method.queue
|
||||
self._channel.queue_bind(
|
||||
exchange=self._exchange_name,
|
||||
queue=self._queue_name,
|
||||
routing_key=self._routing_key,
|
||||
)
|
||||
break
|
||||
except:
|
||||
logger.exception('configuration failure. try again...')
|
||||
time.sleep(CONNECT_DELAY)
|
||||
|
||||
def run(self):
|
||||
|
||||
self._connector.open()
|
||||
self.configure(self._connector.get())
|
||||
self._channel.basic_consume(
|
||||
self.process, queue=self._queue_name, no_ack=True)
|
||||
self._channel.start_consuming()
|
||||
|
||||
def process(self, channel, method, properties, body):
|
||||
raise NotImplemented
|
||||
Loading…
Add table
Add a link
Reference in a new issue