Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,8 @@ Code reference: [`scheduler_register`][dp3.common.callback_registrar.CallbackReg

Most user-facing hooks return `list[DataPointTask]`.
Whenever that happens, the returned tasks are fed back into the main ingestion system.
Each returned task must do useful work: it must contain at least one datapoint, carry non-empty TTL tokens, or be a delete task.
Empty `DataPointTask` objects are rejected during validation because they would be queued and processed without changing DP3 state.

This creates a feedback loop:

Expand Down
163 changes: 163 additions & 0 deletions docs/howto/test-module.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# Test a secondary module

DP3 includes helpers for writing focused unit tests for secondary modules without running a full
worker, database, message broker, or snapshot scheduler.

Use [`DP3ModuleTestCase`][dp3.testing.DP3ModuleTestCase] when you want to instantiate a
module with the application's real `db_entities` model and then call registered hooks directly.
The test registrar captures callbacks during module initialization and exposes runners for the
common hook families.

The config directory is read from the `DP3_CONFIG_DIR` environment variable unless a test class
sets `config_dir` explicitly. Module configuration is read from `modules.<module_name>` in that
config by default, where `<module_name>` is inferred from the module class' Python module name.

```bash
DP3_CONFIG_DIR=config python -m unittest discover -s tests -v
```

## Basic pattern

```python
from unittest.mock import patch

from dp3.testing import DP3ModuleTestCase
from modules.ip_exposure_profile import IPExposureProfile


class TestIPExposureProfile(DP3ModuleTestCase):
module_class = IPExposureProfile

def test_open_port_creates_service_and_link(self):
dp = self.make_observation_datapoint("ip", "192.0.2.1", "open_ports", 443)

tasks = self.run_on_new_attr("ip", "open_ports", "192.0.2.1", dp)

self.assertDatapoint(tasks, etype="service", eid="192.0.2.1:443", attr="guessed_type")
self.assertDatapoint(tasks, etype="ip", eid="192.0.2.1", attr="services")

def test_updater_uses_mocked_external_lookup(self):
with patch.object(self.module, "_fetch_service_intel", return_value={"risk": "high"}):
tasks = self.run_periodic_update(
"service",
"192.0.2.1:443",
{"eid": "192.0.2.1:443", "guessed_type": {"v": "https"}},
hook_id="service_intel",
)

self.assertDatapoint(tasks, attr="external_risk", v="high")
```

## What the helper provides

`DP3ModuleTestCase`:

- loads `db_entities` from `DP3_CONFIG_DIR` or `config_dir` and builds a real `ModelSpec`,
- creates a minimal `PlatformConfig`,
- instantiates `module_class` with a test registrar,
- creates validated `DataPointTask` and plain, observation, or timeseries datapoint objects using
the loaded model,
- calls registered hooks directly,
- provides partial-match assertions for emitted tasks, datapoints, and mutated records.

The helper is intended for module-level unit tests. It does not run a database, task queues,
worker processes, recursive task ingestion, or full linked snapshot loading.

## Datapoint helpers

Use the datapoint helpers to build values accepted by the loaded model specification:

```python
plain = self.make_plain_datapoint("ip", "192.0.2.1", "hostname", "host.example")
observation = self.make_observation_datapoint("ip", "192.0.2.1", "open_ports", 443)
timeseries = self.make_timeseries_datapoint(
"ip",
"192.0.2.1",
"traffic",
{"packets": [1, 2, 3], "bytes": [100, 200, 300]},
)
```

For regular timeseries attributes, `make_timeseries_datapoint()` infers `t2` from `t1`, the
configured `time_step`, and the number of samples when `t2` is not supplied.

## Hook runners

Common runners are available on the test case:

- `run_allow_entity_creation(entity, eid, task=None)`
- `run_on_entity_creation(entity, eid, task=None)`
- `run_on_new_attr(entity, attr, eid, dp)`
- `run_correlation_hooks(entity_type, record, master_record=None)`
- `run_periodic_update(entity_type, eid, master_record, hook_id=None)`
- `run_periodic_eid_update(entity_type, eid, hook_id=None)`
- `run_scheduler_job(job_id_or_func)`

Correlation tests pass the snapshot `record` explicitly. The record must contain `eid`.
Creation and attribute hooks registered with `refresh` are also available through
`run_correlation_hooks()` and their refresh flags are unset by `run_snapshot_finalize_hooks()`.
Periodic record update tests pass the raw persisted `master_record`; plain attributes are stored
under dictionaries such as `{"v": ...}`.
Scheduler jobs can be selected by the id returned from `scheduler_register()`, callable, or callable name.

Hook runners call module hooks directly and intentionally propagate hook exceptions to the test.
The DP3 runtime is more resilient: worker, snapshot, and updater processing logs module errors and
continues so one faulty secondary module hook does not stop processing. Unit tests use stricter
behavior so failures are visible at assertion time and test authors can verify exceptional paths
without searching runtime logs.

## Assertions

Assertions use partial matching: only fields supplied in the expected values are checked.

```python
self.assertDatapoint(tasks, etype="ip", attr="hostname", v="example.test")
self.assertTaskEmitted(tasks, etype="ip", eid="192.0.2.1")
self.assertNoTasks(tasks)
self.assertNoDatapoints(tasks)
self.assertRecordContains(record, exposure_score=10)
self.assertRecordAttr(record, "exposure_score", 10)
self.assertRecordUnchanged(before, after)
```

Snake-case aliases are also available: `assert_datapoint`, `assert_task_emitted`,
`assert_no_tasks`, `assert_no_datapoints`, `assert_record_contains`, `assert_record_attr`, and
`assert_record_unchanged`.

## Registration assertions

Use registration assertions when a test needs to verify callback coverage or dynamic hook
registration.

```python
self.assert_registered("on_new_attr", entity="ip", attr="hostname")
self.assert_registered_once("correlation", entity_type="service")
self.assert_registered_attrs("service", expected_service_attrs)
self.assert_scheduler_registered(func="reload_ip_groups", minute="*/10")
```

`assert_scheduler_registered()` accepts scheduler fields such as `minute`, `hour`, and `second`,
along with `func` for matching the registered callable by object or function name.

## Mocking external dependencies

Patch external constructors or functions before module instantiation when the dependency is created
in `__init__` or `load_config`:

```python
class TestDNSModule(DP3ModuleTestCase):
module_class = DNSModule

def setUp(self):
self.resolver_patcher = patch("modules.dns_module.Resolver", FakeResolver)
self.resolver_patcher.start()
self.addCleanup(self.resolver_patcher.stop)
super().setUp()
```

If patching is not convenient, use a test subclass as `module_class` and override the module's
initialization or dependency construction while keeping the hook methods under test unchanged.

Deprecated registrar methods (`register_entity_hook` and `register_attr_hook`) are supported by the
test registrar and emit `DeprecationWarning`. Prefer the modern registration methods in new module
code and tests.
10 changes: 10 additions & 0 deletions docs/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ and configuration, see the [updater configuration](configuration/updater.md) pag
- [`scheduler_register(...)`](hooks.md#scheduler_register) — CRON-style module-level
scheduled callback for maintenance, polling, housekeeping, or shared-state reloads.

## Testing modules

Secondary modules can be unit-tested without running a full DP3 worker by using
[`DP3ModuleTestCase`][dp3.testing.DP3ModuleTestCase]. The helper loads an application's
real `db_entities` model from `DP3_CONFIG_DIR` or an explicit test fixture path, instantiates a
module with a test callback registrar, and lets tests call registered hooks directly with validated
`DataPointTask` and datapoint objects.

See [Test a secondary module](howto/test-module.md) for examples and supported hook runners.

## Running module code in a separate thread

The module is free to run its own code in separate threads or processes.
Expand Down
16 changes: 7 additions & 9 deletions dp3/common/callback_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
from dp3.common.config import ModelSpec, PlatformConfig, read_config_dir
from dp3.common.datapoint import DataPointType
from dp3.common.datatype import AnyEidT
from dp3.common.hook_types import ATTR_TYPE_TO_ON_NEW_HOOK
from dp3.common.scheduler import Scheduler
from dp3.common.state import SharedFlag
from dp3.common.task import DataPointTask
from dp3.common.task import DataPointTask, task_context
from dp3.common.types import ParsedTimedelta
from dp3.core.updater import Updater
from dp3.snapshots.snapshooter import SnapShooter
Expand Down Expand Up @@ -57,7 +58,8 @@ def on_entity_creation_in_snapshots(
if not run_flag.isset():
return []
eid = record["eid"]
mock_task = DataPointTask(etype=etype, eid=eid, data_points=[])
with task_context(model_spec, allow_empty_data_point_task=True):
mock_task = DataPointTask(etype=etype, eid=eid, data_points=[])
tasks = original_hook(eid, mock_task)
write_datapoints_into_record(model_spec, tasks, record)
return tasks
Expand All @@ -74,7 +76,8 @@ def on_attr_change_in_snapshots(
if not run_flag.isset():
return []
eid = record["eid"]
mock_task = DataPointTask(etype=etype, eid=eid, data_points=[])
with task_context(model_spec, allow_empty_data_point_task=True):
mock_task = DataPointTask(etype=etype, eid=eid, data_points=[])
tasks = original_hook(eid, mock_task)
if isinstance(tasks, list):
write_datapoints_into_record(model_spec, tasks, record)
Expand Down Expand Up @@ -131,11 +134,6 @@ def __init__(

self.log = logging.getLogger(self.__class__.__name__)
self.model_spec = task_executor.model_spec
self.attr_spec_t_to_on_attr = {
AttrType.PLAIN: "on_new_plain",
AttrType.OBSERVATIONS: "on_new_observation",
AttrType.TIMESERIES: "on_new_ts_chunk",
}

def scheduler_register(
self,
Expand Down Expand Up @@ -296,7 +294,7 @@ def register_on_new_attr_hook(
ValueError: If entity and attr do not specify a valid attribute, a ValueError is raised.
"""
try:
hook_type = self.attr_spec_t_to_on_attr[self.model_spec.attributes[entity, attr].t]
hook_type = ATTR_TYPE_TO_ON_NEW_HOOK[self.model_spec.attributes[entity, attr].t]
except KeyError as e:
raise ValueError(
f"Cannot register hook for attribute {entity}/{attr}, are you sure it exists?"
Expand Down
9 changes: 9 additions & 0 deletions dp3/common/hook_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Shared hook type constants."""

from dp3.common.attrspec import AttrType

ATTR_TYPE_TO_ON_NEW_HOOK = {
AttrType.PLAIN: "on_new_plain",
AttrType.OBSERVATIONS: "on_new_observation",
AttrType.TIMESERIES: "on_new_ts_chunk",
}
25 changes: 23 additions & 2 deletions dp3/common/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
Tag,
TypeAdapter,
ValidationError,
ValidationInfo,
field_validator,
model_validator,
)
from pydantic_core.core_schema import FieldValidationInfo

Expand All @@ -40,9 +42,16 @@ def HASH(key: str) -> int:


@contextmanager
def task_context(model_spec: ModelSpec) -> Iterator[None]:
def task_context(
model_spec: ModelSpec, *, allow_empty_data_point_task: bool = False
) -> Iterator[None]:
"""Context manager for setting the `model_spec` context variable."""
token = _init_context_var.set({"model_spec": model_spec})
token = _init_context_var.set(
{
"model_spec": model_spec,
"allow_empty_data_point_task": allow_empty_data_point_task,
}
)
try:
yield
finally:
Expand Down Expand Up @@ -182,6 +191,18 @@ def validate_eid(cls, v, info: FieldValidationInfo):
else:
raise AssertionError("Missing `model_spec` in context")

@model_validator(mode="after")
def validate_not_empty(self, info: ValidationInfo):
context = info.context
if context and context.get("allow_empty_data_point_task"):
return self
if not self.data_points and not self.ttl_tokens and not self.delete:
raise ValueError(
"DataPointTask must contain at least one datapoint, non-empty ttl_tokens, "
"or be a delete task."
)
return self


def parse_data_point_task(task: str, model_spec: ModelSpec) -> DataPointTask:
with task_context(model_spec):
Expand Down
35 changes: 26 additions & 9 deletions dp3/core/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,29 @@ def _setup_cache_indexes(self):
self._cache.create_index("t_created", background=True)


UpdateThreadId = tuple[float, str, bool]


def validate_update_period(period: float, update_batch_period: float) -> None:
"""Validate that an update hook period can be processed by updater batches."""
if period < update_batch_period:
raise ValueError(
f"The total period ({period}s) must be greater than or equal to "
f"the update batch period ({update_batch_period}s)."
)


def get_update_thread_hooks(
update_thread_hooks: dict[UpdateThreadId, dict], hook_id: str, thread_id: UpdateThreadId
) -> dict:
"""Return the updater thread hook mapping after checking hook ID uniqueness."""
hooks = update_thread_hooks[thread_id]
_period, entity_type, _eid_only = thread_id
if hook_id in hooks:
raise ValueError(f"Hook ID {hook_id} already registered for {entity_type}.")
return hooks


class Updater:
"""Executes periodic update callbacks."""

Expand Down Expand Up @@ -228,16 +251,10 @@ def _register_hook(self, hook, hook_id: str, entity_type: str, period: float, ei
return

update_period_secs = self.config.update_batch_period.total_seconds()
if period < update_period_secs:
raise ValueError(
f"The total period {period}s is must be greater or equal than "
f"the update batch period {update_period_secs}s."
)
validate_update_period(period, update_period_secs)

thread_id = (period, entity_type, eid_only)
hooks = self.update_thread_hooks[thread_id]
if hook_id in hooks:
raise ValueError(f"Hook ID {hook_id} already registered for {entity_type}.")
thread_id: UpdateThreadId = (period, entity_type, eid_only)
hooks = get_update_thread_hooks(self.update_thread_hooks, hook_id, thread_id)
self.log.info(
"Registered hook '%s' to thread processing entity '%s' over %.1fs, eid_only = %s",
hook_id,
Expand Down
6 changes: 3 additions & 3 deletions dp3/task_processing/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ def refresh_on_entity_creation(
for master_record in self.db.get_worker_master_records(
worker_id, worker_cnt, etype, projection=projection
):
with task_context(self.model_spec):
with task_context(self.model_spec, allow_empty_data_point_task=True):
task = DataPointTask(etype=etype, eid=master_record["_id"])
self.log.debug(f"Refreshing {etype}/{task.eid}")
new_tasks += self._task_entity_hooks[task.etype].run_on_creation(task.eid, task)
self.log.debug(f"Refreshing {etype}/{task.eid}")
new_tasks += self._task_entity_hooks[task.etype].run_on_creation(task.eid, task)

return new_tasks

Expand Down
13 changes: 5 additions & 8 deletions dp3/task_processing/task_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dp3.common.config import ModelSpec
from dp3.common.datapoint import DataPointBase
from dp3.common.datatype import AnyEidT
from dp3.common.hook_types import ATTR_TYPE_TO_ON_NEW_HOOK
from dp3.common.task import DataPointTask, task_context
from dp3.common.types import EventGroupType
from dp3.common.utils import get_func_name
Expand Down Expand Up @@ -130,14 +131,10 @@ def __init__(
self.elog = elog
self.model_spec = model_spec

if attr_type == AttrType.PLAIN:
self.on_new_hook_type = "on_new_plain"
elif attr_type == AttrType.OBSERVATIONS:
self.on_new_hook_type = "on_new_observation"
elif attr_type == AttrType.TIMESERIES:
self.on_new_hook_type = "on_new_ts_chunk"
else:
raise ValueError(f"Invalid attribute type '{attr_type}'")
try:
self.on_new_hook_type = ATTR_TYPE_TO_ON_NEW_HOOK[attr_type]
except KeyError as e:
raise ValueError(f"Invalid attribute type '{attr_type}'") from e

self._on_new = []

Expand Down
Loading
Loading