WIP on async - test failure due to closed event loop

This commit is contained in:
Andrew Brookins 2021-10-22 06:33:05 -07:00
parent 0f9f7aa868
commit b2c2dd9f6f
22 changed files with 348 additions and 190 deletions

View file

@ -1,19 +1,18 @@
import random
import pytest
from redis import Redis
from redis_developer.connections import get_redis_connection
from redis_om.connections import get_redis_connection
@pytest.fixture
def redis():
def redis(event_loop):
yield get_redis_connection()
def _delete_test_keys(prefix: str, conn: Redis):
async def _delete_test_keys(prefix: str, conn):
keys = []
for key in conn.scan_iter(f"{prefix}:*"):
async for key in conn.scan_iter(f"{prefix}:*"):
keys.append(key)
if keys:
conn.delete(*keys)
@ -21,11 +20,10 @@ def _delete_test_keys(prefix: str, conn: Redis):
@pytest.fixture
def key_prefix(redis):
key_prefix = f"redis-developer:{random.random()}"
key_prefix = f"redis-om:{random.random()}"
yield key_prefix
_delete_test_keys(key_prefix, redis)
@pytest.fixture(autouse=True)
def delete_test_keys(redis, request, key_prefix):
_delete_test_keys(key_prefix, redis)
async def delete_test_keys(redis, request, key_prefix):
await _delete_test_keys(key_prefix, redis)