redis-om-python/tests/conftest.py

48 lines
1 KiB
Python
Raw Normal View History

2021-11-10 00:59:10 +01:00
import asyncio
2021-10-21 08:24:31 +02:00
import random
import pytest
2021-11-10 00:59:10 +01:00
from aredis_om import get_redis_connection
2021-11-12 16:56:47 +01:00
TEST_PREFIX = "redis-om:testing"
2021-11-10 00:59:10 +01:00
@pytest.fixture(scope="session")
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
def redis():
yield get_redis_connection()
2021-11-12 16:56:47 +01:00
def _delete_test_keys(prefix: str, conn):
2021-10-21 08:24:31 +02:00
keys = []
2021-11-12 16:56:47 +01:00
for key in conn.scan_iter(f"{prefix}:*"):
2021-10-21 08:24:31 +02:00
keys.append(key)
if keys:
conn.delete(*keys)
2021-10-21 08:24:31 +02:00
@pytest.fixture
2021-11-12 16:56:47 +01:00
def key_prefix(request, redis):
key_prefix = f"{TEST_PREFIX}:{random.random()}"
2021-10-21 08:24:31 +02:00
yield key_prefix
2021-10-21 08:31:11 +02:00
2021-11-12 16:56:47 +01:00
@pytest.fixture(scope="session", autouse=True)
def cleanup_keys(request):
def cleanup_keys():
# Always use the sync Redis connection with finalizer. Setting up an
# async finalizer should work, but I'm not suer how yet!
from redis_om.connections import get_redis_connection as get_sync_redis
2021-11-19 17:13:50 +01:00
2021-11-12 16:56:47 +01:00
_delete_test_keys(TEST_PREFIX, get_sync_redis())
request.addfinalizer(cleanup_keys)