# -*- coding: utf-8 -*-
# This file is part of Shuup.
#
# Copyright (c) 2012-2021, Shuup Commerce Inc. All rights reserved.
#
# This source code is licensed under the OSL-3.0 license found in the
# LICENSE file in the root directory of this source tree.
import hashlib
import six
from collections import defaultdict
from django.conf import settings
from django.core.paginator import Paginator
from django.middleware.csrf import get_token
from django.utils.translation import get_language
from jinja2.utils import contextfunction
from shuup.core.models import Category, Manufacturer, Product, ShopProduct, Supplier
from shuup.core.utils import context_cache
from shuup.front.utils import cache as cache_utils
from shuup.front.utils.companies import allow_company_registration
from shuup.front.utils.product_statistics import get_best_selling_product_info
from shuup.front.utils.translation import get_language_choices
from shuup.front.utils.user import is_admin_user
from shuup.front.utils.views import cache_product_things
from shuup.utils import django_compat
from shuup.utils.django_compat import reverse
from shuup.utils.importing import cached_load
from shuup.utils.mptt import get_cached_trees
from shuup.utils.translation import cache_translations_for_tree
def _group_list_items(group_list, number):
for i in range(0, len(group_list), number):
yield tuple(group_list[i : i + number])
def _get_listed_products( # noqa (C901)
context, n_products, ordering=None, filter_dict=None, orderable_only=True, extra_filters=None
):
"""
Returns all products marked as listed that are determined to be
visible based on the current context.
:param context: Rendering context
:type context: jinja2.runtime.Context
:param n_products: Number of products to return
:type n_products: int
:param ordering: String specifying ordering
:type ordering: str
:param filter_dict: Dictionary of filter parameters
:type filter_dict: dict[str, object]
:param orderable_only: Boolean limiting results to orderable products
:type orderable_only: bool
:param extra_filters: Extra filters to be used in Product Queryset
:type extra_filters: django.db.models.Q
:rtype: list[shuup.core.models.Product]
"""
request = context["request"]
customer = request.customer
shop = request.shop
# Todo: Check if this should be cached
if not filter_dict:
filter_dict = {}
products_qs = Product.objects.listed(
shop=shop,
customer=customer,
language=get_language(),
).filter(**filter_dict)
if extra_filters:
products_qs = products_qs.filter(extra_filters)
if ordering:
products_qs = products_qs.order_by(ordering)
products = list(products_qs.distinct()[:n_products])
if orderable_only:
suppliers = Supplier.objects.enabled(shop=shop)
valid_products = []
for product in products:
if len(valid_products) == n_products:
break
try:
shop_product = product.get_shop_instance(shop, allow_cache=True)
except ShopProduct.DoesNotExist:
continue
for supplier in suppliers:
if shop_product.is_orderable(supplier, customer, shop_product.minimum_purchase_quantity):
valid_products.append(product)
break
return valid_products
return products
@contextfunction
[docs]def get_listed_products(context, n_products, ordering=None, filter_dict=None, orderable_only=True, extra_filters=None):
"""
A cached version of _get_listed_products
"""
request = context["request"]
key, products = context_cache.get_cached_value(
identifier="listed_products",
item=cache_utils.get_listed_products_cache_item(request.shop),
context=request,
n_products=n_products,
ordering=ordering,
filter_dict=filter_dict,
orderable_only=orderable_only,
extra_filters=hashlib.sha1(str(extra_filters).encode("utf-8")).hexdigest(),
)
if products is not None:
return products
products = _get_listed_products(
context,
n_products,
ordering=ordering,
filter_dict=filter_dict,
orderable_only=orderable_only,
extra_filters=extra_filters,
)
products = cache_product_things(request, products)
context_cache.set_cached_value(key, products, settings.SHUUP_TEMPLATE_HELPERS_CACHE_DURATION)
return products
@contextfunction
[docs]def get_best_selling_products(context, n_products=12, cutoff_days=30, orderable_only=True, supplier=None):
request = context["request"]
key, products = context_cache.get_cached_value(
identifier="best_selling_products_%s" % (supplier.pk if supplier else ""),
item=cache_utils.get_best_selling_products_cache_item(request.shop),
context=request,
n_products=n_products,
cutoff_days=cutoff_days,
orderable_only=orderable_only,
)
if products is not None:
return products
products = _get_best_selling_products(cutoff_days, n_products, orderable_only, request, supplier=supplier)
context_cache.set_cached_value(key, products, settings.SHUUP_TEMPLATE_HELPERS_CACHE_DURATION)
return products
def _get_best_selling_products(cutoff_days, n_products, orderable_only, request, supplier=None): # noqa (C901)
data = get_best_selling_product_info(shop_ids=[request.shop.pk], cutoff_days=cutoff_days, supplier=supplier)
combined_variation_products = defaultdict(int)
for product_id, parent_id, qty in data:
if parent_id:
combined_variation_products[parent_id] += qty
else:
combined_variation_products[product_id] += qty
# get all the product ids
product_ids = [d[0] for d in sorted(six.iteritems(combined_variation_products), key=lambda i: i[1], reverse=True)]
# group product ids in groups of n_products
# to prevent querying ALL products at once
products = []
for grouped_product_ids in _group_list_items(product_ids, n_products):
valid_products_qs = Product.objects.listed(shop=request.shop, customer=request.customer).filter(
id__in=grouped_product_ids, shop_products__shop=request.shop, shop_products__suppliers__enabled=True
)
for product in valid_products_qs.iterator():
products.append(product)
if len(products) == n_products:
break
if len(products) == n_products:
break
if orderable_only:
valid_products = []
if supplier:
suppliers = [supplier]
else:
suppliers = Supplier.objects.enabled(shop=request.shop)
for product in products:
# this instance should always exist as the listed() queryset uses the current shop as a filter
shop_product = product.get_shop_instance(request.shop, allow_cache=True)
for supplier in suppliers:
if shop_product.is_orderable(supplier, request.customer, shop_product.minimum_purchase_quantity):
valid_products.append(product)
break
products = valid_products
products = cache_product_things(request, products)
products = sorted(products, key=lambda p: product_ids.index(p.id)) # pragma: no branch
return products
@contextfunction
[docs]def get_newest_products(context, n_products=6, orderable_only=True):
request = context["request"]
key, products = context_cache.get_cached_value(
identifier="newest_products",
item=cache_utils.get_newest_products_cache_item(request.shop),
context=request,
n_products=n_products,
orderable_only=orderable_only,
)
if products is not None:
return products
products = _get_listed_products(
context, n_products, ordering="-pk", filter_dict={"variation_parent": None}, orderable_only=orderable_only
)
products = cache_product_things(request, products)
context_cache.set_cached_value(key, products, settings.SHUUP_TEMPLATE_HELPERS_CACHE_DURATION)
return products
@contextfunction
[docs]def get_random_products(context, n_products=6, orderable_only=True):
request = context["request"]
key, products = context_cache.get_cached_value(
identifier="random_products",
item=cache_utils.get_random_products_cache_item(request.shop),
context=request,
n_products=n_products,
orderable_only=orderable_only,
)
if products is not None:
return products
products = _get_listed_products(
context, n_products, ordering="?", filter_dict={"variation_parent": None}, orderable_only=orderable_only
)
products = cache_product_things(request, products)
context_cache.set_cached_value(key, products, settings.SHUUP_TEMPLATE_HELPERS_CACHE_DURATION)
return products
@contextfunction
[docs]def get_products_for_categories(context, categories, n_products=6, orderable_only=True):
request = context["request"]
key, products = context_cache.get_cached_value(
identifier="products_for_category",
item=cache_utils.get_products_for_category_cache_item(request.shop),
context=request,
n_products=n_products,
categories=categories,
orderable_only=orderable_only,
)
if products is not None:
return products
products = _get_listed_products(
context,
n_products,
ordering="?",
filter_dict={"variation_parent": None, "shop_products__categories__in": categories},
orderable_only=orderable_only,
)
products = cache_product_things(request, products)
context_cache.set_cached_value(key, products, settings.SHUUP_TEMPLATE_HELPERS_CACHE_DURATION)
return products
@contextfunction
[docs]def get_all_manufacturers(context):
request = context["request"]
key, manufacturers = context_cache.get_cached_value(
identifier="all_manufacturers", item=cache_utils.get_all_manufacturers_cache_item(request.shop), context=request
)
if manufacturers is not None:
return manufacturers
products = Product.objects.listed(shop=request.shop, customer=request.customer)
manufacturers_ids = products.values_list("manufacturer__id").distinct()
manufacturers = Manufacturer.objects.filter(pk__in=manufacturers_ids)
context_cache.set_cached_value(key, manufacturers, settings.SHUUP_TEMPLATE_HELPERS_CACHE_DURATION)
return manufacturers
@contextfunction
[docs]def get_root_categories(context):
request = context["request"]
language = get_language()
roots = get_cached_trees(
Category.objects.all_visible(customer=request.customer, shop=request.shop, language=language)
)
cache_translations_for_tree(roots, languages=[language])
return roots
@contextfunction
def _get_page_range(current_page, num_pages, range_gap=5):
"""
Get page range around given page for a given number of pages.
>>> list(_get_page_range(1, 10))
[1, 2, 3, 4, 5]
>>> list(_get_page_range(3, 10))
[1, 2, 3, 4, 5]
>>> list(_get_page_range(4, 10))
[2, 3, 4, 5, 6]
>>> list(_get_page_range(7, 10))
[5, 6, 7, 8, 9]
>>> list(_get_page_range(10, 10))
[6, 7, 8, 9, 10]
>>> list(_get_page_range(1, 1))
[1]
>>> list(_get_page_range(1, 4))
[1, 2, 3, 4]
>>> list(_get_page_range(3, 4))
[1, 2, 3, 4]
>>> list(_get_page_range(4, 4))
[1, 2, 3, 4]
"""
assert isinstance(num_pages, int)
assert isinstance(current_page, int)
assert num_pages >= 1
assert current_page >= 1
assert current_page <= num_pages
max_start = max(num_pages - range_gap + 1, 1)
start = min(max(current_page - (range_gap // 2), 1), max_start)
end = min(start + range_gap - 1, num_pages)
return six.moves.range(start, end + 1)
@contextfunction
[docs]def get_shop_language_choices(context):
request = context["request"]
return get_language_choices(request.shop)
@contextfunction
[docs]def is_shop_admin(context):
return is_admin_user(context["request"])
@contextfunction
[docs]def is_company_registration_allowed(context, request=None):
current_request = request or context["request"] # From macros it doesn't seem to always pass context correctly
return allow_company_registration(current_request.shop)
@contextfunction
[docs]def can_toggle_all_seeing(context):
request = context["request"]
if request.customer.is_anonymous or request.is_company_member:
# Looks like the user is currently forcing anonymous or company
# mode which means that the visibility limit can't be used since
# 'is all seeing' is purely person contact feature.
return False
return getattr(request.user, "is_superuser", False)
@contextfunction
[docs]def get_admin_edit_url(context, intance_or_model):
from shuup.admin.template_helpers.shuup_admin import model_url
url = model_url(context, intance_or_model)
if url:
return dict(
url=url,
name=intance_or_model._meta.verbose_name.title(),
)
@contextfunction
[docs]def get_powered_by_content(context):
return settings.SHUUP_FRONT_POWERED_BY_CONTENT
@contextfunction
[docs]def get_config(context):
request = context["request"]
is_authenticated = request.user.is_authenticated
return {
"uploadUrl": (reverse("shuup:media-upload") if is_authenticated else None),
"csrf": get_token(request),
}
[docs]def is_authenticated(user):
return django_compat.is_authenticated(user)