Source code for shuup.default_importer.importers.product

# This file is part of Shuup.
#
# Copyright (c) 2012-2021, Shuup Commerce Inc. All rights reserved.
#
# This source code is licensed under the OSL-3.0 license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import unicode_literals

import decimal
import os
import six
from django.db.models import ForeignKey, ManyToManyField, Q
from django.utils.text import slugify
from django.utils.translation import ugettext_lazy as _

from shuup.admin.utils.permissions import has_permission
from shuup.core.models import (
    MediaFile,
    Product,
    ProductMedia,
    ProductMediaKind,
    ProductMode,
    ProductType,
    ProductVariationVariable,
    ProductVariationVariableValue,
    SalesUnit,
    ShopProduct,
    Supplier,
    SupplierModule,
    TaxClass,
)
from shuup.importer.importing import DataImporter, ImporterExampleFile, ImportMetaBase
from shuup.importer.utils import fold_mapping_name
from shuup.simple_supplier.module import SimpleSupplierModule
from shuup.utils.django_compat import force_text
from shuup.utils.djangoenv import has_installed
from shuup.utils.properties import PriceProperty


[docs]class ProductMetaBase(ImportMetaBase): aliases = { "type": ["product_type", "producttype"], "tax_class": ["tax_class_name"], "name": ["title"], "keywords": ["tags"], "qty": ["quantity", "stock_amount", "stock_qty", "stock_quantity", "qty"], "suppliers": ["supplier"], "primary_category": ["category", "main_category"], "categories": ["extra_categories"], "media": ["media", "images"], "image": ["image", "main_image"], "parent_sku": ["parent sku"], "variation_value_1": ["variation value 1"], "variation_value_2": ["variation value 2"], "variation_value_3": ["variation value 3"], "variation_value_4": ["variation value 4"], "manufacturer": ["mfgr"], } fields_to_skip = ["ignore", "shop_primary_image", "primary_image"] post_save_handlers = { "handle_variations": [ "parent_sku", "variation_value_1", "variation_value_2", "variation_value_3", "variation value 4", ], "handle_stocks": ["qty"], "handle_images": ["image", "media"], }
[docs] def handle_variations(self, fields, sess): # noqa (C901) row = {k.lower(): v for k, v in sess.row.items()} product = sess.instance parent_sku = None for parent_sku_field in self.aliases["parent_sku"]: parent_sku = row.get(parent_sku_field) if not parent_sku: # No parent -> skip return parent_product = Product.objects.filter( sku=parent_sku, variation_parent__isnull=True # prevent linking to another child ) if product.pk: parent_product = parent_product.exclude(pk=product.pk) parent_product = parent_product.first() if not parent_product: msg = _("Parent SKU set for the row, but couldn't find product to match with the given SKU.") sess.log_messages.append(msg) return variables = {} value_names = [] for field_key in ["variation_value_1", "variation_value_2", "variation_value_3", "variation_value 4"]: field_value = None if field_key not in self.aliases: continue for actual_field_key in self.aliases[field_key]: field_value = row.get(actual_field_key) if not (field_value and "/" in field_value): continue variable_name, value_name = field_value.split("/", 1) if not (variable_name and value_name): continue variable, variable_created = ProductVariationVariable.objects.update_or_create( product=parent_product, identifier=slugify(variable_name), defaults=dict(name=variable_name) ) value, value_created = ProductVariationVariableValue.objects.update_or_create( variable=variable, identifier=slugify(value_name), defaults=dict(value=value_name) ) value_names.append(value_name) variables[variable.identifier] = value.identifier if not variables.keys(): msg = _("Parent SKU set for the row, but no variation variables found.") sess.log_messages.append(msg) return try: # This is a variation children which can't have any ProductVariationVariables ProductVariationVariable.objects.filter(product=product).delete() if product.name == product.sku or product.name.lower().strip() == "x": product.name = " - ".join([parent_product.name] + value_names) # Variable linking does the save product.mode = ProductMode.VARIATION_CHILD product.link_to_parent(parent_product, variables) except Exception as e: sess.log_messages.append(str(e))
def _handle_image(self, shop, product, image_source, is_primary=False): product_media = None img_path, img_name = os.path.split(image_source) # fetch all images that are candidates using the file name images_candidate = list( MediaFile.objects.filter( Q(shops=shop), Q(file__original_filename__iexact=img_name) | Q(file__name__iexact=img_name) ).distinct() ) image = None if not images_candidate: return None if img_path: for image_candidate in images_candidate: # if there is any path in the image_source string, # we should compare that with the Filer file logical path # this enable users to use the 'folder1/folder2/image.jpeg' as image source folder_paths = image_candidate.file.logical_path logical_path = os.path.join(*[f.name for f in folder_paths]).lower() if logical_path.endswith(img_path.lower()): image = image_candidate break else: # just use the first one image = images_candidate[0] if image: product_media = ProductMedia.objects.filter(product=product, file=image.file, shops=shop).first() if not product_media: product_media = ProductMedia( product=product, kind=ProductMediaKind.IMAGE, enabled=True, public=True, file=image.file ) if product_media: product_media.full_clean() product_media.save() product_media.shops.add(shop) if is_primary: product.primary_image = product_media product.save(update_fields=["primary_image"]) return product_media
[docs] def handle_images(self, fields, sess): """ Handle images for product. """ # convert all keys to lowercase row = {k.lower(): v for k, v in sess.row.items()} product = sess.instance for image_field in self.aliases["image"]: image = row.get(image_field) if image and not self._handle_image(sess.shop, product, row[image_field], is_primary=True): msg = _("Image '%s' was not found, please check whether the image exists.") % row[image_field] sess.log_messages.append(msg) for image_field in self.aliases["media"]: images = row.get(image_field) if images: for image_source in images.split(","): if not self._handle_image(sess.shop, product, image_source): msg = _("Image '%s' was not found, please check whether the image exists.") % image_source sess.log_messages.append(msg) # check whether the product has media but doesn't have a primary image # set the first available media as primary product_medias = ProductMedia.objects.filter(product=product, shops=sess.shop) if not product.primary_image and product_medias.exists(): product.primary_image = product_medias.first() product.save()
[docs] def handle_stocks(self, fields, sess): # noqa (C901) """ Handle stocks for product. If stock quantity has been given, expect that a supplier with stock management must be available. """ # convert all keys to lowercase row = {k.lower(): v for k, v in sess.row.items()} # check if row even has these fields we are requiring field_found = False for qty_field in self.aliases["qty"]: if qty_field in row: field_found = True break if not field_found: # no need to process this as qty was not available return supplier = row.get("supplier") if not supplier: msg = _("Please add supplier to the row, before importing stock quantities.") sess.log_messages.append(msg) return if isinstance(supplier, str): supplier = Supplier.objects.filter(name=supplier).first() else: supplier = sess.importer.resolve_object(Supplier, supplier) if not supplier: msg = _("No supplier found, please check that the supplier exists.") sess.log_messages.append(msg) return qty = None for qty_field in self.aliases["qty"]: qty_field = qty_field.lower() qty = row.get(qty_field, None) if not qty: return supplier_changed = False if not supplier.stock_managed: supplier.stock_managed = True supplier_changed = True if not supplier.supplier_modules.all().exists() and has_installed("shuup.simple_supplier"): supplier_module = SupplierModule.objects.get_or_create(module_identifier=SimpleSupplierModule.identifier)[0] supplier.supplier_modules.add(supplier_module) supplier_changed = True if not supplier.supplier_modules: msg = _("No supplier module set, please check that the supplier module is set.") sess.log_messages.append(msg) return if supplier_changed: supplier.save() product = sess.instance stock_status = supplier.get_stock_status(product.pk) stock_delta = decimal.Decimal(qty) if stock_status: stock_delta = decimal.Decimal(qty) - stock_status.logical_count if stock_delta != 0: supplier.adjust_stock(product.pk, stock_delta)
[docs] def presave_hook(self, sess): # ensure tax_class id is there product = sess.instance if not product.name: product.name = product.sku if not product.description: product.description = ""
[docs] def postsave_hook(self, sess): # noqa (C901) # get all the special values shop_product = ShopProduct.objects.get_or_create(product=sess.instance, shop=sess.shop)[0] matched_fields = [] for k, v in six.iteritems(sess.importer.extra_matches): if k.startswith("ShopProduct:"): x, field = k.split(":") matched_fields.append(field) val = sess.row.get(v) setattr(shop_product, field, val) for k, v in sess.importer.data_map.items(): field_name = v.get("id") if field_name in self.fields_to_skip: continue if hasattr(shop_product, field_name): field = getattr(shop_product, field_name) value = sess.row.get(k, None) if isinstance(field, PriceProperty): value = sess.shop.create_price(value) value, is_related = self._find_related_values(field_name, sess, value) if is_related and isinstance(value, six.string_types): continue if field_name in [ "suppliers", "visibility_groups", "shipping_methods", "payment_methods", "categories", ]: getattr(shop_product, field_name).set(value) else: setattr(shop_product, field_name, value) shop_product.save() # add shop relation to the manufacturer if sess.instance.manufacturer: sess.instance.manufacturer.shops.add(sess.shop) # add shop relation to all categories for category in shop_product.categories.all(): category.shops.add(sess.shop)
def _find_related_values(self, field_name, sess, value): is_related_field = False field_mapping = sess.importer.mapping.get(field_name) for related_field, relmapper in sess.importer.relation_map_cache.items(): if related_field.name != field_name: continue is_related_field = True if isinstance(related_field, ManyToManyField) and value is None: return ([], related_field) if isinstance(related_field, ForeignKey): try: value = int(value) # this is because xlrd causes 1 to be 1.0 except Exception: pass value = relmapper.fk_cache.get(str(value)) break else: value = sess.importer.relation_map_cache.get(field_mapping.get("field")).map_cache[value] break if field_mapping.get("is_enum_field"): field = field_mapping.get("field") for k, v in field.get_choices(): if fold_mapping_name(force_text(v)) == fold_mapping_name(value): value = k break return (value, is_related_field)
[docs] def get_import_defaults(self): """ Get default values for import time. """ data = { "type_id": ProductType.objects.values_list("pk", flat=True).first(), "tax_class_id": TaxClass.objects.values_list("pk", flat=True).first(), "sales_unit_id": SalesUnit.objects.values_list("pk", flat=True).first(), } return data
[docs]class ProductImporter(DataImporter): identifier = "product_importer" name = _("Product Importer") meta_base_class = ProductMetaBase model = Product relation_field = "product" help_template = "shuup/default_importers/product_help.jinja" example_files = [ ImporterExampleFile("product_sample_import.xls", ("application/vnd.ms-excel", "application/excel")), ImporterExampleFile( "product_sample_import.xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ImporterExampleFile( "product_sample_complex_import.xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ImporterExampleFile( "product_sample_import_with_images.xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ), ImporterExampleFile( "product_sample_import_with_variations.xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ), ImporterExampleFile("product_sample_import.csv", "text/csv"), ] @classmethod
[docs] def get_example_file_content(cls, example_file, request): from shuup.default_importer.samples import get_sample_file_content return get_sample_file_content(example_file.file_name)
@classmethod
[docs] def get_help_context_data(cls, request): from shuup.admin.shop_provider import get_shop from shuup.admin.supplier_provider import get_supplier return { "has_media_browse_permission": has_permission(request.user, "media.browse"), "supplier": get_supplier(request) or Supplier.objects.enabled(shop=get_shop(request)).first(), }