Source code for shuup.core.tasks

# -*- coding: utf-8 -*-
# This file is part of Shuup.
#
# Copyright (c) 2012-2021, Shuup Commerce Inc. All rights reserved.
#
# This source code is licensed under the OSL-3.0 license found in the
# LICENSE file in the root directory of this source tree.
import inspect
import json
import logging
from datetime import datetime
from django.utils.translation import ugettext_lazy as _
from typing import Any, Optional, Tuple, Union
from uuid import uuid4

from shuup.core.models import BackgroundTask, BackgroundTaskExecution, BackgroundTaskExecutionStatus
from shuup.utils.importing import cached_load, load

LOGGER = logging.getLogger(__name__)


[docs]class TaskNotSerializableError(Exception): """ Raised when the task can't be serialized. """ pass
[docs]class TaskResult: result = None # type: str error_log = None # type: str def __init__(self, result=None, error_log=None): if result: try: json.dumps(result) except TypeError: raise TaskNotSerializableError("Task result is not serializable as JSON.") self.result = result self.error_log = error_log
[docs]class Task: function = "" # str identifier = "" # str stored = False # bool queue = "default" # str kwargs = None # Optional[Dict[str, Any]] def __init__(self, function, identifier=None, stored=False, queue="default", **kwargs): """ :param function: A string that represents the function specification. It will be locaded dynamically and executed passing the given kwargs. E.g.: `myapp.my_lib.do_domething` The function can optionally return a `TaskResult` object which the result of the execution. It will be used to store the information in the database if the task is stored. :param kwargs: Set of parameter to pass to the `function`. The parameters must be JSON serializable to support multiple task runners implementations. :type function: str """ if not identifier: identifier = f"{queue}_{function}_{uuid4().hex}" assert isinstance(function, str) try: json.dumps(kwargs) except TypeError: raise TaskNotSerializableError("Task kwargs is not serializable as JSON.") self.function = function self.identifier = identifier self.queue = queue self.stored = stored self.kwargs = kwargs
[docs]class TaskRunner:
[docs] def create_task(self, function, stored=False, queue="default", **kwargs) -> Task: """ Create a task to run. :type function: str """ raise NotImplementedError()
[docs] def run_task(self, task) -> Optional[TaskResult]: """ Run the given task. :type task: Task """ raise NotImplementedError()
[docs]class DefaultTaskRunner(TaskRunner): """ The default implementation of a task runner. This task runner will execute the tasks received synchronously. """
[docs] def create_task(self, function, stored=False, queue="default", task_identifier=None, **kwargs) -> Task: task_identifier = task_identifier or f"{queue}_{uuid4().hex}" if stored: background_data = dict(queue=queue, identifier=task_identifier, function=function, arguments=kwargs) if kwargs.get("shop_id"): background_data["shop_id"] = kwargs["shop_id"] if kwargs.get("supplier_id"): background_data["supplier_id"] = kwargs["supplier_id"] if kwargs.get("user_id"): background_data["user_id"] = kwargs["user_id"] BackgroundTask.objects.create(**background_data) return Task(function, task_identifier, stored, queue, **kwargs)
[docs] def run_task(self, task) -> Optional[TaskResult]: task_identifier = task.identifier background_task_execution = None background_task = BackgroundTask.objects.filter(identifier=task_identifier).first() if background_task: background_task_execution = BackgroundTaskExecution.objects.create(task=background_task) function = load(task.function) task_result = None status = BackgroundTaskExecutionStatus.RUNNING try: arguments = task.kwargs # inject the _task_id into the kwargs arguments["_task_id"] = task_identifier # get the list or args of the function args_spec = inspect.getfullargspec(function) # go through all kwargs and check if they can be sent to the function # and remove those that can't be passed forward for arg in list(arguments.keys()): if arg not in args_spec.args and arg not in args_spec.kwonlyargs: arguments.pop(arg) task_result = function(**arguments) # type: Union[TaskResult, Any] if isinstance(task_result, TaskResult) and task_result.error_log: status = BackgroundTaskExecutionStatus.ERROR else: status = BackgroundTaskExecutionStatus.SUCCESS except Exception: LOGGER.exception(_("Failed to execute the task")) task_result = TaskResult(error_log=_("An unexpeted error occurred.")) status = BackgroundTaskExecutionStatus.ERROR if background_task_execution: result = None error = None if isinstance(task_result, TaskResult): result = task_result.result error = task_result.error_log elif task_result: result = str(task_result) background_task_execution.finished_on = datetime.now() background_task_execution.status = status background_task_execution.result = result background_task_execution.error_log = error background_task_execution.save() return task_result
[docs]def get_task_runner() -> TaskRunner: """ Returns the task runner configured in settings. :rtype: TaskRunner """ return cached_load("SHUUP_TASK_RUNNER")()
[docs]def run_task(function, **kwargs) -> Tuple[Task, Any]: """ Runs a function passing the given kwargs using the task runner configured in settings. Returns a tuple with the task and the result of the task execution :type function: str """ task_runner = get_task_runner() task = task_runner.create_task(function, **kwargs) return task, task_runner.run_task(task)