diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index dcc16258..42234b56 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -28,6 +28,29 @@ jobs:
run: |
make publish -e PYPI_USERNAME=$PYPI_USERNAME -e PYPI_PASSWORD=$PYPI_PASSWORD -e PYPI_TEST_PASSWORD=$PYPI_TEST_PASSWORD
+ build-slim:
+ needs: build
+ runs-on: ubuntu-latest
+ steps:
+ - name: ๐๏ธ Checkout
+ uses: actions/checkout@v4
+ with:
+ ref: ${{ github.head_ref }}
+ - name: ๐ Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: '3.10'
+ - name: ๐ฆพ Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ pip install ".[dev]"
+ - name: ๐ Publish roboflow-slim to PyPi
+ env:
+ PYPI_USERNAME: ${{ secrets.PYPI_USERNAME }}
+ PYPI_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
+ run: |
+ make publish-slim -e PYPI_USERNAME=$PYPI_USERNAME -e PYPI_PASSWORD=$PYPI_PASSWORD
+
deploy-docs:
needs: build
runs-on: ubuntu-latest
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index fd99017c..d8af7f47 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -6,6 +6,9 @@ on:
pull_request:
branches: [main]
+permissions:
+ contents: read
+
jobs:
build:
strategy:
@@ -35,3 +38,24 @@ jobs:
make check_code_quality
- name: ๐งช Run tests
run: "python -m unittest"
+
+ test-slim:
+ runs-on: ubuntu-latest
+ steps:
+ - name: ๐๏ธ Checkout
+ uses: actions/checkout@v4
+ with:
+ ref: ${{ github.event.pull_request.head.ref }}
+ repository: ${{ github.event.pull_request.head.repo.full_name }}
+ - name: ๐ Set up Python 3.10
+ uses: actions/setup-python@v5
+ with:
+ python-version: '3.10'
+ - name: ๐ฆพ Install slim dependencies
+ run: |
+ python -m pip install --upgrade pip
+ pip install -r requirements-slim.txt
+ pip install -e . --no-deps
+ pip install responses
+ - name: ๐งช Run slim-compatible tests
+ run: "python -m unittest tests.test_slim_compat tests.test_vision_events"
diff --git a/Makefile b/Makefile
index 1d59d41f..a5a30fcb 100644
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,4 @@
-.PHONY: style check_code_quality publish
+.PHONY: style check_code_quality publish publish-slim
export PYTHONPATH = .
check_dirs := roboflow
@@ -16,3 +16,9 @@ publish:
python setup.py sdist bdist_wheel
twine check dist/*
twine upload dist/* -u ${PYPI_USERNAME} -p ${PYPI_PASSWORD} --verbose
+
+publish-slim:
+ rm -rf dist/ build/ *.egg-info
+ python setup_slim.py sdist bdist_wheel
+ twine check dist/*
+ twine upload dist/* -u ${PYPI_USERNAME} -p ${PYPI_PASSWORD} --verbose
diff --git a/README.md b/README.md
index 22d04177..5e034ff1 100644
--- a/README.md
+++ b/README.md
@@ -51,6 +51,20 @@ pip install "roboflow[desktop]"
```
+
+ Lightweight install (roboflow-slim)
+
+ If you only need vision events, workspace management, and the CLI (no image processing, inference, or training), install the lightweight package:
+
+ ```bash
+ pip install roboflow-slim
+ ```
+
+ This skips heavy dependencies like OpenCV, NumPy, Matplotlib, and Pillow, reducing install size from ~400MB to ~50MB. Useful for embedded devices, CI pipelines, and serverless environments.
+
+ Both packages share the same codebase and version. `pip install roboflow` includes everything.
+
+
Install from source
diff --git a/requirements-slim.txt b/requirements-slim.txt
new file mode 100644
index 00000000..9709e296
--- /dev/null
+++ b/requirements-slim.txt
@@ -0,0 +1,12 @@
+certifi
+idna
+requests
+urllib3>=1.26.6
+tqdm>=4.41.0
+PyYAML>=5.3.1
+requests_toolbelt
+filetype
+typer>=0.12.0
+python-dateutil
+python-dotenv
+six
diff --git a/roboflow/__init__.py b/roboflow/__init__.py
index 03f9acb7..30ed3fd1 100644
--- a/roboflow/__init__.py
+++ b/roboflow/__init__.py
@@ -10,11 +10,17 @@
from roboflow.adapters import rfapi
from roboflow.config import API_URL, APP_URL, DEMO_KEYS, load_roboflow_api_key
-from roboflow.core.project import Project
from roboflow.core.workspace import Workspace
-from roboflow.models import CLIPModel, GazeModel # noqa: F401
from roboflow.util.general import write_line
+try:
+ from roboflow.core.project import Project
+ from roboflow.models import CLIPModel, GazeModel # noqa: F401
+except ImportError:
+ Project = None # type: ignore[assignment,misc]
+ CLIPModel = None # type: ignore[assignment,misc]
+ GazeModel = None # type: ignore[assignment,misc]
+
__version__ = "1.3.1"
@@ -250,6 +256,10 @@ def project(self, project_name, the_workspace=None):
:param the_workspace workspace name
:return project object
"""
+ if Project is None:
+ raise ImportError(
+ "Project requires additional dependencies. Install the full package: pip install roboflow"
+ )
if the_workspace is None:
if "/" in project_name:
diff --git a/roboflow/adapters/rfapi.py b/roboflow/adapters/rfapi.py
index 2b7da820..5e224c02 100644
--- a/roboflow/adapters/rfapi.py
+++ b/roboflow/adapters/rfapi.py
@@ -8,7 +8,6 @@
from requests_toolbelt.multipart.encoder import MultipartEncoder
from roboflow.config import API_URL, DEFAULT_BATCH_NAME, DEFAULT_JOB_NAME
-from roboflow.util import image_utils
class RoboflowError(Exception):
@@ -294,6 +293,8 @@ def upload_image(
# If image is not a hosted image
if not hosted_image:
+ from roboflow.util import image_utils
+
image_name = os.path.basename(image_path)
imgjpeg = image_utils.file2jpeg(image_path)
diff --git a/roboflow/adapters/vision_events_api.py b/roboflow/adapters/vision_events_api.py
new file mode 100644
index 00000000..358ac5be
--- /dev/null
+++ b/roboflow/adapters/vision_events_api.py
@@ -0,0 +1,260 @@
+import json
+import os
+from typing import Any, Dict, List, Optional
+
+import requests
+from requests_toolbelt.multipart.encoder import MultipartEncoder
+
+from roboflow.adapters.rfapi import RoboflowError
+from roboflow.config import API_URL
+
+_BASE = f"{API_URL}/vision-events"
+
+
+def _auth_headers(api_key: str) -> Dict[str, str]:
+ return {"Authorization": f"Bearer {api_key}"}
+
+
+def write_event(api_key: str, event: Dict[str, Any]) -> dict:
+ """Create a single vision event.
+
+ Args:
+ api_key: Roboflow API key.
+ event: Event payload dict (eventId, eventType, useCaseId, timestamp, etc.).
+
+ Returns:
+ Parsed JSON response with ``eventId`` and ``created``.
+
+ Raises:
+ RoboflowError: On non-201 response status codes.
+ """
+ response = requests.post(_BASE, json=event, headers=_auth_headers(api_key))
+ if response.status_code != 201:
+ raise RoboflowError(response.text)
+ return response.json()
+
+
+def write_batch(api_key: str, events: List[Dict[str, Any]]) -> dict:
+ """Create multiple vision events in a single request.
+
+ Args:
+ api_key: Roboflow API key.
+ events: List of event payload dicts (max 100 per the server).
+
+ Returns:
+ Parsed JSON response with ``created`` count and ``eventIds``.
+
+ Raises:
+ RoboflowError: On non-201 response status codes.
+ """
+ response = requests.post(
+ f"{_BASE}/batch",
+ json={"events": events},
+ headers=_auth_headers(api_key),
+ )
+ if response.status_code != 201:
+ raise RoboflowError(response.text)
+ return response.json()
+
+
+def query(api_key: str, query_params: Dict[str, Any]) -> dict:
+ """Query vision events with filters and pagination.
+
+ Args:
+ api_key: Roboflow API key.
+ query_params: Query payload (useCaseId, eventType, startTime, endTime,
+ cursor, limit, customMetadataFilters, etc.).
+
+ Returns:
+ Parsed JSON response with ``events``, ``nextCursor``, ``hasMore``,
+ and ``lookbackDays``.
+
+ Raises:
+ RoboflowError: On non-200 response status codes.
+ """
+ response = requests.post(
+ f"{_BASE}/query",
+ json=query_params,
+ headers=_auth_headers(api_key),
+ )
+ if response.status_code != 200:
+ raise RoboflowError(response.text)
+ return response.json()
+
+
+def list_use_cases(api_key: str, status: Optional[str] = None) -> dict:
+ """List all use cases for a workspace.
+
+ Args:
+ api_key: Roboflow API key.
+ status: Optional status filter (default server-side: "active").
+
+ Returns:
+ Parsed JSON response with ``useCases`` list and ``lookbackDays``.
+
+ Raises:
+ RoboflowError: On non-200 response status codes.
+ """
+ params: Dict[str, str] = {}
+ if status is not None:
+ params["status"] = status
+ response = requests.get(
+ f"{_BASE}/use-cases",
+ params=params,
+ headers=_auth_headers(api_key),
+ )
+ if response.status_code != 200:
+ raise RoboflowError(response.text)
+ return response.json()
+
+
+def get_custom_metadata_schema(api_key: str, use_case_id: str) -> dict:
+ """Get the custom metadata schema for a use case.
+
+ Args:
+ api_key: Roboflow API key.
+ use_case_id: Use case identifier.
+
+ Returns:
+ Parsed JSON response with ``fields`` mapping field names to their types.
+
+ Raises:
+ RoboflowError: On non-200 response status codes.
+ """
+ response = requests.get(
+ f"{_BASE}/custom-metadata-schema/{use_case_id}",
+ headers=_auth_headers(api_key),
+ )
+ if response.status_code != 200:
+ raise RoboflowError(response.text)
+ return response.json()
+
+
+def create_use_case(api_key: str, name: str) -> dict:
+ """Create a new vision event use case.
+
+ Args:
+ api_key: Roboflow API key.
+ name: Human-readable name for the use case.
+
+ Returns:
+ Parsed JSON response with ``id`` and ``name``.
+
+ Raises:
+ RoboflowError: On non-201 response status codes.
+ """
+ response = requests.post(
+ f"{_BASE}/use-cases",
+ json={"name": name},
+ headers=_auth_headers(api_key),
+ )
+ if response.status_code != 201:
+ raise RoboflowError(response.text)
+ return response.json()
+
+
+def rename_use_case(api_key: str, use_case_id: str, name: str) -> dict:
+ """Rename an existing vision event use case.
+
+ Args:
+ api_key: Roboflow API key.
+ use_case_id: Use case identifier.
+ name: New name for the use case.
+
+ Returns:
+ Parsed JSON response with ``id`` and ``name``.
+
+ Raises:
+ RoboflowError: On non-200 response status codes.
+ """
+ response = requests.put(
+ f"{_BASE}/use-cases/{use_case_id}",
+ json={"name": name},
+ headers=_auth_headers(api_key),
+ )
+ if response.status_code != 200:
+ raise RoboflowError(response.text)
+ return response.json()
+
+
+def archive_use_case(api_key: str, use_case_id: str) -> dict:
+ """Archive a vision event use case.
+
+ Args:
+ api_key: Roboflow API key.
+ use_case_id: Use case identifier.
+
+ Returns:
+ Parsed JSON response with ``success``.
+
+ Raises:
+ RoboflowError: On non-200 response status codes.
+ """
+ response = requests.post(
+ f"{_BASE}/use-cases/{use_case_id}/archive",
+ headers=_auth_headers(api_key),
+ )
+ if response.status_code != 200:
+ raise RoboflowError(response.text)
+ return response.json()
+
+
+def unarchive_use_case(api_key: str, use_case_id: str) -> dict:
+ """Unarchive a vision event use case.
+
+ Args:
+ api_key: Roboflow API key.
+ use_case_id: Use case identifier.
+
+ Returns:
+ Parsed JSON response with ``success``.
+
+ Raises:
+ RoboflowError: On non-200 response status codes.
+ """
+ response = requests.post(
+ f"{_BASE}/use-cases/{use_case_id}/unarchive",
+ headers=_auth_headers(api_key),
+ )
+ if response.status_code != 200:
+ raise RoboflowError(response.text)
+ return response.json()
+
+
+def upload_image(
+ api_key: str,
+ image_path: str,
+ name: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+) -> dict:
+ """Upload an image for use in vision events.
+
+ Args:
+ api_key: Roboflow API key.
+ image_path: Local filesystem path to the image file.
+ name: Optional custom image name.
+ metadata: Optional flat dict of metadata to attach.
+
+ Returns:
+ Parsed JSON response with ``sourceId`` (and optionally ``url``).
+
+ Raises:
+ RoboflowError: On non-201 response status codes.
+ """
+ filename = name or os.path.basename(image_path)
+ with open(image_path, "rb") as f:
+ fields: Dict[str, Any] = {
+ "file": (filename, f, "application/octet-stream"),
+ }
+ if name is not None:
+ fields["name"] = name
+ if metadata is not None:
+ fields["metadata"] = json.dumps(metadata)
+ m = MultipartEncoder(fields=fields)
+ headers = _auth_headers(api_key)
+ headers["Content-Type"] = m.content_type
+ response = requests.post(f"{_BASE}/upload", data=m, headers=headers)
+
+ if response.status_code != 201:
+ raise RoboflowError(response.text)
+ return response.json()
diff --git a/roboflow/cli/__init__.py b/roboflow/cli/__init__.py
index 3b3d0c42..45d4cce8 100644
--- a/roboflow/cli/__init__.py
+++ b/roboflow/cli/__init__.py
@@ -185,6 +185,7 @@ def _walk(group: Any, prefix: str = "") -> None:
from roboflow.cli.handlers.universe import universe_app # noqa: E402
from roboflow.cli.handlers.version import version_app # noqa: E402
from roboflow.cli.handlers.video import video_app # noqa: E402
+from roboflow.cli.handlers.vision_events import vision_events_app # noqa: E402
from roboflow.cli.handlers.workflow import workflow_app # noqa: E402
from roboflow.cli.handlers.workspace import workspace_app # noqa: E402
@@ -210,6 +211,7 @@ def _walk(group: Any, prefix: str = "") -> None:
app.add_typer(universe_app, name="universe")
app.add_typer(version_app, name="version")
app.add_typer(video_app, name="video")
+app.add_typer(vision_events_app, name="vision-events")
app.add_typer(workflow_app, name="workflow")
app.add_typer(workspace_app, name="workspace")
diff --git a/roboflow/cli/handlers/vision_events.py b/roboflow/cli/handlers/vision_events.py
new file mode 100644
index 00000000..61a521da
--- /dev/null
+++ b/roboflow/cli/handlers/vision_events.py
@@ -0,0 +1,441 @@
+"""Vision events commands: write, query, list use cases, and upload images."""
+
+from __future__ import annotations
+
+from typing import Annotated, Optional
+
+import typer
+
+from roboflow.cli._compat import SortedGroup, ctx_to_args
+
+vision_events_app = typer.Typer(
+ help="Create, query, and manage vision events.",
+ cls=SortedGroup,
+ no_args_is_help=True,
+)
+
+
+def _resolve(args): # noqa: ANN001
+ """Return api_key or call output_error and return None."""
+ from roboflow.cli._resolver import resolve_ws_and_key
+
+ resolved = resolve_ws_and_key(args)
+ if resolved is None:
+ return None
+ _ws, api_key = resolved
+ return api_key
+
+
+# ---------------------------------------------------------------------------
+# write
+# ---------------------------------------------------------------------------
+
+
+@vision_events_app.command("write")
+def write(
+ ctx: typer.Context,
+ event: Annotated[str, typer.Argument(help="JSON string of the event payload")],
+) -> None:
+ """Create a single vision event."""
+ args = ctx_to_args(ctx, event=event)
+ _write(args)
+
+
+def _write(args) -> None: # noqa: ANN001
+ import json
+
+ from roboflow.adapters import vision_events_api
+ from roboflow.adapters.rfapi import RoboflowError
+ from roboflow.cli._output import output, output_error
+
+ api_key = _resolve(args)
+ if api_key is None:
+ return
+
+ try:
+ event = json.loads(args.event)
+ except (json.JSONDecodeError, TypeError) as exc:
+ output_error(args, f"Invalid JSON: {exc}", hint="Pass a valid JSON string.")
+ return
+
+ try:
+ result = vision_events_api.write_event(api_key, event)
+ except RoboflowError as exc:
+ output_error(args, str(exc))
+ return
+
+ output(args, result, text=f"Created event {result.get('eventId', '')}")
+
+
+# ---------------------------------------------------------------------------
+# write-batch
+# ---------------------------------------------------------------------------
+
+
+@vision_events_app.command("write-batch")
+def write_batch(
+ ctx: typer.Context,
+ events: Annotated[str, typer.Argument(help="JSON string of the events array")],
+) -> None:
+ """Create multiple vision events in a single request."""
+ args = ctx_to_args(ctx, events=events)
+ _write_batch(args)
+
+
+def _write_batch(args) -> None: # noqa: ANN001
+ import json
+
+ from roboflow.adapters import vision_events_api
+ from roboflow.adapters.rfapi import RoboflowError
+ from roboflow.cli._output import output, output_error
+
+ api_key = _resolve(args)
+ if api_key is None:
+ return
+
+ try:
+ events = json.loads(args.events)
+ except (json.JSONDecodeError, TypeError) as exc:
+ output_error(args, f"Invalid JSON: {exc}", hint="Pass a valid JSON array string.")
+ return
+
+ try:
+ result = vision_events_api.write_batch(api_key, events)
+ except RoboflowError as exc:
+ output_error(args, str(exc))
+ return
+
+ output(args, result, text=f"Created {result.get('created', 0)} event(s)")
+
+
+# ---------------------------------------------------------------------------
+# query
+# ---------------------------------------------------------------------------
+
+
+@vision_events_app.command("query")
+def query(
+ ctx: typer.Context,
+ use_case: Annotated[str, typer.Argument(help="Use case identifier to query")],
+ event_type: Annotated[Optional[str], typer.Option("-t", "--event-type", help="Filter by event type")] = None,
+ start_time: Annotated[Optional[str], typer.Option("--start", help="ISO 8601 start time")] = None,
+ end_time: Annotated[Optional[str], typer.Option("--end", help="ISO 8601 end time")] = None,
+ limit: Annotated[Optional[int], typer.Option("-l", "--limit", help="Max events to return")] = None,
+ cursor: Annotated[Optional[str], typer.Option("--cursor", help="Pagination cursor")] = None,
+) -> None:
+ """Query vision events with filters and pagination."""
+ args = ctx_to_args(
+ ctx,
+ use_case=use_case,
+ event_type=event_type,
+ start_time=start_time,
+ end_time=end_time,
+ limit=limit,
+ cursor=cursor,
+ )
+ _query(args)
+
+
+def _query(args) -> None: # noqa: ANN001
+ from roboflow.adapters import vision_events_api
+ from roboflow.adapters.rfapi import RoboflowError
+ from roboflow.cli._output import output, output_error
+
+ api_key = _resolve(args)
+ if api_key is None:
+ return
+
+ payload = {"useCaseId": args.use_case}
+ if args.event_type is not None:
+ payload["eventType"] = args.event_type
+ if args.start_time is not None:
+ payload["startTime"] = args.start_time
+ if args.end_time is not None:
+ payload["endTime"] = args.end_time
+ if args.limit is not None:
+ payload["limit"] = args.limit
+ if args.cursor is not None:
+ payload["cursor"] = args.cursor
+
+ try:
+ result = vision_events_api.query(api_key, payload)
+ except RoboflowError as exc:
+ output_error(args, str(exc))
+ return
+
+ events = result.get("events", [])
+ lines = [f"Found {len(events)} event(s)."]
+ for evt in events:
+ lines.append(f" {evt.get('eventId', '')} [{evt.get('eventType', '')}]")
+ if result.get("nextCursor"):
+ lines.append(f"\nNext page: --cursor {result['nextCursor']}")
+
+ output(args, result, text="\n".join(lines))
+
+
+# ---------------------------------------------------------------------------
+# use-cases
+# ---------------------------------------------------------------------------
+
+
+@vision_events_app.command("use-cases")
+def use_cases(
+ ctx: typer.Context,
+ status: Annotated[Optional[str], typer.Option("-s", "--status", help="Filter by status (active, inactive)")] = None,
+) -> None:
+ """List vision event use cases for the workspace."""
+ args = ctx_to_args(ctx, status=status)
+ _use_cases(args)
+
+
+def _use_cases(args) -> None: # noqa: ANN001
+ from roboflow.adapters import vision_events_api
+ from roboflow.adapters.rfapi import RoboflowError
+ from roboflow.cli._output import output, output_error
+
+ api_key = _resolve(args)
+ if api_key is None:
+ return
+
+ try:
+ result = vision_events_api.list_use_cases(api_key, status=args.status)
+ except RoboflowError as exc:
+ output_error(args, str(exc))
+ return
+
+ items = result.get("useCases") or result.get("solutions", [])
+ lines = [f"{len(items)} use case(s):"]
+ for uc in items:
+ name = uc.get("name", uc.get("id", ""))
+ if uc.get("eventCount") is not None:
+ detail = f" ({uc['eventCount']} events)"
+ elif uc.get("status"):
+ detail = f" [{uc['status']}]"
+ else:
+ detail = ""
+ lines.append(f" {name}{detail}")
+
+ output(args, result, text="\n".join(lines))
+
+
+# ---------------------------------------------------------------------------
+# create-use-case
+# ---------------------------------------------------------------------------
+
+
+@vision_events_app.command("create-use-case")
+def create_use_case(
+ ctx: typer.Context,
+ name: Annotated[str, typer.Argument(help="Name for the new use case")],
+) -> None:
+ """Create a new vision event use case."""
+ args = ctx_to_args(ctx, name=name)
+ _create_use_case(args)
+
+
+def _create_use_case(args) -> None: # noqa: ANN001
+ from roboflow.adapters import vision_events_api
+ from roboflow.adapters.rfapi import RoboflowError
+ from roboflow.cli._output import output, output_error
+
+ api_key = _resolve(args)
+ if api_key is None:
+ return
+
+ try:
+ result = vision_events_api.create_use_case(api_key, args.name)
+ except RoboflowError as exc:
+ output_error(args, str(exc))
+ return
+
+ output(args, result, text=f"Created use case {result.get('id', '')} ({result.get('name', '')})")
+
+
+# ---------------------------------------------------------------------------
+# rename-use-case
+# ---------------------------------------------------------------------------
+
+
+@vision_events_app.command("rename-use-case")
+def rename_use_case(
+ ctx: typer.Context,
+ use_case: Annotated[str, typer.Argument(help="Use case identifier")],
+ name: Annotated[str, typer.Option("-n", "--name", help="New name for the use case")],
+) -> None:
+ """Rename an existing vision event use case."""
+ args = ctx_to_args(ctx, use_case=use_case, name=name)
+ _rename_use_case(args)
+
+
+def _rename_use_case(args) -> None: # noqa: ANN001
+ from roboflow.adapters import vision_events_api
+ from roboflow.adapters.rfapi import RoboflowError
+ from roboflow.cli._output import output, output_error
+
+ api_key = _resolve(args)
+ if api_key is None:
+ return
+
+ try:
+ result = vision_events_api.rename_use_case(api_key, args.use_case, args.name)
+ except RoboflowError as exc:
+ output_error(args, str(exc))
+ return
+
+ output(args, result, text=f"Renamed use case {result.get('id', '')} to {result.get('name', '')}")
+
+
+# ---------------------------------------------------------------------------
+# archive-use-case
+# ---------------------------------------------------------------------------
+
+
+@vision_events_app.command("archive-use-case")
+def archive_use_case(
+ ctx: typer.Context,
+ use_case: Annotated[str, typer.Argument(help="Use case identifier")],
+) -> None:
+ """Archive a vision event use case."""
+ args = ctx_to_args(ctx, use_case=use_case)
+ _archive_use_case(args)
+
+
+def _archive_use_case(args) -> None: # noqa: ANN001
+ from roboflow.adapters import vision_events_api
+ from roboflow.adapters.rfapi import RoboflowError
+ from roboflow.cli._output import output, output_error
+
+ api_key = _resolve(args)
+ if api_key is None:
+ return
+
+ try:
+ result = vision_events_api.archive_use_case(api_key, args.use_case)
+ except RoboflowError as exc:
+ output_error(args, str(exc))
+ return
+
+ output(args, result, text=f"Archived use case {args.use_case}")
+
+
+# ---------------------------------------------------------------------------
+# unarchive-use-case
+# ---------------------------------------------------------------------------
+
+
+@vision_events_app.command("unarchive-use-case")
+def unarchive_use_case(
+ ctx: typer.Context,
+ use_case: Annotated[str, typer.Argument(help="Use case identifier")],
+) -> None:
+ """Unarchive a vision event use case."""
+ args = ctx_to_args(ctx, use_case=use_case)
+ _unarchive_use_case(args)
+
+
+def _unarchive_use_case(args) -> None: # noqa: ANN001
+ from roboflow.adapters import vision_events_api
+ from roboflow.adapters.rfapi import RoboflowError
+ from roboflow.cli._output import output, output_error
+
+ api_key = _resolve(args)
+ if api_key is None:
+ return
+
+ try:
+ result = vision_events_api.unarchive_use_case(api_key, args.use_case)
+ except RoboflowError as exc:
+ output_error(args, str(exc))
+ return
+
+ output(args, result, text=f"Unarchived use case {args.use_case}")
+
+
+# ---------------------------------------------------------------------------
+# metadata-schema
+# ---------------------------------------------------------------------------
+
+
+@vision_events_app.command("metadata-schema")
+def metadata_schema(
+ ctx: typer.Context,
+ use_case: Annotated[str, typer.Argument(help="Use case identifier")],
+) -> None:
+ """Get the custom metadata schema for a use case."""
+ args = ctx_to_args(ctx, use_case=use_case)
+ _metadata_schema(args)
+
+
+def _metadata_schema(args) -> None: # noqa: ANN001
+ from roboflow.adapters import vision_events_api
+ from roboflow.adapters.rfapi import RoboflowError
+ from roboflow.cli._output import output, output_error
+
+ api_key = _resolve(args)
+ if api_key is None:
+ return
+
+ try:
+ result = vision_events_api.get_custom_metadata_schema(api_key, args.use_case)
+ except RoboflowError as exc:
+ output_error(args, str(exc))
+ return
+
+ fields = result.get("fields", {})
+ lines = [f"{len(fields)} field(s):"]
+ for name, info in fields.items():
+ types = ", ".join(info.get("types", []))
+ lines.append(f" {name} ({types})")
+
+ output(args, result, text="\n".join(lines))
+
+
+# ---------------------------------------------------------------------------
+# upload-image
+# ---------------------------------------------------------------------------
+
+
+@vision_events_app.command("upload-image")
+def upload_image(
+ ctx: typer.Context,
+ image: Annotated[str, typer.Argument(help="Path to the image file")],
+ name: Annotated[Optional[str], typer.Option("-n", "--name", help="Custom image name")] = None,
+ metadata: Annotated[
+ Optional[str],
+ typer.Option("-M", "--metadata", help='JSON string of metadata (e.g. \'{"camera_id":"cam001"}\')'),
+ ] = None,
+) -> None:
+ """Upload an image for use in vision events."""
+ args = ctx_to_args(ctx, image=image, name=name, metadata=metadata)
+ _upload_image(args)
+
+
+def _upload_image(args) -> None: # noqa: ANN001
+ import json
+
+ from roboflow.adapters import vision_events_api
+ from roboflow.adapters.rfapi import RoboflowError
+ from roboflow.cli._output import output, output_error
+
+ api_key = _resolve(args)
+ if api_key is None:
+ return
+
+ try:
+ parsed_metadata = json.loads(args.metadata) if args.metadata else None
+ except (json.JSONDecodeError, TypeError) as exc:
+ output_error(args, f"Invalid metadata JSON: {exc}", hint="Pass a valid JSON string.")
+ return
+
+ try:
+ result = vision_events_api.upload_image(
+ api_key,
+ image_path=args.image,
+ name=args.name,
+ metadata=parsed_metadata,
+ )
+ except RoboflowError as exc:
+ output_error(args, str(exc))
+ return
+
+ output(args, result, text=f"Uploaded image: sourceId={result.get('sourceId', '')}")
diff --git a/roboflow/core/workspace.py b/roboflow/core/workspace.py
index 698aa59d..6790a20b 100644
--- a/roboflow/core/workspace.py
+++ b/roboflow/core/workspace.py
@@ -9,21 +9,12 @@
from typing import Any, Dict, Generator, List, Optional
import requests
-from PIL import Image
from requests.exceptions import HTTPError
from tqdm import tqdm
-from roboflow.adapters import rfapi
+from roboflow.adapters import rfapi, vision_events_api
from roboflow.adapters.rfapi import AnnotationSaveError, ImageUploadError, RoboflowError
-from roboflow.config import API_URL, APP_URL, CLIP_FEATURIZE_URL, DEMO_KEYS
-from roboflow.core.project import Project
-from roboflow.util import folderparser
-from roboflow.util.active_learning_utils import check_box_size, clip_encode, count_comparisons
-from roboflow.util.general import extract_zip as _extract_zip
-from roboflow.util.image_utils import load_labelmap
-from roboflow.util.model_processor import process
-from roboflow.util.two_stage_utils import ocr_infer
-from roboflow.util.versions import normalize_yolo_model_type
+from roboflow.config import API_URL, APP_URL, DEMO_KEYS
class Workspace:
@@ -64,6 +55,8 @@ def projects(self):
Returns:
List of Project objects.
"""
+ from roboflow.core.project import Project
+
projects_array = []
for a_project in self.project_list:
proj = Project(self.__api_key, a_project, self.model_format)
@@ -83,6 +76,8 @@ def project(self, project_id):
Returns:
Project Object
"""
+ from roboflow.core.project import Project
+
sys.stdout.write("\r" + "loading Roboflow project...")
sys.stdout.write("\n")
sys.stdout.flush()
@@ -113,6 +108,8 @@ def create_project(self, project_name, project_type, project_license, annotation
Returns:
Project Object
""" # noqa: E501 // docs
+ from roboflow.core.project import Project
+
data = {
"name": project_name,
"type": project_type,
@@ -143,6 +140,9 @@ def clip_compare(self, dir: str = "", image_ext: str = ".png", target_image: str
dict: a key:value mapping of image_name:comparison_score_to_target
""" # noqa: E501 // docs
+ from roboflow.config import CLIP_FEATURIZE_URL
+ from roboflow.util.active_learning_utils import clip_encode
+
# list to store comparison results in
comparisons = []
# grab all images in a given directory with ext type
@@ -176,6 +176,8 @@ def two_stage(
# TODO: fix docs
dict: a json obj containing the results of the second stage detection
""" # noqa: E501 // docs
+ from PIL import Image
+
results = []
# create PIL image for cropping
@@ -245,6 +247,10 @@ def two_stage_ocr(
# TODO: fix docs
dict: a json obj containing the results of the second stage detection
""" # noqa: E501 // docs
+ from PIL import Image
+
+ from roboflow.util.two_stage_utils import ocr_infer
+
results = []
# create PIL image for cropping
@@ -307,6 +313,9 @@ def upload_dataset(
num_retries (int, optional): number of times to retry uploading an image if the upload fails. Defaults to 0.
is_prediction (bool, optional): whether the annotations provided in the dataset are predictions and not ground truth. Defaults to False.
""" # noqa: E501 // docs
+ from roboflow.util import folderparser
+ from roboflow.util.image_utils import load_labelmap
+
if dataset_format != "NOT_USED":
print("Warning: parameter 'dataset_format' is deprecated and will be removed in a future release")
project, created = self._get_or_create_project(
@@ -460,6 +469,9 @@ def active_learning(
use_localhost: (bool) = determines if local http format used or remote endpoint
local_server: (str) = local http address for inference server, use_localhost must be True for this to be used
""" # noqa: E501 // docs
+ from roboflow.config import CLIP_FEATURIZE_URL
+ from roboflow.util.active_learning_utils import check_box_size, clip_encode, count_comparisons
+
if inference_endpoint is None:
inference_endpoint = []
if conditionals is None:
@@ -608,6 +620,9 @@ def deploy_model(
filename (str, optional): The name of the weights file. Defaults to "weights/best.pt".
"""
+ from roboflow.util.model_processor import process
+ from roboflow.util.versions import normalize_yolo_model_type
+
if not project_ids:
raise ValueError("At least one project ID must be provided")
@@ -802,6 +817,8 @@ def search_export(
ValueError: If both *dataset* and *annotation_group* are provided.
RoboflowError: On API errors or export timeout.
"""
+ from roboflow.util.general import extract_zip as _extract_zip
+
if dataset is not None and annotation_group is not None:
raise ValueError("dataset and annotation_group are mutually exclusive; provide only one")
@@ -938,6 +955,317 @@ def get_plan(self):
return rfapi.get_plan_info(self.__api_key)
+ # --- Vision Events ---
+
+ def write_vision_event(self, event: Dict[str, Any]) -> dict:
+ """Create a single vision event.
+
+ The event dict is passed directly to the server with no client-side
+ validation, so new event types and fields work without an SDK update.
+
+ Args:
+ event: Event payload containing at minimum ``eventId``,
+ ``eventType``, ``useCaseId``, and ``timestamp``.
+
+ Returns:
+ Dict with ``eventId`` and ``created``.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> ws.write_vision_event({
+ ... "eventId": "evt-001",
+ ... "eventType": "quality_check",
+ ... "useCaseId": "manufacturing-qa",
+ ... "timestamp": "2024-01-15T10:30:00.000Z",
+ ... "eventData": {"result": "pass"},
+ ... })
+ """
+ return vision_events_api.write_event(
+ api_key=self.__api_key,
+ event=event,
+ )
+
+ def write_vision_events_batch(self, events: List[Dict[str, Any]]) -> dict:
+ """Create multiple vision events in a single request.
+
+ Args:
+ events: List of event payload dicts (server enforces max 100).
+
+ Returns:
+ Dict with ``created`` count and ``eventIds`` list.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> ws.write_vision_events_batch([
+ ... {"eventId": "e1", "eventType": "custom", "useCaseId": "uc", "timestamp": "2024-01-15T10:00:00Z"},
+ ... {"eventId": "e2", "eventType": "custom", "useCaseId": "uc", "timestamp": "2024-01-15T10:01:00Z"},
+ ... ])
+ """
+ return vision_events_api.write_batch(
+ api_key=self.__api_key,
+ events=events,
+ )
+
+ def query_vision_events(
+ self,
+ use_case: str,
+ *,
+ event_type: Optional[str] = None,
+ event_types: Optional[List[str]] = None,
+ start_time: Optional[str] = None,
+ end_time: Optional[str] = None,
+ limit: Optional[int] = None,
+ cursor: Optional[str] = None,
+ **filters: Any,
+ ) -> dict:
+ """Query vision events with filters and pagination.
+
+ Common filter kwargs are passed through to the server as-is,
+ supporting ``deviceId``, ``streamId``, ``workflowId``,
+ ``customMetadataFilters``, ``eventFieldFilters``, etc.
+
+ Args:
+ use_case: Use case identifier to query.
+ event_type: Filter by a single event type.
+ event_types: Filter by multiple event types.
+ start_time: ISO 8601 start time filter.
+ end_time: ISO 8601 end time filter.
+ limit: Maximum number of events to return.
+ cursor: Pagination cursor from a previous response.
+ **filters: Additional filter parameters passed to the API.
+
+ Returns:
+ Dict with ``events``, ``nextCursor``, ``hasMore``, and ``lookbackDays``.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> page = ws.query_vision_events("manufacturing-qa", event_type="quality_check", limit=50)
+ >>> for evt in page["events"]:
+ ... print(evt["eventId"])
+ """
+ payload: Dict[str, Any] = {"useCaseId": use_case}
+ if event_type is not None:
+ payload["eventType"] = event_type
+ if event_types is not None:
+ payload["eventTypes"] = event_types
+ if start_time is not None:
+ payload["startTime"] = start_time
+ if end_time is not None:
+ payload["endTime"] = end_time
+ if limit is not None:
+ payload["limit"] = limit
+ if cursor is not None:
+ payload["cursor"] = cursor
+ payload.update(filters)
+
+ return vision_events_api.query(
+ api_key=self.__api_key,
+ query_params=payload,
+ )
+
+ def query_all_vision_events(
+ self,
+ use_case: str,
+ *,
+ event_type: Optional[str] = None,
+ event_types: Optional[List[str]] = None,
+ start_time: Optional[str] = None,
+ end_time: Optional[str] = None,
+ limit: Optional[int] = None,
+ **filters: Any,
+ ) -> Generator[List[dict], None, None]:
+ """Paginated query across vision events, yielding one page at a time.
+
+ Automatically follows ``nextCursor`` until all matching events have
+ been returned.
+
+ Args:
+ use_case: Use case identifier to query.
+ event_type: Filter by a single event type.
+ event_types: Filter by multiple event types.
+ start_time: ISO 8601 start time filter.
+ end_time: ISO 8601 end time filter.
+ limit: Maximum events per page.
+ **filters: Additional filter parameters passed to the API.
+
+ Yields:
+ A list of event dicts for each page.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> for page in ws.query_all_vision_events("manufacturing-qa"):
+ ... for evt in page:
+ ... print(evt["eventId"])
+ """
+ cursor = None
+ while True:
+ response = self.query_vision_events(
+ use_case,
+ event_type=event_type,
+ event_types=event_types,
+ start_time=start_time,
+ end_time=end_time,
+ limit=limit,
+ cursor=cursor,
+ **filters,
+ )
+ events = response.get("events", [])
+ if not events:
+ break
+ yield events
+ cursor = response.get("nextCursor")
+ if not cursor or not response.get("hasMore", False):
+ break
+
+ def list_vision_event_use_cases(self, status: Optional[str] = None) -> dict:
+ """List all vision event use cases for the workspace.
+
+ Args:
+ status: Optional status filter (e.g. "active", "inactive").
+
+ Returns:
+ Dict with ``useCases`` list and ``lookbackDays``.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> result = ws.list_vision_event_use_cases()
+ >>> for uc in result["useCases"]:
+ ... print(uc["name"], uc.get("status"))
+ """
+ result = vision_events_api.list_use_cases(
+ api_key=self.__api_key,
+ status=status,
+ )
+ if "useCases" not in result and "solutions" in result:
+ result["useCases"] = result["solutions"]
+ return result
+
+ def create_vision_event_use_case(self, name: str) -> dict:
+ """Create a new vision event use case.
+
+ Args:
+ name: Human-readable name for the use case.
+
+ Returns:
+ Dict with ``id`` and ``name``.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> result = ws.create_vision_event_use_case("manufacturing-qa")
+ >>> use_case_id = result["id"]
+ """
+ return vision_events_api.create_use_case(
+ api_key=self.__api_key,
+ name=name,
+ )
+
+ def rename_vision_event_use_case(self, use_case: str, name: str) -> dict:
+ """Rename an existing vision event use case.
+
+ Args:
+ use_case: Use case identifier.
+ name: New name for the use case.
+
+ Returns:
+ Dict with ``id`` and ``name``.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> ws.rename_vision_event_use_case("abc123", "new-name")
+ """
+ return vision_events_api.rename_use_case(
+ api_key=self.__api_key,
+ use_case_id=use_case,
+ name=name,
+ )
+
+ def archive_vision_event_use_case(self, use_case: str) -> dict:
+ """Archive a vision event use case.
+
+ Args:
+ use_case: Use case identifier.
+
+ Returns:
+ Dict with ``success``.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> ws.archive_vision_event_use_case("abc123")
+ """
+ return vision_events_api.archive_use_case(
+ api_key=self.__api_key,
+ use_case_id=use_case,
+ )
+
+ def unarchive_vision_event_use_case(self, use_case: str) -> dict:
+ """Unarchive a vision event use case.
+
+ Args:
+ use_case: Use case identifier.
+
+ Returns:
+ Dict with ``success``.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> ws.unarchive_vision_event_use_case("abc123")
+ """
+ return vision_events_api.unarchive_use_case(
+ api_key=self.__api_key,
+ use_case_id=use_case,
+ )
+
+ def get_vision_event_metadata_schema(self, use_case: str) -> dict:
+ """Get the custom metadata schema for a vision event use case.
+
+ Returns discovered field names and their types, useful for building
+ queries with ``customMetadataFilters``.
+
+ Args:
+ use_case: Use case identifier.
+
+ Returns:
+ Dict with ``fields`` mapping field names to ``{"types": [...]}``.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> schema = ws.get_vision_event_metadata_schema("manufacturing-qa")
+ >>> for field, info in schema["fields"].items():
+ ... print(field, info["types"])
+ """
+ return vision_events_api.get_custom_metadata_schema(
+ api_key=self.__api_key,
+ use_case_id=use_case,
+ )
+
+ def upload_vision_event_image(
+ self,
+ image_path: str,
+ name: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> dict:
+ """Upload an image for use in vision events.
+
+ Args:
+ image_path: Local path to the image file.
+ name: Optional custom name for the image.
+ metadata: Optional flat dict of metadata to attach.
+
+ Returns:
+ Dict with ``sourceId`` for referencing in events.
+
+ Example:
+ >>> ws = rf.workspace()
+ >>> result = ws.upload_vision_event_image("photo.jpg")
+ >>> source_id = result["sourceId"]
+ """
+ return vision_events_api.upload_image(
+ api_key=self.__api_key,
+ image_path=image_path,
+ name=name,
+ metadata=metadata,
+ )
+
def __str__(self):
projects = self.projects()
json_value = {"name": self.name, "url": self.url, "projects": projects}
diff --git a/setup_slim.py b/setup_slim.py
new file mode 100644
index 00000000..cf93fca9
--- /dev/null
+++ b/setup_slim.py
@@ -0,0 +1,54 @@
+import re
+
+import setuptools
+from setuptools import find_packages
+
+with open("./roboflow/__init__.py") as f:
+ content = f.read()
+_search_version = re.search(r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]', content)
+assert _search_version
+version = _search_version.group(1)
+
+
+with open("README.md") as fh:
+ long_description = fh.read()
+
+with open("requirements-slim.txt") as fh:
+ install_requires = fh.read().split("\n")
+
+setuptools.setup(
+ name="roboflow-slim",
+ version=version,
+ author="Roboflow",
+ author_email="support@roboflow.com",
+ description="Lightweight Roboflow SDK for vision events, workspace management, and CLI",
+ long_description=long_description,
+ long_description_content_type="text/markdown",
+ url="https://github.com/roboflow-ai/roboflow-python",
+ install_requires=install_requires,
+ packages=find_packages(exclude=("tests",)),
+ extras_require={
+ "dev": [
+ "mypy",
+ "responses",
+ "ruff",
+ "twine",
+ "types-pyyaml",
+ "types-requests",
+ "types-setuptools",
+ "types-tqdm",
+ "wheel",
+ ],
+ },
+ entry_points={
+ "console_scripts": [
+ "roboflow=roboflow.roboflowpy:main",
+ ],
+ },
+ classifiers=[
+ "Programming Language :: Python :: 3",
+ "License :: OSI Approved :: Apache Software License",
+ "Operating System :: OS Independent",
+ ],
+ python_requires=">=3.10",
+)
diff --git a/tests/test_project.py b/tests/test_project.py
index 4568ed7e..87dbde92 100644
--- a/tests/test_project.py
+++ b/tests/test_project.py
@@ -61,15 +61,15 @@ def _setup_upload_dataset_mocks(
# Create the mock objects
mocks = {
- "parser": patch("roboflow.core.workspace.folderparser.parsefolder", return_value=test_dataset),
- "upload": patch("roboflow.core.workspace.Project.upload_image", side_effect=upload_image_side_effect)
+ "parser": patch("roboflow.util.folderparser.parsefolder", return_value=test_dataset),
+ "upload": patch("roboflow.core.project.Project.upload_image", side_effect=upload_image_side_effect)
if upload_image_side_effect
- else patch("roboflow.core.workspace.Project.upload_image", return_value=image_return),
+ else patch("roboflow.core.project.Project.upload_image", return_value=image_return),
"save_annotation": patch(
- "roboflow.core.workspace.Project.save_annotation", side_effect=save_annotation_side_effect
+ "roboflow.core.project.Project.save_annotation", side_effect=save_annotation_side_effect
)
if save_annotation_side_effect
- else patch("roboflow.core.workspace.Project.save_annotation", return_value=annotation_return),
+ else patch("roboflow.core.project.Project.save_annotation", return_value=annotation_return),
"get_project": patch(
"roboflow.core.workspace.Workspace._get_or_create_project", return_value=(self.project, project_created)
),
@@ -348,7 +348,7 @@ def test_project_upload_dataset(self):
"extra_mocks": [
(
"load_labelmap",
- "roboflow.core.workspace.load_labelmap",
+ "roboflow.util.image_utils.load_labelmap",
{"return_value": {"old_label": "new_label"}},
)
],
@@ -650,13 +650,13 @@ def capture_annotation_calls(annotation_path, **kwargs):
return ({"success": True}, 0.1, 0)
mocks = {
- "parser": patch("roboflow.core.workspace.folderparser.parsefolder", return_value=parsed_dataset),
+ "parser": patch("roboflow.util.folderparser.parsefolder", return_value=parsed_dataset),
"upload": patch(
- "roboflow.core.workspace.Project.upload_image",
+ "roboflow.core.project.Project.upload_image",
return_value=({"id": "test-id", "success": True}, 0.1, 0),
),
"save_annotation": patch(
- "roboflow.core.workspace.Project.save_annotation", side_effect=capture_annotation_calls
+ "roboflow.core.project.Project.save_annotation", side_effect=capture_annotation_calls
),
"get_project": patch(
"roboflow.core.workspace.Workspace._get_or_create_project", return_value=(self.project, False)
@@ -737,13 +737,13 @@ def capture_annotation_calls(annotation_path, **kwargs):
return ({"success": True}, 0.1, 0)
mocks = {
- "parser": patch("roboflow.core.workspace.folderparser.parsefolder", return_value=parsed_dataset),
+ "parser": patch("roboflow.util.folderparser.parsefolder", return_value=parsed_dataset),
"upload": patch(
- "roboflow.core.workspace.Project.upload_image",
+ "roboflow.core.project.Project.upload_image",
return_value=({"id": "test-id", "success": True}, 0.1, 0),
),
"save_annotation": patch(
- "roboflow.core.workspace.Project.save_annotation", side_effect=capture_annotation_calls
+ "roboflow.core.project.Project.save_annotation", side_effect=capture_annotation_calls
),
"get_project": patch(
"roboflow.core.workspace.Workspace._get_or_create_project", return_value=(self.project, False)
diff --git a/tests/test_slim_compat.py b/tests/test_slim_compat.py
new file mode 100644
index 00000000..972eeacf
--- /dev/null
+++ b/tests/test_slim_compat.py
@@ -0,0 +1,84 @@
+"""Tests for slim install compatibility.
+
+Verifies that the package can be imported and lightweight features work
+even when heavy dependencies (PIL, opencv, numpy, matplotlib) are missing.
+
+In a full install, these tests verify the guards don't break normal behavior.
+In a slim install, they verify graceful degradation.
+"""
+
+import unittest
+
+
+class TestSlimImport(unittest.TestCase):
+ """Verify that importing the package always succeeds."""
+
+ def test_import_roboflow(self):
+ import roboflow
+
+ self.assertIsNotNone(roboflow.__version__)
+
+ def test_import_vision_events_adapter(self):
+ from roboflow.adapters import vision_events_api
+
+ self.assertTrue(callable(vision_events_api.write_event))
+ self.assertTrue(callable(vision_events_api.write_batch))
+ self.assertTrue(callable(vision_events_api.query))
+ self.assertTrue(callable(vision_events_api.list_use_cases))
+ self.assertTrue(callable(vision_events_api.get_custom_metadata_schema))
+ self.assertTrue(callable(vision_events_api.upload_image))
+
+ def test_import_config(self):
+ from roboflow.config import API_URL
+
+ self.assertIsInstance(API_URL, str)
+
+ def test_import_rfapi(self):
+ from roboflow.adapters.rfapi import RoboflowError
+
+ self.assertTrue(issubclass(RoboflowError, Exception))
+
+ def test_import_cli(self):
+ from roboflow.cli import app
+
+ self.assertIsNotNone(app)
+
+
+class TestSlimGracefulDegradation(unittest.TestCase):
+ """Verify that heavy features fail with clear errors when deps are missing.
+
+ These tests only exercise the error path when PIL/opencv are absent.
+ In a full install they verify the guard exists but doesn't fire.
+ """
+
+ def test_workspace_always_available(self):
+ """Workspace imports cleanly even in slim mode."""
+ import roboflow
+
+ self.assertIsNotNone(roboflow.Workspace)
+ self.assertTrue(callable(roboflow.Workspace))
+
+ def test_project_guarded(self):
+ """Project is either a real class (full) or None (slim)."""
+ import roboflow
+
+ self.assertTrue(roboflow.Project is None or callable(roboflow.Project))
+
+ def test_roboflow_project_guard(self):
+ """If Project is None (slim), calling project() raises ImportError."""
+ import roboflow
+
+ if roboflow.Project is not None:
+ self.skipTest("Full install, Project is available")
+
+ rf = roboflow.Roboflow.__new__(roboflow.Roboflow)
+ rf.api_key = "test"
+ rf.current_workspace = "test"
+
+ with self.assertRaises(ImportError) as ctx:
+ rf.project("test-project")
+ self.assertIn("pip install roboflow", str(ctx.exception))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_vision_events.py b/tests/test_vision_events.py
new file mode 100644
index 00000000..0ab2bb7c
--- /dev/null
+++ b/tests/test_vision_events.py
@@ -0,0 +1,636 @@
+import json
+import os
+import tempfile
+import unittest
+
+import responses
+
+from roboflow.adapters.rfapi import RoboflowError
+from roboflow.config import API_URL
+
+# The vision events API does not include workspace in the URL.
+# Auth is via Bearer token; workspace is derived server-side from the API key.
+_BASE = f"{API_URL}/vision-events"
+
+
+class TestVisionEvents(unittest.TestCase):
+ API_KEY = "test_key"
+ WORKSPACE = "test-ws"
+
+ def _make_workspace(self):
+ from roboflow.core.workspace import Workspace
+
+ info = {
+ "workspace": {
+ "name": "Test",
+ "url": self.WORKSPACE,
+ "projects": [],
+ "members": [],
+ }
+ }
+ return Workspace(info, api_key=self.API_KEY, default_workspace=self.WORKSPACE, model_format="yolov8")
+
+ def _assert_bearer_auth(self, call_index=0):
+ auth = responses.calls[call_index].request.headers.get("Authorization")
+ self.assertEqual(auth, f"Bearer {self.API_KEY}")
+
+ # --- write_vision_event ---
+
+ @responses.activate
+ def test_write_event(self):
+ responses.add(responses.POST, _BASE, json={"eventId": "evt-001"}, status=201)
+
+ ws = self._make_workspace()
+ event = {
+ "eventId": "evt-001",
+ "eventType": "quality_check",
+ "useCaseId": "uc-1",
+ "timestamp": "2024-01-15T10:00:00Z",
+ "eventData": {"result": "pass"},
+ }
+ result = ws.write_vision_event(event)
+
+ self.assertEqual(result["eventId"], "evt-001")
+ self._assert_bearer_auth()
+ sent = json.loads(responses.calls[0].request.body)
+ self.assertEqual(sent["eventId"], "evt-001")
+ self.assertEqual(sent["eventType"], "quality_check")
+ self.assertEqual(sent["useCaseId"], "uc-1")
+ self.assertEqual(sent["eventData"], {"result": "pass"})
+
+ @responses.activate
+ def test_write_event_passthrough(self):
+ """The event dict must be sent to the server unchanged (no filtering or transformation)."""
+ responses.add(responses.POST, _BASE, json={"eventId": "e1"}, status=201)
+
+ ws = self._make_workspace()
+ event = {
+ "eventId": "e1",
+ "eventType": "safety_alert",
+ "useCaseId": "warehouse-safety",
+ "timestamp": "2024-06-01T12:00:00Z",
+ "deviceId": "cam-5",
+ "streamId": "stream-a",
+ "workflowId": "wf-1",
+ "images": [{"sourceId": "src-1", "label": "frame"}],
+ "eventData": {"alertType": "fire", "severity": "high"},
+ "customMetadata": {"zone": "B3", "temperature": 42.5, "active": True},
+ }
+ ws.write_vision_event(event)
+
+ sent = json.loads(responses.calls[0].request.body)
+ self.assertEqual(sent, event)
+
+ @responses.activate
+ def test_write_event_error(self):
+ responses.add(responses.POST, _BASE, json={"error": "forbidden"}, status=403)
+
+ ws = self._make_workspace()
+ with self.assertRaises(RoboflowError):
+ ws.write_vision_event({"eventId": "x", "eventType": "custom", "useCaseId": "s", "timestamp": "t"})
+
+ # --- write_vision_events_batch ---
+
+ @responses.activate
+ def test_write_batch(self):
+ responses.add(responses.POST, f"{_BASE}/batch", json={"created": 2, "eventIds": ["e1", "e2"]}, status=201)
+
+ ws = self._make_workspace()
+ events = [
+ {"eventId": "e1", "eventType": "custom", "useCaseId": "s", "timestamp": "2024-01-15T10:00:00Z"},
+ {"eventId": "e2", "eventType": "custom", "useCaseId": "s", "timestamp": "2024-01-15T10:01:00Z"},
+ ]
+ result = ws.write_vision_events_batch(events)
+
+ self.assertEqual(result["created"], 2)
+ self.assertEqual(result["eventIds"], ["e1", "e2"])
+ self._assert_bearer_auth()
+ sent = json.loads(responses.calls[0].request.body)
+ self.assertEqual(len(sent["events"]), 2)
+
+ @responses.activate
+ def test_write_batch_error(self):
+ responses.add(responses.POST, f"{_BASE}/batch", json={"error": "validation"}, status=400)
+
+ ws = self._make_workspace()
+ with self.assertRaises(RoboflowError):
+ ws.write_vision_events_batch([{"bad": "event"}])
+
+ # --- query_vision_events ---
+
+ @responses.activate
+ def test_query_basic(self):
+ body = {
+ "events": [{"eventId": "e1"}, {"eventId": "e2"}],
+ "nextCursor": None,
+ "hasMore": False,
+ "lookbackDays": 14,
+ }
+ responses.add(responses.POST, f"{_BASE}/query", json=body, status=200)
+
+ ws = self._make_workspace()
+ result = ws.query_vision_events("my-use-case")
+
+ self.assertEqual(len(result["events"]), 2)
+ self.assertFalse(result["hasMore"])
+ self._assert_bearer_auth()
+ sent = json.loads(responses.calls[0].request.body)
+ self.assertEqual(sent["useCaseId"], "my-use-case")
+
+ @responses.activate
+ def test_query_with_filters(self):
+ body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14}
+ responses.add(responses.POST, f"{_BASE}/query", json=body, status=200)
+
+ ws = self._make_workspace()
+ ws.query_vision_events(
+ "my-uc",
+ event_type="quality_check",
+ start_time="2024-01-01T00:00:00Z",
+ end_time="2024-02-01T00:00:00Z",
+ limit=10,
+ cursor="abc123",
+ deviceId={"operator": "eq", "value": "cam-01"},
+ )
+
+ sent = json.loads(responses.calls[0].request.body)
+ self.assertEqual(sent["useCaseId"], "my-uc")
+ self.assertEqual(sent["eventType"], "quality_check")
+ self.assertEqual(sent["startTime"], "2024-01-01T00:00:00Z")
+ self.assertEqual(sent["endTime"], "2024-02-01T00:00:00Z")
+ self.assertEqual(sent["limit"], 10)
+ self.assertEqual(sent["cursor"], "abc123")
+ self.assertEqual(sent["deviceId"], {"operator": "eq", "value": "cam-01"})
+
+ @responses.activate
+ def test_query_with_event_types_plural(self):
+ body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14}
+ responses.add(responses.POST, f"{_BASE}/query", json=body, status=200)
+
+ ws = self._make_workspace()
+ ws.query_vision_events("uc", event_types=["quality_check", "safety_alert"])
+
+ sent = json.loads(responses.calls[0].request.body)
+ self.assertEqual(sent["eventTypes"], ["quality_check", "safety_alert"])
+ self.assertNotIn("eventType", sent)
+
+ @responses.activate
+ def test_query_omits_none_params(self):
+ """Optional params that are None must not appear in the payload."""
+ body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14}
+ responses.add(responses.POST, f"{_BASE}/query", json=body, status=200)
+
+ ws = self._make_workspace()
+ ws.query_vision_events("uc")
+
+ sent = json.loads(responses.calls[0].request.body)
+ self.assertEqual(sent, {"useCaseId": "uc"})
+
+ @responses.activate
+ def test_query_error(self):
+ responses.add(responses.POST, f"{_BASE}/query", json={"error": "unauthorized"}, status=401)
+
+ ws = self._make_workspace()
+ with self.assertRaises(RoboflowError):
+ ws.query_vision_events("my-uc")
+
+ # --- query_all_vision_events ---
+
+ @responses.activate
+ def test_query_all_single_page(self):
+ body = {
+ "events": [{"eventId": "e1"}],
+ "nextCursor": None,
+ "hasMore": False,
+ "lookbackDays": 14,
+ }
+ responses.add(responses.POST, f"{_BASE}/query", json=body, status=200)
+
+ ws = self._make_workspace()
+ pages = list(ws.query_all_vision_events("my-uc"))
+
+ self.assertEqual(len(pages), 1)
+ self.assertEqual(pages[0][0]["eventId"], "e1")
+
+ @responses.activate
+ def test_query_all_multiple_pages(self):
+ page1 = {"events": [{"eventId": "e1"}], "nextCursor": "cursor2", "hasMore": True, "lookbackDays": 14}
+ page2 = {"events": [{"eventId": "e2"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14}
+ responses.add(responses.POST, f"{_BASE}/query", json=page1, status=200)
+ responses.add(responses.POST, f"{_BASE}/query", json=page2, status=200)
+
+ ws = self._make_workspace()
+ pages = list(ws.query_all_vision_events("my-uc"))
+
+ self.assertEqual(len(pages), 2)
+ self.assertEqual(pages[0][0]["eventId"], "e1")
+ self.assertEqual(pages[1][0]["eventId"], "e2")
+
+ # Verify cursor was sent in second request
+ sent2 = json.loads(responses.calls[1].request.body)
+ self.assertEqual(sent2["cursor"], "cursor2")
+
+ @responses.activate
+ def test_query_all_forwards_filters(self):
+ """Filters must be forwarded to every page request, not just the first."""
+ page1 = {"events": [{"eventId": "e1"}], "nextCursor": "c2", "hasMore": True, "lookbackDays": 14}
+ page2 = {"events": [{"eventId": "e2"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14}
+ responses.add(responses.POST, f"{_BASE}/query", json=page1, status=200)
+ responses.add(responses.POST, f"{_BASE}/query", json=page2, status=200)
+
+ ws = self._make_workspace()
+ list(ws.query_all_vision_events("uc", event_type="quality_check", limit=1))
+
+ sent1 = json.loads(responses.calls[0].request.body)
+ sent2 = json.loads(responses.calls[1].request.body)
+
+ # Both requests should have the filter
+ self.assertEqual(sent1["eventType"], "quality_check")
+ self.assertEqual(sent2["eventType"], "quality_check")
+ # Second request should also have the cursor
+ self.assertNotIn("cursor", sent1)
+ self.assertEqual(sent2["cursor"], "c2")
+
+ @responses.activate
+ def test_query_all_empty(self):
+ body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14}
+ responses.add(responses.POST, f"{_BASE}/query", json=body, status=200)
+
+ ws = self._make_workspace()
+ pages = list(ws.query_all_vision_events("my-uc"))
+
+ self.assertEqual(len(pages), 0)
+
+ # --- list_vision_event_use_cases ---
+
+ @responses.activate
+ def test_list_use_cases(self):
+ body = {
+ "useCases": [
+ {"id": "uc-1", "name": "QA", "status": "active"},
+ ],
+ "lookbackDays": 14,
+ }
+ responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200)
+
+ ws = self._make_workspace()
+ result = ws.list_vision_event_use_cases()
+
+ self.assertEqual(len(result["useCases"]), 1)
+ self.assertEqual(result["useCases"][0]["name"], "QA")
+ self._assert_bearer_auth()
+
+ @responses.activate
+ def test_list_use_cases_with_status(self):
+ body = {"useCases": [], "lookbackDays": 14}
+ responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200)
+
+ ws = self._make_workspace()
+ result = ws.list_vision_event_use_cases(status="inactive")
+
+ self.assertEqual(len(result["useCases"]), 0)
+ # Verify status was sent as query param
+ self.assertIn("status=inactive", responses.calls[0].request.url)
+
+ @responses.activate
+ def test_list_use_cases_legacy_solutions_response(self):
+ responses.add(
+ responses.GET,
+ f"{_BASE}/use-cases",
+ json={"solutions": [{"id": "uc-legacy", "name": "Legacy"}], "lookbackDays": 14},
+ status=200,
+ )
+
+ ws = self._make_workspace()
+ result = ws.list_vision_event_use_cases()
+ self.assertEqual(result["useCases"][0]["id"], "uc-legacy")
+
+ @responses.activate
+ def test_list_use_cases_error(self):
+ responses.add(responses.GET, f"{_BASE}/use-cases", json={"error": "forbidden"}, status=403)
+
+ ws = self._make_workspace()
+ with self.assertRaises(RoboflowError):
+ ws.list_vision_event_use_cases()
+
+ # --- create_vision_event_use_case ---
+
+ @responses.activate
+ def test_create_use_case(self):
+ responses.add(
+ responses.POST,
+ f"{_BASE}/use-cases",
+ json={"id": "new-uc", "name": "My Use Case"},
+ status=201,
+ )
+
+ ws = self._make_workspace()
+ result = ws.create_vision_event_use_case("My Use Case")
+
+ self.assertEqual(result["id"], "new-uc")
+ self.assertEqual(result["name"], "My Use Case")
+ self._assert_bearer_auth()
+ sent = json.loads(responses.calls[0].request.body)
+ self.assertEqual(sent["name"], "My Use Case")
+
+ @responses.activate
+ def test_create_use_case_error(self):
+ responses.add(responses.POST, f"{_BASE}/use-cases", json={"error": "duplicate name"}, status=409)
+
+ ws = self._make_workspace()
+ with self.assertRaises(RoboflowError):
+ ws.create_vision_event_use_case("Existing Name")
+
+ # --- rename_vision_event_use_case ---
+
+ @responses.activate
+ def test_rename_use_case(self):
+ responses.add(
+ responses.PUT,
+ f"{_BASE}/use-cases/uc-1",
+ json={"id": "uc-1", "name": "Renamed"},
+ status=200,
+ )
+
+ ws = self._make_workspace()
+ result = ws.rename_vision_event_use_case("uc-1", "Renamed")
+
+ self.assertEqual(result["id"], "uc-1")
+ self.assertEqual(result["name"], "Renamed")
+ self._assert_bearer_auth()
+ sent = json.loads(responses.calls[0].request.body)
+ self.assertEqual(sent["name"], "Renamed")
+
+ @responses.activate
+ def test_rename_use_case_error(self):
+ responses.add(responses.PUT, f"{_BASE}/use-cases/nonexistent", json={"error": "not found"}, status=404)
+
+ ws = self._make_workspace()
+ with self.assertRaises(RoboflowError):
+ ws.rename_vision_event_use_case("nonexistent", "New Name")
+
+ # --- archive_vision_event_use_case ---
+
+ @responses.activate
+ def test_archive_use_case(self):
+ responses.add(
+ responses.POST,
+ f"{_BASE}/use-cases/uc-1/archive",
+ json={"success": True},
+ status=200,
+ )
+
+ ws = self._make_workspace()
+ result = ws.archive_vision_event_use_case("uc-1")
+
+ self.assertTrue(result["success"])
+ self._assert_bearer_auth()
+
+ @responses.activate
+ def test_archive_use_case_error(self):
+ responses.add(responses.POST, f"{_BASE}/use-cases/nonexistent/archive", json={"error": "not found"}, status=404)
+
+ ws = self._make_workspace()
+ with self.assertRaises(RoboflowError):
+ ws.archive_vision_event_use_case("nonexistent")
+
+ # --- unarchive_vision_event_use_case ---
+
+ @responses.activate
+ def test_unarchive_use_case(self):
+ responses.add(
+ responses.POST,
+ f"{_BASE}/use-cases/uc-1/unarchive",
+ json={"success": True},
+ status=200,
+ )
+
+ ws = self._make_workspace()
+ result = ws.unarchive_vision_event_use_case("uc-1")
+
+ self.assertTrue(result["success"])
+ self._assert_bearer_auth()
+
+ @responses.activate
+ def test_unarchive_use_case_error(self):
+ responses.add(responses.POST, f"{_BASE}/use-cases/uc-1/unarchive", json={"error": "already active"}, status=400)
+
+ ws = self._make_workspace()
+ with self.assertRaises(RoboflowError):
+ ws.unarchive_vision_event_use_case("uc-1")
+
+ # --- get_vision_event_metadata_schema ---
+
+ @responses.activate
+ def test_get_metadata_schema(self):
+ body = {
+ "useCaseId": "manufacturing-qa",
+ "fields": {
+ "temperature": {"types": ["number"]},
+ "zone": {"types": ["string"]},
+ "active": {"types": ["boolean"]},
+ },
+ }
+ responses.add(
+ responses.GET,
+ f"{_BASE}/custom-metadata-schema/manufacturing-qa",
+ json=body,
+ status=200,
+ )
+
+ ws = self._make_workspace()
+ result = ws.get_vision_event_metadata_schema("manufacturing-qa")
+
+ self.assertEqual(len(result["fields"]), 3)
+ self.assertEqual(result["fields"]["temperature"]["types"], ["number"])
+ self._assert_bearer_auth()
+
+ @responses.activate
+ def test_get_metadata_schema_error(self):
+ responses.add(
+ responses.GET,
+ f"{_BASE}/custom-metadata-schema/nonexistent",
+ json={"error": "not found"},
+ status=404,
+ )
+
+ ws = self._make_workspace()
+ with self.assertRaises(RoboflowError):
+ ws.get_vision_event_metadata_schema("nonexistent")
+
+ # --- upload_vision_event_image ---
+
+ @responses.activate
+ def test_upload_image(self):
+ responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-123"}, status=201)
+
+ ws = self._make_workspace()
+
+ with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
+ f.write(b"\xff\xd8\xff\xe0fake-jpeg-data")
+ tmp_path = f.name
+
+ try:
+ result = ws.upload_vision_event_image(tmp_path)
+ self.assertEqual(result["sourceId"], "src-123")
+ self._assert_bearer_auth()
+ finally:
+ os.unlink(tmp_path)
+
+ @responses.activate
+ def test_upload_image_uses_basename(self):
+ """When no name is provided, the multipart filename should be the basename of the path."""
+ responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-789"}, status=201)
+
+ ws = self._make_workspace()
+
+ with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False, prefix="myimage_") as f:
+ f.write(b"\xff\xd8\xff\xe0fake")
+ tmp_path = f.name
+
+ try:
+ ws.upload_vision_event_image(tmp_path)
+ request_body = responses.calls[0].request.body
+ basename = os.path.basename(tmp_path).encode()
+ if isinstance(request_body, bytes):
+ self.assertIn(basename, request_body)
+ finally:
+ os.unlink(tmp_path)
+
+ @responses.activate
+ def test_upload_image_with_metadata(self):
+ responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-456"}, status=201)
+
+ ws = self._make_workspace()
+
+ with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
+ f.write(b"\x89PNGfake-png-data")
+ tmp_path = f.name
+
+ try:
+ result = ws.upload_vision_event_image(
+ tmp_path,
+ name="custom-name.png",
+ metadata={"camera_id": "cam-01"},
+ )
+ self.assertEqual(result["sourceId"], "src-456")
+
+ request_body = responses.calls[0].request.body
+ # Verify metadata and name were included in the multipart body
+ if isinstance(request_body, bytes):
+ self.assertIn(b"cam-01", request_body)
+ self.assertIn(b"custom-name.png", request_body)
+ finally:
+ os.unlink(tmp_path)
+
+ @responses.activate
+ def test_upload_image_error(self):
+ responses.add(responses.POST, f"{_BASE}/upload", json={"error": "forbidden"}, status=403)
+
+ ws = self._make_workspace()
+
+ with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
+ f.write(b"data")
+ tmp_path = f.name
+
+ try:
+ with self.assertRaises(RoboflowError):
+ ws.upload_vision_event_image(tmp_path)
+ finally:
+ os.unlink(tmp_path)
+
+
+class TestVisionEventsAdapter(unittest.TestCase):
+ """Tests that call the adapter directly (no Workspace). Work in slim installs."""
+
+ API_KEY = "test_key"
+
+ @responses.activate
+ def test_adapter_write_event(self):
+ from roboflow.adapters import vision_events_api
+
+ responses.add(responses.POST, _BASE, json={"eventId": "e1", "created": True}, status=201)
+ result = vision_events_api.write_event(
+ self.API_KEY, {"eventId": "e1", "eventType": "custom", "useCaseId": "uc"}
+ )
+ self.assertEqual(result["eventId"], "e1")
+ self.assertEqual(responses.calls[0].request.headers["Authorization"], f"Bearer {self.API_KEY}")
+
+ @responses.activate
+ def test_adapter_write_batch(self):
+ from roboflow.adapters import vision_events_api
+
+ responses.add(responses.POST, f"{_BASE}/batch", json={"created": 1, "eventIds": ["e1"]}, status=201)
+ result = vision_events_api.write_batch(self.API_KEY, [{"eventId": "e1"}])
+ self.assertEqual(result["created"], 1)
+
+ @responses.activate
+ def test_adapter_query(self):
+ from roboflow.adapters import vision_events_api
+
+ body = {"events": [{"eventId": "e1"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14}
+ responses.add(responses.POST, f"{_BASE}/query", json=body, status=200)
+ result = vision_events_api.query(self.API_KEY, {"useCaseId": "uc"})
+ self.assertEqual(len(result["events"]), 1)
+
+ @responses.activate
+ def test_adapter_list_use_cases(self):
+ from roboflow.adapters import vision_events_api
+
+ body = {"useCases": [{"id": "uc-1", "name": "QA"}], "lookbackDays": 14}
+ responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200)
+ result = vision_events_api.list_use_cases(self.API_KEY)
+ self.assertEqual(len(result["useCases"]), 1)
+
+ @responses.activate
+ def test_adapter_get_metadata_schema(self):
+ from roboflow.adapters import vision_events_api
+
+ body = {"useCaseId": "uc-1", "fields": {"temp": {"types": ["number"]}}}
+ responses.add(responses.GET, f"{_BASE}/custom-metadata-schema/uc-1", json=body, status=200)
+ result = vision_events_api.get_custom_metadata_schema(self.API_KEY, "uc-1")
+ self.assertEqual(result["fields"]["temp"]["types"], ["number"])
+
+ @responses.activate
+ def test_adapter_create_use_case(self):
+ from roboflow.adapters import vision_events_api
+
+ responses.add(responses.POST, f"{_BASE}/use-cases", json={"id": "uc-new", "name": "Test"}, status=201)
+ result = vision_events_api.create_use_case(self.API_KEY, "Test")
+ self.assertEqual(result["id"], "uc-new")
+
+ @responses.activate
+ def test_adapter_rename_use_case(self):
+ from roboflow.adapters import vision_events_api
+
+ responses.add(responses.PUT, f"{_BASE}/use-cases/uc-1", json={"id": "uc-1", "name": "New"}, status=200)
+ result = vision_events_api.rename_use_case(self.API_KEY, "uc-1", "New")
+ self.assertEqual(result["name"], "New")
+
+ @responses.activate
+ def test_adapter_archive_use_case(self):
+ from roboflow.adapters import vision_events_api
+
+ responses.add(responses.POST, f"{_BASE}/use-cases/uc-1/archive", json={"success": True}, status=200)
+ result = vision_events_api.archive_use_case(self.API_KEY, "uc-1")
+ self.assertTrue(result["success"])
+
+ @responses.activate
+ def test_adapter_unarchive_use_case(self):
+ from roboflow.adapters import vision_events_api
+
+ responses.add(responses.POST, f"{_BASE}/use-cases/uc-1/unarchive", json={"success": True}, status=200)
+ result = vision_events_api.unarchive_use_case(self.API_KEY, "uc-1")
+ self.assertTrue(result["success"])
+
+ @responses.activate
+ def test_adapter_error_raises_roboflow_error(self):
+ from roboflow.adapters import vision_events_api
+
+ responses.add(responses.POST, _BASE, json={"error": "forbidden"}, status=403)
+ with self.assertRaises(RoboflowError):
+ vision_events_api.write_event(self.API_KEY, {"eventId": "x"})
+
+
+if __name__ == "__main__":
+ unittest.main()