WIP on recursive schema generation - still broken
This commit is contained in:
parent
5d05de95f8
commit
abb1ce6a31
1 changed files with 40 additions and 43 deletions
|
@ -1084,75 +1084,72 @@ class JsonModel(RedisModel, abc.ABC):
|
|||
schema_parts = []
|
||||
json_path = "$"
|
||||
|
||||
if cls.__name__ == "Address":
|
||||
import ipdb; ipdb.set_trace()
|
||||
for name, field in cls.__fields__.items():
|
||||
# TODO: Merge this code with schema_for_type()?
|
||||
_type = field.outer_type_
|
||||
if getattr(field.field_info, 'primary_key', None):
|
||||
if issubclass(_type, str):
|
||||
redisearch_field = f"{json_path}.{name} AS {name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR}"
|
||||
else:
|
||||
redisearch_field = cls.schema_for_type(f"{json_path}.{name}", name, "", _type, field.field_info)
|
||||
schema_parts.append(redisearch_field)
|
||||
elif getattr(field.field_info, 'index', None) is True:
|
||||
schema_parts.append(cls.schema_for_type(f"{json_path}.{name}", name, "", _type, field.field_info))
|
||||
# TODO: Raise error if user embeds a model field or list and makes it
|
||||
# sortable. Instead, the embedded model should mark individual fields
|
||||
# as sortable.
|
||||
if getattr(field.field_info, 'sortable', False) is True:
|
||||
schema_parts.append("SORTABLE")
|
||||
elif get_origin(_type) == list:
|
||||
embedded_cls = get_args(_type)
|
||||
if not embedded_cls:
|
||||
# TODO: Test if this can really happen.
|
||||
log.warning("Model %s defined an empty list field: %s", cls, name)
|
||||
continue
|
||||
embedded_cls = embedded_cls[0]
|
||||
# TODO: Should this have a name prefix?
|
||||
schema_parts.append(cls.schema_for_type(f"{json_path}.{name}[]", name, name,
|
||||
embedded_cls, field.field_info))
|
||||
elif issubclass(_type, RedisModel):
|
||||
schema_parts.append(cls.schema_for_type(f"{json_path}.{name}", name, name, _type,
|
||||
field.field_info))
|
||||
schema_parts.append(cls.schema_for_type(
|
||||
json_path, name, "", _type, field.field_info))
|
||||
return schema_parts
|
||||
|
||||
@classmethod
|
||||
def schema_for_type(cls, json_path: str, name: str, name_prefix: str, typ: Any,
|
||||
field_info: PydanticFieldInfo) -> str:
|
||||
if name == "description":
|
||||
import ipdb; ipdb.set_trace()
|
||||
index_field_name = f"{name_prefix}_{name}"
|
||||
print(json_path, name, name_prefix, typ)
|
||||
should_index = getattr(field_info, 'index', False)
|
||||
field_type = get_origin(typ)
|
||||
try:
|
||||
field_is_model = issubclass(typ, RedisModel)
|
||||
except TypeError:
|
||||
# Not a class, probably a type annotation
|
||||
field_is_model = False
|
||||
|
||||
if get_origin(typ) == list:
|
||||
# When we encounter a list or model field, we need to descend
|
||||
# into the values of the list or the fields of the model to
|
||||
# find any values marked as indexed.
|
||||
if field_type == list:
|
||||
embedded_cls = get_args(typ)
|
||||
if not embedded_cls:
|
||||
# TODO: Test if this can really happen.
|
||||
log.warning("Model %s defined an empty list field: %s", cls, name)
|
||||
return ""
|
||||
embedded_cls = embedded_cls[0]
|
||||
return cls.schema_for_type(f"{json_path}[]", name, f"{name_prefix}{name}",
|
||||
return cls.schema_for_type(f"{json_path}.{name}[]", name, name_prefix,
|
||||
embedded_cls, field_info)
|
||||
elif issubclass(typ, RedisModel):
|
||||
elif field_is_model:
|
||||
name_prefix = f"{name_prefix}_{name}" if name_prefix else name
|
||||
sub_fields = []
|
||||
for embedded_name, field in typ.__fields__.items():
|
||||
sub_fields.append(cls.schema_for_type(f"{json_path}.{embedded_name}",
|
||||
if json_path.endswith("[]"):
|
||||
# This is a list, so the correct JSONPath expression is to
|
||||
# refer directly to attribute names after the list notation,
|
||||
# e.g. orders[].created_date.
|
||||
path = f"{json_path}.{embedded_name}"
|
||||
else:
|
||||
# All other fields should use dot notation with both the
|
||||
# current field name and "embedded" field name, e.g.,
|
||||
# order.address.street_line_1.
|
||||
path = f"{json_path}.{name}.{embedded_name}"
|
||||
print(path)
|
||||
sub_fields.append(cls.schema_for_type(path,
|
||||
embedded_name,
|
||||
f"{name_prefix}_{embedded_name}",
|
||||
name_prefix,
|
||||
field.outer_type_,
|
||||
field.field_info))
|
||||
return " ".join(filter(None, sub_fields))
|
||||
elif should_index:
|
||||
index_field_name = f"{name_prefix}_{name}" if name_prefix else name
|
||||
path = f"{json_path}.{name}"
|
||||
if any(issubclass(typ, t) for t in NUMERIC_TYPES):
|
||||
return f"{json_path} AS {index_field_name} NUMERIC"
|
||||
schema_part = f"{path} AS {index_field_name} NUMERIC"
|
||||
elif issubclass(typ, str):
|
||||
if getattr(field_info, 'full_text_search', False) is True:
|
||||
return f"{json_path} AS {index_field_name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR} " \
|
||||
f"{json_path} AS {index_field_name}_fts TEXT"
|
||||
schema_part = f"{path} AS {index_field_name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR} " \
|
||||
f"{path} AS {index_field_name}_fts TEXT"
|
||||
else:
|
||||
return f"{json_path} AS {index_field_name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR}"
|
||||
schema_part = f"{path} AS {index_field_name} TAG SEPARATOR {SINGLE_VALUE_TAG_FIELD_SEPARATOR}"
|
||||
else:
|
||||
return f"{json_path} AS {index_field_name} TAG"
|
||||
schema_part = f"{path} AS {index_field_name} TAG"
|
||||
# TODO: GEO field
|
||||
if should_index:
|
||||
schema_part += " SORTABLE"
|
||||
return schema_part
|
||||
|
||||
return ""
|
||||
|
|
Loading…
Reference in a new issue