# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-module-docstring, missing-class-docstring

import sys
from hashlib import sha256
from importlib import import_module
from os import listdir, makedirs, remove, stat, utime
from os.path import abspath, basename, dirname, exists, join
from shutil import copyfile
from pkgutil import iter_modules
from logging import getLogger
from typing import List, Tuple

from searx import logger, settings


class Plugin:  # pylint: disable=too-few-public-methods
    """This class is currently never initialized and only used for type hinting."""

    id: str
    name: str
    description: str
    default_on: bool
    js_dependencies: Tuple[str]
    css_dependencies: Tuple[str]
    preference_section: str


logger = logger.getChild("plugins")

required_attrs = (
    # fmt: off
    ("name", str),
    ("description", str),
    ("default_on", bool)
    # fmt: on
)

optional_attrs = (
    # fmt: off
    ("js_dependencies", tuple),
    ("css_dependencies", tuple),
    ("preference_section", str),
    # fmt: on
)


def sha_sum(filename):
    with open(filename, "rb") as f:
        file_content_bytes = f.read()
        return sha256(file_content_bytes).hexdigest()


def sync_resource(base_path, resource_path, name, target_dir, plugin_dir):
    dep_path = join(base_path, resource_path)
    file_name = basename(dep_path)
    resource_path = join(target_dir, file_name)
    if not exists(resource_path) or sha_sum(dep_path) != sha_sum(resource_path):
        try:
            copyfile(dep_path, resource_path)
            # copy atime_ns and mtime_ns, so the weak ETags (generated by
            # the HTTP server) do not change
            dep_stat = stat(dep_path)
            utime(resource_path, ns=(dep_stat.st_atime_ns, dep_stat.st_mtime_ns))
        except IOError:
            logger.critical("failed to copy plugin resource {0} for plugin {1}".format(file_name, name))
            sys.exit(3)

    # returning with the web path of the resource
    return join("plugins/external_plugins", plugin_dir, file_name)


def prepare_package_resources(plugin, plugin_module_name):
    plugin_base_path = dirname(abspath(plugin.__file__))

    plugin_dir = plugin_module_name
    target_dir = join(settings["ui"]["static_path"], "plugins/external_plugins", plugin_dir)
    try:
        makedirs(target_dir, exist_ok=True)
    except IOError:
        logger.critical("failed to create resource directory {0} for plugin {1}".format(target_dir, plugin_module_name))
        sys.exit(3)

    resources = []

    if hasattr(plugin, "js_dependencies"):
        resources.extend(map(basename, plugin.js_dependencies))
        plugin.js_dependencies = [
            sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir)
            for x in plugin.js_dependencies
        ]

    if hasattr(plugin, "css_dependencies"):
        resources.extend(map(basename, plugin.css_dependencies))
        plugin.css_dependencies = [
            sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir)
            for x in plugin.css_dependencies
        ]

    for f in listdir(target_dir):
        if basename(f) not in resources:
            resource_path = join(target_dir, basename(f))
            try:
                remove(resource_path)
            except IOError:
                logger.critical(
                    "failed to remove unused resource file {0} for plugin {1}".format(resource_path, plugin_module_name)
                )
                sys.exit(3)


def load_plugin(plugin_module_name, external):
    # pylint: disable=too-many-branches
    try:
        plugin = import_module(plugin_module_name)
    except (
        SyntaxError,
        KeyboardInterrupt,
        SystemExit,
        SystemError,
        ImportError,
        RuntimeError,
    ) as e:
        logger.critical("%s: fatal exception", plugin_module_name, exc_info=e)
        sys.exit(3)
    except BaseException:
        logger.exception("%s: exception while loading, the plugin is disabled", plugin_module_name)
        return None

    # difference with searx: use module name instead of the user name
    plugin.id = plugin_module_name

    #
    plugin.logger = getLogger(plugin_module_name)

    for plugin_attr, plugin_attr_type in required_attrs:
        if not hasattr(plugin, plugin_attr):
            logger.critical('%s: missing attribute "%s", cannot load plugin', plugin, plugin_attr)
            sys.exit(3)
        attr = getattr(plugin, plugin_attr)
        if not isinstance(attr, plugin_attr_type):
            type_attr = str(type(attr))
            logger.critical(
                '{1}: attribute "{0}" is of type {2}, must be of type {3}, cannot load plugin'.format(
                    plugin, plugin_attr, type_attr, plugin_attr_type
                )
            )
            sys.exit(3)

    for plugin_attr, plugin_attr_type in optional_attrs:
        if not hasattr(plugin, plugin_attr) or not isinstance(getattr(plugin, plugin_attr), plugin_attr_type):
            setattr(plugin, plugin_attr, plugin_attr_type())

    if not hasattr(plugin, "preference_section"):
        plugin.preference_section = "general"

    # query plugin
    if plugin.preference_section == "query":
        for plugin_attr in ("query_keywords", "query_examples"):
            if not hasattr(plugin, plugin_attr):
                logger.critical('missing attribute "{0}", cannot load plugin: {1}'.format(plugin_attr, plugin))
                sys.exit(3)

    if settings.get("enabled_plugins"):
        # searx compatibility: plugin.name in settings['enabled_plugins']
        plugin.default_on = plugin.name in settings["enabled_plugins"] or plugin.id in settings["enabled_plugins"]

    # copy ressources if this is an external plugin
    if external:
        prepare_package_resources(plugin, plugin_module_name)

    logger.debug("%s: loaded", plugin_module_name)

    return plugin


def load_and_initialize_plugin(plugin_module_name, external, init_args):
    plugin = load_plugin(plugin_module_name, external)
    if plugin and hasattr(plugin, 'init'):
        try:
            return plugin if plugin.init(*init_args) else None
        except Exception:  # pylint: disable=broad-except
            plugin.logger.exception("Exception while calling init, the plugin is disabled")
            return None
    return plugin


class PluginStore:
    def __init__(self):
        self.plugins: List[Plugin] = []

    def __iter__(self):
        for plugin in self.plugins:
            yield plugin

    def register(self, plugin):
        self.plugins.append(plugin)

    def call(self, ordered_plugin_list, plugin_type, *args, **kwargs):
        ret = True
        for plugin in ordered_plugin_list:
            if hasattr(plugin, plugin_type):
                try:
                    ret = getattr(plugin, plugin_type)(*args, **kwargs)
                    if not ret:
                        break
                except Exception:  # pylint: disable=broad-except
                    plugin.logger.exception("Exception while calling %s", plugin_type)
        return ret


plugins = PluginStore()


def plugin_module_names():
    yield_plugins = set()

    # embedded plugins
    for module in iter_modules(path=[dirname(__file__)]):
        yield (__name__ + "." + module.name, False)
        yield_plugins.add(module.name)
    # external plugins
    for module_name in settings['plugins']:
        if module_name not in yield_plugins:
            yield (module_name, True)
            yield_plugins.add(module_name)


def initialize(app):
    for module_name, external in plugin_module_names():
        plugin = load_and_initialize_plugin(module_name, external, (app, settings))
        if plugin:
            plugins.register(plugin)