# -*- coding: utf-8 -*-
# This file is part of Shuup.
#
# Copyright (c) 2012-2021, Shuup Commerce Inc. All rights reserved.
#
# This source code is licensed under the OSL-3.0 license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import unicode_literals
import datetime
import json
import six
from django.core.paginator import EmptyPage, Paginator
from django.db.models import Manager, Q, QuerySet
from django.http.response import HttpResponse, JsonResponse
from django.template.defaultfilters import yesno
from django.utils.html import escape
from django.utils.translation import ugettext_lazy as _
from django.views.generic import View
from easy_thumbnails.files import get_thumbnailer
from filer.models import Image
from shuup.admin.utils.urls import NoModelUrl, get_model_url
from shuup.apps.provides import get_provide_objects
from shuup.core.models import ProductMedia
from shuup.utils.dates import try_parse_datetime
from shuup.utils.django_compat import force_text
from shuup.utils.i18n import format_money, get_locally_formatted_datetime
from shuup.utils.importing import load
from shuup.utils.money import Money
from shuup.utils.objects import compact
from shuup.utils.serialization import ExtendedJSONEncoder
[docs]def maybe_callable(thing, context=None):
"""
If `thing` is callable, return it.
If `thing` names a callable attribute of `context`, return it.
"""
if callable(thing):
return thing
if isinstance(thing, six.string_types):
callable_thing = getattr(context, thing, None)
if callable(callable_thing):
# prevent returning a callable that is defined in a base View class
# this won't allow leaking methods like: dispatch, options, get, post etc
base_view_thing = getattr(View, thing, None)
if not callable(base_view_thing):
return callable_thing
return None
[docs]def maybe_call(thing, context, args=None, kwargs=None):
"""
If `thing` is callable, call it with `args` and `kwargs` and return the value.
If `thing` names a callable attribute of `context`, call it with args and kwargs and return the value.
Otherwise return `thing`.
"""
func = maybe_callable(context=context, thing=thing)
if func:
thing = func(*(args or ()), **(kwargs or {}))
return thing
[docs]class Filter(object):
type = None
filter_field = None
[docs] def get_filter_field(self, column, context):
if self.filter_field:
return self.filter_field
if column.context and not maybe_callable(column.display, context=context):
return column.display
return column.id
[docs] def to_json(self, context):
return None
[docs] def filter_queryset(self, queryset, column, value, context):
return queryset # pragma: no cover
[docs]class ChoicesFilter(Filter):
type = "choices"
def __init__(self, choices=None, filter_field=None, default=None):
self.filter_field = filter_field
self.choices = choices
self.default = default
def _flatten_choices(self, context):
if not self.choices:
return None
choices = maybe_call(self.choices, context=context)
if isinstance(choices, QuerySet):
choices = [(c.pk, c) for c in choices]
return [("_all", "---------")] + [
(force_text(value, strings_only=True), force_text(display)) for (value, display) in choices
]
[docs] def to_json(self, context):
choices = self._flatten_choices(context)
default_choice = self.default
if default_choice is None and choices:
default_choice = choices[0][0]
return {"choices": choices, "defaultChoice": default_choice}
[docs] def filter_queryset(self, queryset, column, value, context):
if value == "_all":
return queryset
filter_field = self.get_filter_field(column, context)
return queryset.filter(**{filter_field: value})
[docs]class Select2Filter(ChoicesFilter):
type = "select2"
[docs] def to_json(self, context):
json_dict = super(Select2Filter, self).to_json(context)
json_dict["select2"] = True
return json_dict
[docs]class MPTTFilter(Select2Filter):
type = "mptt"
[docs] def filter_queryset(self, queryset, column, value, context):
qs = super(MPTTFilter, self).filter_queryset(queryset, column, value, context)
return qs.get_descendants(include_self=True)
[docs]class RangeFilter(Filter):
type = "range"
def __init__(self, min=None, max=None, step=None, field_type=None, filter_field=None):
"""
:param filter_field: Filter field (Django query expression). If None, column ID is used.
:type filter_field: str|None
:param min: Minimum value.
:param max: Maximum value.
:param step: Step value. See the HTML5 documentation for semantics.
:param field_type: Field type string. See the HTML5 documentation for semantics.
:type field_type: str|None
"""
self.filter_field = filter_field
self.min = min
self.max = max
self.step = step
self.field_type = field_type
[docs] def to_json(self, context):
return {
"range": compact(
{
"min": maybe_call(self.min, context=context),
"max": maybe_call(self.max, context=context),
"step": maybe_call(self.step, context=context),
"type": self.field_type,
}
)
}
[docs] def filter_queryset(self, queryset, column, value, context):
if value:
min = value.get("min")
max = value.get("max")
# strip string values
if type(max) in six.string_types:
max = max.strip()
if type(min) in six.string_types:
min = min.strip()
q = {}
filter_field = self.get_filter_field(column, context)
if min is not None:
q["%s__gte" % filter_field] = min
if max is not None:
q["%s__lte" % filter_field] = max
if q:
queryset = queryset.filter(**q)
return queryset
[docs]class DateRangeFilter(RangeFilter):
def __init__(self, *args, **kwargs):
super(DateRangeFilter, self).__init__(*args, **kwargs)
if not self.field_type:
self.field_type = "date"
[docs] def filter_queryset(self, queryset, column, value, context):
if value:
value = {"min": try_parse_datetime(value.get("min")), "max": try_parse_datetime(value.get("max"))}
return super(DateRangeFilter, self).filter_queryset(queryset, column, value, context)
[docs]class TextFilter(Filter):
type = "text"
def __init__(self, field_type=None, placeholder=None, operator="icontains", filter_field=None):
"""
:param filter_field: Filter field (Django query expression). If None, column ID is used.
:type filter_field: str|None
:param field_type: Field type string. See the HTML5 documentation for semantics.
:type field_type: str|None
:param placeholder: Field placeholder string.
:type placeholder: str|None
:param operator: Django operator for the queryset.
:type operator: str
"""
self.filter_field = filter_field
self.field_type = field_type
self.placeholder = placeholder
self.operator = operator
[docs] def to_json(self, context):
return {
"text": compact(
{
"type": self.field_type,
"placeholder": force_text(self.placeholder) if self.placeholder else None,
}
)
}
[docs] def filter_queryset(self, queryset, column, value, context):
if value:
value = force_text(value).strip()
field = self.get_filter_field(column, context)
if value:
return queryset.filter(**{"%s__%s" % (field, self.operator): value})
return queryset
[docs]class MultiFieldTextFilter(TextFilter):
def __init__(self, filter_fields, **kwargs):
"""
:param filter_field: List of Filter fields (Django query expression).
:type filter_field: list<str>
:param kwargs: Kwargs for `TextFilter`.
"""
super(MultiFieldTextFilter, self).__init__(**kwargs)
self.filter_fields = tuple(filter_fields)
[docs] def filter_queryset(self, queryset, column, value, context):
if value:
q = Q()
for filter_field in self.filter_fields:
q |= Q(**{"%s__%s" % (filter_field, self.operator): value})
return queryset.filter(q)
return queryset
true_or_false_filter = ChoicesFilter([(False, _("no")), (True, _("yes"))])
[docs]class Column(object):
def __init__(self, id, title, **kwargs):
self.id = id
self.title = title
self.display = kwargs.pop("display", id)
self.class_name = kwargs.pop("class_name", None)
self.filter_config = kwargs.pop("filter_config", None)
self.sortable = bool(kwargs.pop("sortable", True))
self.linked = bool(kwargs.pop("linked", True))
self.raw = bool(kwargs.pop("raw", False))
self.ordering = kwargs.pop("ordering", 9999)
self.context = None # will be set after initializing
self.sort_field = kwargs.pop("sort_field", None)
self.allow_highlight = kwargs.pop("allow_highlight", True)
if kwargs and type(self) is Column: # If we're not derived, validate that client code doesn't fail
raise NameError("Unexpected kwarg(s): %s" % kwargs.keys())
[docs] def to_json(self, context=None):
out = {
"id": force_text(self.id),
"title": force_text(self.title),
"className": force_text(self.class_name) if self.class_name else None,
"filter": self.filter_config.to_json(context=context) if self.filter_config else None,
"sortable": bool(self.sortable),
"linked": bool(self.linked),
"allowHighlight": bool(self.allow_highlight),
"raw": bool(self.raw),
}
return dict((key, value) for (key, value) in six.iteritems(out) if value is not None)
[docs] def get_sort_field(self, sort_field):
if self.sort_field:
return self.sort_field
if self.filter_config and self.filter_config.filter_field:
return self.filter_config.filter_field
if self.display and not maybe_callable(self.display, context=self.context):
return self.display
return self.id
[docs] def set_sort_field(self):
self.sort_field = self.get_sort_field(None)
[docs] def set_context(self, context):
self.context = context
self.set_sort_field()
[docs] def sort_queryset(self, queryset, desc=False):
order_by = ("-" if desc else "") + self.sort_field
return queryset.order_by(order_by)
[docs] def filter_queryset(self, queryset, value):
if self.filter_config:
queryset = self.filter_config.filter_queryset(queryset, self, value, self.context)
return queryset
[docs] def get_display_value(self, context, object):
# Look for callable from view context
display_callable = maybe_callable(self.display, context=context)
if display_callable:
return display_callable(object)
# Look for callable from provided column objects contexts
display_callable = self.search_from_provided_contexts(object)
if display_callable:
return display_callable(object)
value = object
for bit in self.display.split("__"):
value = getattr(value, bit, None)
return_value = self.check_different_types(value)
if return_value is not None:
return return_value
if not value:
value = ""
return force_text(value)
[docs] def check_different_types(self, value):
if isinstance(value, ProductMedia):
return "<img src='%s'>" % value.get_thumbnail().url
if isinstance(value, Image):
thumbnailer = get_thumbnailer(value)
options = {"size": (64, 64)}
thumbnail = thumbnailer.get_thumbnail(options, generate=True)
return "<img src='%s'>" % thumbnail.url
if isinstance(value, bool):
value = yesno(value)
if isinstance(value, Manager):
value = ", ".join("%s" % x for x in value.all())
return value
if isinstance(value, datetime.datetime):
return get_locally_formatted_datetime(value)
if isinstance(value, Money):
return escape(format_money(value))
[docs] def search_from_provided_contexts(self, object):
provide_object_key = "provided_columns_%s" % type(object).__name__
for provided_column_object in get_provide_objects(provide_object_key):
obj = provided_column_object()
display_callable = maybe_callable(self.display, context=obj)
if display_callable:
return display_callable(object)
def __repr__(self):
return "<Column: %s> %s" % (self.title, self.id)
[docs]class Picotable(object):
def __init__(self, request, columns, mass_actions, queryset, context):
for column in columns:
column.set_context(context)
self.request = request
self.columns = columns
self.mass_actions = mass_actions
self.queryset = queryset
self.context = context
self.columns_by_id = dict((c.id, c) for c in self.columns)
self.get_object_url = maybe_callable("get_object_url", context=self.context)
self.get_object_abstract = maybe_callable("get_object_abstract", context=self.context)
self.get_object_extra = maybe_callable("get_object_extra", context=self.context)
self.default_filters = self._get_default_filters()
def _get_default_filter(self, column):
filter_config = getattr(column, "filter_config")
if filter_config and hasattr(filter_config, "default") and filter_config.default is not None:
field = filter_config.filter_field or column.id
return (field, filter_config.default)
else:
return None
def _get_default_filters(self):
filters = {}
for column in self.columns:
default_filter = self._get_default_filter(column)
if default_filter:
filters[default_filter[0]] = default_filter[1]
return filters
[docs] def process_queryset(self, query):
queryset = self.queryset
ordered = getattr(self.queryset, "ordered", None)
if ordered is not None and not ordered:
queryset = self.queryset.order_by("-id")
filters = query.get("filters") or self._get_default_filters()
for column, value in six.iteritems(filters):
column = self.columns_by_id.get(column)
if column:
queryset = column.filter_queryset(queryset, value)
sort = query.get("sort")
if sort:
desc = sort[0] == "-"
column = self.columns_by_id.get(sort[1:])
if not (column and column.sortable):
raise ValueError("Error! Can't sort by column %r." % sort[1:])
queryset = column.sort_queryset(queryset, desc=desc)
return queryset
[docs] def get_data(self, query):
paginator = Paginator(self.process_queryset(query), query["perPage"])
try:
page = paginator.page(int(query["page"]))
except EmptyPage:
page = paginator.page(paginator.num_pages)
out = {
"columns": [c.to_json(context=self.context) for c in self.columns],
"pagination": {
"perPage": paginator.per_page,
"nPages": paginator.num_pages,
"nItems": paginator.count,
"pageNum": page.number,
},
"massActions": self.mass_actions,
"items": [self.process_item(item) for item in page],
"itemInfo": _("Showing %(per_page)s of %(n_items)s %(verbose_name_plural)s")
% {
"per_page": min(paginator.per_page, paginator.count),
"n_items": paginator.count,
"verbose_name_plural": self.get_verbose_name_plural(),
},
}
return out
[docs] def process_item(self, object):
object_url = self.get_object_url(object) if callable(self.get_object_url) else None
object_extra = self.get_object_extra(object) if callable(self.get_object_extra) else None
out = {
"_id": object.id,
"_url": object_url,
"_linked_in_mobile": True if object_url else False,
"_extra": object_extra,
}
for column in self.columns:
out[column.id] = column.get_display_value(context=self.context, object=object)
out["type"] = type(object).__name__
out["_abstract"] = self.get_object_abstract(object, item=out) if callable(self.get_object_abstract) else None
return out
[docs] def get_verbose_name_plural(self):
try:
return self.queryset.model._meta.verbose_name_plural
except AttributeError:
return _("objects")
[docs]class PicotableViewMixin(object):
url_identifier = None
default_columns = []
columns = []
mass_actions = []
picotable_class = Picotable
related_objects = []
template_name = "shuup/admin/base_picotable.jinja"
toolbar_buttons_provider_key = None
mass_actions_provider_key = None
[docs] def process_picotable(self, query_json):
mass_actions = self.load_mass_actions()
pico = self.picotable_class(
request=self.request,
columns=self.columns,
mass_actions=mass_actions,
queryset=self.get_queryset(),
context=self,
)
return JsonResponse(pico.get_data(json.loads(query_json)), encoder=ExtendedJSONEncoder)
[docs] def get(self, request, *args, **kwargs):
query = request.GET.get("jq")
if query:
return self.process_picotable(query)
return super(PicotableViewMixin, self).get(request, *args, **kwargs)
[docs] def post(self, request, *args, **kwargs):
"""
Post action is where Mass Actions post their data.
"""
data = request.body.decode("utf-8")
data = json.loads(data)
action_identifier = data.get("action", None)
ids = data.get("values", [])
mass_action = self._get_mass_action(action_identifier)
if mass_action is None:
return JsonResponse({"error": force_text(_("Mass Action encountered an unknown error."))})
if isinstance(mass_action, PicotableFileMassAction):
return mass_action.process(request, ids)
mass_action.process(request, ids)
return JsonResponse({"ok": True})
def _get_mass_actions(self):
mass_actions = self.mass_actions[:] # copy
# add mass actions from the view mass action provider
if getattr(self, "mass_actions_provider_key", None):
for mass_action_provider in get_provide_objects(self.mass_actions_provider_key):
mass_actions.extend(list(mass_action_provider.get_mass_actions_for_view(self)))
# add mass actions from the global mass action provider
for mass_action_provider in get_provide_objects("admin_mass_actions_provider"):
mass_actions.extend(list(mass_action_provider.get_mass_actions_for_view(self)))
return mass_actions
def _get_mass_action(self, action_identifier):
for mass_action in self._get_mass_actions():
loaded_action = load(mass_action)()
if loaded_action.identifier == action_identifier:
return loaded_action
return None
[docs] def get_object_url(self, instance):
try:
return get_model_url(instance, user=self.request.user, shop=self.request.shop)
except NoModelUrl:
pass
return None
[docs] def get_object_abstract(self, instance, item):
"""
Get the object abstract lines (used for mobile layouts) for this object.
Supported keys in abstract line dicts are:
* text (required)
* title
* class (CSS class name -- `header` for instance)
* raw (boolean; whether or not the `text` is raw HTML)
:param instance: The instance.
:param item: The item dict so far. Useful for reusing precalculated values.
:return: Iterable of dicts to pass through to the picotable javascript.
:rtype: Iterable[dict]
"""
return None
[docs] def get_filter(self):
filter_string = self.request.GET.get("filter")
return json.loads(filter_string) if filter_string else {}
[docs] def load_mass_actions(self):
actions = []
for action in self._get_mass_actions():
obj = load(action)()
action_data = {}
extra_data = obj.get_action_info(self.request)
if extra_data and isinstance(extra_data, dict):
action_data.update(extra_data)
action_data.update({"key": obj.identifier, "value": obj.label})
actions.append(action_data)
return actions
[docs]class PicotableMassAction(object):
"""
Simple Mass Action.
This action only processes the given id's in subclass.
Examples:
* `shuup.admin.modules.orders.mass_actions.CancelOrderAction`
* `shuup.admin.modules.products.mass_actions.VisibleMassAction`
"""
label = _("Mass Action")
identifier = "mass_action"
def __repr__(self):
return "Mass Action: %s" % force_text(self.label)
[docs] def process(self, request, ids):
"""
Process the given ids in masses.
:param request: `WSGIRequest`
:param ids: list of ids.
:return: None
"""
pass
[docs] def get_action_info(self, request):
"""
Returns a dict with additional action data to be rendered
in html action option element as data-xxx attribute.
:param request: `WSGIRequest`
:return dict: dictionary with extra info to be rendered in option element.
"""
return {}
[docs]class PicotableMassActionProvider(object):
@classmethod
[docs] def get_mass_actions_for_view(cls, view):
"""
Returns a list of mass actions for a given `view`.
:param view: `django.views.View`
:return list[PicotableMassAction]: list of picotable mass actions definition (strings).
"""
return []
[docs]class PicotableFileMassAction(PicotableMassAction):
"""
File Mass Action.
This action returns file as a response.
Examples:
* `shuup.admin.modules.orders.mass_actions.OrderConfirmationPdfAction`
* `shuup.admin.modules.products.mass_actions.ExportProductsCSVAction`
"""
[docs] def process(self, request, ids):
"""
Process and return `HttpResponse`.
Example:
response = HttpResponse(content_type="text/csv")
response['Content-Disposition'] = 'attachment; filename="mass_action.csv"'
writer = csv.writer(response)
writer.writerow(['First row', 'Foo', 'Bar', 'Baz'])
return response
:param request: `WSGIRequest`
:param ids: list of ids.
:return: `HttpResponse`
"""
pass
[docs]class PicotableRedirectMassAction(PicotableMassAction):
"""
Redirect Mass Action.
This view saves selected id's into session which are then
further processed in the mass action view.
Redirect of this view is handled in `picotable.js`.
To use this action, your admin module must supply admin_url
and a view for the action.
Examples:
* `shuup.admin.modules.contacts.mass_actions.EditContactsAction`
* `shuup.admin.modules.products.mass_actions.EditProductAttributesAction`
"""
redirect_url = None
[docs] def process(self, request, ids):
request.session["mass_action_ids"] = ids
return HttpResponse("ok")
[docs] def get_action_info(self, request):
if self.redirect_url:
return {"redirects": True, "redirect_url": self.redirect_url}
return {}
[docs]class PicotableJavascriptMassAction(PicotableMassAction):
"""
Javascript Mass Action.
This view saves invokes a pre-defined javascript function
with the list of object ids.
Set the function call in `callback`, e.g. `deleteProducts`.
The mass action will then invoce the callback as `deleteProducts(ids)`
"""
callback = None
[docs] def get_action_info(self, request):
return {"callback": self.callback}