redis-om-python/redis_developer/orm/model.py

1204 lines
45 KiB
Python
Raw Normal View History

import abc
import dataclasses
import decimal
import json
import logging
import operator
2021-10-14 02:16:20 +02:00
from copy import deepcopy, copy
from enum import Enum
from functools import reduce
from typing import (
AbstractSet,
Any,
Callable,
Dict,
Mapping,
Optional,
Set,
Tuple,
TypeVar,
Union,
2021-09-01 21:56:06 +02:00
Sequence,
no_type_check,
Protocol,
List,
get_origin,
get_args, Type
)
import redis
2021-09-01 21:56:06 +02:00
from pydantic import BaseModel, validator
from pydantic.fields import FieldInfo as PydanticFieldInfo
from pydantic.fields import ModelField, Undefined, UndefinedType
from pydantic.main import ModelMetaclass
2021-10-14 02:16:20 +02:00
from pydantic.typing import NoArgAnyCallable
from pydantic.utils import Representation
from ulid import ULID
from .encoders import jsonable_encoder
from .render_tree import render_tree
from .token_escaper import TokenEscaper
model_registry = {}
_T = TypeVar("_T")
log = logging.getLogger(__name__)
escaper = TokenEscaper()
# For basic exact-match field types like an indexed string, we create a TAG
# field in the RediSearch index. TAG is designed for multi-value fields
# separated by a "separator" character. We're using the field for single values
# (multi-value TAGs will be exposed as a separate field type), and we use the
# pipe character (|) as the separator. There is no way to escape this character
# in hash fields or JSON objects, so if someone indexes a value that includes
# the pipe, we'll warn but allow, and then warn again if they try to query for
# values that contain this separator.
SINGLE_VALUE_TAG_FIELD_SEPARATOR = "|"
# This is the default field separator in RediSearch. We need it to determine if
# someone has accidentally passed in the field separator with string value of a
# multi-value field lookup, like a IN or NOT_IN.
DEFAULT_REDISEARCH_FIELD_SEPARATOR = ","
class RedisModelError(Exception):
"""Raised when a problem exists in the definition of a RedisModel."""
class QuerySyntaxError(Exception):
"""Raised when a query is constructed improperly."""
class NotFoundError(Exception):
"""Raised when a query found no results."""
class Operators(Enum):
EQ = 1
NE = 2
LT = 3
LE = 4
GT = 5
GE = 6
OR = 7
AND = 8
NOT = 9
IN = 10
NOT_IN = 11
LIKE = 12
ALL = 13
def __str__(self):
return str(self.name)
ExpressionOrModelField = Union['Expression', 'NegatedExpression', ModelField]
2021-10-12 23:22:57 +02:00
def embedded(cls):
"""
Mark a model as embedded to avoid creating multiple indexes if the model is
only ever used embedded within other models.
"""
setattr(cls.Meta, 'embedded', True)
class ExpressionProtocol(Protocol):
op: Operators
left: ExpressionOrModelField
right: ExpressionOrModelField
def __invert__(self) -> 'Expression':
pass
def __and__(self, other: ExpressionOrModelField):
pass
def __or__(self, other: ExpressionOrModelField):
pass
@property
def name(self) -> str:
raise NotImplementedError
@property
def tree(self) -> str:
raise NotImplementedError
@dataclasses.dataclass
2021-09-16 21:03:03 +02:00
class NegatedExpression:
"""A negated Expression object.
For now, this is a separate dataclass from Expression that acts as a facade
around an Expression, indicating to model code (specifically, code
responsible for querying) to negate the logic in the wrapped Expression. A
better design is probably possible, maybe at least an ExpressionProtocol?
"""
2021-09-16 21:03:03 +02:00
expression: 'Expression'
def __invert__(self):
return self.expression
2021-09-21 01:06:04 +02:00
def __and__(self, other):
2021-10-13 17:12:22 +02:00
return Expression(left=self, op=Operators.AND, right=other, parents=self.expression.parents)
2021-09-21 01:06:04 +02:00
def __or__(self, other):
2021-10-13 17:12:22 +02:00
return Expression(left=self, op=Operators.OR, right=other, parents=self.expression.parents)
2021-09-21 01:06:04 +02:00
@property
def left(self):
return self.expression.left
@property
def right(self):
return self.expression.right
@property
def op(self):
return self.expression.op
@property
def name(self):
if self.expression.op is Operators.EQ:
return f"NOT {self.expression.name}"
else:
return f"{self.expression.name} NOT"
@property
def tree(self):
return render_tree(self)
2021-09-16 21:03:03 +02:00
@dataclasses.dataclass
class Expression:
op: Operators
2021-10-14 02:16:20 +02:00
left: Optional[ExpressionOrModelField]
right: Optional[ExpressionOrModelField]
2021-10-12 23:22:57 +02:00
parents: List[Tuple[str, 'RedisModel']]
2021-09-16 21:03:03 +02:00
def __invert__(self):
return NegatedExpression(self)
def __and__(self, other: ExpressionOrModelField):
2021-10-12 23:22:57 +02:00
return Expression(left=self, op=Operators.AND, right=other, parents=self.parents)
def __or__(self, other: ExpressionOrModelField):
2021-10-12 23:22:57 +02:00
return Expression(left=self, op=Operators.OR, right=other, parents=self.parents)
2021-09-16 23:35:25 +02:00
@property
def name(self):
return str(self.op)
@property
def tree(self):
return render_tree(self)
2021-09-16 21:03:03 +02:00
ExpressionOrNegated = Union[Expression, NegatedExpression]
class ExpressionProxy:
2021-10-12 23:22:57 +02:00
def __init__(self, field: ModelField, parents: List[Tuple[str, 'RedisModel']]):
self.field = field
2021-10-12 23:22:57 +02:00
self.parents = parents
def __eq__(self, other: Any) -> Expression: # type: ignore[override]
2021-10-12 23:22:57 +02:00
return Expression(left=self.field, op=Operators.EQ, right=other, parents=self.parents)
def __ne__(self, other: Any) -> Expression: # type: ignore[override]
2021-10-12 23:22:57 +02:00
return Expression(left=self.field, op=Operators.NE, right=other, parents=self.parents)
2021-10-14 02:16:20 +02:00
def __lt__(self, other: Any) -> Expression:
2021-10-12 23:22:57 +02:00
return Expression(left=self.field, op=Operators.LT, right=other, parents=self.parents)
2021-10-14 02:16:20 +02:00
def __le__(self, other: Any) -> Expression:
2021-10-12 23:22:57 +02:00
return Expression(left=self.field, op=Operators.LE, right=other, parents=self.parents)
2021-10-14 02:16:20 +02:00
def __gt__(self, other: Any) -> Expression:
2021-10-12 23:22:57 +02:00
return Expression(left=self.field, op=Operators.GT, right=other, parents=self.parents)
2021-10-14 02:16:20 +02:00
def __ge__(self, other: Any) -> Expression:
2021-10-12 23:22:57 +02:00
return Expression(left=self.field, op=Operators.GE, right=other, parents=self.parents)
2021-10-14 02:16:20 +02:00
def __mod__(self, other: Any) -> Expression:
return Expression(left=self.field, op=Operators.LIKE, right=other, parents=self.parents)
2021-10-14 02:16:20 +02:00
def __lshift__(self, other: Any) -> Expression:
return Expression(left=self.field, op=Operators.IN, right=other, parents=self.parents)
2021-10-12 23:22:57 +02:00
def __getattr__(self, item):
2021-10-13 17:12:22 +02:00
if get_origin(self.field.outer_type_) == list:
embedded_cls = get_args(self.field.outer_type_)
if not embedded_cls:
raise QuerySyntaxError("In order to query on a list field, you must define "
"the contents of the list with a type annotation, like: "
"orders: List[Order]. Docs: TODO")
embedded_cls = embedded_cls[0]
attr = getattr(embedded_cls, item)
else:
attr = getattr(self.field.outer_type_, item)
2021-10-12 23:22:57 +02:00
if isinstance(attr, self.__class__):
new_parent = (self.field.name, self.field.outer_type_)
2021-10-14 02:16:20 +02:00
if new_parent not in attr.parents:
attr.parents.append(new_parent)
new_parents = list(set(self.parents) - set(attr.parents))
if new_parents:
attr.parents = new_parents + attr.parents
2021-10-12 23:22:57 +02:00
return attr
class QueryNotSupportedError(Exception):
"""The attempted query is not supported."""
class RediSearchFieldTypes(Enum):
TEXT = 'TEXT'
TAG = 'TAG'
NUMERIC = 'NUMERIC'
GEO = 'GEO'
# TODO: How to handle Geo fields?
NUMERIC_TYPES = (float, int, decimal.Decimal)
DEFAULT_PAGE_SIZE = 10
2021-09-30 05:23:39 +02:00
class FindQuery:
def __init__(self,
expressions: Sequence[ExpressionOrNegated],
model: Type['RedisModel'],
offset: int = 0,
limit: int = DEFAULT_PAGE_SIZE,
page_size: int = DEFAULT_PAGE_SIZE,
sort_fields: Optional[List[str]] = None):
self.expressions = expressions
self.model = model
self.offset = offset
self.limit = limit
self.page_size = page_size
if sort_fields:
self.sort_fields = self.validate_sort_fields(sort_fields)
else:
self.sort_fields = []
2021-09-30 05:23:39 +02:00
self._expression = None
self._query: Optional[str] = None
self._pagination: list[str] = []
self._model_cache: list[RedisModel] = []
2021-10-14 02:16:20 +02:00
def dict(self) -> dict[str, Any]:
return dict(
model=self.model,
offset=self.offset,
page_size=self.page_size,
limit=self.limit,
expressions=copy(self.expressions),
sort_fields=copy(self.sort_fields)
)
def copy(self, **kwargs):
original = self.dict()
original.update(**kwargs)
return FindQuery(**original)
@property
def pagination(self):
if self._pagination:
return self._pagination
self._pagination = self.resolve_redisearch_pagination()
return self._pagination
@property
def expression(self):
if self._expression:
return self._expression
if self.expressions:
self._expression = reduce(operator.and_, self.expressions)
else:
2021-10-14 02:16:20 +02:00
self._expression = Expression(left=None, right=None, op=Operators.ALL, parents=[])
return self._expression
@property
def query(self):
2021-10-14 02:16:20 +02:00
"""
Resolve and return the RediSearch query for this FindQuery.
NOTE: We cache the resolved query string after generating it. This should be OK
because all mutations of FindQuery through public APIs return a new FindQuery instance.
"""
if self._query:
return self._query
self._query = self.resolve_redisearch_query(self.expression)
return self._query
2021-09-30 05:23:39 +02:00
def validate_sort_fields(self, sort_fields):
for sort_field in sort_fields:
field_name = sort_field.lstrip("-")
2021-09-30 05:23:39 +02:00
if field_name not in self.model.__fields__:
raise QueryNotSupportedError(f"You tried sort by {field_name}, but that field "
2021-09-30 05:23:39 +02:00
f"does not exist on the model {self.model}")
field_proxy = getattr(self.model, field_name)
if not getattr(field_proxy.field.field_info, 'sortable', False):
2021-09-30 05:23:39 +02:00
raise QueryNotSupportedError(f"You tried sort by {field_name}, but {self.model} does "
"not define that field as sortable. See docs: XXX")
2021-09-30 05:23:39 +02:00
return sort_fields
@staticmethod
2021-10-14 02:16:20 +02:00
def resolve_field_type(field: ModelField, op: Operators) -> RediSearchFieldTypes:
2021-09-21 01:06:04 +02:00
if getattr(field.field_info, 'primary_key', None) is True:
return RediSearchFieldTypes.TAG
2021-10-14 02:16:20 +02:00
elif op is Operators.LIKE:
fts = getattr(field.field_info, 'full_text_search', None)
if fts is not True: # Could be PydanticUndefined
raise QuerySyntaxError(f"You tried to do a full-text search on the field '{field.name}', "
f"but the field is not indexed for full-text search. Use the "
f"full_text_search=True option. Docs: TODO")
2021-09-21 01:06:04 +02:00
return RediSearchFieldTypes.TEXT
field_type = field.outer_type_
# TODO: GEO
if any(issubclass(field_type, t) for t in NUMERIC_TYPES):
return RediSearchFieldTypes.NUMERIC
else:
2021-09-21 01:06:04 +02:00
# TAG fields are the default field type.
# TODO: A ListField or ArrayField that supports multiple values
# and contains logic should allow IN and NOT_IN queries.
2021-09-21 01:06:04 +02:00
return RediSearchFieldTypes.TAG
@staticmethod
def expand_tag_value(value):
2021-10-14 02:16:20 +02:00
if isinstance(value, str):
return value
2021-09-21 01:06:04 +02:00
try:
expanded_value = "|".join([escaper.escape(v) for v in value])
2021-09-21 01:06:04 +02:00
except TypeError:
raise QuerySyntaxError("Values passed to an IN query must be iterables,"
"like a list of strings. For more information, see:"
"TODO: doc.")
2021-09-21 01:06:04 +02:00
return expanded_value
@classmethod
def resolve_value(cls, field_name: str, field_type: RediSearchFieldTypes,
2021-10-12 23:22:57 +02:00
field_info: PydanticFieldInfo, op: Operators, value: Any,
parents: List[Tuple[str, 'RedisModel']]) -> str:
if parents:
prefix = "_".join([p[0] for p in parents])
field_name = f"{prefix}_{field_name}"
result = ""
if field_type is RediSearchFieldTypes.TEXT:
result = f"@{field_name}_fts:"
if op is Operators.EQ:
2021-09-16 21:03:03 +02:00
result += f'"{value}"'
elif op is Operators.NE:
result = f'-({result}"{value}")'
elif op is Operators.LIKE:
result += value
else:
raise QueryNotSupportedError("Only equals (=), not-equals (!=), and like() "
"comparisons are supported for TEXT fields. See "
"docs: TODO.")
elif field_type is RediSearchFieldTypes.NUMERIC:
if op is Operators.EQ:
result += f"@{field_name}:[{value} {value}]"
elif op is Operators.NE:
2021-09-21 01:08:24 +02:00
result += f"-(@{field_name}:[{value} {value}])"
elif op is Operators.GT:
result += f"@{field_name}:[({value} +inf]"
elif op is Operators.LT:
result += f"@{field_name}:[-inf ({value}]"
elif op is Operators.GE:
result += f"@{field_name}:[{value} +inf]"
elif op is Operators.LE:
result += f"@{field_name}:[-inf {value}]"
# TODO: How will we know the difference between a multi-value use of a TAG
# field and our hidden use of TAG for exact-match queries?
2021-09-21 01:06:04 +02:00
elif field_type is RediSearchFieldTypes.TAG:
if op is Operators.EQ:
separator_char = getattr(field_info, 'separator',
SINGLE_VALUE_TAG_FIELD_SEPARATOR)
if value == separator_char:
# The value is ONLY the TAG field separator character --
# this is not going to work.
log.warning("Your query against the field %s is for a single character, %s, "
"that is used internally by redis-developer-python. We must ignore "
"this portion of the query. Please review your query to find "
"an alternative query that uses a string containing more than "
"just the character %s.", field_name, separator_char, separator_char)
return ""
if separator_char in value:
# The value contains the TAG field separator. We can work
# around this by breaking apart the values and unioning them
# with multiple field:{} queries.
2021-10-14 02:16:20 +02:00
values: filter = filter(None, value.split(separator_char))
for value in values:
value = escaper.escape(value)
result += f"@{field_name}:{{{value}}}"
else:
value = escaper.escape(value)
result += f"@{field_name}:{{{value}}}"
2021-09-21 01:06:04 +02:00
elif op is Operators.NE:
value = escaper.escape(value)
2021-09-21 01:08:24 +02:00
result += f"-(@{field_name}:{{{value}}})"
2021-09-21 01:06:04 +02:00
elif op is Operators.IN:
# TODO: Implement IN, test this...
expanded_value = cls.expand_tag_value(value)
2021-09-21 01:06:04 +02:00
result += f"(@{field_name}:{{{expanded_value}}})"
elif op is Operators.NOT_IN:
# TODO: Implement NOT_IN, test this...
expanded_value = cls.expand_tag_value(value)
2021-09-21 01:08:24 +02:00
result += f"-(@{field_name}:{{{expanded_value}}})"
2021-09-16 21:03:03 +02:00
return result
2021-09-16 23:35:25 +02:00
def resolve_redisearch_pagination(self):
"""Resolve pagination options for a query."""
return ["LIMIT", self.offset, self.limit]
def resolve_redisearch_sort_fields(self):
"""Resolve sort options for a query."""
if not self.sort_fields:
return
fields = []
for f in self.sort_fields:
direction = "desc" if f.startswith('-') else 'asc'
fields.extend([f.lstrip('-'), direction])
if self.sort_fields:
return ["SORTBY", *fields]
2021-09-16 23:35:25 +02:00
@classmethod
def resolve_redisearch_query(cls, expression: ExpressionOrNegated) -> str:
2021-10-14 02:16:20 +02:00
"""
Resolve an arbitrarily deep expression into a single RediSearch query string.
This method is complex. Note the following:
1. This method makes a recursive call to itself when it finds that
either the left or right operand contains another expression.
2. An expression might be in a "negated" form, which means that the user
gave us an expression like ~(Member.age == 30), or in other words,
"Members whose age is NOT 30." Thus, a negated expression is one in
which the meaning of an expression is inverted. If we find a negated
expression, we need to add the appropriate "NOT" syntax but can
otherwise use the resolved RediSearch query for the expression as-is.
3. The final resolution of an expression should be a left operand that's
a ModelField, an operator, and a right operand that's NOT a ModelField.
With an IN or NOT_IN operator, the right operand can be a sequence
type, but otherwise, sequence types are converted to strings.
TODO: When the operator is not IN or NOT_IN, detect a sequence type (other
than strings, which are allowed) and raise an exception.
"""
field_type = None
field_name = None
field_info = None
2021-09-21 01:06:04 +02:00
encompassing_expression_is_negated = False
result = ""
2021-09-16 21:03:03 +02:00
if isinstance(expression, NegatedExpression):
2021-09-21 01:06:04 +02:00
encompassing_expression_is_negated = True
2021-09-16 21:03:03 +02:00
expression = expression.expression
2021-09-21 01:06:04 +02:00
if expression.op is Operators.ALL:
if encompassing_expression_is_negated:
# TODO: Is there a use case for this, perhaps for dynamic
2021-10-14 02:16:20 +02:00
# scoring purposes with full-text search?
raise QueryNotSupportedError("You cannot negate a query for all results.")
return "*"
2021-09-21 01:06:04 +02:00
if isinstance(expression.left, Expression) or \
isinstance(expression.left, NegatedExpression):
result += f"({cls.resolve_redisearch_query(expression.left)})"
elif isinstance(expression.left, ModelField):
field_type = cls.resolve_field_type(expression.left, expression.op)
field_name = expression.left.name
field_info = expression.left.field_info
2021-10-12 23:22:57 +02:00
if not field_info or not getattr(field_info, "index", None):
raise QueryNotSupportedError(f"You tried to query by a field ({field_name}) "
f"that isn't indexed. See docs: TODO")
else:
2021-09-21 01:06:04 +02:00
raise QueryNotSupportedError(f"A query expression should start with either a field "
f"or an expression enclosed in parenthesis. See docs: "
f"TODO")
2021-09-16 21:03:03 +02:00
right = expression.right
if isinstance(right, Expression) or isinstance(right, NegatedExpression):
if expression.op == Operators.AND:
2021-09-16 21:03:03 +02:00
result += " "
elif expression.op == Operators.OR:
2021-09-16 21:03:03 +02:00
result += "| "
else:
raise QueryNotSupportedError("You can only combine two query expressions with"
2021-09-16 21:03:03 +02:00
"AND (&) or OR (|). See docs: TODO")
if isinstance(right, NegatedExpression):
2021-09-16 21:03:03 +02:00
result += "-"
# We're handling the RediSearch operator in this call ("-"), so resolve the
# inner expression instead of the NegatedExpression.
right = right.expression
result += f"({cls.resolve_redisearch_query(right)})"
else:
2021-10-14 02:16:20 +02:00
if not field_name:
raise QuerySyntaxError("Could not resolve field name. See docs: TODO")
elif not field_type:
raise QuerySyntaxError("Could not resolve field type. See docs: TODO")
elif not field_info:
raise QuerySyntaxError("Could not resolve field info. See docs: TODO")
elif isinstance(right, ModelField):
raise QueryNotSupportedError("Comparing fields is not supported. See docs: TODO")
else:
2021-10-12 23:22:57 +02:00
result += cls.resolve_value(field_name, field_type, field_info,
expression.op, right, expression.parents)
2021-09-16 21:03:03 +02:00
2021-09-21 01:06:04 +02:00
if encompassing_expression_is_negated:
2021-09-16 21:03:03 +02:00
result = f"-({result})"
return result
def execute(self, exhaust_results=True):
args = ["ft.search", self.model.Meta.index_name, self.query, *self.pagination]
if self.sort_fields:
args += self.resolve_redisearch_sort_fields()
# Reset the cache if we're executing from offset 0.
if self.offset == 0:
self._model_cache.clear()
# If the offset is greater than 0, we're paginating through a result set,
# so append the new results to results already in the cache.
raw_result = self.model.db().execute_command(*args)
count = raw_result[0]
results = self.model.from_redis(raw_result)
self._model_cache += results
if not exhaust_results:
return self._model_cache
# The query returned all results, so we have no more work to do.
if count <= len(results):
return self._model_cache
# Transparently (to the user) make subsequent requests to paginate
# through the results and finally return them all.
query = self
while True:
# Make a query for each pass of the loop, with a new offset equal to the
# current offset plus `page_size`, until we stop getting results back.
2021-10-14 02:16:20 +02:00
query = query.copy(offset=query.offset + query.page_size)
_results = query.execute(exhaust_results=False)
if not _results:
break
self._model_cache += _results
return self._model_cache
def first(self):
2021-10-14 02:16:20 +02:00
query = self.copy(offset=0, limit=1, sort_fields=self.sort_fields)
results = query.execute()
if not results:
raise NotFoundError()
return results[0]
def all(self, batch_size=10):
if batch_size != self.page_size:
2021-10-14 02:16:20 +02:00
query = self.copy(page_size=batch_size, limit=batch_size)
return query.execute()
return self.execute()
2021-09-30 05:23:39 +02:00
def sort_by(self, *fields: str):
if not fields:
return self
2021-10-14 02:16:20 +02:00
return self.copy(sort_fields=list(fields))
def update(self, **kwargs):
"""Update all matching records in this query."""
# TODO
def delete(cls, **field_values):
"""Delete all matching records in this query."""
for field_name, value in field_values:
valid_attr = hasattr(cls.model, field_name)
if not valid_attr:
raise RedisModelError(f"Can't update field {field_name} because "
f"the field does not exist on the model {cls}")
return cls
def __iter__(self):
if self._model_cache:
for m in self._model_cache:
yield m
else:
for m in self.execute():
yield m
2021-09-01 01:30:31 +02:00
def __getitem__(self, item: int):
"""
Given this code:
Model.find()[1000]
2021-09-01 01:30:31 +02:00
We should return only the 1000th result.
2021-09-01 01:30:31 +02:00
1. If the result is loaded in the query cache for this query,
we can return it directly from the cache.
2021-09-01 01:30:31 +02:00
2. If the query cache does not have enough elements to return
that result, then we should clone the current query and
give it a new offset and limit: offset=n, limit=1.
"""
if self._model_cache and len(self._model_cache) >= item:
return self._model_cache[item]
2021-10-14 02:16:20 +02:00
query = self.copy(offset=item, limit=1)
return query.execute()[0]
class PrimaryKeyCreator(Protocol):
def create_pk(self, *args, **kwargs) -> str:
"""Create a new primary key"""
class UlidPrimaryKey:
"""A client-side generated primary key that follows the ULID spec.
https://github.com/ulid/javascript#specification
"""
@staticmethod
def create_pk(*args, **kwargs) -> str:
return str(ULID())
def __dataclass_transform__(
*,
eq_default: bool = True,
order_default: bool = False,
kw_only_default: bool = False,
field_descriptors: Tuple[Union[type, Callable[..., Any]], ...] = (()),
) -> Callable[[_T], _T]:
return lambda a: a
class FieldInfo(PydanticFieldInfo):
def __init__(self, default: Any = Undefined, **kwargs: Any) -> None:
primary_key = kwargs.pop("primary_key", False)
sortable = kwargs.pop("sortable", Undefined)
index = kwargs.pop("index", Undefined)
full_text_search = kwargs.pop("full_text_search", Undefined)
super().__init__(default=default, **kwargs)
self.primary_key = primary_key
self.sortable = sortable
self.index = index
self.full_text_search = full_text_search
class RelationshipInfo(Representation):
def __init__(
self,
*,
back_populates: Optional[str] = None,
link_model: Optional[Any] = None,
) -> None:
self.back_populates = back_populates
self.link_model = link_model
def Field(
default: Any = Undefined,
*,
default_factory: Optional[NoArgAnyCallable] = None,
alias: str = None,
title: str = None,
description: str = None,
exclude: Union[
AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any
] = None,
include: Union[
AbstractSet[Union[int, str]], Mapping[Union[int, str], Any], Any
] = None,
const: bool = None,
gt: float = None,
ge: float = None,
lt: float = None,
le: float = None,
multiple_of: float = None,
min_items: int = None,
max_items: int = None,
min_length: int = None,
max_length: int = None,
allow_mutation: bool = True,
regex: str = None,
primary_key: bool = False,
sortable: Union[bool, UndefinedType] = Undefined,
index: Union[bool, UndefinedType] = Undefined,
full_text_search: Union[bool, UndefinedType] = Undefined,
schema_extra: Optional[Dict[str, Any]] = None,
) -> Any:
current_schema_extra = schema_extra or {}
field_info = FieldInfo(
default,
default_factory=default_factory,
alias=alias,
title=title,
description=description,
exclude=exclude,
include=include,
const=const,
gt=gt,
ge=ge,
lt=lt,
le=le,
multiple_of=multiple_of,
min_items=min_items,
max_items=max_items,
min_length=min_length,
max_length=max_length,
allow_mutation=allow_mutation,
regex=regex,
primary_key=primary_key,
sortable=sortable,
index=index,
full_text_search=full_text_search,
**current_schema_extra,
)
field_info._validate()
return field_info
@dataclasses.dataclass
class PrimaryKey:
name: str
field: ModelField
class MetaProtocol(Protocol):
global_key_prefix: str
model_key_prefix: str
primary_key_pattern: str
database: redis.Redis
primary_key: PrimaryKey
primary_key_creator_cls: Type[PrimaryKeyCreator]
index_name: str
abstract: bool
2021-10-12 23:22:57 +02:00
embedded: bool
@dataclasses.dataclass
class DefaultMeta:
"""A default placeholder Meta object.
TODO: Revisit whether this is really necessary, and whether making
these all optional here is the right choice.
"""
global_key_prefix: Optional[str] = None
model_key_prefix: Optional[str] = None
primary_key_pattern: Optional[str] = None
database: Optional[redis.Redis] = None
primary_key: Optional[PrimaryKey] = None
primary_key_creator_cls: Optional[Type[PrimaryKeyCreator]] = None
index_name: Optional[str] = None
abstract: Optional[bool] = False
2021-10-12 23:22:57 +02:00
embedded: Optional[bool] = False
class ModelMeta(ModelMetaclass):
_meta: MetaProtocol
def __new__(cls, name, bases, attrs, **kwargs): # noqa C901
meta = attrs.pop('Meta', None)
new_class = super().__new__(cls, name, bases, attrs, **kwargs)
2021-10-12 23:22:57 +02:00
# The fact that there is a Meta field and _meta field is important: a
# user may have given us a Meta object with their configuration, while
# we might have inherited _meta from a parent class, and should
# therefore use some of the inherited fields.
meta = meta or getattr(new_class, 'Meta', None)
base_meta = getattr(new_class, '_meta', None)
if meta and meta != DefaultMeta and meta != base_meta:
new_class.Meta = meta
new_class._meta = meta
elif base_meta:
new_class._meta = deepcopy(base_meta)
new_class.Meta = new_class._meta
2021-10-12 23:22:57 +02:00
# Unset inherited values we don't want to reuse (typically based on
# the model name).
new_class._meta.embedded = False
new_class._meta.model_key_prefix = None
new_class._meta.index_name = None
else:
new_class._meta = deepcopy(DefaultMeta)
new_class.Meta = new_class._meta
# Create proxies for each model field so that we can use the field
# in queries, like Model.get(Model.field_name == 1)
for field_name, field in new_class.__fields__.items():
2021-10-12 23:22:57 +02:00
setattr(new_class, field_name, ExpressionProxy(field, []))
# Check if this is our FieldInfo version with extended ORM metadata.
if isinstance(field.field_info, FieldInfo):
if field.field_info.primary_key:
new_class._meta.primary_key = PrimaryKey(name=field_name, field=field)
if not getattr(new_class._meta, 'global_key_prefix', None):
new_class._meta.global_key_prefix = getattr(base_meta, "global_key_prefix", "")
if not getattr(new_class._meta, 'model_key_prefix', None):
# Don't look at the base class for this.
new_class._meta.model_key_prefix = f"{new_class.__module__}.{new_class.__name__}"
if not getattr(new_class._meta, 'primary_key_pattern', None):
new_class._meta.primary_key_pattern = getattr(base_meta, "primary_key_pattern",
"{pk}")
if not getattr(new_class._meta, 'database', None):
new_class._meta.database = getattr(base_meta, "database",
redis.Redis(decode_responses=True))
if not getattr(new_class._meta, 'primary_key_creator_cls', None):
new_class._meta.primary_key_creator_cls = getattr(base_meta, "primary_key_creator_cls",
UlidPrimaryKey)
if not getattr(new_class._meta, 'index_name', None):
new_class._meta.index_name = f"{new_class._meta.global_key_prefix}:" \
f"{new_class._meta.model_key_prefix}:index"
2021-10-12 23:22:57 +02:00
# Not an abstract model class or embedded model, so we should let the
# Migrator create indexes for it.
if abc.ABC not in bases and not getattr(new_class._meta, 'embedded', False):
key = f"{new_class.__module__}.{new_class.__qualname__}"
model_registry[key] = new_class
return new_class
class RedisModel(BaseModel, abc.ABC, metaclass=ModelMeta):
pk: Optional[str] = Field(default=None, primary_key=True)
Meta = DefaultMeta
class Config:
orm_mode = True
arbitrary_types_allowed = True
extra = 'allow'
def __init__(__pydantic_self__, **data: Any) -> None:
2021-09-01 01:30:31 +02:00
super().__init__(**data)
__pydantic_self__.validate_primary_key()
def __lt__(self, other):
"""Default sort: compare primary key of models."""
return self.pk < other.pk
2021-09-01 21:56:06 +02:00
@validator("pk", always=True)
def validate_pk(cls, v):
if not v:
v = cls._meta.primary_key_creator_cls().create_pk()
2021-09-01 21:56:06 +02:00
return v
@classmethod
def validate_primary_key(cls):
"""Check for a primary key. We need one (and only one)."""
primary_keys = 0
for name, field in cls.__fields__.items():
2021-09-01 01:30:31 +02:00
if getattr(field.field_info, 'primary_key', None):
primary_keys += 1
if primary_keys == 0:
raise RedisModelError("You must define a primary key for the model")
elif primary_keys > 1:
raise RedisModelError("You must define only one primary key for a model")
@classmethod
2021-08-31 21:03:53 +02:00
def make_key(cls, part: str):
global_prefix = getattr(cls._meta, 'global_key_prefix', '').strip(":")
model_prefix = getattr(cls._meta, 'model_key_prefix', '').strip(":")
2021-09-01 21:56:06 +02:00
return f"{global_prefix}:{model_prefix}:{part}"
2021-08-31 21:03:53 +02:00
@classmethod
2021-09-01 01:30:31 +02:00
def make_primary_key(cls, pk: Any):
2021-08-31 21:03:53 +02:00
"""Return the Redis key for this model."""
return cls.make_key(cls._meta.primary_key_pattern.format(pk=pk))
2021-08-31 21:03:53 +02:00
def key(self):
"""Return the Redis key for this model."""
pk = getattr(self, self._meta.primary_key.field.name)
2021-08-31 21:03:53 +02:00
return self.make_primary_key(pk)
@classmethod
def db(cls):
return cls._meta.database
2021-09-16 21:03:03 +02:00
@classmethod
2021-10-14 02:16:20 +02:00
def find(cls, *expressions: Union[Any, Expression]) -> FindQuery:
return FindQuery(expressions=expressions, model=cls)
@classmethod
def from_redis(cls, res: Any):
# TODO: Parsing logic borrowed from redisearch-py. Evaluate.
import six
from six.moves import xrange, zip as izip
2021-09-16 21:03:03 +02:00
def to_string(s):
if isinstance(s, six.string_types):
return s
elif isinstance(s, six.binary_type):
return s.decode('utf-8', 'ignore')
else:
return s # Not a string we care about
2021-09-16 21:03:03 +02:00
docs = []
step = 2 # Because the result has content
offset = 1 # The first item is the count of total matches.
for i in xrange(1, len(res), step):
fields_offset = offset
fields = dict(
dict(izip(map(to_string, res[i + fields_offset][::2]),
map(to_string, res[i + fields_offset][1::2])))
)
try:
del fields['id']
except KeyError:
pass
try:
fields['json'] = fields['$']
del fields['$']
except KeyError:
pass
if 'json' in fields:
json_fields = json.loads(fields['json'])
doc = cls(**json_fields)
else:
doc = cls(**fields)
docs.append(doc)
return docs
@classmethod
def add(cls, models: Sequence['RedisModel']) -> Sequence['RedisModel']:
return [model.save() for model in models]
@classmethod
def update(cls, **field_values):
"""Update this model instance."""
return cls
@classmethod
def values(cls):
"""Return raw values from Redis instead of model instances."""
return cls
2021-08-31 21:03:53 +02:00
def delete(self):
return self.db().delete(self.key())
def save(self, *args, **kwargs) -> 'RedisModel':
2021-09-01 21:56:06 +02:00
raise NotImplementedError
@classmethod
2021-09-30 05:23:39 +02:00
def redisearch_schema(cls):
2021-09-01 21:56:06 +02:00
raise NotImplementedError
class HashModel(RedisModel, abc.ABC):
2021-09-01 21:56:06 +02:00
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
for name, field in cls.__fields__.items():
if issubclass(field.outer_type_, RedisModel):
raise RedisModelError(f"HashModels cannot have embedded model "
f"fields. Field: {name}")
2021-09-01 21:56:06 +02:00
for typ in (Set, Mapping, List):
if issubclass(field.outer_type_, typ):
raise RedisModelError(f"HashModels cannot have set, list,"
f" or mapping fields. Field: {name}")
2021-09-01 21:56:06 +02:00
def save(self, *args, **kwargs) -> 'HashModel':
document = jsonable_encoder(self.dict())
2021-08-31 21:03:53 +02:00
success = self.db().hset(self.key(), mapping=document)
2021-09-01 21:56:06 +02:00
return success
@classmethod
def get(cls, pk: Any) -> 'HashModel':
document = cls.db().hgetall(cls.make_primary_key(pk))
if not document:
raise NotFoundError
2021-09-01 21:56:06 +02:00
return cls.parse_obj(document)
@classmethod
@no_type_check
def _get_value(cls, *args, **kwargs) -> Any:
"""
Always send None as an empty string.
TODO: We do this because redis-py's hset() method requires non-null
values. Is there a better way?
"""
val = super()._get_value(*args, **kwargs)
if val is None:
return ""
return val
@classmethod
2021-09-30 05:23:39 +02:00
def redisearch_schema(cls):
hash_prefix = cls.make_key(cls._meta.primary_key_pattern.format(pk=""))
schema_prefix = f"ON HASH PREFIX 1 {hash_prefix} SCHEMA"
schema_parts = [schema_prefix] + cls.schema_for_fields()
return " ".join(schema_parts)
@classmethod
def schema_for_fields(cls):
schema_parts = []
for name, field in cls.__fields__.items():
2021-10-12 23:22:57 +02:00
# TODO: Merge this code with schema_for_type()?
_type = field.outer_type_
if getattr(field.field_info, 'primary_key', None):
if issubclass(_type, str):
redisearch_field = f"{name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR}"
else:
redisearch_field = cls.schema_for_type(name, _type, field.field_info)
schema_parts.append(redisearch_field)
elif getattr(field.field_info, 'index', None) is True:
schema_parts.append(cls.schema_for_type(name, _type, field.field_info))
# TODO: Raise error if user embeds a model field or list and makes it
# sortable. Instead, the embedded model should mark individual fields
# as sortable.
if getattr(field.field_info, 'sortable', False) is True:
schema_parts.append("SORTABLE")
elif get_origin(_type) == list:
embedded_cls = get_args(_type)
if not embedded_cls:
# TODO: Test if this can really happen.
log.warning("Model %s defined an empty list field: %s", cls, name)
continue
embedded_cls = embedded_cls[0]
schema_parts.append(cls.schema_for_type(name, embedded_cls,
field.field_info))
elif issubclass(_type, RedisModel):
schema_parts.append(cls.schema_for_type(name, _type, field.field_info))
return schema_parts
@classmethod
def schema_for_type(cls, name, typ: Any, field_info: PydanticFieldInfo):
if get_origin(typ) == list:
embedded_cls = get_args(typ)
if not embedded_cls:
# TODO: Test if this can really happen.
log.warning("Model %s defined an empty list field: %s", cls, name)
return ""
embedded_cls = embedded_cls[0]
return cls.schema_for_type(name, embedded_cls, field_info)
elif any(issubclass(typ, t) for t in NUMERIC_TYPES):
return f"{name} NUMERIC"
elif issubclass(typ, str):
if getattr(field_info, 'full_text_search', False) is True:
return f"{name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR} " \
f"{name}_fts TEXT"
else:
return f"{name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR}"
elif issubclass(typ, RedisModel):
sub_fields = []
for embedded_name, field in typ.__fields__.items():
sub_fields.append(cls.schema_for_type(f"{name}_{embedded_name}", field.outer_type_,
field.field_info))
return " ".join(sub_fields)
else:
return f"{name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR}"
class JsonModel(RedisModel, abc.ABC):
2021-09-01 21:56:06 +02:00
def save(self, *args, **kwargs) -> 'JsonModel':
success = self.db().execute_command('JSON.SET', self.key(), ".", self.json())
return success
2021-09-01 21:56:06 +02:00
@classmethod
def get(cls, pk: Any) -> 'JsonModel':
document = cls.db().execute_command("JSON.GET", cls.make_primary_key(pk))
if not document:
raise NotFoundError
return cls.parse_raw(document)
@classmethod
def redisearch_schema(cls):
key_prefix = cls.make_key(cls._meta.primary_key_pattern.format(pk=""))
schema_prefix = f"ON JSON PREFIX 1 {key_prefix} SCHEMA"
schema_parts = [schema_prefix] + cls.schema_for_fields()
return " ".join(schema_parts)
@classmethod
def schema_for_fields(cls):
schema_parts = []
json_path = "$"
for name, field in cls.__fields__.items():
_type = field.outer_type_
schema_parts.append(cls.schema_for_type(
json_path, name, "", _type, field.field_info))
return schema_parts
@classmethod
def schema_for_type(cls, json_path: str, name: str, name_prefix: str, typ: Any,
2021-10-13 17:12:22 +02:00
field_info: PydanticFieldInfo,
parent_type: Optional[Any] = None) -> str:
should_index = getattr(field_info, 'index', False)
field_type = get_origin(typ)
try:
field_is_model = issubclass(typ, RedisModel)
except TypeError:
# Not a class, probably a type annotation
field_is_model = False
# When we encounter a list or model field, we need to descend
# into the values of the list or the fields of the model to
# find any values marked as indexed.
if field_type == list:
embedded_cls = get_args(typ)
if not embedded_cls:
log.warning("Model %s defined an empty list field: %s", cls, name)
return ""
embedded_cls = embedded_cls[0]
2021-10-13 17:12:22 +02:00
return cls.schema_for_type(f"{json_path}.{name}[*]", name, name_prefix,
embedded_cls, field_info, parent_type=field_type)
elif field_is_model:
name_prefix = f"{name_prefix}_{name}" if name_prefix else name
sub_fields = []
for embedded_name, field in typ.__fields__.items():
2021-10-13 17:12:22 +02:00
if parent_type == list or isinstance(parent_type, RedisModel):
# This is a list, so the correct JSONPath expression is to
# refer directly to attribute names after the list notation,
2021-10-13 17:12:22 +02:00
# e.g. orders[*].created_date.
path = json_path
else:
# All other fields should use dot notation with both the
# current field name and "embedded" field name, e.g.,
# order.address.street_line_1.
2021-10-13 17:12:22 +02:00
path = f"{json_path}.{name}"
sub_fields.append(cls.schema_for_type(path,
embedded_name,
name_prefix,
field.outer_type_,
2021-10-13 17:12:22 +02:00
field.field_info,
parent_type=field_type))
return " ".join(filter(None, sub_fields))
elif should_index:
index_field_name = f"{name_prefix}_{name}" if name_prefix else name
path = f"{json_path}.{name}"
if any(issubclass(typ, t) for t in NUMERIC_TYPES):
schema_part = f"{path} AS {index_field_name} NUMERIC"
elif issubclass(typ, str):
if getattr(field_info, 'full_text_search', False) is True:
schema_part = f"{path} AS {index_field_name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR} " \
f"{path} AS {index_field_name}_fts TEXT"
else:
schema_part = f"{path} AS {index_field_name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR}"
else:
schema_part = f"{path} AS {index_field_name} TAG"
# TODO: GEO field
schema_part += " SORTABLE"
return schema_part
return ""