Add find_one() implementation
This commit is contained in:
		
							parent
							
								
									f8c55236c1
								
							
						
					
					
						commit
						85ba111260
					
				
					 2 changed files with 33 additions and 38 deletions
				
			
		| 
						 | 
				
			
			@ -85,6 +85,9 @@ class Expression:
 | 
			
		|||
    def __and__(self, other):
 | 
			
		||||
        return Expression(left=self, op=Operators.AND, right=other)
 | 
			
		||||
 | 
			
		||||
    def __or__(self, other):
 | 
			
		||||
        return Expression(left=self, op=Operators.OR, right=other)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
ExpressionOrNegated = Union[Expression, NegatedExpression]
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -109,11 +112,15 @@ class FindQuery:
 | 
			
		|||
    expressions: Sequence[Expression]
 | 
			
		||||
    expression: Expression = dataclasses.field(init=False)
 | 
			
		||||
    query: str = dataclasses.field(init=False)
 | 
			
		||||
    pagination: List[str] = dataclasses.field(init=False)
 | 
			
		||||
    model: Type['RedisModel']
 | 
			
		||||
    limit: Optional[int] = None
 | 
			
		||||
    offset: Optional[int] = None
 | 
			
		||||
 | 
			
		||||
    def __post_init__(self):
 | 
			
		||||
        self.expression = reduce(operator.and_, self.expressions)
 | 
			
		||||
        self.query = self.resolve_redisearch_query(self.expression)
 | 
			
		||||
        self.pagination = self.resolve_redisearch_pagination()
 | 
			
		||||
 | 
			
		||||
    def resolve_field_type(self, field: ModelField) -> RediSearchFieldTypes:
 | 
			
		||||
        if getattr(field.field_info, 'primary_key', None):
 | 
			
		||||
| 
						 | 
				
			
			@ -159,6 +166,14 @@ class FindQuery:
 | 
			
		|||
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
    def resolve_redisearch_pagination(self):
 | 
			
		||||
        """Resolve pagination options for a query."""
 | 
			
		||||
        if not self.limit and not self.offset:
 | 
			
		||||
            return []
 | 
			
		||||
        offset = self.offset or 0
 | 
			
		||||
        limit = self.limit or 10
 | 
			
		||||
        return ["LIMIT", offset, limit]
 | 
			
		||||
 | 
			
		||||
    def resolve_redisearch_query(self, expression: ExpressionOrNegated):
 | 
			
		||||
        """Resolve an expression to a string RediSearch query."""
 | 
			
		||||
        field_type = None
 | 
			
		||||
| 
						 | 
				
			
			@ -210,8 +225,11 @@ class FindQuery:
 | 
			
		|||
        return result
 | 
			
		||||
 | 
			
		||||
    def find(self):
 | 
			
		||||
        return self.model.db().execute_command("ft.search", self.model.Meta.index_name,
 | 
			
		||||
                                               self.query)
 | 
			
		||||
        args = ["ft.search", self.model.Meta.index_name, self.query]
 | 
			
		||||
        # TODO: Do we need self.pagination if we're just appending to query anyway?
 | 
			
		||||
        if self.pagination:
 | 
			
		||||
            args.extend(self.pagination)
 | 
			
		||||
        return self.model.db().execute_command(*args)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PrimaryKeyCreator(Protocol):
 | 
			
		||||
| 
						 | 
				
			
			@ -518,8 +536,10 @@ class RedisModel(BaseModel, abc.ABC, metaclass=ModelMeta):
 | 
			
		|||
        return cls.from_redis(raw_result)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def find_one(cls, *expressions: Sequence[Expression]):
 | 
			
		||||
        return cls
 | 
			
		||||
    def find_one(cls, *expressions: Expression):
 | 
			
		||||
        query = FindQuery(expressions=expressions, model=cls, limit=1, offset=0)
 | 
			
		||||
        raw_result = query.find()
 | 
			
		||||
        return cls.from_redis(raw_result)[0]
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def add(cls, models: Sequence['RedisModel']) -> Sequence['RedisModel']:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -162,8 +162,6 @@ def test_exact_match_queries():
 | 
			
		|||
    member2.save()
 | 
			
		||||
    member3.save()
 | 
			
		||||
 | 
			
		||||
    import ipdb; ipdb.set_trace()
 | 
			
		||||
 | 
			
		||||
    # # TODO: How to help IDEs know that last_name is not a str, but a wrapped expression?
 | 
			
		||||
    actual = Member.find(Member.last_name == "Brookins")
 | 
			
		||||
    assert actual == [member2, member1]
 | 
			
		||||
| 
						 | 
				
			
			@ -179,15 +177,14 @@ def test_exact_match_queries():
 | 
			
		|||
    actual = Member.find(Member.last_name != "Brookins")
 | 
			
		||||
    assert actual == [member3]
 | 
			
		||||
 | 
			
		||||
    actual = Member.find(
 | 
			
		||||
        (Member.last_name == "Brookins") & (Member.first_name == "Andrew")
 | 
			
		||||
        | (Member.first_name == "Kim")
 | 
			
		||||
    )
 | 
			
		||||
    assert actual == [member2, member1]
 | 
			
		||||
 | 
			
		||||
# actual = Member.find(
 | 
			
		||||
    #     (Member.last_name == "Brookins") & (Member.first_name == "Andrew")
 | 
			
		||||
    #     | (Member.first_name == "Kim")
 | 
			
		||||
    # )
 | 
			
		||||
    # assert actual == [member1, member2]
 | 
			
		||||
 | 
			
		||||
    # actual = Member.find_one(Member.last_name == "Brookins")
 | 
			
		||||
    # assert actual == member1
 | 
			
		||||
    actual = Member.find_one(Member.last_name == "Brookins")
 | 
			
		||||
    assert actual == member2
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_schema():
 | 
			
		||||
| 
						 | 
				
			
			@ -196,28 +193,6 @@ def test_schema():
 | 
			
		|||
        an_integer: int
 | 
			
		||||
        a_float: float
 | 
			
		||||
 | 
			
		||||
    assert Address.schema() == "SCHEMA pk TAG SORTABLE a_string TEXT an_integer NUMERIC " \
 | 
			
		||||
    # TODO: Fix
 | 
			
		||||
    assert Address.schema() == "ON HASH PREFIX 1 redis-developer:basehashmodel: SCHEMA pk TAG SORTABLE a_string TEXT an_integer NUMERIC " \
 | 
			
		||||
                               "a_float NUMERIC"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# ---
 | 
			
		||||
 | 
			
		||||
from typing import Optional
 | 
			
		||||
 | 
			
		||||
from sqlmodel import Field, Session, SQLModel, create_engine, select
 | 
			
		||||
 | 
			
		||||
class Hero(SQLModel, table=True):
 | 
			
		||||
    id: Optional[int] = Field(default=None, primary_key=True)
 | 
			
		||||
    name: str
 | 
			
		||||
    secret_name: str
 | 
			
		||||
    age: Optional[int] = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
engine = create_engine("sqlite:///database.db")
 | 
			
		||||
 | 
			
		||||
with Session(engine) as session:
 | 
			
		||||
    import ipdb; ipdb.set_trace()
 | 
			
		||||
    statement = select(Hero).where(Hero.name == "Spider-Boy")
 | 
			
		||||
    hero = session.exec(statement).first()
 | 
			
		||||
    print(hero)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue