#
# BSD licence
#
# Author : Bro <bro.development@gmail.com>
#
def enum(*sequential, **named):
enums = dict(zip(sequential, range(len(sequential))), **named)
return type('Enum', (), enums)
try:
strinstance = basestring
except:
strinstance = str
def is_iterable_but_not_str(x):
return hasattr(x, '__iter__') and not isinstance(x, strinstance)
class Expression(object):
def __init__(self, **kwargs):
self.key = kwargs.get("key", None)
self.value = kwargs.get("value", None)
self.filter_value = self.value
self.operator = kwargs.get("operator", None)
if kwargs:
if self.operator == Filter.operations.LIKE:
self.filter_value = "'%%%s%%'" % self.value
elif self.operator == Filter.operations.ILIKE:
self.filter_value = "'*%s*'" % self.value
elif self.operator == Filter.operations.IN:
self.filter_value = "('%s')" % "','".join(self.value)
else:
if type(self.value) is bool:
self.filter_value = 1 if self.value else 0
else:
self.filter_value = "'%s'" % self.value
def filter_string(self):
filter_str = "%s %s %s" % (self.key, self.operator, self.filter_value)
return filter_str
def filter(self):
filter_str = "? %s ?" % (self.operator)
filter_values = (self.key, self.operator, self.filter_value)
return filter_str, filter_values
def __str__(self):
return self.filter_string()
class ExpressionGroup(object):
def __init__(self):
self.expression = None
self.exp_group1 = None
self.exp_group2 = None
self.exp_operator = None
def is_dummy(self):
return self.expression is None and self.exp_operator is None
def is_filtered(self):
return not self.is_dummy()
def __or__(self, exp_group):
if exp_group.is_dummy() or self.is_dummy():
return exp_group if self.is_dummy() else self
new_exp_group = type(self)()
new_exp_group.exp_group1 = exp_group
new_exp_group.exp_group2 = self
new_exp_group.exp_operator = Filter.operations.OR
return new_exp_group
def __and__(self, exp_group):
if exp_group.is_dummy() or self.is_dummy():
return exp_group if self.is_dummy() else self
new_exp_group = type(self)()
new_exp_group.exp_group1 = exp_group
new_exp_group.exp_group2 = self
new_exp_group.exp_operator = Filter.operations.AND
return new_exp_group
def __str__(self):
if self.is_dummy():
return ""
if self.expression:
return self.expression.filter_string()
else:
return "((%s) %s (%s))" % (self.exp_group1.filter_string(), self.exp_operator,
self.exp_group2.filter_string())
def filter(self):
if self.is_dummy():
return "", []
if self.expression:
return self.expression.filter()
else:
group1_str, group1_values = self.exp_group1.filter()
group2_str, group2_values = self.exp_group2.filter()
filter_str = "((%s) %s (%s))" % (group1_str, self.exp_operator, group2_str)
return filter_str, group1_values + group2_values
def filter_string(self):
return str(self)
[docs]class Filter(object):
"""A filter to be used to filter the results from a database query.
Users should not have to use this class."""
operations = enum(**{'AND': 'AND', 'OR': 'OR', 'LIKE': 'LIKE', 'ILIKE': "GLOB",
"IN": "IN", 'EQ': "=", 'NE': "!=", 'LT': "<", 'LE': "<=",
'GT': ">", 'GE': ">="})
def __init__(self, db, key):
self.db = db
self.key = key
self.expression_group = ExpressionGroup()
self.expression_t = Expression
[docs] def is_filtered(self):
"""If the filter contains any filters"""
return not self.expression_group.is_dummy()
def _comparison(self, value, operation):
self.expression_group.expression = self.expression_t(key=self.key,
value=value, operator=operation)
return self
[docs] def like(self, value):
"""Perform LIKE operation"""
return self._comparison(value, self.operations.LIKE)
[docs] def ilike(self, value):
"""Perform ILIKE operation"""
return self._comparison(value, self.operations.ILIKE)
[docs] def __eq__(self, value):
"""Perform EQUALS operation
When input value is an iterable, but not a string, it will match for
any of the values on the iterable
"""
# Iterable, so we use IN (X, X...) instead of =
if is_iterable_but_not_str(value):
return self._comparison(value, self.operations.IN)
else:
return self._comparison(value, self.operations.EQ)
[docs] def __ne__(self, value):
"""Perform NOT EQUALS operation"""
return self._comparison(value, self.operations.NE)
[docs] def __lt__(self, value):
"""Perform LESS THAN operation"""
return self._comparison(value, self.operations.LT)
[docs] def __le__(self, value):
"""Perform LESS THAN OR EQUALS operation"""
return self._comparison(value, self.operations.LE)
[docs] def __gt__(self, value):
"""Perform GREATER THAN operation"""
return self._comparison(value, self.operations.GT)
[docs] def __ge__(self, value):
"""Perform GREATER THAN OR EQUALS operation"""
return self._comparison(value, self.operations.GE)
[docs] def __and__(self, other_filter):
"""
Returns a new filter that combines this filter with other_filter using AND.
"""
new_filter = type(self)(self.db, None)
new_filter.expression_group = self.expression_group & other_filter.expression_group
return new_filter
[docs] def __or__(self, other_filter):
"""
Returns a new filter that combines this filter with other_filter using OR.
"""
new_filter = type(self)(self.db, None)
new_filter.expression_group = self.expression_group | other_filter.expression_group
return new_filter
[docs] def __len__(self):
"""Returns the number of records that matches this filter"""
if self.expression_group.is_dummy():
count = len(self.db)
else:
count = self.db._len(db_filter=self.expression_group)
return count
[docs] def __iter__(self):
"""Returns in iterator over the records for this filter"""
if self.expression_group.is_dummy():
res = self.db()
else:
res = self.db(self.expression_group)
return iter(res)
[docs] def __str__(self):
"""Returns a string representation of the filter"""
if self.expression_group:
return self.expression_group.filter_string()
else:
return ""
[docs] def filter(self):
"""Returns the filter"""
if self.expression_group:
return self.expression_group.filter()
else:
return "", []