remove IMAP part
This commit is contained in:
parent
1ae37ff18e
commit
7f2ff74ebe
21 changed files with 24 additions and 535 deletions
9
run.py
9
run.py
|
@ -72,11 +72,6 @@ def stacosys_server(config_pathname):
|
|||
|
||||
# configure mailer
|
||||
mailer = Mailer(
|
||||
conf.get(ConfigParameter.IMAP_HOST),
|
||||
conf.get_int(ConfigParameter.IMAP_PORT),
|
||||
conf.get_bool(ConfigParameter.IMAP_SSL),
|
||||
conf.get(ConfigParameter.IMAP_LOGIN),
|
||||
conf.get(ConfigParameter.IMAP_PASSWORD),
|
||||
conf.get(ConfigParameter.SMTP_HOST),
|
||||
conf.get_int(ConfigParameter.SMTP_PORT),
|
||||
conf.get_bool(ConfigParameter.SMTP_STARTTLS),
|
||||
|
@ -94,14 +89,10 @@ def stacosys_server(config_pathname):
|
|||
# configure scheduler
|
||||
conf.put(ConfigParameter.SITE_TOKEN, hashlib.sha1(conf.get(ConfigParameter.SITE_NAME).encode('utf-8')).hexdigest())
|
||||
scheduler.configure(
|
||||
conf.get_int(ConfigParameter.IMAP_POLLING),
|
||||
conf.get_int(ConfigParameter.COMMENT_POLLING),
|
||||
conf.get(ConfigParameter.LANG),
|
||||
conf.get(ConfigParameter.SITE_NAME),
|
||||
conf.get(ConfigParameter.SITE_TOKEN),
|
||||
conf.get(ConfigParameter.SITE_ADMIN_EMAIL),
|
||||
mailer,
|
||||
rss,
|
||||
)
|
||||
|
||||
# inject config parameters into flask
|
||||
|
|
|
@ -17,13 +17,6 @@ class ConfigParameter(Enum):
|
|||
RSS_PROTO = "rss.proto"
|
||||
RSS_FILE = "rss.file"
|
||||
|
||||
IMAP_POLLING = "imap.polling"
|
||||
IMAP_SSL = "imap.ssl"
|
||||
IMAP_HOST = "imap.host"
|
||||
IMAP_PORT = "imap.port"
|
||||
IMAP_LOGIN = "imap.login"
|
||||
IMAP_PASSWORD = "imap.password"
|
||||
|
||||
SMTP_STARTTLS = "smtp.starttls"
|
||||
SMTP_SSL = "smtp.ssl"
|
||||
SMTP_HOST = "smtp.host"
|
||||
|
|
|
@ -2,92 +2,13 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
from stacosys.core.mailer import Mailer
|
||||
from stacosys.core.rss import Rss
|
||||
from stacosys.core.templater import Templater, Template
|
||||
from stacosys.db import dao
|
||||
from stacosys.model.email import Email
|
||||
|
||||
REGEX_EMAIL_SUBJECT = r".*STACOSYS.*\[(\d+)\:(\w+)\]"
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
current_path = os.path.dirname(__file__)
|
||||
template_path = os.path.abspath(os.path.join(current_path, "../templates"))
|
||||
templater = Templater(template_path)
|
||||
|
||||
|
||||
def fetch_mail_answers(lang, mailer: Mailer, rss: Rss, site_token):
|
||||
while True:
|
||||
msgs = mailer.fetch()
|
||||
if len(msgs) == 0:
|
||||
break
|
||||
msg = msgs[0]
|
||||
_process_answer_msg(msg, lang, mailer, rss, site_token)
|
||||
mailer.delete(msg.id)
|
||||
|
||||
|
||||
def _process_answer_msg(msg, lang, mailer: Mailer, rss: Rss, site_token):
|
||||
# filter stacosys e-mails
|
||||
m = re.search(REGEX_EMAIL_SUBJECT, msg.subject, re.DOTALL)
|
||||
if not m:
|
||||
return
|
||||
|
||||
comment_id = int(m.group(1))
|
||||
submitted_token = m.group(2)
|
||||
|
||||
# validate token
|
||||
if submitted_token != site_token:
|
||||
logger.warning("ignore corrupted email. Unknown token %d" % comment_id)
|
||||
return
|
||||
|
||||
if not msg.plain_text_content:
|
||||
logger.warning("ignore empty email")
|
||||
return
|
||||
|
||||
_reply_comment_email(lang, mailer, rss, msg, comment_id)
|
||||
|
||||
|
||||
def _reply_comment_email(lang, mailer: Mailer, rss: Rss, email: Email, comment_id):
|
||||
# retrieve comment
|
||||
comment = dao.find_comment_by_id(comment_id)
|
||||
if not comment:
|
||||
logger.warning("unknown comment %d" % comment_id)
|
||||
return
|
||||
|
||||
if comment.published:
|
||||
logger.warning("ignore already published email. token %d" % comment_id)
|
||||
return
|
||||
|
||||
# safe logic: no answer or unknown answer is a go for publishing
|
||||
if email.plain_text_content[:2].upper() == "NO":
|
||||
logger.info("discard comment: %d" % comment_id)
|
||||
dao.delete_comment(comment)
|
||||
new_email_body = templater.get_template(lang, Template.DROP_COMMENT).render(
|
||||
original=email.plain_text_content
|
||||
)
|
||||
if not mailer.send(email.from_addr, "Re: " + email.subject, new_email_body):
|
||||
logger.warning("minor failure. cannot send rejection mail " + email.subject)
|
||||
else:
|
||||
# save publishing datetime
|
||||
dao.publish_comment(comment)
|
||||
logger.info("commit comment: %d" % comment_id)
|
||||
|
||||
# rebuild RSS
|
||||
rss.generate()
|
||||
|
||||
# send approval confirmation email to admin
|
||||
new_email_body = templater.get_template(lang, Template.APPROVE_COMMENT).render(
|
||||
original=email.plain_text_content
|
||||
)
|
||||
if not mailer.send(email.from_addr, "Re: " + email.subject, new_email_body):
|
||||
logger.warning("minor failure. cannot send approval email " + email.subject)
|
||||
|
||||
|
||||
def submit_new_comment(lang, site_name, site_token, site_admin_email, mailer):
|
||||
def submit_new_comment(site_name, site_admin_email, mailer):
|
||||
for comment in dao.find_not_notified_comments():
|
||||
comment_list = (
|
||||
"author: %s" % comment.author_name,
|
||||
|
@ -98,13 +19,10 @@ def submit_new_comment(lang, site_name, site_token, site_admin_email, mailer):
|
|||
"%s" % comment.content,
|
||||
"",
|
||||
)
|
||||
comment_text = "\n".join(comment_list)
|
||||
email_body = templater.get_template(lang, Template.NEW_COMMENT).render(
|
||||
url=comment.url, comment=comment_text
|
||||
)
|
||||
email_body = "\n".join(comment_list)
|
||||
|
||||
# send email to notify admin
|
||||
subject = "STACOSYS %s: [%d:%s]" % (site_name, comment.id, site_token)
|
||||
subject = "STACOSYS %s" % site_name
|
||||
if mailer.send(site_admin_email, subject, email_body):
|
||||
logger.debug("new comment processed ")
|
||||
|
||||
|
|
|
@ -1,161 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
import base64
|
||||
import datetime
|
||||
import email
|
||||
import imaplib
|
||||
import logging
|
||||
import re
|
||||
from email.message import Message
|
||||
|
||||
from stacosys.model.email import Attachment, Email, Part
|
||||
|
||||
filename_re = re.compile('filename="(.+)"|filename=([^;\n\r"\']+)', re.I | re.S)
|
||||
|
||||
|
||||
class Mailbox(object):
|
||||
def __init__(self, host, port, ssl, login, password):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.ssl = ssl
|
||||
self.login = login
|
||||
self.password = password
|
||||
|
||||
def __enter__(self):
|
||||
if self.ssl:
|
||||
self.imap = imaplib.IMAP4_SSL(self.host, self.port)
|
||||
else:
|
||||
self.imap = imaplib.IMAP4(self.host, self.port)
|
||||
self.imap.login(self.login, self.password)
|
||||
return self
|
||||
|
||||
def __exit__(self, _type, value, traceback):
|
||||
self.imap.close()
|
||||
self.imap.logout()
|
||||
|
||||
def get_count(self):
|
||||
self.imap.select("Inbox")
|
||||
_, data = self.imap.search(None, "ALL")
|
||||
return sum(1 for _ in data[0].split())
|
||||
|
||||
def fetch_raw_message(self, num):
|
||||
self.imap.select("Inbox")
|
||||
_, data = self.imap.fetch(str(num), "(RFC822)")
|
||||
email_msg = email.message_from_bytes(data[0][1])
|
||||
return email_msg
|
||||
|
||||
def fetch_message(self, num):
|
||||
raw_msg = self.fetch_raw_message(num)
|
||||
|
||||
parts = []
|
||||
attachments = []
|
||||
plain_text_content = "no plain-text part"
|
||||
for part in raw_msg.walk():
|
||||
if part.is_multipart():
|
||||
continue
|
||||
|
||||
if _is_part_attachment(part):
|
||||
attachments.append(_get_attachment(part))
|
||||
else:
|
||||
try:
|
||||
content = _to_plain_text_content(part)
|
||||
parts.append(
|
||||
Part(content=content, content_type=part.get_content_type())
|
||||
)
|
||||
if part.get_content_type() == "text/plain":
|
||||
plain_text_content = content
|
||||
except Exception:
|
||||
logging.exception("cannot extract content from mail part")
|
||||
|
||||
return Email(
|
||||
id=num,
|
||||
encoding="UTF-8",
|
||||
date=_parse_date(raw_msg["Date"]).strftime("%Y-%m-%d %H:%M:%S"),
|
||||
from_addr=raw_msg["From"],
|
||||
to_addr=raw_msg["To"],
|
||||
subject=_email_non_ascii_to_uft8(raw_msg["Subject"]),
|
||||
parts=parts,
|
||||
attachments=attachments,
|
||||
plain_text_content=plain_text_content,
|
||||
)
|
||||
|
||||
def delete_message(self, num):
|
||||
self.imap.select("Inbox")
|
||||
self.imap.store(str(num), "+FLAGS", r"\Deleted")
|
||||
self.imap.expunge()
|
||||
|
||||
def delete_all(self):
|
||||
self.imap.select("Inbox")
|
||||
_, data = self.imap.search(None, "ALL")
|
||||
for num in data[0].split():
|
||||
self.imap.store(num, "+FLAGS", r"\Deleted")
|
||||
self.imap.expunge()
|
||||
|
||||
def print_msgs(self):
|
||||
self.imap.select("Inbox")
|
||||
_, data = self.imap.search(None, "ALL")
|
||||
for num in reversed(data[0].split()):
|
||||
status, data = self.imap.fetch(num, "(RFC822)")
|
||||
self.logger.debug("Message %s\n%s\n" % (num, data[0][1]))
|
||||
|
||||
|
||||
def _parse_date(v):
|
||||
if v is None:
|
||||
return datetime.datetime.now()
|
||||
tt = email.utils.parsedate_tz(v)
|
||||
if tt is None:
|
||||
return datetime.datetime.now()
|
||||
timestamp = email.utils.mktime_tz(tt)
|
||||
date = datetime.datetime.fromtimestamp(timestamp)
|
||||
return date
|
||||
|
||||
|
||||
def _to_utf8(string, charset):
|
||||
return string.decode(charset).encode("UTF-8").decode("UTF-8")
|
||||
|
||||
|
||||
def _email_non_ascii_to_uft8(string):
|
||||
# RFC 1342 is a recommendation that provides a way to represent non ASCII
|
||||
# characters inside e-mail in a way that won’t confuse e-mail servers
|
||||
subject = ""
|
||||
for v, charset in email.header.decode_header(string):
|
||||
if charset is None or charset == 'unknown-8bit':
|
||||
if type(v) is bytes:
|
||||
v = v.decode()
|
||||
subject = subject + v
|
||||
else:
|
||||
subject = subject + _to_utf8(v, charset)
|
||||
return subject
|
||||
|
||||
|
||||
def _to_plain_text_content(part: Message) -> str:
|
||||
content = part.get_payload(decode=True)
|
||||
charset = part.get_param("charset", None)
|
||||
if charset:
|
||||
content = _to_utf8(content, charset)
|
||||
elif type(content) == bytes:
|
||||
content = content.decode("utf8")
|
||||
# RFC 3676: remove automatic word-wrapping
|
||||
return content.replace(" \r\n", " ")
|
||||
|
||||
|
||||
def _is_part_attachment(part):
|
||||
return part.get("Content-Disposition", None)
|
||||
|
||||
|
||||
def _get_attachment(part) -> Attachment:
|
||||
content_disposition = part.get("Content-Disposition", None)
|
||||
r = filename_re.findall(content_disposition)
|
||||
if r:
|
||||
filename = sorted(r[0])[1]
|
||||
else:
|
||||
filename = "undefined"
|
||||
content = base64.b64encode(part.get_payload(decode=True))
|
||||
content = content.decode()
|
||||
return Attachment(
|
||||
filename=_email_non_ascii_to_uft8(filename),
|
||||
content=content,
|
||||
content_type=part.get_content_type(),
|
||||
)
|
|
@ -8,19 +8,12 @@ from email.mime.text import MIMEText
|
|||
from email.message import EmailMessage
|
||||
from logging.handlers import SMTPHandler
|
||||
|
||||
from stacosys.core import imap
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Mailer:
|
||||
def __init__(
|
||||
self,
|
||||
imap_host,
|
||||
imap_port,
|
||||
imap_ssl,
|
||||
imap_login,
|
||||
imap_password,
|
||||
smtp_host,
|
||||
smtp_port,
|
||||
smtp_starttls,
|
||||
|
@ -29,11 +22,6 @@ class Mailer:
|
|||
smtp_password,
|
||||
site_admin_email,
|
||||
):
|
||||
self._imap_host = imap_host
|
||||
self._imap_port = imap_port
|
||||
self._imap_ssl = imap_ssl
|
||||
self._imap_login = imap_login
|
||||
self._imap_password = imap_password
|
||||
self._smtp_host = smtp_host
|
||||
self._smtp_port = smtp_port
|
||||
self._smtp_starttls = smtp_starttls
|
||||
|
@ -42,26 +30,6 @@ class Mailer:
|
|||
self._smtp_password = smtp_password
|
||||
self._site_admin_email = site_admin_email
|
||||
|
||||
def _open_mailbox(self):
|
||||
return imap.Mailbox(
|
||||
self._imap_host,
|
||||
self._imap_port,
|
||||
self._imap_ssl,
|
||||
self._imap_login,
|
||||
self._imap_password,
|
||||
)
|
||||
|
||||
def fetch(self):
|
||||
msgs = []
|
||||
try:
|
||||
with self._open_mailbox() as mbox:
|
||||
count = mbox.get_count()
|
||||
for num in range(count):
|
||||
msgs.append(mbox.fetch_message(num + 1))
|
||||
except Exception:
|
||||
logger.exception("fetch mail exception")
|
||||
return msgs
|
||||
|
||||
def send(self, to_email, subject, message):
|
||||
|
||||
# Create the container (outer) email message.
|
||||
|
@ -87,13 +55,6 @@ class Mailer:
|
|||
success = False
|
||||
return success
|
||||
|
||||
def delete(self, id):
|
||||
try:
|
||||
with self._open_mailbox() as mbox:
|
||||
mbox.delete_message(id)
|
||||
except Exception:
|
||||
logger.exception("delete mail exception")
|
||||
|
||||
def get_error_handler(self):
|
||||
if self._smtp_ssl:
|
||||
mail_handler = SSLSMTPHandler(
|
||||
|
|
|
@ -1,52 +1,44 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
import markdown
|
||||
import PyRSS2Gen
|
||||
import markdown
|
||||
|
||||
from stacosys.core.templater import Templater, Template
|
||||
from stacosys.model.comment import Comment
|
||||
|
||||
|
||||
class Rss:
|
||||
def __init__(
|
||||
self,
|
||||
lang,
|
||||
rss_file,
|
||||
rss_proto,
|
||||
site_name,
|
||||
site_url,
|
||||
self,
|
||||
lang,
|
||||
rss_file,
|
||||
rss_proto,
|
||||
site_name,
|
||||
site_url,
|
||||
):
|
||||
self._lang = lang
|
||||
self._rss_file = rss_file
|
||||
self._rss_proto = rss_proto
|
||||
self._site_name = site_name
|
||||
self._site_url = site_url
|
||||
current_path = os.path.dirname(__file__)
|
||||
template_path = os.path.abspath(os.path.join(current_path, "../templates"))
|
||||
self._templater = Templater(template_path)
|
||||
|
||||
def generate(self):
|
||||
rss_title = self._templater.get_template(
|
||||
self._lang, Template.RSS_TITLE_MESSAGE
|
||||
).render(site=self._site_name)
|
||||
md = markdown.Markdown()
|
||||
|
||||
items = []
|
||||
for row in (
|
||||
Comment.select()
|
||||
.where(Comment.published)
|
||||
.order_by(-Comment.published)
|
||||
.limit(10)
|
||||
Comment.select()
|
||||
.where(Comment.published)
|
||||
.order_by(-Comment.published)
|
||||
.limit(10)
|
||||
):
|
||||
item_link = "%s://%s%s" % (self._rss_proto, self._site_url, row.url)
|
||||
items.append(
|
||||
PyRSS2Gen.RSSItem(
|
||||
title="%s - %s://%s%s"
|
||||
% (self._rss_proto, row.author_name, self._site_url, row.url),
|
||||
% (self._rss_proto, row.author_name, self._site_url, row.url),
|
||||
link=item_link,
|
||||
description=md.convert(row.content),
|
||||
guid=PyRSS2Gen.Guid("%s/%d" % (item_link, row.id)),
|
||||
|
@ -54,10 +46,11 @@ class Rss:
|
|||
)
|
||||
)
|
||||
|
||||
rss_title = 'Commentaires du site "%s"' % self._site_name
|
||||
rss = PyRSS2Gen.RSS2(
|
||||
title=rss_title,
|
||||
link="%s://%s" % (self._rss_proto, self._site_url),
|
||||
description='Commentaires du site "%s"' % self._site_name,
|
||||
description=rss_title,
|
||||
lastBuildDate=datetime.now(),
|
||||
items=items,
|
||||
)
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from enum import Enum
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
|
||||
class Template(Enum):
|
||||
DROP_COMMENT = "drop_comment"
|
||||
APPROVE_COMMENT = "approve_comment"
|
||||
NEW_COMMENT = "new_comment"
|
||||
NOTIFY_MESSAGE = "notify_message"
|
||||
RSS_TITLE_MESSAGE = "rss_title_message"
|
||||
WEB_COMMENT_APPROVAL = "web_comment_approval"
|
||||
|
||||
|
||||
class Templater:
|
||||
def __init__(self, template_path):
|
||||
self._env = Environment(loader=FileSystemLoader(template_path))
|
||||
|
||||
def get_template(self, lang, template: Template):
|
||||
return self._env.get_template(lang + "/" + template.value + ".tpl")
|
||||
|
|
@ -9,31 +9,20 @@ class JobConfig(object):
|
|||
|
||||
JOBS: list = []
|
||||
|
||||
SCHEDULER_EXECUTORS = {"default": {"type": "threadpool", "max_workers": 4}}
|
||||
SCHEDULER_EXECUTORS = {"default": {"type": "threadpool", "max_workers": 1}}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
imap_polling_seconds,
|
||||
new_comment_polling_seconds,
|
||||
lang,
|
||||
site_name,
|
||||
site_token,
|
||||
site_admin_email,
|
||||
mailer,
|
||||
rss,
|
||||
):
|
||||
self.JOBS = [
|
||||
{
|
||||
"id": "fetch_mail",
|
||||
"func": "stacosys.core.cron:fetch_mail_answers",
|
||||
"args": [lang, mailer, rss, site_token],
|
||||
"trigger": "interval",
|
||||
"seconds": imap_polling_seconds,
|
||||
},
|
||||
{
|
||||
"id": "submit_new_comment",
|
||||
"func": "stacosys.core.cron:submit_new_comment",
|
||||
"args": [lang, site_name, site_token, site_admin_email, mailer],
|
||||
"args": [site_name, site_admin_email, mailer],
|
||||
"trigger": "interval",
|
||||
"seconds": new_comment_polling_seconds,
|
||||
},
|
||||
|
@ -41,25 +30,17 @@ class JobConfig(object):
|
|||
|
||||
|
||||
def configure(
|
||||
imap_polling,
|
||||
comment_polling,
|
||||
lang,
|
||||
site_name,
|
||||
site_token,
|
||||
site_admin_email,
|
||||
mailer,
|
||||
rss,
|
||||
):
|
||||
app.config.from_object(
|
||||
JobConfig(
|
||||
imap_polling,
|
||||
comment_polling,
|
||||
lang,
|
||||
site_name,
|
||||
site_token,
|
||||
site_admin_email,
|
||||
mailer,
|
||||
rss,
|
||||
)
|
||||
)
|
||||
scheduler = APScheduler()
|
||||
|
|
|
@ -1,9 +0,0 @@
|
|||
Hi,
|
||||
|
||||
The comment should be published soon. It has been approved.
|
||||
|
||||
--
|
||||
Stacosys
|
||||
|
||||
|
||||
{{ original }}
|
|
@ -1,9 +0,0 @@
|
|||
Hi,
|
||||
|
||||
The comment will not be published. It has been dropped.
|
||||
|
||||
--
|
||||
Stacosys
|
||||
|
||||
|
||||
{{ original }}
|
|
@ -1,16 +0,0 @@
|
|||
Hi,
|
||||
|
||||
A new comment has been submitted for post {{ url }}
|
||||
|
||||
You have two choices:
|
||||
- reject the comment by replying NO (or no),
|
||||
- accept the comment by sending back the email as it is.
|
||||
|
||||
If you choose the latter option, Stacosys is going to publish the commennt.
|
||||
|
||||
Please find comment details below:
|
||||
|
||||
{{ comment }}
|
||||
|
||||
--
|
||||
Stacosys
|
|
@ -1 +0,0 @@
|
|||
New comment
|
|
@ -1 +0,0 @@
|
|||
{{ site }} : comments
|
|
@ -1,9 +0,0 @@
|
|||
Bonjour,
|
||||
|
||||
Le commentaire sera bientôt publié. Il a été approuvé.
|
||||
|
||||
--
|
||||
Stacosys
|
||||
|
||||
|
||||
{{ original }}
|
|
@ -1,9 +0,0 @@
|
|||
Bonjour,
|
||||
|
||||
Le commentaire ne sera pas publié. Il a été rejeté.
|
||||
|
||||
--
|
||||
Stacosys
|
||||
|
||||
|
||||
{{ original }}
|
|
@ -1,16 +0,0 @@
|
|||
Bonjour,
|
||||
|
||||
Un nouveau commentaire a été posté pour l'article {{ url }}
|
||||
|
||||
Vous avez deux réponses possibles :
|
||||
- rejeter le commentaire en répondant NO (ou no),
|
||||
- accepter le commentaire en renvoyant cet email tel quel.
|
||||
|
||||
Si cette dernière option est choisie, Stacosys publiera le commentaire très bientôt.
|
||||
|
||||
Voici les détails concernant le commentaire :
|
||||
|
||||
{{ comment }}
|
||||
|
||||
--
|
||||
Stacosys
|
|
@ -1 +0,0 @@
|
|||
Nouveau commentaire
|
|
@ -1 +0,0 @@
|
|||
{{ site }} : commentaires
|
|
@ -7,8 +7,7 @@ from stacosys.conf.config import Config, ConfigParameter
|
|||
|
||||
EXPECTED_DB_SQLITE_FILE = "db.sqlite"
|
||||
EXPECTED_HTTP_PORT = 8080
|
||||
EXPECTED_IMAP_PORT = "5000"
|
||||
EXPECTED_IMAP_LOGIN = "user"
|
||||
EXPECTED_LANG = "fr"
|
||||
|
||||
|
||||
class ConfigTestCase(unittest.TestCase):
|
||||
|
@ -17,24 +16,18 @@ class ConfigTestCase(unittest.TestCase):
|
|||
self.conf = Config()
|
||||
self.conf.put(ConfigParameter.DB_SQLITE_FILE, EXPECTED_DB_SQLITE_FILE)
|
||||
self.conf.put(ConfigParameter.HTTP_PORT, EXPECTED_HTTP_PORT)
|
||||
self.conf.put(ConfigParameter.IMAP_PORT, EXPECTED_IMAP_PORT)
|
||||
self.conf.put(ConfigParameter.SMTP_STARTTLS, "yes")
|
||||
self.conf.put(ConfigParameter.IMAP_SSL, "false")
|
||||
|
||||
def test_exists(self):
|
||||
self.assertTrue(self.conf.exists(ConfigParameter.DB_SQLITE_FILE))
|
||||
self.assertFalse(self.conf.exists(ConfigParameter.IMAP_HOST))
|
||||
|
||||
def test_get(self):
|
||||
self.assertEqual(self.conf.get(ConfigParameter.DB_SQLITE_FILE), EXPECTED_DB_SQLITE_FILE)
|
||||
self.assertEqual(self.conf.get(ConfigParameter.HTTP_PORT), EXPECTED_HTTP_PORT)
|
||||
self.assertIsNone(self.conf.get(ConfigParameter.HTTP_HOST))
|
||||
self.assertEqual(self.conf.get(ConfigParameter.HTTP_PORT), EXPECTED_HTTP_PORT)
|
||||
self.assertEqual(self.conf.get(ConfigParameter.IMAP_PORT), EXPECTED_IMAP_PORT)
|
||||
self.assertEqual(self.conf.get_int(ConfigParameter.IMAP_PORT), int(EXPECTED_IMAP_PORT))
|
||||
self.assertEqual(self.conf.get_int(ConfigParameter.HTTP_PORT), 8080)
|
||||
self.assertTrue(self.conf.get_bool(ConfigParameter.SMTP_STARTTLS))
|
||||
self.assertFalse(self.conf.get_bool(ConfigParameter.IMAP_SSL))
|
||||
try:
|
||||
self.conf.get_bool(ConfigParameter.DB_SQLITE_FILE)
|
||||
self.assertTrue(False)
|
||||
|
@ -42,7 +35,7 @@ class ConfigTestCase(unittest.TestCase):
|
|||
pass
|
||||
|
||||
def test_put(self):
|
||||
self.assertFalse(self.conf.exists(ConfigParameter.IMAP_LOGIN))
|
||||
self.conf.put(ConfigParameter.IMAP_LOGIN, EXPECTED_IMAP_LOGIN)
|
||||
self.assertTrue(self.conf.exists(ConfigParameter.IMAP_LOGIN))
|
||||
self.assertEqual(self.conf.get(ConfigParameter.IMAP_LOGIN), EXPECTED_IMAP_LOGIN)
|
||||
self.assertFalse(self.conf.exists(ConfigParameter.LANG))
|
||||
self.conf.put(ConfigParameter.LANG, EXPECTED_LANG)
|
||||
self.assertTrue(self.conf.exists(ConfigParameter.LANG))
|
||||
self.assertEqual(self.conf.get(ConfigParameter.LANG), EXPECTED_LANG)
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
import datetime
|
||||
import unittest
|
||||
from email.header import Header
|
||||
from email.message import Message
|
||||
|
||||
from stacosys.core import imap
|
||||
|
||||
|
||||
class ImapTestCase(unittest.TestCase):
|
||||
|
||||
def test_utf8_decode(self):
|
||||
h = Header(s="Chez Darty vous avez re\udcc3\udca7u un nouvel aspirateur Vacuum gratuit jl8nz",
|
||||
charset="unknown-8bit")
|
||||
decoded = imap._email_non_ascii_to_uft8(h)
|
||||
self.assertEqual(decoded, "Chez Darty vous avez reçu un nouvel aspirateur Vacuum gratuit jl8nz")
|
||||
|
||||
def test_parse_date(self):
|
||||
now = datetime.datetime.now()
|
||||
self.assertGreaterEqual(imap._parse_date(None), now)
|
||||
parsed = imap._parse_date("Wed, 8 Dec 2021 20:05:20 +0100")
|
||||
self.assertEqual(parsed.day, 8)
|
||||
self.assertEqual(parsed.month, 12)
|
||||
self.assertEqual(parsed.year, 2021)
|
||||
# do not compare hours. don't care about timezone
|
||||
|
||||
def test_to_plain_text_content(self):
|
||||
msg = Message()
|
||||
payload = b"non\r\n\r\nLe 08/12/2021 \xc3\xa0 20:04, kianby@free.fr a \xc3\xa9crit\xc2\xa0:\r\n> Bonjour,\r\n>\r\n> Un nouveau commentaire a \xc3\xa9t\xc3\xa9 post\xc3\xa9 pour l'article /2021/rester-discret-sur-github//\r\n>\r\n> Vous avez deux r\xc3\xa9ponses possibles :\r\n> - rejeter le commentaire en r\xc3\xa9pondant NO (ou no),\r\n> - accepter le commentaire en renvoyant cet email tel quel.\r\n>\r\n> Si cette derni\xc3\xa8re option est choisie, Stacosys publiera le commentaire tr\xc3\xa8s bient\xc3\xb4t.\r\n>\r\n> Voici les d\xc3\xa9tails concernant le commentaire :\r\n>\r\n> author: ET Rate\r\n> site:\r\n> date: 2021-12-08 20:03:58\r\n> url: /2021/rester-discret-sur-github//\r\n>\r\n> gfdgdgf\r\n>\r\n>\r\n> --\r\n> Stacosys\r\n"
|
||||
msg.set_payload(payload, "UTF-8")
|
||||
self.assertTrue(imap._to_plain_text_content(msg))
|
|
@ -1,52 +0,0 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from stacosys.core.templater import Templater, Template
|
||||
|
||||
|
||||
class TemplateTestCase(unittest.TestCase):
|
||||
|
||||
def get_template_content(self, lang, template_name, **kwargs):
|
||||
current_path = os.path.dirname(__file__)
|
||||
template_path = os.path.abspath(os.path.join(current_path, "../stacosys/templates"))
|
||||
template = Templater(template_path).get_template(lang, template_name)
|
||||
return template.render(kwargs)
|
||||
|
||||
def test_approve_comment(self):
|
||||
content = self.get_template_content("fr", Template.APPROVE_COMMENT, original="[texte]")
|
||||
self.assertTrue(content.startswith("Bonjour,\n\nLe commentaire sera bientôt publié."))
|
||||
self.assertTrue(content.endswith("[texte]"))
|
||||
content = self.get_template_content("en", Template.APPROVE_COMMENT, original="[texte]")
|
||||
self.assertTrue(content.startswith("Hi,\n\nThe comment should be published soon."))
|
||||
self.assertTrue(content.endswith("[texte]"))
|
||||
|
||||
def test_drop_comment(self):
|
||||
content = self.get_template_content("fr", Template.DROP_COMMENT, original="[texte]")
|
||||
self.assertTrue(content.startswith("Bonjour,\n\nLe commentaire ne sera pas publié."))
|
||||
self.assertTrue(content.endswith("[texte]"))
|
||||
content = self.get_template_content("en", Template.DROP_COMMENT, original="[texte]")
|
||||
self.assertTrue(content.startswith("Hi,\n\nThe comment will not be published."))
|
||||
self.assertTrue(content.endswith("[texte]"))
|
||||
|
||||
def test_new_comment(self):
|
||||
content = self.get_template_content("fr", Template.NEW_COMMENT, comment="[comment]")
|
||||
self.assertTrue(content.startswith("Bonjour,\n\nUn nouveau commentaire a été posté"))
|
||||
self.assertTrue(content.endswith("[comment]\n\n--\nStacosys"))
|
||||
content = self.get_template_content("en", Template.NEW_COMMENT, comment="[comment]")
|
||||
self.assertTrue(content.startswith("Hi,\n\nA new comment has been submitted"))
|
||||
self.assertTrue(content.endswith("[comment]\n\n--\nStacosys"))
|
||||
|
||||
def test_notify_message(self):
|
||||
content = self.get_template_content("fr", Template.NOTIFY_MESSAGE)
|
||||
self.assertEqual("Nouveau commentaire", content)
|
||||
content = self.get_template_content("en", Template.NOTIFY_MESSAGE)
|
||||
self.assertEqual("New comment", content)
|
||||
|
||||
def test_rss_title(self):
|
||||
content = self.get_template_content("fr", Template.RSS_TITLE_MESSAGE, site="[site]")
|
||||
self.assertEqual("[site] : commentaires", content)
|
||||
content = self.get_template_content("en", Template.RSS_TITLE_MESSAGE, site="[site]")
|
||||
self.assertEqual("[site] : comments", content)
|
Loading…
Add table
Reference in a new issue