Add delete_many to support for bulk deletes (#305)

* Add support for bulk deletes

* linters

* linters

* fix review comments

* update more-itertools version

* poetry fix - maybe?

* merge main & add more-itertools 8.14.0

* update poetry.lock

* linters

* fix test

Co-authored-by: Chayim I. Kirshen <c@kirshen.com>
This commit is contained in:
dvora-h 2022-08-10 16:22:27 +03:00 committed by GitHub
parent 4661459ddd
commit a00a68b414
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 171 additions and 395 deletions

View file

@ -24,6 +24,7 @@ from typing import (
no_type_check,
)
from more_itertools import ichunked
from pydantic import BaseModel, validator
from pydantic.fields import FieldInfo as PydanticFieldInfo
from pydantic.fields import ModelField, Undefined, UndefinedType
@ -1117,9 +1118,17 @@ class RedisModel(BaseModel, abc.ABC, metaclass=ModelMeta):
return self.make_primary_key(pk)
@classmethod
async def delete(cls, pk: Any) -> int:
async def _delete(cls, db, *pks):
return await db.delete(*pks)
@classmethod
async def delete(
cls, pk: Any, pipeline: Optional[redis.client.Pipeline] = None
) -> int:
"""Delete data at this key."""
return await cls.db().delete(cls.make_primary_key(pk))
db = cls._get_db(pipeline)
return await cls._delete(db, cls.make_primary_key(pk))
@classmethod
async def get(cls, pk: Any) -> "RedisModel":
@ -1137,10 +1146,7 @@ class RedisModel(BaseModel, abc.ABC, metaclass=ModelMeta):
async def expire(
self, num_seconds: int, pipeline: Optional[redis.client.Pipeline] = None
):
if pipeline is None:
db = self.db()
else:
db = pipeline
db = self._get_db(pipeline)
# TODO: Wrap any Redis response errors in a custom exception?
await db.expire(self.make_primary_key(self.pk), num_seconds)
@ -1232,16 +1238,7 @@ class RedisModel(BaseModel, abc.ABC, metaclass=ModelMeta):
pipeline: Optional[redis.client.Pipeline] = None,
pipeline_verifier: Callable[..., Any] = verify_pipeline_response,
) -> Sequence["RedisModel"]:
if pipeline is None:
# By default, send commands in a pipeline. Saving each model will
# be atomic, but Redis may process other commands in between
# these saves.
db = cls.db().pipeline(transaction=False)
else:
# If the user gave us a pipeline, add our commands to that. The user
# will be responsible for executing the pipeline after they've accumulated
# the commands they want to send.
db = pipeline
db = cls._get_db(pipeline, bulk=True)
for model in models:
# save() just returns the model, we don't need that here.
@ -1255,6 +1252,31 @@ class RedisModel(BaseModel, abc.ABC, metaclass=ModelMeta):
return models
@classmethod
def _get_db(
self, pipeline: Optional[redis.client.Pipeline] = None, bulk: bool = False
):
if pipeline is not None:
return pipeline
elif bulk:
return self.db().pipeline(transaction=False)
else:
return self.db()
@classmethod
async def delete_many(
cls,
models: Sequence["RedisModel"],
pipeline: Optional[redis.client.Pipeline] = None,
) -> int:
db = cls._get_db(pipeline)
for chunk in ichunked(models, 100):
pks = [cls.make_primary_key(model.pk) for model in chunk]
await cls._delete(db, *pks)
return len(models)
@classmethod
def redisearch_schema(cls):
raise NotImplementedError
@ -1293,10 +1315,8 @@ class HashModel(RedisModel, abc.ABC):
self, pipeline: Optional[redis.client.Pipeline] = None
) -> "HashModel":
self.check()
if pipeline is None:
db = self.db()
else:
db = pipeline
db = self._get_db(pipeline)
document = jsonable_encoder(self.dict())
# TODO: Wrap any Redis response errors in a custom exception?
await db.hset(self.key(), mapping=document)
@ -1467,10 +1487,8 @@ class JsonModel(RedisModel, abc.ABC):
self, pipeline: Optional[redis.client.Pipeline] = None
) -> "JsonModel":
self.check()
if pipeline is None:
db = self.db()
else:
db = pipeline
db = self._get_db(pipeline)
# TODO: Wrap response errors in a custom exception?
await db.execute_command("JSON.SET", self.key(), ".", self.json())
return self