Skip to content
2 changes: 1 addition & 1 deletion packages/google-cloud-firestore/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
SYSTEM_TEST_PYTHON_VERSIONS: List[str] = ALL_PYTHON
SYSTEM_TEST_STANDARD_DEPENDENCIES = [
"mock",
"pytest",
"pytest>9.0",
"google-cloud-testutils",
]
SYSTEM_TEST_EXTERNAL_DEPENDENCIES: List[str] = [
Expand Down
43 changes: 43 additions & 0 deletions packages/google-cloud-firestore/tests/system/test__helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import os
import re
import time
import datetime
import contextlib

from test_utils.system import EmulatorCreds, unique_resource_id

from google.cloud.firestore_v1.base_client import _FIRESTORE_EMULATOR_HOST
from google.cloud.firestore import SERVER_TIMESTAMP
from google.api_core.exceptions import AlreadyExists

FIRESTORE_CREDS = os.environ.get("FIRESTORE_APPLICATION_CREDENTIALS")
FIRESTORE_PROJECT = os.environ.get("GCLOUD_PROJECT")
Expand All @@ -20,3 +25,41 @@
# run all tests against default database, and a named database
TEST_DATABASES = [None, FIRESTORE_OTHER_DB]
TEST_DATABASES_W_ENTERPRISE = TEST_DATABASES + [FIRESTORE_ENTERPRISE_DB]


@contextlib.contextmanager
def system_test_lock(client, lock_name="system_test_lock", max_wait_minutes=65):
"""
Acquires a distributed lock using a Firestore document to prevent concurrent system tests.
"""
lock_ref = client.collection("system_tests").document(lock_name)
start_time = time.time()
max_wait_time = max_wait_minutes * 60

while time.time() - start_time < max_wait_time:
try:
lock_ref.create({"created_at": SERVER_TIMESTAMP})
break # Lock acquired
except AlreadyExists:
lock_doc = lock_ref.get()
if lock_doc.exists:
created_at = lock_doc.to_dict().get("created_at")
if created_at:
now = datetime.datetime.now(datetime.timezone.utc)
age = (now - created_at).total_seconds()
if age > 3600:
print(f"Lock is expired (age: {age}s). Stealing lock.")
lock_ref.delete()
continue
else:
print(
f"Waiting for {lock_name}. Lock is {age:.0f}s old. Sleeping for 15s..."
)
time.sleep(15)
else:
raise TimeoutError(f"Timed out waiting for {lock_name}")

try:
yield lock_ref
finally:
lock_ref.delete()
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import yaml
from google.api_core.exceptions import GoogleAPIError
from google.protobuf.json_format import MessageToDict
from test__helpers import FIRESTORE_EMULATOR, FIRESTORE_ENTERPRISE_DB
from test__helpers import FIRESTORE_EMULATOR, FIRESTORE_ENTERPRISE_DB, system_test_lock

from google.cloud.firestore import AsyncClient, Client
from google.cloud.firestore_v1 import pipeline_expressions
Expand Down Expand Up @@ -364,21 +364,23 @@ def client():
client = Client(project=FIRESTORE_PROJECT, database=FIRESTORE_ENTERPRISE_DB)
data = yaml_loader("data", attach_file_name=False)
to_delete = []
try:
# setup data
batch = client.batch()
for collection_name, documents in data.items():
collection_ref = client.collection(collection_name)
for document_id, document_data in documents.items():
document_ref = collection_ref.document(document_id)
to_delete.append(document_ref)
batch.set(document_ref, _parse_yaml_types(document_data))
batch.commit()
yield client
finally:
# clear data
for document_ref in to_delete:
document_ref.delete()

with system_test_lock(client, lock_name="pipeline_e2e_lock"):
try:
# setup data
batch = client.batch()
for collection_name, documents in data.items():
collection_ref = client.collection(collection_name)
for document_id, document_data in documents.items():
document_ref = collection_ref.document(document_id)
to_delete.append(document_ref)
batch.set(document_ref, _parse_yaml_types(document_data))
batch.commit()
yield client
finally:
# clear data
for document_ref in to_delete:
document_ref.delete()


@pytest.fixture(scope="module")
Expand Down
Loading
Loading