add type checking

This commit is contained in:
Yax 2020-03-31 19:53:59 +02:00
parent f1c5d83495
commit e95c6264a0
3 changed files with 71 additions and 69 deletions

View file

@ -8,6 +8,8 @@ import imaplib
import logging import logging
import re import re
from model.email import Attachment, Email, Part
filename_re = re.compile('filename="(.+)"|filename=([^;\n\r"\']+)', re.I | re.S) filename_re = re.compile('filename="(.+)"|filename=([^;\n\r"\']+)', re.I | re.S)
@ -33,90 +35,93 @@ class Mailbox(object):
self.imap.logout() self.imap.logout()
def get_count(self): def get_count(self):
self.imap.select('Inbox') self.imap.select("Inbox")
_, data = self.imap.search(None, 'ALL') _, data = self.imap.search(None, "ALL")
return sum(1 for num in data[0].split()) return sum(1 for num in data[0].split())
def fetch_raw_message(self, num): def fetch_raw_message(self, num):
self.imap.select('Inbox') self.imap.select("Inbox")
_, data = self.imap.fetch(str(num), '(RFC822)') _, data = self.imap.fetch(str(num), "(RFC822)")
email_msg = email.message_from_bytes(data[0][1]) email_msg = email.message_from_bytes(data[0][1])
return email_msg return email_msg
def fetch_message(self, num): def fetch_message(self, num):
raw_msg = self.fetch_raw_message(num) raw_msg = self.fetch_raw_message(num)
msg = {}
msg['encoding'] = 'UTF-8' msg = Email(
msg['index'] = num id=num,
dt = parse_date(raw_msg['Date']).strftime('%Y-%m-%d %H:%M:%S') encoding="UTF-8",
msg['datetime'] = dt date=parse_date(raw_msg["Date"]).strftime("%Y-%m-%d %H:%M:%S"),
msg['from'] = raw_msg['From'] from_addr=raw_msg["From"],
msg['to'] = raw_msg['To'] to_addr=raw_msg["To"],
subject = email_nonascii_to_uft8(raw_msg['Subject']) subject=email_nonascii_to_uft8(raw_msg["Subject"]),
msg['subject'] = subject )
parts = [] parts = []
attachments = [] attachments = []
for part in raw_msg.walk(): for part in raw_msg.walk():
if part.is_multipart(): if part.is_multipart():
continue continue
content_disposition = part.get('Content-Disposition', None) content_disposition = part.get("Content-Disposition", None)
if content_disposition: if content_disposition:
# we have attachment # we have attachment
r = filename_re.findall(content_disposition) r = filename_re.findall(content_disposition)
if r: if r:
filename = sorted(r[0])[1] filename = sorted(r[0])[1]
else: else:
filename = 'undefined' filename = "undefined"
content = base64.b64encode(part.get_payload(decode=True)) content = base64.b64encode(part.get_payload(decode=True))
content = content.decode() content = content.decode()
a = { attachments.append(
'filename': email_nonascii_to_uft8(filename), Attachment(
'content': content, filename=email_nonascii_to_uft8(filename),
'content-type': part.get_content_type(), content=content,
} content_type=part.get_content_type(),
attachments.append(a) )
)
else: else:
part_item = {} part_item = {}
content = part.get_payload(decode=True) content = part.get_payload(decode=True)
content_type = part.get_content_type()
try: try:
charset = part.get_param('charset', None) charset = part.get_param("charset", None)
if charset: if charset:
content = to_utf8(content, charset) content = to_utf8(content, charset)
elif type(content) == bytes: elif type(content) == bytes:
content = content.decode('utf8') content = content.decode("utf8")
except: except:
self.logger.exception() self.logger.exception()
# RFC 3676: remove automatic word-wrapping # RFC 3676: remove automatic word-wrapping
content = content.replace(' \r\n', ' ') content = content.replace(" \r\n", " ")
part_item['content'] = content
part_item['content-type'] = content_type parts.append(
parts.append(part_item) Part(content=content, content_type=part.get_content_type())
if parts: )
msg['parts'] = parts
if attachments: if part.get_content_type() == "text/plain":
msg['attachments'] = attachments msg.plain_text_content = content
msg.parts = parts
msg.attachments = attachments
return msg return msg
def delete_message(self, num): def delete_message(self, num):
self.imap.select('Inbox') self.imap.select("Inbox")
self.imap.store(str(num), '+FLAGS', r'\Deleted') self.imap.store(str(num), "+FLAGS", r"\Deleted")
self.imap.expunge() self.imap.expunge()
def delete_all(self): def delete_all(self):
self.imap.select('Inbox') self.imap.select("Inbox")
_, data = self.imap.search(None, 'ALL') _, data = self.imap.search(None, "ALL")
for num in data[0].split(): for num in data[0].split():
self.imap.store(num, '+FLAGS', r'\Deleted') self.imap.store(num, "+FLAGS", r"\Deleted")
self.imap.expunge() self.imap.expunge()
def print_msgs(self): def print_msgs(self):
self.imap.select('Inbox') self.imap.select("Inbox")
_, data = self.imap.search(None, 'ALL') _, data = self.imap.search(None, "ALL")
for num in reversed(data[0].split()): for num in reversed(data[0].split()):
status, data = self.imap.fetch(num, '(RFC822)') status, data = self.imap.fetch(num, "(RFC822)")
self.logger.debug('Message %s\n%s\n' % (num, data[0][1])) self.logger.debug("Message %s\n%s\n" % (num, data[0][1]))
def parse_date(v): def parse_date(v):
@ -134,14 +139,14 @@ def parse_date(v):
def to_utf8(string, charset): def to_utf8(string, charset):
return string.decode(charset).encode('UTF-8').decode('UTF-8') return string.decode(charset).encode("UTF-8").decode("UTF-8")
def email_nonascii_to_uft8(string): def email_nonascii_to_uft8(string):
# RFC 1342 is a recommendation that provides a way to represent non ASCII # RFC 1342 is a recommendation that provides a way to represent non ASCII
# characters inside e-mail in a way that wont confuse e-mail servers # characters inside e-mail in a way that wont confuse e-mail servers
subject = '' subject = ""
for v, charset in email.header.decode_header(string): for v, charset in email.header.decode_header(string):
if charset is None: if charset is None:
if type(v) is bytes: if type(v) is bytes:

View file

@ -25,33 +25,15 @@ def _open_mailbox():
) )
def _to_dto(msg):
content = 'no plain-text part found in email'
for part in msg['parts']:
if part['content-type'] == 'text/plain':
content = part['content']
break
return Email(
id=msg['index'],
encoding=msg['encoding'],
date=msg['datetime'],
from_addr=msg['from'],
to_addr=msg['to'],
subject=msg['subject'],
content=content,
)
def fetch(): def fetch():
msgs = [] msgs = []
try: try:
with _open_mailbox() as mbox: with _open_mailbox() as mbox:
count = mbox.get_count() count = mbox.get_count()
for num in range(count): for num in range(count):
msg = _to_dto(mbox.fetch_message(num + 1)) msgs.append(mbox.fetch_message(num + 1))
msgs.append(msg)
except: except:
logger.exception('fetch mail exception') logger.exception("fetch mail exception")
return msgs return msgs
@ -59,9 +41,9 @@ def send(to_email, subject, message):
# Create the container (outer) email message. # Create the container (outer) email message.
msg = MIMEText(message) msg = MIMEText(message)
msg['Subject'] = subject msg["Subject"] = subject
msg['To'] = to_email msg["To"] = to_email
msg['From'] = config.get(config.SMTP_LOGIN) msg["From"] = config.get(config.SMTP_LOGIN)
success = True success = True
try: try:
@ -72,7 +54,7 @@ def send(to_email, subject, message):
s.send_message(msg) s.send_message(msg)
s.quit() s.quit()
except: except:
logger.exception('send mail exception') logger.exception("send mail exception")
success = False success = False
return success return success
@ -82,4 +64,4 @@ def delete(id):
with _open_mailbox() as mbox: with _open_mailbox() as mbox:
mbox.delete_message(id) mbox.delete_message(id)
except: except:
logger.exception('delete mail exception') logger.exception("delete mail exception")

View file

@ -2,8 +2,21 @@
# -*- coding: UTF-8 -*- # -*- coding: UTF-8 -*-
from typing import NamedTuple from typing import NamedTuple
from typing import List
from datetime import datetime from datetime import datetime
class Part(NamedTuple):
content: str
content_type: str
class Attachment(NamedTuple):
filename: str
content: str
content_type: str
class Email(NamedTuple): class Email(NamedTuple):
id: int id: int
encoding: str encoding: str
@ -11,4 +24,6 @@ class Email(NamedTuple):
from_addr: str from_addr: str
to_addr: str to_addr: str
subject: str subject: str
content: str parts: List[Part]
attachments: List[Attachment]
plain_text_content: str = 'no plain-text part'