diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index dcc16258..42234b56 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -28,6 +28,29 @@ jobs: run: | make publish -e PYPI_USERNAME=$PYPI_USERNAME -e PYPI_PASSWORD=$PYPI_PASSWORD -e PYPI_TEST_PASSWORD=$PYPI_TEST_PASSWORD + build-slim: + needs: build + runs-on: ubuntu-latest + steps: + - name: ๐Ÿ›Ž๏ธ Checkout + uses: actions/checkout@v4 + with: + ref: ${{ github.head_ref }} + - name: ๐Ÿ Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + - name: ๐Ÿฆพ Install dependencies + run: | + python -m pip install --upgrade pip + pip install ".[dev]" + - name: ๐Ÿš€ Publish roboflow-slim to PyPi + env: + PYPI_USERNAME: ${{ secrets.PYPI_USERNAME }} + PYPI_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: | + make publish-slim -e PYPI_USERNAME=$PYPI_USERNAME -e PYPI_PASSWORD=$PYPI_PASSWORD + deploy-docs: needs: build runs-on: ubuntu-latest diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fd99017c..d8af7f47 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,6 +6,9 @@ on: pull_request: branches: [main] +permissions: + contents: read + jobs: build: strategy: @@ -35,3 +38,24 @@ jobs: make check_code_quality - name: ๐Ÿงช Run tests run: "python -m unittest" + + test-slim: + runs-on: ubuntu-latest + steps: + - name: ๐Ÿ›Ž๏ธ Checkout + uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.ref }} + repository: ${{ github.event.pull_request.head.repo.full_name }} + - name: ๐Ÿ Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: '3.10' + - name: ๐Ÿฆพ Install slim dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-slim.txt + pip install -e . --no-deps + pip install responses + - name: ๐Ÿงช Run slim-compatible tests + run: "python -m unittest tests.test_slim_compat tests.test_vision_events" diff --git a/Makefile b/Makefile index 1d59d41f..a5a30fcb 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: style check_code_quality publish +.PHONY: style check_code_quality publish publish-slim export PYTHONPATH = . check_dirs := roboflow @@ -16,3 +16,9 @@ publish: python setup.py sdist bdist_wheel twine check dist/* twine upload dist/* -u ${PYPI_USERNAME} -p ${PYPI_PASSWORD} --verbose + +publish-slim: + rm -rf dist/ build/ *.egg-info + python setup_slim.py sdist bdist_wheel + twine check dist/* + twine upload dist/* -u ${PYPI_USERNAME} -p ${PYPI_PASSWORD} --verbose diff --git a/README.md b/README.md index 22d04177..5e034ff1 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,20 @@ pip install "roboflow[desktop]" ``` +
+ Lightweight install (roboflow-slim) + + If you only need vision events, workspace management, and the CLI (no image processing, inference, or training), install the lightweight package: + + ```bash + pip install roboflow-slim + ``` + + This skips heavy dependencies like OpenCV, NumPy, Matplotlib, and Pillow, reducing install size from ~400MB to ~50MB. Useful for embedded devices, CI pipelines, and serverless environments. + + Both packages share the same codebase and version. `pip install roboflow` includes everything. +
+
Install from source diff --git a/requirements-slim.txt b/requirements-slim.txt new file mode 100644 index 00000000..9709e296 --- /dev/null +++ b/requirements-slim.txt @@ -0,0 +1,12 @@ +certifi +idna +requests +urllib3>=1.26.6 +tqdm>=4.41.0 +PyYAML>=5.3.1 +requests_toolbelt +filetype +typer>=0.12.0 +python-dateutil +python-dotenv +six diff --git a/roboflow/__init__.py b/roboflow/__init__.py index 03f9acb7..30ed3fd1 100644 --- a/roboflow/__init__.py +++ b/roboflow/__init__.py @@ -10,11 +10,17 @@ from roboflow.adapters import rfapi from roboflow.config import API_URL, APP_URL, DEMO_KEYS, load_roboflow_api_key -from roboflow.core.project import Project from roboflow.core.workspace import Workspace -from roboflow.models import CLIPModel, GazeModel # noqa: F401 from roboflow.util.general import write_line +try: + from roboflow.core.project import Project + from roboflow.models import CLIPModel, GazeModel # noqa: F401 +except ImportError: + Project = None # type: ignore[assignment,misc] + CLIPModel = None # type: ignore[assignment,misc] + GazeModel = None # type: ignore[assignment,misc] + __version__ = "1.3.1" @@ -250,6 +256,10 @@ def project(self, project_name, the_workspace=None): :param the_workspace workspace name :return project object """ + if Project is None: + raise ImportError( + "Project requires additional dependencies. Install the full package: pip install roboflow" + ) if the_workspace is None: if "/" in project_name: diff --git a/roboflow/adapters/rfapi.py b/roboflow/adapters/rfapi.py index 2b7da820..5e224c02 100644 --- a/roboflow/adapters/rfapi.py +++ b/roboflow/adapters/rfapi.py @@ -8,7 +8,6 @@ from requests_toolbelt.multipart.encoder import MultipartEncoder from roboflow.config import API_URL, DEFAULT_BATCH_NAME, DEFAULT_JOB_NAME -from roboflow.util import image_utils class RoboflowError(Exception): @@ -294,6 +293,8 @@ def upload_image( # If image is not a hosted image if not hosted_image: + from roboflow.util import image_utils + image_name = os.path.basename(image_path) imgjpeg = image_utils.file2jpeg(image_path) diff --git a/roboflow/adapters/vision_events_api.py b/roboflow/adapters/vision_events_api.py new file mode 100644 index 00000000..358ac5be --- /dev/null +++ b/roboflow/adapters/vision_events_api.py @@ -0,0 +1,260 @@ +import json +import os +from typing import Any, Dict, List, Optional + +import requests +from requests_toolbelt.multipart.encoder import MultipartEncoder + +from roboflow.adapters.rfapi import RoboflowError +from roboflow.config import API_URL + +_BASE = f"{API_URL}/vision-events" + + +def _auth_headers(api_key: str) -> Dict[str, str]: + return {"Authorization": f"Bearer {api_key}"} + + +def write_event(api_key: str, event: Dict[str, Any]) -> dict: + """Create a single vision event. + + Args: + api_key: Roboflow API key. + event: Event payload dict (eventId, eventType, useCaseId, timestamp, etc.). + + Returns: + Parsed JSON response with ``eventId`` and ``created``. + + Raises: + RoboflowError: On non-201 response status codes. + """ + response = requests.post(_BASE, json=event, headers=_auth_headers(api_key)) + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() + + +def write_batch(api_key: str, events: List[Dict[str, Any]]) -> dict: + """Create multiple vision events in a single request. + + Args: + api_key: Roboflow API key. + events: List of event payload dicts (max 100 per the server). + + Returns: + Parsed JSON response with ``created`` count and ``eventIds``. + + Raises: + RoboflowError: On non-201 response status codes. + """ + response = requests.post( + f"{_BASE}/batch", + json={"events": events}, + headers=_auth_headers(api_key), + ) + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() + + +def query(api_key: str, query_params: Dict[str, Any]) -> dict: + """Query vision events with filters and pagination. + + Args: + api_key: Roboflow API key. + query_params: Query payload (useCaseId, eventType, startTime, endTime, + cursor, limit, customMetadataFilters, etc.). + + Returns: + Parsed JSON response with ``events``, ``nextCursor``, ``hasMore``, + and ``lookbackDays``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.post( + f"{_BASE}/query", + json=query_params, + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def list_use_cases(api_key: str, status: Optional[str] = None) -> dict: + """List all use cases for a workspace. + + Args: + api_key: Roboflow API key. + status: Optional status filter (default server-side: "active"). + + Returns: + Parsed JSON response with ``useCases`` list and ``lookbackDays``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + params: Dict[str, str] = {} + if status is not None: + params["status"] = status + response = requests.get( + f"{_BASE}/use-cases", + params=params, + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def get_custom_metadata_schema(api_key: str, use_case_id: str) -> dict: + """Get the custom metadata schema for a use case. + + Args: + api_key: Roboflow API key. + use_case_id: Use case identifier. + + Returns: + Parsed JSON response with ``fields`` mapping field names to their types. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.get( + f"{_BASE}/custom-metadata-schema/{use_case_id}", + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def create_use_case(api_key: str, name: str) -> dict: + """Create a new vision event use case. + + Args: + api_key: Roboflow API key. + name: Human-readable name for the use case. + + Returns: + Parsed JSON response with ``id`` and ``name``. + + Raises: + RoboflowError: On non-201 response status codes. + """ + response = requests.post( + f"{_BASE}/use-cases", + json={"name": name}, + headers=_auth_headers(api_key), + ) + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() + + +def rename_use_case(api_key: str, use_case_id: str, name: str) -> dict: + """Rename an existing vision event use case. + + Args: + api_key: Roboflow API key. + use_case_id: Use case identifier. + name: New name for the use case. + + Returns: + Parsed JSON response with ``id`` and ``name``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.put( + f"{_BASE}/use-cases/{use_case_id}", + json={"name": name}, + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def archive_use_case(api_key: str, use_case_id: str) -> dict: + """Archive a vision event use case. + + Args: + api_key: Roboflow API key. + use_case_id: Use case identifier. + + Returns: + Parsed JSON response with ``success``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.post( + f"{_BASE}/use-cases/{use_case_id}/archive", + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def unarchive_use_case(api_key: str, use_case_id: str) -> dict: + """Unarchive a vision event use case. + + Args: + api_key: Roboflow API key. + use_case_id: Use case identifier. + + Returns: + Parsed JSON response with ``success``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.post( + f"{_BASE}/use-cases/{use_case_id}/unarchive", + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def upload_image( + api_key: str, + image_path: str, + name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> dict: + """Upload an image for use in vision events. + + Args: + api_key: Roboflow API key. + image_path: Local filesystem path to the image file. + name: Optional custom image name. + metadata: Optional flat dict of metadata to attach. + + Returns: + Parsed JSON response with ``sourceId`` (and optionally ``url``). + + Raises: + RoboflowError: On non-201 response status codes. + """ + filename = name or os.path.basename(image_path) + with open(image_path, "rb") as f: + fields: Dict[str, Any] = { + "file": (filename, f, "application/octet-stream"), + } + if name is not None: + fields["name"] = name + if metadata is not None: + fields["metadata"] = json.dumps(metadata) + m = MultipartEncoder(fields=fields) + headers = _auth_headers(api_key) + headers["Content-Type"] = m.content_type + response = requests.post(f"{_BASE}/upload", data=m, headers=headers) + + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() diff --git a/roboflow/cli/__init__.py b/roboflow/cli/__init__.py index 3b3d0c42..45d4cce8 100644 --- a/roboflow/cli/__init__.py +++ b/roboflow/cli/__init__.py @@ -185,6 +185,7 @@ def _walk(group: Any, prefix: str = "") -> None: from roboflow.cli.handlers.universe import universe_app # noqa: E402 from roboflow.cli.handlers.version import version_app # noqa: E402 from roboflow.cli.handlers.video import video_app # noqa: E402 +from roboflow.cli.handlers.vision_events import vision_events_app # noqa: E402 from roboflow.cli.handlers.workflow import workflow_app # noqa: E402 from roboflow.cli.handlers.workspace import workspace_app # noqa: E402 @@ -210,6 +211,7 @@ def _walk(group: Any, prefix: str = "") -> None: app.add_typer(universe_app, name="universe") app.add_typer(version_app, name="version") app.add_typer(video_app, name="video") +app.add_typer(vision_events_app, name="vision-events") app.add_typer(workflow_app, name="workflow") app.add_typer(workspace_app, name="workspace") diff --git a/roboflow/cli/handlers/vision_events.py b/roboflow/cli/handlers/vision_events.py new file mode 100644 index 00000000..61a521da --- /dev/null +++ b/roboflow/cli/handlers/vision_events.py @@ -0,0 +1,441 @@ +"""Vision events commands: write, query, list use cases, and upload images.""" + +from __future__ import annotations + +from typing import Annotated, Optional + +import typer + +from roboflow.cli._compat import SortedGroup, ctx_to_args + +vision_events_app = typer.Typer( + help="Create, query, and manage vision events.", + cls=SortedGroup, + no_args_is_help=True, +) + + +def _resolve(args): # noqa: ANN001 + """Return api_key or call output_error and return None.""" + from roboflow.cli._resolver import resolve_ws_and_key + + resolved = resolve_ws_and_key(args) + if resolved is None: + return None + _ws, api_key = resolved + return api_key + + +# --------------------------------------------------------------------------- +# write +# --------------------------------------------------------------------------- + + +@vision_events_app.command("write") +def write( + ctx: typer.Context, + event: Annotated[str, typer.Argument(help="JSON string of the event payload")], +) -> None: + """Create a single vision event.""" + args = ctx_to_args(ctx, event=event) + _write(args) + + +def _write(args) -> None: # noqa: ANN001 + import json + + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + event = json.loads(args.event) + except (json.JSONDecodeError, TypeError) as exc: + output_error(args, f"Invalid JSON: {exc}", hint="Pass a valid JSON string.") + return + + try: + result = vision_events_api.write_event(api_key, event) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Created event {result.get('eventId', '')}") + + +# --------------------------------------------------------------------------- +# write-batch +# --------------------------------------------------------------------------- + + +@vision_events_app.command("write-batch") +def write_batch( + ctx: typer.Context, + events: Annotated[str, typer.Argument(help="JSON string of the events array")], +) -> None: + """Create multiple vision events in a single request.""" + args = ctx_to_args(ctx, events=events) + _write_batch(args) + + +def _write_batch(args) -> None: # noqa: ANN001 + import json + + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + events = json.loads(args.events) + except (json.JSONDecodeError, TypeError) as exc: + output_error(args, f"Invalid JSON: {exc}", hint="Pass a valid JSON array string.") + return + + try: + result = vision_events_api.write_batch(api_key, events) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Created {result.get('created', 0)} event(s)") + + +# --------------------------------------------------------------------------- +# query +# --------------------------------------------------------------------------- + + +@vision_events_app.command("query") +def query( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier to query")], + event_type: Annotated[Optional[str], typer.Option("-t", "--event-type", help="Filter by event type")] = None, + start_time: Annotated[Optional[str], typer.Option("--start", help="ISO 8601 start time")] = None, + end_time: Annotated[Optional[str], typer.Option("--end", help="ISO 8601 end time")] = None, + limit: Annotated[Optional[int], typer.Option("-l", "--limit", help="Max events to return")] = None, + cursor: Annotated[Optional[str], typer.Option("--cursor", help="Pagination cursor")] = None, +) -> None: + """Query vision events with filters and pagination.""" + args = ctx_to_args( + ctx, + use_case=use_case, + event_type=event_type, + start_time=start_time, + end_time=end_time, + limit=limit, + cursor=cursor, + ) + _query(args) + + +def _query(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + payload = {"useCaseId": args.use_case} + if args.event_type is not None: + payload["eventType"] = args.event_type + if args.start_time is not None: + payload["startTime"] = args.start_time + if args.end_time is not None: + payload["endTime"] = args.end_time + if args.limit is not None: + payload["limit"] = args.limit + if args.cursor is not None: + payload["cursor"] = args.cursor + + try: + result = vision_events_api.query(api_key, payload) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + events = result.get("events", []) + lines = [f"Found {len(events)} event(s)."] + for evt in events: + lines.append(f" {evt.get('eventId', '')} [{evt.get('eventType', '')}]") + if result.get("nextCursor"): + lines.append(f"\nNext page: --cursor {result['nextCursor']}") + + output(args, result, text="\n".join(lines)) + + +# --------------------------------------------------------------------------- +# use-cases +# --------------------------------------------------------------------------- + + +@vision_events_app.command("use-cases") +def use_cases( + ctx: typer.Context, + status: Annotated[Optional[str], typer.Option("-s", "--status", help="Filter by status (active, inactive)")] = None, +) -> None: + """List vision event use cases for the workspace.""" + args = ctx_to_args(ctx, status=status) + _use_cases(args) + + +def _use_cases(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.list_use_cases(api_key, status=args.status) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + items = result.get("useCases") or result.get("solutions", []) + lines = [f"{len(items)} use case(s):"] + for uc in items: + name = uc.get("name", uc.get("id", "")) + if uc.get("eventCount") is not None: + detail = f" ({uc['eventCount']} events)" + elif uc.get("status"): + detail = f" [{uc['status']}]" + else: + detail = "" + lines.append(f" {name}{detail}") + + output(args, result, text="\n".join(lines)) + + +# --------------------------------------------------------------------------- +# create-use-case +# --------------------------------------------------------------------------- + + +@vision_events_app.command("create-use-case") +def create_use_case( + ctx: typer.Context, + name: Annotated[str, typer.Argument(help="Name for the new use case")], +) -> None: + """Create a new vision event use case.""" + args = ctx_to_args(ctx, name=name) + _create_use_case(args) + + +def _create_use_case(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.create_use_case(api_key, args.name) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Created use case {result.get('id', '')} ({result.get('name', '')})") + + +# --------------------------------------------------------------------------- +# rename-use-case +# --------------------------------------------------------------------------- + + +@vision_events_app.command("rename-use-case") +def rename_use_case( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier")], + name: Annotated[str, typer.Option("-n", "--name", help="New name for the use case")], +) -> None: + """Rename an existing vision event use case.""" + args = ctx_to_args(ctx, use_case=use_case, name=name) + _rename_use_case(args) + + +def _rename_use_case(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.rename_use_case(api_key, args.use_case, args.name) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Renamed use case {result.get('id', '')} to {result.get('name', '')}") + + +# --------------------------------------------------------------------------- +# archive-use-case +# --------------------------------------------------------------------------- + + +@vision_events_app.command("archive-use-case") +def archive_use_case( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier")], +) -> None: + """Archive a vision event use case.""" + args = ctx_to_args(ctx, use_case=use_case) + _archive_use_case(args) + + +def _archive_use_case(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.archive_use_case(api_key, args.use_case) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Archived use case {args.use_case}") + + +# --------------------------------------------------------------------------- +# unarchive-use-case +# --------------------------------------------------------------------------- + + +@vision_events_app.command("unarchive-use-case") +def unarchive_use_case( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier")], +) -> None: + """Unarchive a vision event use case.""" + args = ctx_to_args(ctx, use_case=use_case) + _unarchive_use_case(args) + + +def _unarchive_use_case(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.unarchive_use_case(api_key, args.use_case) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Unarchived use case {args.use_case}") + + +# --------------------------------------------------------------------------- +# metadata-schema +# --------------------------------------------------------------------------- + + +@vision_events_app.command("metadata-schema") +def metadata_schema( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier")], +) -> None: + """Get the custom metadata schema for a use case.""" + args = ctx_to_args(ctx, use_case=use_case) + _metadata_schema(args) + + +def _metadata_schema(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.get_custom_metadata_schema(api_key, args.use_case) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + fields = result.get("fields", {}) + lines = [f"{len(fields)} field(s):"] + for name, info in fields.items(): + types = ", ".join(info.get("types", [])) + lines.append(f" {name} ({types})") + + output(args, result, text="\n".join(lines)) + + +# --------------------------------------------------------------------------- +# upload-image +# --------------------------------------------------------------------------- + + +@vision_events_app.command("upload-image") +def upload_image( + ctx: typer.Context, + image: Annotated[str, typer.Argument(help="Path to the image file")], + name: Annotated[Optional[str], typer.Option("-n", "--name", help="Custom image name")] = None, + metadata: Annotated[ + Optional[str], + typer.Option("-M", "--metadata", help='JSON string of metadata (e.g. \'{"camera_id":"cam001"}\')'), + ] = None, +) -> None: + """Upload an image for use in vision events.""" + args = ctx_to_args(ctx, image=image, name=name, metadata=metadata) + _upload_image(args) + + +def _upload_image(args) -> None: # noqa: ANN001 + import json + + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + parsed_metadata = json.loads(args.metadata) if args.metadata else None + except (json.JSONDecodeError, TypeError) as exc: + output_error(args, f"Invalid metadata JSON: {exc}", hint="Pass a valid JSON string.") + return + + try: + result = vision_events_api.upload_image( + api_key, + image_path=args.image, + name=args.name, + metadata=parsed_metadata, + ) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Uploaded image: sourceId={result.get('sourceId', '')}") diff --git a/roboflow/core/workspace.py b/roboflow/core/workspace.py index 698aa59d..6790a20b 100644 --- a/roboflow/core/workspace.py +++ b/roboflow/core/workspace.py @@ -9,21 +9,12 @@ from typing import Any, Dict, Generator, List, Optional import requests -from PIL import Image from requests.exceptions import HTTPError from tqdm import tqdm -from roboflow.adapters import rfapi +from roboflow.adapters import rfapi, vision_events_api from roboflow.adapters.rfapi import AnnotationSaveError, ImageUploadError, RoboflowError -from roboflow.config import API_URL, APP_URL, CLIP_FEATURIZE_URL, DEMO_KEYS -from roboflow.core.project import Project -from roboflow.util import folderparser -from roboflow.util.active_learning_utils import check_box_size, clip_encode, count_comparisons -from roboflow.util.general import extract_zip as _extract_zip -from roboflow.util.image_utils import load_labelmap -from roboflow.util.model_processor import process -from roboflow.util.two_stage_utils import ocr_infer -from roboflow.util.versions import normalize_yolo_model_type +from roboflow.config import API_URL, APP_URL, DEMO_KEYS class Workspace: @@ -64,6 +55,8 @@ def projects(self): Returns: List of Project objects. """ + from roboflow.core.project import Project + projects_array = [] for a_project in self.project_list: proj = Project(self.__api_key, a_project, self.model_format) @@ -83,6 +76,8 @@ def project(self, project_id): Returns: Project Object """ + from roboflow.core.project import Project + sys.stdout.write("\r" + "loading Roboflow project...") sys.stdout.write("\n") sys.stdout.flush() @@ -113,6 +108,8 @@ def create_project(self, project_name, project_type, project_license, annotation Returns: Project Object """ # noqa: E501 // docs + from roboflow.core.project import Project + data = { "name": project_name, "type": project_type, @@ -143,6 +140,9 @@ def clip_compare(self, dir: str = "", image_ext: str = ".png", target_image: str dict: a key:value mapping of image_name:comparison_score_to_target """ # noqa: E501 // docs + from roboflow.config import CLIP_FEATURIZE_URL + from roboflow.util.active_learning_utils import clip_encode + # list to store comparison results in comparisons = [] # grab all images in a given directory with ext type @@ -176,6 +176,8 @@ def two_stage( # TODO: fix docs dict: a json obj containing the results of the second stage detection """ # noqa: E501 // docs + from PIL import Image + results = [] # create PIL image for cropping @@ -245,6 +247,10 @@ def two_stage_ocr( # TODO: fix docs dict: a json obj containing the results of the second stage detection """ # noqa: E501 // docs + from PIL import Image + + from roboflow.util.two_stage_utils import ocr_infer + results = [] # create PIL image for cropping @@ -307,6 +313,9 @@ def upload_dataset( num_retries (int, optional): number of times to retry uploading an image if the upload fails. Defaults to 0. is_prediction (bool, optional): whether the annotations provided in the dataset are predictions and not ground truth. Defaults to False. """ # noqa: E501 // docs + from roboflow.util import folderparser + from roboflow.util.image_utils import load_labelmap + if dataset_format != "NOT_USED": print("Warning: parameter 'dataset_format' is deprecated and will be removed in a future release") project, created = self._get_or_create_project( @@ -460,6 +469,9 @@ def active_learning( use_localhost: (bool) = determines if local http format used or remote endpoint local_server: (str) = local http address for inference server, use_localhost must be True for this to be used """ # noqa: E501 // docs + from roboflow.config import CLIP_FEATURIZE_URL + from roboflow.util.active_learning_utils import check_box_size, clip_encode, count_comparisons + if inference_endpoint is None: inference_endpoint = [] if conditionals is None: @@ -608,6 +620,9 @@ def deploy_model( filename (str, optional): The name of the weights file. Defaults to "weights/best.pt". """ + from roboflow.util.model_processor import process + from roboflow.util.versions import normalize_yolo_model_type + if not project_ids: raise ValueError("At least one project ID must be provided") @@ -802,6 +817,8 @@ def search_export( ValueError: If both *dataset* and *annotation_group* are provided. RoboflowError: On API errors or export timeout. """ + from roboflow.util.general import extract_zip as _extract_zip + if dataset is not None and annotation_group is not None: raise ValueError("dataset and annotation_group are mutually exclusive; provide only one") @@ -938,6 +955,317 @@ def get_plan(self): return rfapi.get_plan_info(self.__api_key) + # --- Vision Events --- + + def write_vision_event(self, event: Dict[str, Any]) -> dict: + """Create a single vision event. + + The event dict is passed directly to the server with no client-side + validation, so new event types and fields work without an SDK update. + + Args: + event: Event payload containing at minimum ``eventId``, + ``eventType``, ``useCaseId``, and ``timestamp``. + + Returns: + Dict with ``eventId`` and ``created``. + + Example: + >>> ws = rf.workspace() + >>> ws.write_vision_event({ + ... "eventId": "evt-001", + ... "eventType": "quality_check", + ... "useCaseId": "manufacturing-qa", + ... "timestamp": "2024-01-15T10:30:00.000Z", + ... "eventData": {"result": "pass"}, + ... }) + """ + return vision_events_api.write_event( + api_key=self.__api_key, + event=event, + ) + + def write_vision_events_batch(self, events: List[Dict[str, Any]]) -> dict: + """Create multiple vision events in a single request. + + Args: + events: List of event payload dicts (server enforces max 100). + + Returns: + Dict with ``created`` count and ``eventIds`` list. + + Example: + >>> ws = rf.workspace() + >>> ws.write_vision_events_batch([ + ... {"eventId": "e1", "eventType": "custom", "useCaseId": "uc", "timestamp": "2024-01-15T10:00:00Z"}, + ... {"eventId": "e2", "eventType": "custom", "useCaseId": "uc", "timestamp": "2024-01-15T10:01:00Z"}, + ... ]) + """ + return vision_events_api.write_batch( + api_key=self.__api_key, + events=events, + ) + + def query_vision_events( + self, + use_case: str, + *, + event_type: Optional[str] = None, + event_types: Optional[List[str]] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + **filters: Any, + ) -> dict: + """Query vision events with filters and pagination. + + Common filter kwargs are passed through to the server as-is, + supporting ``deviceId``, ``streamId``, ``workflowId``, + ``customMetadataFilters``, ``eventFieldFilters``, etc. + + Args: + use_case: Use case identifier to query. + event_type: Filter by a single event type. + event_types: Filter by multiple event types. + start_time: ISO 8601 start time filter. + end_time: ISO 8601 end time filter. + limit: Maximum number of events to return. + cursor: Pagination cursor from a previous response. + **filters: Additional filter parameters passed to the API. + + Returns: + Dict with ``events``, ``nextCursor``, ``hasMore``, and ``lookbackDays``. + + Example: + >>> ws = rf.workspace() + >>> page = ws.query_vision_events("manufacturing-qa", event_type="quality_check", limit=50) + >>> for evt in page["events"]: + ... print(evt["eventId"]) + """ + payload: Dict[str, Any] = {"useCaseId": use_case} + if event_type is not None: + payload["eventType"] = event_type + if event_types is not None: + payload["eventTypes"] = event_types + if start_time is not None: + payload["startTime"] = start_time + if end_time is not None: + payload["endTime"] = end_time + if limit is not None: + payload["limit"] = limit + if cursor is not None: + payload["cursor"] = cursor + payload.update(filters) + + return vision_events_api.query( + api_key=self.__api_key, + query_params=payload, + ) + + def query_all_vision_events( + self, + use_case: str, + *, + event_type: Optional[str] = None, + event_types: Optional[List[str]] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + limit: Optional[int] = None, + **filters: Any, + ) -> Generator[List[dict], None, None]: + """Paginated query across vision events, yielding one page at a time. + + Automatically follows ``nextCursor`` until all matching events have + been returned. + + Args: + use_case: Use case identifier to query. + event_type: Filter by a single event type. + event_types: Filter by multiple event types. + start_time: ISO 8601 start time filter. + end_time: ISO 8601 end time filter. + limit: Maximum events per page. + **filters: Additional filter parameters passed to the API. + + Yields: + A list of event dicts for each page. + + Example: + >>> ws = rf.workspace() + >>> for page in ws.query_all_vision_events("manufacturing-qa"): + ... for evt in page: + ... print(evt["eventId"]) + """ + cursor = None + while True: + response = self.query_vision_events( + use_case, + event_type=event_type, + event_types=event_types, + start_time=start_time, + end_time=end_time, + limit=limit, + cursor=cursor, + **filters, + ) + events = response.get("events", []) + if not events: + break + yield events + cursor = response.get("nextCursor") + if not cursor or not response.get("hasMore", False): + break + + def list_vision_event_use_cases(self, status: Optional[str] = None) -> dict: + """List all vision event use cases for the workspace. + + Args: + status: Optional status filter (e.g. "active", "inactive"). + + Returns: + Dict with ``useCases`` list and ``lookbackDays``. + + Example: + >>> ws = rf.workspace() + >>> result = ws.list_vision_event_use_cases() + >>> for uc in result["useCases"]: + ... print(uc["name"], uc.get("status")) + """ + result = vision_events_api.list_use_cases( + api_key=self.__api_key, + status=status, + ) + if "useCases" not in result and "solutions" in result: + result["useCases"] = result["solutions"] + return result + + def create_vision_event_use_case(self, name: str) -> dict: + """Create a new vision event use case. + + Args: + name: Human-readable name for the use case. + + Returns: + Dict with ``id`` and ``name``. + + Example: + >>> ws = rf.workspace() + >>> result = ws.create_vision_event_use_case("manufacturing-qa") + >>> use_case_id = result["id"] + """ + return vision_events_api.create_use_case( + api_key=self.__api_key, + name=name, + ) + + def rename_vision_event_use_case(self, use_case: str, name: str) -> dict: + """Rename an existing vision event use case. + + Args: + use_case: Use case identifier. + name: New name for the use case. + + Returns: + Dict with ``id`` and ``name``. + + Example: + >>> ws = rf.workspace() + >>> ws.rename_vision_event_use_case("abc123", "new-name") + """ + return vision_events_api.rename_use_case( + api_key=self.__api_key, + use_case_id=use_case, + name=name, + ) + + def archive_vision_event_use_case(self, use_case: str) -> dict: + """Archive a vision event use case. + + Args: + use_case: Use case identifier. + + Returns: + Dict with ``success``. + + Example: + >>> ws = rf.workspace() + >>> ws.archive_vision_event_use_case("abc123") + """ + return vision_events_api.archive_use_case( + api_key=self.__api_key, + use_case_id=use_case, + ) + + def unarchive_vision_event_use_case(self, use_case: str) -> dict: + """Unarchive a vision event use case. + + Args: + use_case: Use case identifier. + + Returns: + Dict with ``success``. + + Example: + >>> ws = rf.workspace() + >>> ws.unarchive_vision_event_use_case("abc123") + """ + return vision_events_api.unarchive_use_case( + api_key=self.__api_key, + use_case_id=use_case, + ) + + def get_vision_event_metadata_schema(self, use_case: str) -> dict: + """Get the custom metadata schema for a vision event use case. + + Returns discovered field names and their types, useful for building + queries with ``customMetadataFilters``. + + Args: + use_case: Use case identifier. + + Returns: + Dict with ``fields`` mapping field names to ``{"types": [...]}``. + + Example: + >>> ws = rf.workspace() + >>> schema = ws.get_vision_event_metadata_schema("manufacturing-qa") + >>> for field, info in schema["fields"].items(): + ... print(field, info["types"]) + """ + return vision_events_api.get_custom_metadata_schema( + api_key=self.__api_key, + use_case_id=use_case, + ) + + def upload_vision_event_image( + self, + image_path: str, + name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> dict: + """Upload an image for use in vision events. + + Args: + image_path: Local path to the image file. + name: Optional custom name for the image. + metadata: Optional flat dict of metadata to attach. + + Returns: + Dict with ``sourceId`` for referencing in events. + + Example: + >>> ws = rf.workspace() + >>> result = ws.upload_vision_event_image("photo.jpg") + >>> source_id = result["sourceId"] + """ + return vision_events_api.upload_image( + api_key=self.__api_key, + image_path=image_path, + name=name, + metadata=metadata, + ) + def __str__(self): projects = self.projects() json_value = {"name": self.name, "url": self.url, "projects": projects} diff --git a/setup_slim.py b/setup_slim.py new file mode 100644 index 00000000..cf93fca9 --- /dev/null +++ b/setup_slim.py @@ -0,0 +1,54 @@ +import re + +import setuptools +from setuptools import find_packages + +with open("./roboflow/__init__.py") as f: + content = f.read() +_search_version = re.search(r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]', content) +assert _search_version +version = _search_version.group(1) + + +with open("README.md") as fh: + long_description = fh.read() + +with open("requirements-slim.txt") as fh: + install_requires = fh.read().split("\n") + +setuptools.setup( + name="roboflow-slim", + version=version, + author="Roboflow", + author_email="support@roboflow.com", + description="Lightweight Roboflow SDK for vision events, workspace management, and CLI", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/roboflow-ai/roboflow-python", + install_requires=install_requires, + packages=find_packages(exclude=("tests",)), + extras_require={ + "dev": [ + "mypy", + "responses", + "ruff", + "twine", + "types-pyyaml", + "types-requests", + "types-setuptools", + "types-tqdm", + "wheel", + ], + }, + entry_points={ + "console_scripts": [ + "roboflow=roboflow.roboflowpy:main", + ], + }, + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + ], + python_requires=">=3.10", +) diff --git a/tests/test_project.py b/tests/test_project.py index 4568ed7e..87dbde92 100644 --- a/tests/test_project.py +++ b/tests/test_project.py @@ -61,15 +61,15 @@ def _setup_upload_dataset_mocks( # Create the mock objects mocks = { - "parser": patch("roboflow.core.workspace.folderparser.parsefolder", return_value=test_dataset), - "upload": patch("roboflow.core.workspace.Project.upload_image", side_effect=upload_image_side_effect) + "parser": patch("roboflow.util.folderparser.parsefolder", return_value=test_dataset), + "upload": patch("roboflow.core.project.Project.upload_image", side_effect=upload_image_side_effect) if upload_image_side_effect - else patch("roboflow.core.workspace.Project.upload_image", return_value=image_return), + else patch("roboflow.core.project.Project.upload_image", return_value=image_return), "save_annotation": patch( - "roboflow.core.workspace.Project.save_annotation", side_effect=save_annotation_side_effect + "roboflow.core.project.Project.save_annotation", side_effect=save_annotation_side_effect ) if save_annotation_side_effect - else patch("roboflow.core.workspace.Project.save_annotation", return_value=annotation_return), + else patch("roboflow.core.project.Project.save_annotation", return_value=annotation_return), "get_project": patch( "roboflow.core.workspace.Workspace._get_or_create_project", return_value=(self.project, project_created) ), @@ -348,7 +348,7 @@ def test_project_upload_dataset(self): "extra_mocks": [ ( "load_labelmap", - "roboflow.core.workspace.load_labelmap", + "roboflow.util.image_utils.load_labelmap", {"return_value": {"old_label": "new_label"}}, ) ], @@ -650,13 +650,13 @@ def capture_annotation_calls(annotation_path, **kwargs): return ({"success": True}, 0.1, 0) mocks = { - "parser": patch("roboflow.core.workspace.folderparser.parsefolder", return_value=parsed_dataset), + "parser": patch("roboflow.util.folderparser.parsefolder", return_value=parsed_dataset), "upload": patch( - "roboflow.core.workspace.Project.upload_image", + "roboflow.core.project.Project.upload_image", return_value=({"id": "test-id", "success": True}, 0.1, 0), ), "save_annotation": patch( - "roboflow.core.workspace.Project.save_annotation", side_effect=capture_annotation_calls + "roboflow.core.project.Project.save_annotation", side_effect=capture_annotation_calls ), "get_project": patch( "roboflow.core.workspace.Workspace._get_or_create_project", return_value=(self.project, False) @@ -737,13 +737,13 @@ def capture_annotation_calls(annotation_path, **kwargs): return ({"success": True}, 0.1, 0) mocks = { - "parser": patch("roboflow.core.workspace.folderparser.parsefolder", return_value=parsed_dataset), + "parser": patch("roboflow.util.folderparser.parsefolder", return_value=parsed_dataset), "upload": patch( - "roboflow.core.workspace.Project.upload_image", + "roboflow.core.project.Project.upload_image", return_value=({"id": "test-id", "success": True}, 0.1, 0), ), "save_annotation": patch( - "roboflow.core.workspace.Project.save_annotation", side_effect=capture_annotation_calls + "roboflow.core.project.Project.save_annotation", side_effect=capture_annotation_calls ), "get_project": patch( "roboflow.core.workspace.Workspace._get_or_create_project", return_value=(self.project, False) diff --git a/tests/test_slim_compat.py b/tests/test_slim_compat.py new file mode 100644 index 00000000..972eeacf --- /dev/null +++ b/tests/test_slim_compat.py @@ -0,0 +1,84 @@ +"""Tests for slim install compatibility. + +Verifies that the package can be imported and lightweight features work +even when heavy dependencies (PIL, opencv, numpy, matplotlib) are missing. + +In a full install, these tests verify the guards don't break normal behavior. +In a slim install, they verify graceful degradation. +""" + +import unittest + + +class TestSlimImport(unittest.TestCase): + """Verify that importing the package always succeeds.""" + + def test_import_roboflow(self): + import roboflow + + self.assertIsNotNone(roboflow.__version__) + + def test_import_vision_events_adapter(self): + from roboflow.adapters import vision_events_api + + self.assertTrue(callable(vision_events_api.write_event)) + self.assertTrue(callable(vision_events_api.write_batch)) + self.assertTrue(callable(vision_events_api.query)) + self.assertTrue(callable(vision_events_api.list_use_cases)) + self.assertTrue(callable(vision_events_api.get_custom_metadata_schema)) + self.assertTrue(callable(vision_events_api.upload_image)) + + def test_import_config(self): + from roboflow.config import API_URL + + self.assertIsInstance(API_URL, str) + + def test_import_rfapi(self): + from roboflow.adapters.rfapi import RoboflowError + + self.assertTrue(issubclass(RoboflowError, Exception)) + + def test_import_cli(self): + from roboflow.cli import app + + self.assertIsNotNone(app) + + +class TestSlimGracefulDegradation(unittest.TestCase): + """Verify that heavy features fail with clear errors when deps are missing. + + These tests only exercise the error path when PIL/opencv are absent. + In a full install they verify the guard exists but doesn't fire. + """ + + def test_workspace_always_available(self): + """Workspace imports cleanly even in slim mode.""" + import roboflow + + self.assertIsNotNone(roboflow.Workspace) + self.assertTrue(callable(roboflow.Workspace)) + + def test_project_guarded(self): + """Project is either a real class (full) or None (slim).""" + import roboflow + + self.assertTrue(roboflow.Project is None or callable(roboflow.Project)) + + def test_roboflow_project_guard(self): + """If Project is None (slim), calling project() raises ImportError.""" + import roboflow + + if roboflow.Project is not None: + self.skipTest("Full install, Project is available") + + rf = roboflow.Roboflow.__new__(roboflow.Roboflow) + rf.api_key = "test" + rf.current_workspace = "test" + + with self.assertRaises(ImportError) as ctx: + rf.project("test-project") + self.assertIn("pip install roboflow", str(ctx.exception)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_vision_events.py b/tests/test_vision_events.py new file mode 100644 index 00000000..0ab2bb7c --- /dev/null +++ b/tests/test_vision_events.py @@ -0,0 +1,636 @@ +import json +import os +import tempfile +import unittest + +import responses + +from roboflow.adapters.rfapi import RoboflowError +from roboflow.config import API_URL + +# The vision events API does not include workspace in the URL. +# Auth is via Bearer token; workspace is derived server-side from the API key. +_BASE = f"{API_URL}/vision-events" + + +class TestVisionEvents(unittest.TestCase): + API_KEY = "test_key" + WORKSPACE = "test-ws" + + def _make_workspace(self): + from roboflow.core.workspace import Workspace + + info = { + "workspace": { + "name": "Test", + "url": self.WORKSPACE, + "projects": [], + "members": [], + } + } + return Workspace(info, api_key=self.API_KEY, default_workspace=self.WORKSPACE, model_format="yolov8") + + def _assert_bearer_auth(self, call_index=0): + auth = responses.calls[call_index].request.headers.get("Authorization") + self.assertEqual(auth, f"Bearer {self.API_KEY}") + + # --- write_vision_event --- + + @responses.activate + def test_write_event(self): + responses.add(responses.POST, _BASE, json={"eventId": "evt-001"}, status=201) + + ws = self._make_workspace() + event = { + "eventId": "evt-001", + "eventType": "quality_check", + "useCaseId": "uc-1", + "timestamp": "2024-01-15T10:00:00Z", + "eventData": {"result": "pass"}, + } + result = ws.write_vision_event(event) + + self.assertEqual(result["eventId"], "evt-001") + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["eventId"], "evt-001") + self.assertEqual(sent["eventType"], "quality_check") + self.assertEqual(sent["useCaseId"], "uc-1") + self.assertEqual(sent["eventData"], {"result": "pass"}) + + @responses.activate + def test_write_event_passthrough(self): + """The event dict must be sent to the server unchanged (no filtering or transformation).""" + responses.add(responses.POST, _BASE, json={"eventId": "e1"}, status=201) + + ws = self._make_workspace() + event = { + "eventId": "e1", + "eventType": "safety_alert", + "useCaseId": "warehouse-safety", + "timestamp": "2024-06-01T12:00:00Z", + "deviceId": "cam-5", + "streamId": "stream-a", + "workflowId": "wf-1", + "images": [{"sourceId": "src-1", "label": "frame"}], + "eventData": {"alertType": "fire", "severity": "high"}, + "customMetadata": {"zone": "B3", "temperature": 42.5, "active": True}, + } + ws.write_vision_event(event) + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent, event) + + @responses.activate + def test_write_event_error(self): + responses.add(responses.POST, _BASE, json={"error": "forbidden"}, status=403) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.write_vision_event({"eventId": "x", "eventType": "custom", "useCaseId": "s", "timestamp": "t"}) + + # --- write_vision_events_batch --- + + @responses.activate + def test_write_batch(self): + responses.add(responses.POST, f"{_BASE}/batch", json={"created": 2, "eventIds": ["e1", "e2"]}, status=201) + + ws = self._make_workspace() + events = [ + {"eventId": "e1", "eventType": "custom", "useCaseId": "s", "timestamp": "2024-01-15T10:00:00Z"}, + {"eventId": "e2", "eventType": "custom", "useCaseId": "s", "timestamp": "2024-01-15T10:01:00Z"}, + ] + result = ws.write_vision_events_batch(events) + + self.assertEqual(result["created"], 2) + self.assertEqual(result["eventIds"], ["e1", "e2"]) + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(len(sent["events"]), 2) + + @responses.activate + def test_write_batch_error(self): + responses.add(responses.POST, f"{_BASE}/batch", json={"error": "validation"}, status=400) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.write_vision_events_batch([{"bad": "event"}]) + + # --- query_vision_events --- + + @responses.activate + def test_query_basic(self): + body = { + "events": [{"eventId": "e1"}, {"eventId": "e2"}], + "nextCursor": None, + "hasMore": False, + "lookbackDays": 14, + } + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + result = ws.query_vision_events("my-use-case") + + self.assertEqual(len(result["events"]), 2) + self.assertFalse(result["hasMore"]) + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["useCaseId"], "my-use-case") + + @responses.activate + def test_query_with_filters(self): + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + ws.query_vision_events( + "my-uc", + event_type="quality_check", + start_time="2024-01-01T00:00:00Z", + end_time="2024-02-01T00:00:00Z", + limit=10, + cursor="abc123", + deviceId={"operator": "eq", "value": "cam-01"}, + ) + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["useCaseId"], "my-uc") + self.assertEqual(sent["eventType"], "quality_check") + self.assertEqual(sent["startTime"], "2024-01-01T00:00:00Z") + self.assertEqual(sent["endTime"], "2024-02-01T00:00:00Z") + self.assertEqual(sent["limit"], 10) + self.assertEqual(sent["cursor"], "abc123") + self.assertEqual(sent["deviceId"], {"operator": "eq", "value": "cam-01"}) + + @responses.activate + def test_query_with_event_types_plural(self): + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + ws.query_vision_events("uc", event_types=["quality_check", "safety_alert"]) + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["eventTypes"], ["quality_check", "safety_alert"]) + self.assertNotIn("eventType", sent) + + @responses.activate + def test_query_omits_none_params(self): + """Optional params that are None must not appear in the payload.""" + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + ws.query_vision_events("uc") + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent, {"useCaseId": "uc"}) + + @responses.activate + def test_query_error(self): + responses.add(responses.POST, f"{_BASE}/query", json={"error": "unauthorized"}, status=401) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.query_vision_events("my-uc") + + # --- query_all_vision_events --- + + @responses.activate + def test_query_all_single_page(self): + body = { + "events": [{"eventId": "e1"}], + "nextCursor": None, + "hasMore": False, + "lookbackDays": 14, + } + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + pages = list(ws.query_all_vision_events("my-uc")) + + self.assertEqual(len(pages), 1) + self.assertEqual(pages[0][0]["eventId"], "e1") + + @responses.activate + def test_query_all_multiple_pages(self): + page1 = {"events": [{"eventId": "e1"}], "nextCursor": "cursor2", "hasMore": True, "lookbackDays": 14} + page2 = {"events": [{"eventId": "e2"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=page1, status=200) + responses.add(responses.POST, f"{_BASE}/query", json=page2, status=200) + + ws = self._make_workspace() + pages = list(ws.query_all_vision_events("my-uc")) + + self.assertEqual(len(pages), 2) + self.assertEqual(pages[0][0]["eventId"], "e1") + self.assertEqual(pages[1][0]["eventId"], "e2") + + # Verify cursor was sent in second request + sent2 = json.loads(responses.calls[1].request.body) + self.assertEqual(sent2["cursor"], "cursor2") + + @responses.activate + def test_query_all_forwards_filters(self): + """Filters must be forwarded to every page request, not just the first.""" + page1 = {"events": [{"eventId": "e1"}], "nextCursor": "c2", "hasMore": True, "lookbackDays": 14} + page2 = {"events": [{"eventId": "e2"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=page1, status=200) + responses.add(responses.POST, f"{_BASE}/query", json=page2, status=200) + + ws = self._make_workspace() + list(ws.query_all_vision_events("uc", event_type="quality_check", limit=1)) + + sent1 = json.loads(responses.calls[0].request.body) + sent2 = json.loads(responses.calls[1].request.body) + + # Both requests should have the filter + self.assertEqual(sent1["eventType"], "quality_check") + self.assertEqual(sent2["eventType"], "quality_check") + # Second request should also have the cursor + self.assertNotIn("cursor", sent1) + self.assertEqual(sent2["cursor"], "c2") + + @responses.activate + def test_query_all_empty(self): + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + pages = list(ws.query_all_vision_events("my-uc")) + + self.assertEqual(len(pages), 0) + + # --- list_vision_event_use_cases --- + + @responses.activate + def test_list_use_cases(self): + body = { + "useCases": [ + {"id": "uc-1", "name": "QA", "status": "active"}, + ], + "lookbackDays": 14, + } + responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200) + + ws = self._make_workspace() + result = ws.list_vision_event_use_cases() + + self.assertEqual(len(result["useCases"]), 1) + self.assertEqual(result["useCases"][0]["name"], "QA") + self._assert_bearer_auth() + + @responses.activate + def test_list_use_cases_with_status(self): + body = {"useCases": [], "lookbackDays": 14} + responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200) + + ws = self._make_workspace() + result = ws.list_vision_event_use_cases(status="inactive") + + self.assertEqual(len(result["useCases"]), 0) + # Verify status was sent as query param + self.assertIn("status=inactive", responses.calls[0].request.url) + + @responses.activate + def test_list_use_cases_legacy_solutions_response(self): + responses.add( + responses.GET, + f"{_BASE}/use-cases", + json={"solutions": [{"id": "uc-legacy", "name": "Legacy"}], "lookbackDays": 14}, + status=200, + ) + + ws = self._make_workspace() + result = ws.list_vision_event_use_cases() + self.assertEqual(result["useCases"][0]["id"], "uc-legacy") + + @responses.activate + def test_list_use_cases_error(self): + responses.add(responses.GET, f"{_BASE}/use-cases", json={"error": "forbidden"}, status=403) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.list_vision_event_use_cases() + + # --- create_vision_event_use_case --- + + @responses.activate + def test_create_use_case(self): + responses.add( + responses.POST, + f"{_BASE}/use-cases", + json={"id": "new-uc", "name": "My Use Case"}, + status=201, + ) + + ws = self._make_workspace() + result = ws.create_vision_event_use_case("My Use Case") + + self.assertEqual(result["id"], "new-uc") + self.assertEqual(result["name"], "My Use Case") + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["name"], "My Use Case") + + @responses.activate + def test_create_use_case_error(self): + responses.add(responses.POST, f"{_BASE}/use-cases", json={"error": "duplicate name"}, status=409) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.create_vision_event_use_case("Existing Name") + + # --- rename_vision_event_use_case --- + + @responses.activate + def test_rename_use_case(self): + responses.add( + responses.PUT, + f"{_BASE}/use-cases/uc-1", + json={"id": "uc-1", "name": "Renamed"}, + status=200, + ) + + ws = self._make_workspace() + result = ws.rename_vision_event_use_case("uc-1", "Renamed") + + self.assertEqual(result["id"], "uc-1") + self.assertEqual(result["name"], "Renamed") + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["name"], "Renamed") + + @responses.activate + def test_rename_use_case_error(self): + responses.add(responses.PUT, f"{_BASE}/use-cases/nonexistent", json={"error": "not found"}, status=404) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.rename_vision_event_use_case("nonexistent", "New Name") + + # --- archive_vision_event_use_case --- + + @responses.activate + def test_archive_use_case(self): + responses.add( + responses.POST, + f"{_BASE}/use-cases/uc-1/archive", + json={"success": True}, + status=200, + ) + + ws = self._make_workspace() + result = ws.archive_vision_event_use_case("uc-1") + + self.assertTrue(result["success"]) + self._assert_bearer_auth() + + @responses.activate + def test_archive_use_case_error(self): + responses.add(responses.POST, f"{_BASE}/use-cases/nonexistent/archive", json={"error": "not found"}, status=404) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.archive_vision_event_use_case("nonexistent") + + # --- unarchive_vision_event_use_case --- + + @responses.activate + def test_unarchive_use_case(self): + responses.add( + responses.POST, + f"{_BASE}/use-cases/uc-1/unarchive", + json={"success": True}, + status=200, + ) + + ws = self._make_workspace() + result = ws.unarchive_vision_event_use_case("uc-1") + + self.assertTrue(result["success"]) + self._assert_bearer_auth() + + @responses.activate + def test_unarchive_use_case_error(self): + responses.add(responses.POST, f"{_BASE}/use-cases/uc-1/unarchive", json={"error": "already active"}, status=400) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.unarchive_vision_event_use_case("uc-1") + + # --- get_vision_event_metadata_schema --- + + @responses.activate + def test_get_metadata_schema(self): + body = { + "useCaseId": "manufacturing-qa", + "fields": { + "temperature": {"types": ["number"]}, + "zone": {"types": ["string"]}, + "active": {"types": ["boolean"]}, + }, + } + responses.add( + responses.GET, + f"{_BASE}/custom-metadata-schema/manufacturing-qa", + json=body, + status=200, + ) + + ws = self._make_workspace() + result = ws.get_vision_event_metadata_schema("manufacturing-qa") + + self.assertEqual(len(result["fields"]), 3) + self.assertEqual(result["fields"]["temperature"]["types"], ["number"]) + self._assert_bearer_auth() + + @responses.activate + def test_get_metadata_schema_error(self): + responses.add( + responses.GET, + f"{_BASE}/custom-metadata-schema/nonexistent", + json={"error": "not found"}, + status=404, + ) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.get_vision_event_metadata_schema("nonexistent") + + # --- upload_vision_event_image --- + + @responses.activate + def test_upload_image(self): + responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-123"}, status=201) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(b"\xff\xd8\xff\xe0fake-jpeg-data") + tmp_path = f.name + + try: + result = ws.upload_vision_event_image(tmp_path) + self.assertEqual(result["sourceId"], "src-123") + self._assert_bearer_auth() + finally: + os.unlink(tmp_path) + + @responses.activate + def test_upload_image_uses_basename(self): + """When no name is provided, the multipart filename should be the basename of the path.""" + responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-789"}, status=201) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False, prefix="myimage_") as f: + f.write(b"\xff\xd8\xff\xe0fake") + tmp_path = f.name + + try: + ws.upload_vision_event_image(tmp_path) + request_body = responses.calls[0].request.body + basename = os.path.basename(tmp_path).encode() + if isinstance(request_body, bytes): + self.assertIn(basename, request_body) + finally: + os.unlink(tmp_path) + + @responses.activate + def test_upload_image_with_metadata(self): + responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-456"}, status=201) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: + f.write(b"\x89PNGfake-png-data") + tmp_path = f.name + + try: + result = ws.upload_vision_event_image( + tmp_path, + name="custom-name.png", + metadata={"camera_id": "cam-01"}, + ) + self.assertEqual(result["sourceId"], "src-456") + + request_body = responses.calls[0].request.body + # Verify metadata and name were included in the multipart body + if isinstance(request_body, bytes): + self.assertIn(b"cam-01", request_body) + self.assertIn(b"custom-name.png", request_body) + finally: + os.unlink(tmp_path) + + @responses.activate + def test_upload_image_error(self): + responses.add(responses.POST, f"{_BASE}/upload", json={"error": "forbidden"}, status=403) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(b"data") + tmp_path = f.name + + try: + with self.assertRaises(RoboflowError): + ws.upload_vision_event_image(tmp_path) + finally: + os.unlink(tmp_path) + + +class TestVisionEventsAdapter(unittest.TestCase): + """Tests that call the adapter directly (no Workspace). Work in slim installs.""" + + API_KEY = "test_key" + + @responses.activate + def test_adapter_write_event(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, _BASE, json={"eventId": "e1", "created": True}, status=201) + result = vision_events_api.write_event( + self.API_KEY, {"eventId": "e1", "eventType": "custom", "useCaseId": "uc"} + ) + self.assertEqual(result["eventId"], "e1") + self.assertEqual(responses.calls[0].request.headers["Authorization"], f"Bearer {self.API_KEY}") + + @responses.activate + def test_adapter_write_batch(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, f"{_BASE}/batch", json={"created": 1, "eventIds": ["e1"]}, status=201) + result = vision_events_api.write_batch(self.API_KEY, [{"eventId": "e1"}]) + self.assertEqual(result["created"], 1) + + @responses.activate + def test_adapter_query(self): + from roboflow.adapters import vision_events_api + + body = {"events": [{"eventId": "e1"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + result = vision_events_api.query(self.API_KEY, {"useCaseId": "uc"}) + self.assertEqual(len(result["events"]), 1) + + @responses.activate + def test_adapter_list_use_cases(self): + from roboflow.adapters import vision_events_api + + body = {"useCases": [{"id": "uc-1", "name": "QA"}], "lookbackDays": 14} + responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200) + result = vision_events_api.list_use_cases(self.API_KEY) + self.assertEqual(len(result["useCases"]), 1) + + @responses.activate + def test_adapter_get_metadata_schema(self): + from roboflow.adapters import vision_events_api + + body = {"useCaseId": "uc-1", "fields": {"temp": {"types": ["number"]}}} + responses.add(responses.GET, f"{_BASE}/custom-metadata-schema/uc-1", json=body, status=200) + result = vision_events_api.get_custom_metadata_schema(self.API_KEY, "uc-1") + self.assertEqual(result["fields"]["temp"]["types"], ["number"]) + + @responses.activate + def test_adapter_create_use_case(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, f"{_BASE}/use-cases", json={"id": "uc-new", "name": "Test"}, status=201) + result = vision_events_api.create_use_case(self.API_KEY, "Test") + self.assertEqual(result["id"], "uc-new") + + @responses.activate + def test_adapter_rename_use_case(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.PUT, f"{_BASE}/use-cases/uc-1", json={"id": "uc-1", "name": "New"}, status=200) + result = vision_events_api.rename_use_case(self.API_KEY, "uc-1", "New") + self.assertEqual(result["name"], "New") + + @responses.activate + def test_adapter_archive_use_case(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, f"{_BASE}/use-cases/uc-1/archive", json={"success": True}, status=200) + result = vision_events_api.archive_use_case(self.API_KEY, "uc-1") + self.assertTrue(result["success"]) + + @responses.activate + def test_adapter_unarchive_use_case(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, f"{_BASE}/use-cases/uc-1/unarchive", json={"success": True}, status=200) + result = vision_events_api.unarchive_use_case(self.API_KEY, "uc-1") + self.assertTrue(result["success"]) + + @responses.activate + def test_adapter_error_raises_roboflow_error(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, _BASE, json={"error": "forbidden"}, status=403) + with self.assertRaises(RoboflowError): + vision_events_api.write_event(self.API_KEY, {"eventId": "x"}) + + +if __name__ == "__main__": + unittest.main()