Generate a seemingly correct schema for JSON models
This commit is contained in:
		
							parent
							
								
									9a47d1046d
								
							
						
					
					
						commit
						976fdf3ad4
					
				
					 3 changed files with 384 additions and 121 deletions
				
			
		| 
						 | 
				
			
			@ -1,6 +1,7 @@
 | 
			
		|||
import abc
 | 
			
		||||
import dataclasses
 | 
			
		||||
import decimal
 | 
			
		||||
import logging
 | 
			
		||||
import operator
 | 
			
		||||
import re
 | 
			
		||||
from copy import deepcopy
 | 
			
		||||
| 
						 | 
				
			
			@ -22,7 +23,7 @@ from typing import (
 | 
			
		|||
    Protocol,
 | 
			
		||||
    List,
 | 
			
		||||
    Type,
 | 
			
		||||
    Pattern
 | 
			
		||||
    Pattern, get_origin, get_args
 | 
			
		||||
)
 | 
			
		||||
import uuid
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -37,8 +38,8 @@ from pydantic.utils import Representation
 | 
			
		|||
from .encoders import jsonable_encoder
 | 
			
		||||
 | 
			
		||||
model_registry = {}
 | 
			
		||||
 | 
			
		||||
_T = TypeVar("_T")
 | 
			
		||||
log = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TokenEscaper:
 | 
			
		||||
| 
						 | 
				
			
			@ -338,7 +339,6 @@ class FindQuery:
 | 
			
		|||
            field_type = cls.resolve_field_type(expression.left)
 | 
			
		||||
            field_name = expression.left.name
 | 
			
		||||
        else:
 | 
			
		||||
            import ipdb; ipdb.set_trace()
 | 
			
		||||
            raise QueryNotSupportedError(f"A query expression should start with either a field "
 | 
			
		||||
                                         f"or an expression enclosed in parenthesis. See docs: "
 | 
			
		||||
                                         f"TODO")
 | 
			
		||||
| 
						 | 
				
			
			@ -831,24 +831,19 @@ class HashModel(RedisModel, abc.ABC):
 | 
			
		|||
            return ""
 | 
			
		||||
        return val
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def schema_for_type(cls, name, typ: Type, field_info: FieldInfo):
 | 
			
		||||
        if any(issubclass(typ, t) for t in NUMERIC_TYPES):
 | 
			
		||||
            return f"{name} NUMERIC"
 | 
			
		||||
        elif issubclass(typ, str):
 | 
			
		||||
            if getattr(field_info, 'full_text_search', False) is True:
 | 
			
		||||
                return f"{name} TAG {name}_fts TEXT"
 | 
			
		||||
            else:
 | 
			
		||||
                return f"{name} TAG"
 | 
			
		||||
        else:
 | 
			
		||||
            return f"{name} TAG"
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def redisearch_schema(cls):
 | 
			
		||||
        hash_prefix = cls.make_key(cls._meta.primary_key_pattern.format(pk=""))
 | 
			
		||||
        schema_prefix = f"ON HASH PREFIX 1 {hash_prefix} SCHEMA"
 | 
			
		||||
        schema_parts = [schema_prefix]
 | 
			
		||||
        schema_parts = [schema_prefix] + cls.schema_for_fields()
 | 
			
		||||
        return " ".join(schema_parts)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def schema_for_fields(cls):
 | 
			
		||||
        schema_parts = []
 | 
			
		||||
 | 
			
		||||
        for name, field in cls.__fields__.items():
 | 
			
		||||
            # TODO: Merge this code with schema_for_type()
 | 
			
		||||
            _type = field.outer_type_
 | 
			
		||||
            if getattr(field.field_info, 'primary_key', None):
 | 
			
		||||
                if issubclass(_type, str):
 | 
			
		||||
| 
						 | 
				
			
			@ -858,9 +853,49 @@ class HashModel(RedisModel, abc.ABC):
 | 
			
		|||
                schema_parts.append(redisearch_field)
 | 
			
		||||
            elif getattr(field.field_info, 'index', None) is True:
 | 
			
		||||
                schema_parts.append(cls.schema_for_type(name, _type, field.field_info))
 | 
			
		||||
                # TODO: Raise error if user embeds a model field or list and makes it
 | 
			
		||||
                #  sortable. Instead, the embedded model should mark individual fields
 | 
			
		||||
                #  as sortable.
 | 
			
		||||
                if getattr(field.field_info, 'sortable', False) is True:
 | 
			
		||||
                    schema_parts.append("SORTABLE")
 | 
			
		||||
        return " ".join(schema_parts)
 | 
			
		||||
            elif get_origin(_type) == list:
 | 
			
		||||
                embedded_cls = get_args(_type)
 | 
			
		||||
                if not embedded_cls:
 | 
			
		||||
                    # TODO: Test if this can really happen.
 | 
			
		||||
                    log.warning("Model %s defined an empty list field: %s", cls, name)
 | 
			
		||||
                    continue
 | 
			
		||||
                embedded_cls = embedded_cls[0]
 | 
			
		||||
                schema_parts.append(cls.schema_for_type(name, embedded_cls,
 | 
			
		||||
                                                        field.field_info))
 | 
			
		||||
            elif issubclass(_type, RedisModel):
 | 
			
		||||
                schema_parts.append(cls.schema_for_type(name, _type, field.field_info))
 | 
			
		||||
        return schema_parts
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def schema_for_type(cls, name, typ: Type, field_info: FieldInfo):
 | 
			
		||||
        if get_origin(typ) == list:
 | 
			
		||||
            embedded_cls = get_args(typ)
 | 
			
		||||
            if not embedded_cls:
 | 
			
		||||
                # TODO: Test if this can really happen.
 | 
			
		||||
                log.warning("Model %s defined an empty list field: %s", cls, name)
 | 
			
		||||
                return ""
 | 
			
		||||
            embedded_cls = embedded_cls[0]
 | 
			
		||||
            return cls.schema_for_type(name, embedded_cls, field_info)
 | 
			
		||||
        elif any(issubclass(typ, t) for t in NUMERIC_TYPES):
 | 
			
		||||
            return f"{name} NUMERIC"
 | 
			
		||||
        elif issubclass(typ, str):
 | 
			
		||||
            if getattr(field_info, 'full_text_search', False) is True:
 | 
			
		||||
                return f"{name} TAG {name}_fts TEXT"
 | 
			
		||||
            else:
 | 
			
		||||
                return f"{name} TAG"
 | 
			
		||||
        elif issubclass(typ, RedisModel):
 | 
			
		||||
            sub_fields = []
 | 
			
		||||
            for embedded_name, field in typ.__fields__.items():
 | 
			
		||||
                sub_fields.append(cls.schema_for_type(f"{name}_{embedded_name}", field.outer_type_,
 | 
			
		||||
                                                      field.field_info))
 | 
			
		||||
            return " ".join(sub_fields)
 | 
			
		||||
        else:
 | 
			
		||||
            return f"{name} TAG"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class JsonModel(RedisModel, abc.ABC):
 | 
			
		||||
| 
						 | 
				
			
			@ -874,3 +909,88 @@ class JsonModel(RedisModel, abc.ABC):
 | 
			
		|||
        if not document:
 | 
			
		||||
            raise NotFoundError
 | 
			
		||||
        return cls.parse_raw(document)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def redisearch_schema(cls):
 | 
			
		||||
        key_prefix = cls.make_key(cls._meta.primary_key_pattern.format(pk=""))
 | 
			
		||||
        schema_prefix = f"ON JSON PREFIX 1 {key_prefix} SCHEMA"
 | 
			
		||||
        schema_parts = [schema_prefix] + cls.schema_for_fields()
 | 
			
		||||
        return " ".join(schema_parts)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def schema_for_fields(cls):
 | 
			
		||||
        schema_parts = []
 | 
			
		||||
        json_path = "$"
 | 
			
		||||
 | 
			
		||||
        for name, field in cls.__fields__.items():
 | 
			
		||||
            # TODO: Merge this code with schema_for_type()?
 | 
			
		||||
            _type = field.outer_type_
 | 
			
		||||
            if getattr(field.field_info, 'primary_key', None):
 | 
			
		||||
                if issubclass(_type, str):
 | 
			
		||||
                    redisearch_field = f"{json_path}.{name} AS {name} TAG"
 | 
			
		||||
                else:
 | 
			
		||||
                    redisearch_field = cls.schema_for_type(f"{json_path}.{name}", name, "", _type, field.field_info)
 | 
			
		||||
                schema_parts.append(redisearch_field)
 | 
			
		||||
            elif getattr(field.field_info, 'index', None) is True:
 | 
			
		||||
                schema_parts.append(cls.schema_for_type(f"{json_path}.{name}", name, "", _type, field.field_info))
 | 
			
		||||
                # TODO: Raise error if user embeds a model field or list and makes it
 | 
			
		||||
                #  sortable. Instead, the embedded model should mark individual fields
 | 
			
		||||
                #  as sortable.
 | 
			
		||||
                if getattr(field.field_info, 'sortable', False) is True:
 | 
			
		||||
                    schema_parts.append("SORTABLE")
 | 
			
		||||
            elif get_origin(_type) == list:
 | 
			
		||||
                embedded_cls = get_args(_type)
 | 
			
		||||
                if not embedded_cls:
 | 
			
		||||
                    # TODO: Test if this can really happen.
 | 
			
		||||
                    log.warning("Model %s defined an empty list field: %s", cls, name)
 | 
			
		||||
                    continue
 | 
			
		||||
                embedded_cls = embedded_cls[0]
 | 
			
		||||
                schema_parts.append(cls.schema_for_type(f"{json_path}.{name}[]", name, f"{name}",
 | 
			
		||||
                                                        embedded_cls, field.field_info))
 | 
			
		||||
            elif issubclass(_type, RedisModel):
 | 
			
		||||
                schema_parts.append(cls.schema_for_type(f"{json_path}.{name}", name, f"{name}", _type,
 | 
			
		||||
                                                        field.field_info))
 | 
			
		||||
        return schema_parts
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    # TODO: We need both the "name" of the field (address_line_1) as we'll
 | 
			
		||||
    #  find it in the JSON document, AND the name of the field as it should
 | 
			
		||||
    #  be in the redisearch schema (address_address_line_1). Maybe both "name"
 | 
			
		||||
    #  and "name_prefix"?
 | 
			
		||||
    def schema_for_type(cls, json_path: str, name: str, name_prefix: str, typ: Type,
 | 
			
		||||
                        field_info: FieldInfo) -> str:
 | 
			
		||||
        index_field_name = f"{name_prefix}{name}"
 | 
			
		||||
        should_index = getattr(field_info, 'index', False)
 | 
			
		||||
 | 
			
		||||
        if get_origin(typ) == list:
 | 
			
		||||
            embedded_cls = get_args(typ)
 | 
			
		||||
            if not embedded_cls:
 | 
			
		||||
                # TODO: Test if this can really happen.
 | 
			
		||||
                log.warning("Model %s defined an empty list field: %s", cls, name)
 | 
			
		||||
                return ""
 | 
			
		||||
            embedded_cls = embedded_cls[0]
 | 
			
		||||
            # TODO: We need to pass the "JSON Path so far" which should include the
 | 
			
		||||
            #  correct syntax for an array.
 | 
			
		||||
            return cls.schema_for_type(f"{json_path}[]", name, f"{name_prefix}{name}", embedded_cls, field_info)
 | 
			
		||||
        elif issubclass(typ, RedisModel):
 | 
			
		||||
            sub_fields = []
 | 
			
		||||
            for embedded_name, field in typ.__fields__.items():
 | 
			
		||||
                sub_fields.append(cls.schema_for_type(f"{json_path}.{embedded_name}",
 | 
			
		||||
                                                      embedded_name,
 | 
			
		||||
                                                      f"{name_prefix}_",
 | 
			
		||||
                                                      field.outer_type_,
 | 
			
		||||
                                                      field.field_info))
 | 
			
		||||
            return " ".join(filter(None, sub_fields))
 | 
			
		||||
        elif should_index:
 | 
			
		||||
            if any(issubclass(typ, t) for t in NUMERIC_TYPES):
 | 
			
		||||
                return f"{json_path} AS {index_field_name} NUMERIC"
 | 
			
		||||
            elif issubclass(typ, str):
 | 
			
		||||
                if getattr(field_info, 'full_text_search', False) is True:
 | 
			
		||||
                    return f"{json_path} AS {index_field_name} TAG " \
 | 
			
		||||
                           f"{json_path} AS {index_field_name}_fts TEXT"
 | 
			
		||||
                else:
 | 
			
		||||
                    return f"{json_path} AS {index_field_name} TAG"
 | 
			
		||||
            else:
 | 
			
		||||
                return f"{json_path} AS {index_field_name} TAG"
 | 
			
		||||
 | 
			
		||||
        return ""
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue