Source code for shuup.importer.utils.importer

# -*- coding: utf-8 -*-
# This file is part of Shuup.
#
# Copyright (c) 2012-2021, Shuup Commerce Inc. All rights reserved.
#
# This source code is licensed under the OSL-3.0 license found in the
# LICENSE file in the root directory of this source tree.
import os
from django.conf import settings
from django.utils.translation import ugettext_lazy as _
from enumfields import Enum

from shuup.admin.utils.permissions import has_permission
from shuup.apps.provides import get_provide_objects
from shuup.importer.exceptions import ImporterError
from shuup.importer.transforms import transform_file


[docs]class ImportMode(Enum): CREATE_UPDATE = "create,update" CREATE = "create" UPDATE = "update" class Labels: CREATE_UPDATE = _("Allow create and update") CREATE = _("Only create (no updates)") UPDATE = _("Only update existing (no new ones are created)")
[docs]def get_importer_choices(user=None): # filter the importers by the user if user: return [ (importer.identifier, importer.name) for importer in get_provide_objects("importers") if has_permission(user, importer.get_permission_identifier()) ] return [(i.identifier, i.name) for i in get_provide_objects("importers")]
[docs]def get_importer(identifier): for i in get_provide_objects("importers"): if i.identifier == identifier: return i return None
[docs]def get_import_file_path(filename): return os.path.join(settings.MEDIA_ROOT, "import_temp", os.path.basename(filename))
[docs]class FileImporter: def __init__( self, importer: str, import_mode: ImportMode, file_name: str, language: str = None, mapping={}, shop=None, supplier=None, user=None, **kwargs ): self.importer = None self.importer_cls = get_importer(importer) self.import_mode = import_mode self.file_name = file_name self.language = language self.shop = shop self.supplier = supplier self.mapping = mapping self.user = user
[docs] def prepare(self): self.data = self._transform_request_file() if self.data is None: raise ImporterError(_("The file doesn't contain data.")) context = self.importer_cls.get_importer_context( request=None, shop=self.shop, language=self.language, supplier=self.supplier, user=self.user ) self.importer = self.importer_cls(self.data, context) self.importer.process_data() # check if mapping was done if self.mapping: for field in self.importer.unmatched_fields: values = self.mapping.get(field) if len(values): self.importer.manually_match(field, values[0]) self.importer.do_remap() return True
[docs] def import_file(self): if self.data is None: raise ImporterError(_("The file doesn't contain data.")) try: self.importer.do_import(self.import_mode) except Exception: raise ImporterError(_("Failed to import data."))
def _transform_request_file(self): try: filename = get_import_file_path(self.file_name) if not os.path.isfile(filename): raise ImporterError(_("{file_name} is not a valid file.").format(file_name=self.file_name)) except Exception: raise ImporterError(_("The file is missing.")) try: mode = "xls" if filename.endswith("xlsx"): mode = "xlsx" if filename.endswith("csv"): mode = "csv" if self.importer_cls.custom_file_transformer: return self.importer_cls.transform_file(mode, filename) return transform_file(mode, filename) except (Exception, RuntimeError) as e: raise ImporterError(str(e))