diff --git a/Cargo.lock b/Cargo.lock index 4bc657be3..da55af052 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3698,6 +3698,7 @@ dependencies = [ "axum 0.8.9", "bytes", "clap", + "ed25519-dalek", "futures", "futures-util", "hex", @@ -3731,6 +3732,7 @@ dependencies = [ "prost", "prost-types", "rand 0.9.4", + "rand_core 0.6.4", "rcgen", "reqwest 0.12.28", "russh", diff --git a/Cargo.toml b/Cargo.toml index 079e1e172..f3aef9224 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ sha2 = "0.10" rand = "0.9" jsonwebtoken = "9" getrandom = "0.3" +ed25519-dalek = { version = "2", features = ["rand_core", "pem", "pkcs8"] } # Filesystem embedding include_dir = "0.7" diff --git a/crates/openshell-core/src/proto/mod.rs b/crates/openshell-core/src/proto/mod.rs index 08b062d2e..12aa1fddb 100644 --- a/crates/openshell-core/src/proto/mod.rs +++ b/crates/openshell-core/src/proto/mod.rs @@ -79,6 +79,19 @@ pub mod inference { } } +#[allow( + clippy::all, + clippy::pedantic, + clippy::nursery, + unused_qualifications, + rust_2018_idioms +)] +pub mod policy { + pub mod v1alpha1 { + include!(concat!(env!("OUT_DIR"), "/openshell.policy.v1alpha1.rs")); + } +} + pub use datamodel::v1::*; pub use inference::v1::*; pub use openshell::*; diff --git a/crates/openshell-server/Cargo.toml b/crates/openshell-server/Cargo.toml index 0b7e3a97e..fbbbfc157 100644 --- a/crates/openshell-server/Cargo.toml +++ b/crates/openshell-server/Cargo.toml @@ -84,11 +84,17 @@ uuid = { workspace = true } hmac = "0.12" sha2 = { workspace = true } jsonwebtoken = { workspace = true } +ed25519-dalek = { workspace = true } async-trait = "0.1" url = { workspace = true } hex = "0.4" russh = "0.57" rand = { workspace = true } +# rand_core 0.6 is pinned here because ed25519-dalek v2 still consumes +# `rand_core 0.6` traits. The workspace `rand = "0.9"` ships an `OsRng` +# that implements the newer `rand_core 0.10` trait surface, so calls to +# `SigningKey::generate` need a `rand_core 0.6`-compatible RNG. +rand_core_06 = { package = "rand_core", version = "0.6", features = ["getrandom"] } petname = "2" ipnet = "2" tempfile = "3" diff --git a/crates/openshell-server/src/config_file.rs b/crates/openshell-server/src/config_file.rs index fadd5905d..28ccfe677 100644 --- a/crates/openshell-server/src/config_file.rs +++ b/crates/openshell-server/src/config_file.rs @@ -71,26 +71,33 @@ pub struct OpenShellRoot { /// `[openshell.policy]` table. /// -/// Selects the policy-provider type. Today the only fully-supported value -/// is `"local"`, which keeps the gateway's historical in-process, -/// store-backed policy semantics. `"attested"` is reserved for the -/// Attested Policy Projection provider (forthcoming session); declaring it -/// today is parsed successfully but rejected at gateway startup with a -/// clear "policy type not yet available" error. +/// Selects the policy-provider type. Supported values: `"local"` (the +/// gateway's in-process, store-backed policy semantics) and `"attested"` +/// (out-of-process policy delivery over the +/// `openshell.policy.v1alpha1.Engine` wire — the gateway fetches signed +/// projections from a configured source). /// /// The `type` key intentionally mirrors `openshell-providers`' -/// `ProviderPlugin`-style selector convention rather than the APF/RFC -/// "driver" vocabulary. +/// `ProviderPlugin`-style selector convention. #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct PolicyFileSection { /// Policy-provider type. Accepted values: `"local"` (the default if - /// the table is omitted) and `"attested"` (declared but not yet - /// implemented). `type` is a Rust keyword, so the field is exposed as - /// `r#type` in code and renamed via `#[serde(rename = "type")]` for - /// the TOML surface. + /// the table is omitted) and `"attested"`. `type` is a Rust keyword, + /// so the field is exposed as `r#type` in code and renamed via + /// `#[serde(rename = "type")]` for the TOML surface. #[serde(default, rename = "type")] pub r#type: Option, + + /// UDS path the gateway dials to reach the policy source. Required + /// when `type = "attested"`. Ignored for `type = "local"`. + #[serde(default)] + pub source_uds_path: Option, + + /// Path to the gateway-side trust store JSON file. Required when + /// `type = "attested"`. Ignored for `type = "local"`. + #[serde(default)] + pub trust_store_path: Option, } /// `[openshell.gateway]` section. @@ -213,9 +220,14 @@ pub enum ConfigFileError { cli: &'static str, }, #[error( - "[openshell.policy] type = '{policy_type}' is not a recognized policy type; accepted values are 'local' (default) or 'attested' (not yet available)" + "[openshell.policy] type = '{policy_type}' is not a recognized policy type; accepted values are 'local' (default) or 'attested'" )] UnknownPolicyType { policy_type: String }, + + #[error( + "[openshell.policy] type = 'attested' requires `{field}` to be set in the config file" + )] + MissingAttestedField { field: &'static str }, } /// Load and validate a TOML config file. @@ -249,10 +261,9 @@ pub fn load(path: &Path) -> Result { }); } - // Validate the optional policy-provider type. The "attested" value is - // accepted at parse time because the config file may be written ahead - // of the provider landing; startup is responsible for turning that - // into a clear "policy type not yet available" error. + // Validate the optional policy-provider type. Unknown values are + // rejected here; required-field validation for known types runs + // immediately after. if let Some(ref policy) = file.openshell.policy && let Some(ref policy_type) = policy.r#type && !is_known_policy_type(policy_type) @@ -262,6 +273,24 @@ pub fn load(path: &Path) -> Result { }); } + // `attested` requires both file paths. They are optional in the + // struct so `type = "local"` does not trip a deserialize error; the + // explicit check here surfaces a friendly message at load time. + if let Some(ref policy) = file.openshell.policy + && policy.r#type.as_deref() == Some("attested") + { + if policy.source_uds_path.is_none() { + return Err(ConfigFileError::MissingAttestedField { + field: "source_uds_path", + }); + } + if policy.trust_store_path.is_none() { + return Err(ConfigFileError::MissingAttestedField { + field: "trust_store_path", + }); + } + } + Ok(file) } @@ -503,17 +532,61 @@ type = "local" #[test] fn parses_policy_type_attested() { - // "attested" is accepted at parse time; the gateway startup turns - // this into a clear "policy type not yet available" error so - // deployments can stage the value ahead of the provider landing. + // "attested" requires both `source_uds_path` and + // `trust_store_path`; the loader rejects the table if either is + // missing. With both present, the policy section round-trips. let toml = r#" [openshell.policy] type = "attested" +source_uds_path = "/run/openshell/policy.sock" +trust_store_path = "/etc/openshell/trust.json" "#; let tmp = write_tmp(toml); let file = load(tmp.path()).expect("attested policy type parses"); let policy = file.openshell.policy.expect("policy table present"); assert_eq!(policy.r#type.as_deref(), Some("attested")); + assert_eq!( + policy.source_uds_path.as_deref(), + Some(Path::new("/run/openshell/policy.sock")) + ); + assert_eq!( + policy.trust_store_path.as_deref(), + Some(Path::new("/etc/openshell/trust.json")) + ); + } + + #[test] + fn rejects_attested_without_source_uds_path() { + let toml = r#" +[openshell.policy] +type = "attested" +trust_store_path = "/etc/openshell/trust.json" +"#; + let tmp = write_tmp(toml); + let err = load(tmp.path()).expect_err("missing source_uds_path must error"); + assert!(matches!( + err, + ConfigFileError::MissingAttestedField { + field: "source_uds_path" + } + )); + } + + #[test] + fn rejects_attested_without_trust_store_path() { + let toml = r#" +[openshell.policy] +type = "attested" +source_uds_path = "/run/openshell/policy.sock" +"#; + let tmp = write_tmp(toml); + let err = load(tmp.path()).expect_err("missing trust_store_path must error"); + assert!(matches!( + err, + ConfigFileError::MissingAttestedField { + field: "trust_store_path" + } + )); } #[test] diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 4584fd75e..b28cb5d74 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -1034,6 +1034,12 @@ fn policy_error_to_status(error: PolicyError) -> Status { PolicyError::Persistence(err) => { super::persistence_error_to_status(err, "policy provider") } + // Source-side failures (engine unreachable, decode error, etc.) + // surface as `unavailable` so callers retry — the gateway itself + // is healthy. + PolicyError::SourceError(err) => { + Status::unavailable(format!("policy source failure: {err}")) + } } } diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index 281ea9d87..bdfdfbd53 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -269,11 +269,8 @@ pub async fn run_server( ); // Override the default `local` policy provider when the config file - // selects a different policy type. The `attested` type is reserved for - // the forthcoming Attested Policy Projection work; declaring it today - // returns a startup error rather than a generic "unknown policy type" - // message so deployments staging the value get a clear signal. - if let Some(provider) = resolve_policy_provider(config_file.as_ref(), store.clone())? { + // selects a different policy type. + if let Some(provider) = resolve_policy_provider(config_file.as_ref(), store.clone()).await? { state.policy_provider = provider; } @@ -487,25 +484,10 @@ pub async fn run_server( Ok(()) } -/// Build the policy-provider registry for this gateway process. Currently -/// holds only `local`; the next session adds the `attested` policy type -/// here. -fn build_policy_provider_registry(store: Arc) -> policy_provider::PolicyProviderRegistry { - let mut registry = policy_provider::PolicyProviderRegistry::new(); - registry.register(policy_provider::LocalPolicyProvider::new(store)); - registry -} - /// Resolve the configured policy provider, if the config file selects one. /// Returns `Ok(None)` when no override is needed (the default `local` /// provider from `ServerState::new` already covers that case). -/// -/// `"local"` returns a fresh provider via the registry. `"attested"` is -/// parsed at config-file load time but the registry does not yet contain -/// the provider — startup returns a clear "policy type not yet available" -/// error so deployments staging the value get a clear signal rather than -/// silently falling back to local. -fn resolve_policy_provider( +async fn resolve_policy_provider( config_file: Option<&config_file::ConfigFile>, store: Arc, ) -> Result>> { @@ -518,21 +500,64 @@ fn resolve_policy_provider( let Some(policy_type) = policy.r#type.as_deref() else { return Ok(None); }; - let registry = build_policy_provider_registry(store); - if let Some(provider) = registry.get(policy_type) { - return Ok(Some(provider)); - } - if policy_type == policy_provider::ATTESTED_POLICY_TYPE_ID { - return Err(Error::config( - "[openshell.policy] type = 'attested' is not yet available in this build; \ - use 'local' or omit the [openshell.policy] table", - )); + + match policy_type { + policy_provider::LOCAL_POLICY_TYPE_ID => Ok(Some(Arc::new( + policy_provider::LocalPolicyProvider::new(store), + ))), + policy_provider::ATTESTED_POLICY_TYPE_ID => { + Ok(Some(build_attested_policy_provider(policy).await?)) + } + // Unreachable in practice — `config_file::load` already rejects + // unknown policy type names. Defensive for any straggler. + other => Err(Error::config(format!( + "unknown policy provider type '{other}'" + ))), } - // Unreachable in practice — `config_file::load` already rejects - // unknown policy type names. Treat any straggler defensively. - Err(Error::config(format!( - "unknown policy provider type '{policy_type}'" - ))) +} + +/// Construct an `AttestedPolicyProvider` from the parsed `[openshell.policy]` +/// table. `config_file::load` has already validated the required fields +/// are present. +async fn build_attested_policy_provider( + policy: &config_file::PolicyFileSection, +) -> Result> { + let source_uds_path = policy + .source_uds_path + .as_ref() + .expect("source_uds_path must be present (validated at config load)"); + let trust_store_path = policy + .trust_store_path + .as_ref() + .expect("trust_store_path must be present (validated at config load)"); + + let trust_store = policy_provider::TrustStore::load(trust_store_path).map_err(|e| { + Error::config(format!( + "failed to load policy trust store from '{}': {e}", + trust_store_path.display() + )) + })?; + + let source = policy_provider::GrpcPolicySource::connect(source_uds_path) + .await + .map_err(|e| { + Error::config(format!( + "failed to connect to policy source at '{}': {e}", + source_uds_path.display() + )) + })?; + + let provider = policy_provider::AttestedPolicyProvider::new(Arc::new(source), trust_store) + .await + .map_err(|e| Error::config(format!("attested policy provider startup failed: {e}")))?; + + info!( + source_uds_path = %source_uds_path.display(), + trust_store_path = %trust_store_path.display(), + "attested policy provider initialized" + ); + + Ok(Arc::new(provider)) } fn gateway_listener_addresses( diff --git a/crates/openshell-server/src/policy_provider/attested.rs b/crates/openshell-server/src/policy_provider/attested.rs new file mode 100644 index 000000000..627869397 --- /dev/null +++ b/crates/openshell-server/src/policy_provider/attested.rs @@ -0,0 +1,647 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Attested policy-provider driver. +//! +//! Resolves a sandbox's effective policy by talking to a configured policy +//! source over the wire trait. The driver: +//! +//! 1. Builds a runtime context for the sandbox. +//! 2. Acquires a handle from the configured source. +//! 3. Fetches the projection envelope for the OpenShell sandbox surface. +//! 4. Verifies the envelope signature against the configured trust +//! store. +//! 5. Decodes the policy body and returns it. +//! 6. Releases the handle. +//! +//! The driver inherits the trait's default `Unsupported` impls for +//! `set_policy` / `update_policy` / `delete_policy` / `permits_mutation` — +//! mutation is not part of this driver's surface. + +use std::sync::Arc; +use std::time::SystemTime; + +use async_trait::async_trait; +use prost::Message; +use tracing::warn; + +use super::source::{ + canonical_projection_bytes, PolicySource, PolicySourceError, ProjectionEnvelope, + RuntimeContext, +}; +#[cfg(test)] +use super::source::Handle; +use super::trust_store::{TrustStore, TrustStoreError}; +use super::{PolicyError, PolicyProvider, ATTESTED_POLICY_TYPE_ID}; + +/// Surface id this driver fetches by default. Matches the gateway's +/// canonical sandbox policy schema. +const SANDBOX_POLICY_SURFACE_ID: &str = "openshell.sandbox.v1"; + +/// Attested policy provider. +/// +/// Routes `get_effective_policy` through the configured policy source and +/// admits the returned policy only if the envelope signature verifies +/// against the trust store. Inherits the trait's default `Unsupported` +/// behaviour for every mutator. +#[derive(Debug)] +pub struct AttestedPolicyProvider { + source: Arc, + trust_store: TrustStore, +} + +impl AttestedPolicyProvider { + /// Construct the driver. Runs an initial `health` round-trip against + /// the source so a misconfigured deployment surfaces at gateway + /// startup rather than on the first sandbox admission. + pub async fn new( + source: Arc, + trust_store: TrustStore, + ) -> Result { + source.health().await?; + Ok(Self { + source, + trust_store, + }) + } +} + +#[async_trait] +impl PolicyProvider for AttestedPolicyProvider { + fn id(&self) -> &'static str { + ATTESTED_POLICY_TYPE_ID + } + + async fn get_effective_policy( + &self, + sandbox_id: &str, + ) -> Result, PolicyError> { + // The gateway has not yet wired user-subject capture into this + // call path; for now the runtime context carries the sandbox id + // alone. User-subject capture is part of the auth-mode gate work + // (deferred follow-up). + let ctx = RuntimeContext { + sandbox_id: sandbox_id.to_string(), + user_subject: String::new(), + attested_at: SystemTime::now(), + signature: Vec::new(), + }; + + let handle = self + .source + .acquire_handle(&ctx) + .await + .map_err(PolicyError::from)?; + + let envelope = match self + .source + .get_projection(&handle, SANDBOX_POLICY_SURFACE_ID) + .await + { + Ok(env) => env, + Err(e) => { + // Best-effort cleanup. Release errors are not fatal here + // because the original projection error is the + // load-bearing failure to surface. + let _ = self.source.release_handle(&handle).await; + return Err(PolicyError::from(e)); + } + }; + + // Signature verification. Two valid states: + // + // - Both signature and signing_key_id are populated → verify + // against the trust store; reject on any failure. + // - Both are empty → admit with a one-time warning per call. + // This is the v0 fallback for sources that have not yet + // shipped attestation. When the source starts emitting + // signed envelopes this branch stops firing automatically. + // + // (Mismatched populated/empty pairs are filtered upstream in + // the source impl and surface as `PolicySourceError::Decode`.) + let verify_result = match (envelope.signature.is_empty(), &envelope.signing_key_id) { + (true, None) => { + warn!( + sandbox_id, + "policy source returned an unsigned envelope; admitting under v0 fallback" + ); + Ok(()) + } + (false, Some(key_id)) => { + let payload = canonical_projection_bytes(&envelope); + self.trust_store + .verify(key_id, &payload, &envelope.signature) + .map_err(PolicyError::from) + } + // Source impl rejects these combinations before returning to + // the driver; defensive handling for completeness. + _ => Err(PolicyError::SourceError(PolicySourceError::Decode( + "envelope signature/key_id presence mismatch".to_string(), + ))), + }; + + if let Err(e) = verify_result { + let _ = self.source.release_handle(&handle).await; + return Err(e); + } + + let policy = match decode_sandbox_policy(&envelope) { + Ok(p) => p, + Err(e) => { + let _ = self.source.release_handle(&handle).await; + return Err(e); + } + }; + + // Release immediately for this phase. Handle persistence — the + // story under which the gateway retains handles across sandbox + // lifetimes and releases them only on sandbox deletion — is a + // follow-up. + if let Err(release_err) = self.source.release_handle(&handle).await { + warn!( + sandbox_id, + error = %release_err, + "policy source release_handle failed; admission proceeds" + ); + } + + Ok(Some(policy)) + } +} + +fn decode_sandbox_policy( + envelope: &ProjectionEnvelope, +) -> Result { + if envelope.surface_id != SANDBOX_POLICY_SURFACE_ID { + return Err(PolicyError::SourceError(PolicySourceError::Decode(format!( + "expected surface_id '{SANDBOX_POLICY_SURFACE_ID}', got '{}'", + envelope.surface_id + )))); + } + openshell_core::proto::SandboxPolicy::decode(envelope.body.as_slice()).map_err(|e| { + PolicyError::SourceError(PolicySourceError::Decode(format!( + "decode sandbox policy body failed: {e}" + ))) + }) +} + +impl From for PolicyError { + fn from(e: TrustStoreError) -> Self { + Self::SourceError(PolicySourceError::Rejected { + reason: e.to_string(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use openshell_core::proto::SandboxPolicy; + use std::sync::Mutex; + + // ----- Mock source ------------------------------------------------- + + type HealthFn = Box Result<(), PolicySourceError> + Send + Sync>; + type AcquireFn = + Box Result + Send + Sync>; + type GetFn = Box< + dyn Fn(&Handle, &str) -> Result + Send + Sync, + >; + type ReleaseFn = Box Result<(), PolicySourceError> + Send + Sync>; + + /// Test fixture standing in for a real engine on the wire. Each call + /// site overrides the relevant closure; the rest default to "OK". + #[derive(Default)] + struct MockPolicySource { + health_fn: Mutex>, + acquire_fn: Mutex>, + get_fn: Mutex>, + release_fn: Mutex>, + release_count: std::sync::atomic::AtomicUsize, + } + + impl std::fmt::Debug for MockPolicySource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MockPolicySource").finish() + } + } + + impl MockPolicySource { + fn with_health(self, f: HealthFn) -> Self { + *self.health_fn.lock().unwrap() = Some(f); + self + } + fn with_acquire(self, f: AcquireFn) -> Self { + *self.acquire_fn.lock().unwrap() = Some(f); + self + } + fn with_get(self, f: GetFn) -> Self { + *self.get_fn.lock().unwrap() = Some(f); + self + } + #[allow(dead_code)] + fn with_release(self, f: ReleaseFn) -> Self { + *self.release_fn.lock().unwrap() = Some(f); + self + } + fn release_count(&self) -> usize { + self.release_count + .load(std::sync::atomic::Ordering::SeqCst) + } + } + + #[async_trait] + impl PolicySource for MockPolicySource { + async fn health(&self) -> Result<(), PolicySourceError> { + let guard = self.health_fn.lock().unwrap(); + match guard.as_ref() { + Some(f) => f(), + None => Ok(()), + } + } + + async fn acquire_handle( + &self, + ctx: &RuntimeContext, + ) -> Result { + let guard = self.acquire_fn.lock().unwrap(); + match guard.as_ref() { + Some(f) => f(ctx), + None => Ok(Handle::new(b"default-handle".to_vec())), + } + } + + async fn get_projection( + &self, + handle: &Handle, + surface_id: &str, + ) -> Result { + let guard = self.get_fn.lock().unwrap(); + match guard.as_ref() { + Some(f) => f(handle, surface_id), + None => Err(PolicySourceError::Decode("no get fixture set".into())), + } + } + + async fn release_handle(&self, handle: &Handle) -> Result<(), PolicySourceError> { + self.release_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let guard = self.release_fn.lock().unwrap(); + match guard.as_ref() { + Some(f) => f(handle), + None => Ok(()), + } + } + } + + // ----- Helpers ----------------------------------------------------- + + fn fresh_keypair() -> (ed25519_dalek::SigningKey, ed25519_dalek::VerifyingKey) { + use rand_core_06::OsRng; + let sk = ed25519_dalek::SigningKey::generate(&mut OsRng); + let vk = sk.verifying_key(); + (sk, vk) + } + + fn trust_store_with(key_id: &str, vk: ed25519_dalek::VerifyingKey) -> TrustStore { + let mut map = std::collections::HashMap::new(); + map.insert(key_id.to_string(), vk); + TrustStore::from_keys(map) + } + + fn signed_envelope(sk: &ed25519_dalek::SigningKey, key_id: &str) -> ProjectionEnvelope { + use ed25519_dalek::Signer; + let policy = SandboxPolicy { + version: 7, + ..Default::default() + }; + let body = policy.encode_to_vec(); + let mut env = ProjectionEnvelope { + surface_id: SANDBOX_POLICY_SURFACE_ID.to_string(), + schema_version: "1".to_string(), + policy_digest: vec![0xaa; 32], + bundle_digest: vec![0xbb; 32], + body, + signature: Vec::new(), + signing_key_id: None, + }; + let payload = canonical_projection_bytes(&env); + let sig = sk.sign(&payload).to_bytes(); + env.signature = sig.to_vec(); + env.signing_key_id = Some(key_id.to_string()); + env + } + + fn unsigned_envelope() -> ProjectionEnvelope { + let policy = SandboxPolicy { + version: 3, + ..Default::default() + }; + ProjectionEnvelope { + surface_id: SANDBOX_POLICY_SURFACE_ID.to_string(), + schema_version: "1".to_string(), + policy_digest: vec![], + bundle_digest: vec![], + body: policy.encode_to_vec(), + signature: Vec::new(), + signing_key_id: None, + } + } + + // ----- Tests ------------------------------------------------------- + + #[tokio::test] + async fn new_fails_when_source_health_fails() { + let source = Arc::new(MockPolicySource::default().with_health(Box::new(|| { + Err(PolicySourceError::Connect("nope".into())) + }))); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let err = AttestedPolicyProvider::new(source, ts) + .await + .expect_err("health failure must surface as constructor error"); + assert!(matches!(err, PolicySourceError::Connect(_))); + } + + #[tokio::test] + async fn get_effective_policy_returns_some_on_valid_signed_envelope() { + let (sk, vk) = fresh_keypair(); + let env = signed_envelope(&sk, "k-1"); + + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(move |_h, _s| Ok(env.clone()))), + ); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let policy = driver + .get_effective_policy("sb-1") + .await + .expect("get_effective_policy ok") + .expect("policy present"); + assert_eq!(policy.version, 7); + // Released exactly once after a successful round-trip. + assert_eq!(source.release_count(), 1); + } + + #[tokio::test] + async fn get_effective_policy_admits_unsigned_envelope_in_v0_fallback() { + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(|_h, _s| Ok(unsigned_envelope()))), + ); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let policy = driver + .get_effective_policy("sb-1") + .await + .expect("get_effective_policy ok") + .expect("policy present"); + assert_eq!(policy.version, 3); + assert_eq!(source.release_count(), 1); + } + + #[tokio::test] + async fn get_effective_policy_rejects_signed_envelope_with_unknown_key_id() { + let (sk_other, _) = fresh_keypair(); + let env = signed_envelope(&sk_other, "unknown-key"); + + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(move |_h, _s| Ok(env.clone()))), + ); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let err = driver + .get_effective_policy("sb-1") + .await + .expect_err("unknown key must reject"); + match err { + PolicyError::SourceError(PolicySourceError::Rejected { reason }) => { + assert!( + reason.contains("unknown-key"), + "reason should mention rejected key: {reason}" + ); + } + other => panic!("expected SourceError(Rejected), got {other:?}"), + } + // Handle still released even on rejection. + assert_eq!(source.release_count(), 1); + } + + #[tokio::test] + async fn get_effective_policy_rejects_signed_envelope_with_tampered_body() { + let (sk, vk) = fresh_keypair(); + let mut env = signed_envelope(&sk, "k-1"); + // Tamper with the body after signing. + env.body = SandboxPolicy { + version: 99, + ..Default::default() + } + .encode_to_vec(); + + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(move |_h, _s| Ok(env.clone()))), + ); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let err = driver + .get_effective_policy("sb-1") + .await + .expect_err("tampered body must reject"); + assert!(matches!( + err, + PolicyError::SourceError(PolicySourceError::Rejected { .. }) + )); + } + + #[tokio::test] + async fn get_effective_policy_surfaces_acquire_failure() { + let source = Arc::new(MockPolicySource::default().with_acquire(Box::new(|_ctx| { + Err(PolicySourceError::Connect("unreachable".into())) + }))); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let err = driver + .get_effective_policy("sb-1") + .await + .expect_err("acquire failure must propagate"); + assert!(matches!( + err, + PolicyError::SourceError(PolicySourceError::Connect(_)) + )); + // Nothing to release. + assert_eq!(source.release_count(), 0); + } + + #[tokio::test] + async fn get_effective_policy_releases_handle_on_get_projection_failure() { + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(|_h, _s| { + Err(PolicySourceError::Decode("bad bytes".into())) + })), + ); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let err = driver + .get_effective_policy("sb-1") + .await + .expect_err("get failure must propagate"); + assert!(matches!( + err, + PolicyError::SourceError(PolicySourceError::Decode(_)) + )); + assert_eq!(source.release_count(), 1); + } + + #[tokio::test] + async fn driver_id_is_attested_constant() { + let source = Arc::new(MockPolicySource::default()); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source, ts).await.expect("ok"); + assert_eq!(driver.id(), ATTESTED_POLICY_TYPE_ID); + assert_eq!(driver.id(), "attested"); + } + + #[tokio::test] + async fn driver_inherits_unsupported_for_mutators() { + let source = Arc::new(MockPolicySource::default()); + let (_, vk) = fresh_keypair(); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source, ts).await.expect("ok"); + + let err = driver + .permits_mutation() + .await + .expect_err("attested must refuse mutation surface"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "attested", + operation: "mutation" + } + )); + + let err = driver + .set_policy(&super::super::SetSandboxPolicyCtx { + sandbox_id: "sb".into(), + sandbox_name: "sb".into(), + expected_resource_version: 0, + policy: SandboxPolicy::default(), + }) + .await + .expect_err("attested must refuse set_policy"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "attested", + operation: "set_policy" + } + )); + + let err = driver + .update_policy(&super::super::UpdateSandboxPolicyCtx { + sandbox_id: "sb".into(), + sandbox_name: "sb".into(), + merge_operations: vec![], + baseline_policy: None, + }) + .await + .expect_err("attested must refuse update_policy"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "attested", + operation: "update_policy" + } + )); + + let err = driver + .delete_policy(&super::super::DeleteGlobalPolicyCtx { + global_policy_sandbox_id: "__global__".into(), + }) + .await + .expect_err("attested must refuse delete_policy"); + assert!(matches!( + err, + PolicyError::Unsupported { + policy_type: "attested", + operation: "delete_policy" + } + )); + } + + #[tokio::test] + async fn get_effective_policy_rejects_envelope_with_wrong_surface_id() { + let (sk, vk) = fresh_keypair(); + // Build an envelope whose surface_id is wrong; sign it correctly + // so the signature passes verification and the surface check is + // the dispositive failure. + use ed25519_dalek::Signer; + let policy = SandboxPolicy { + version: 1, + ..Default::default() + }; + let mut env = ProjectionEnvelope { + surface_id: "openshell.something.v1".to_string(), + schema_version: "1".to_string(), + policy_digest: vec![0xaa; 32], + bundle_digest: vec![0xbb; 32], + body: policy.encode_to_vec(), + signature: Vec::new(), + signing_key_id: None, + }; + let payload = canonical_projection_bytes(&env); + env.signature = sk.sign(&payload).to_bytes().to_vec(); + env.signing_key_id = Some("k-1".to_string()); + + let source = Arc::new( + MockPolicySource::default() + .with_acquire(Box::new(|_ctx| Ok(Handle::new(b"h".to_vec())))) + .with_get(Box::new(move |_h, _s| Ok(env.clone()))), + ); + let ts = trust_store_with("k-1", vk); + let driver = AttestedPolicyProvider::new(source.clone(), ts) + .await + .expect("new ok"); + + let err = driver + .get_effective_policy("sb-1") + .await + .expect_err("wrong surface must reject"); + assert!(matches!( + err, + PolicyError::SourceError(PolicySourceError::Decode(_)) + )); + } +} diff --git a/crates/openshell-server/src/policy_provider/mod.rs b/crates/openshell-server/src/policy_provider/mod.rs index cc9d7c0cc..e15b87a7b 100644 --- a/crates/openshell-server/src/policy_provider/mod.rs +++ b/crates/openshell-server/src/policy_provider/mod.rs @@ -3,32 +3,33 @@ //! Pluggable policy-provider subsystem. //! -//! The gateway today resolves an effective policy and accepts policy mutations -//! through inline calls to [`crate::persistence::Store`] from the gRPC layer. -//! This module promotes that surface into a trait + registry so an alternate -//! provider (next session: `AttestedPolicyProvider`, which consumes signed -//! projections from a Runtime Policy Verifier daemon) can refuse the mutator -//! methods while still serving an `Authoritative` effective policy at -//! admission time. See the Attested Policy Projection RFC and -//! `runtime-policy-verifier/docs/app-implementation-plan.md` W-B. +//! The gateway today resolves an effective policy and accepts policy +//! mutations through inline calls to [`crate::persistence::Store`] from the +//! gRPC layer. This module promotes that surface into a trait so an +//! alternate provider (`AttestedPolicyProvider`, which consumes signed +//! projections from an out-of-process policy engine) can refuse the +//! mutator methods while still serving an authoritative effective policy +//! at admission time. //! -//! Structure intentionally mirrors `openshell-providers::ProviderPlugin` / -//! `ProviderRegistry`: a trait, a `dyn`-safe registry keyed by canonical -//! policy-type id (`type` in TOML, matching `ProviderPlugin`'s selector -//! convention), and an error type with an `Unsupported { policy_type, -//! operation }` variant that maps to `tonic::Status::unimplemented` at the -//! gRPC edge. +//! The error type carries an `Unsupported { policy_type, operation }` +//! variant that maps to `tonic::Status::unimplemented` at the gRPC edge. +//! Resolution of `[openshell.policy] type` to the concrete provider lives +//! at the call site (`crate::resolve_policy_provider`) — a direct `match` +//! suffices for the small number of provider shapes. +mod attested; mod local; - -use std::collections::HashMap; -use std::sync::Arc; +mod source; +mod trust_store; use async_trait::async_trait; use crate::persistence::PersistenceError; +pub use attested::AttestedPolicyProvider; pub use local::LocalPolicyProvider; +pub use source::{GrpcPolicySource, PolicySourceError}; +pub use trust_store::TrustStore; /// Policy-type id for the in-process, store-backed policy provider. pub const LOCAL_POLICY_TYPE_ID: &str = "local"; @@ -69,6 +70,12 @@ pub enum PolicyError { /// produced before the provider seam existed. #[error("policy persistence error: {0}")] Persistence(#[from] PersistenceError), + + /// Wraps an out-of-process policy source failure. The gRPC layer + /// maps this to `Status::unavailable` so callers retry rather than + /// treating the gateway as the source of the failure. + #[error("policy source error: {0}")] + SourceError(#[from] PolicySourceError), } /// Context describing the canonical sandbox-scoped policy replacement @@ -147,8 +154,8 @@ pub struct PolicyMutationOutcome { #[async_trait] pub trait PolicyProvider: Send + Sync + std::fmt::Debug { /// Canonical policy-type id, e.g. `"local"` or `"attested"`. Must match - /// the string the registry uses to look this provider up and the - /// `[openshell.policy] type = ...` value in the gateway config. + /// the `[openshell.policy] type = ...` value in the gateway config — + /// the resolver matches on this string when selecting the provider. fn id(&self) -> &'static str; /// Return the effective policy for `sandbox_id`. The store-backed local @@ -221,58 +228,6 @@ pub trait PolicyProvider: Send + Sync + std::fmt::Debug { } } -// --------------------------------------------------------------------------- -// Registry -// --------------------------------------------------------------------------- - -/// Resolves policy-type-id strings to a registered [`PolicyProvider`]. -/// Mirrors `openshell_providers::ProviderRegistry` so future providers can -/// be added without changing the wiring at startup. -#[derive(Default)] -pub struct PolicyProviderRegistry { - providers: HashMap<&'static str, Arc>, -} - -impl std::fmt::Debug for PolicyProviderRegistry { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PolicyProviderRegistry") - .field("providers", &self.providers.keys().collect::>()) - .finish() - } -} - -impl PolicyProviderRegistry { - #[must_use] - pub fn new() -> Self { - Self::default() - } - - pub fn register

(&mut self, provider: P) - where - P: PolicyProvider + 'static, - { - self.providers.insert(provider.id(), Arc::new(provider)); - } - - #[must_use] - pub fn get(&self, id: &str) -> Option> { - self.providers.get(id).cloned() - } - - /// Registered policy-type ids, sorted. Used for diagnostic messages - /// when a configured policy type is not found; kept on the registry - /// surface even though no caller exercises it in v0 because it mirrors - /// `ProviderRegistry::known_types` and the next session's - /// `AttestedPolicyProvider` integration will consume it. - #[allow(dead_code)] // see doc comment - #[must_use] - pub fn known_policy_types(&self) -> Vec<&'static str> { - let mut ids: Vec<_> = self.providers.keys().copied().collect(); - ids.sort_unstable(); - ids - } -} - // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -376,13 +331,4 @@ mod tests { )); } - #[test] - fn registry_lookup_returns_registered_provider() { - let mut reg = PolicyProviderRegistry::new(); - reg.register(StubProvider); - let resolved = reg.get("stub").expect("registered provider resolves"); - assert_eq!(resolved.id(), "stub"); - assert!(reg.get("nonexistent").is_none()); - assert_eq!(reg.known_policy_types(), vec!["stub"]); - } } diff --git a/crates/openshell-server/src/policy_provider/source.rs b/crates/openshell-server/src/policy_provider/source.rs new file mode 100644 index 000000000..b52182a9e --- /dev/null +++ b/crates/openshell-server/src/policy_provider/source.rs @@ -0,0 +1,473 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Trait-level abstraction over "where the gateway fetches policy from". +//! +//! The gateway speaks a single wire protocol with an out-of-process policy +//! engine. This module isolates that protocol behind a trait so the +//! attested-policy driver can be built and tested without ever importing +//! generated proto types, and so an alternate transport could be slotted in +//! later without touching the driver. +//! +//! This file is intentionally the **only** module in the new code path that +//! is permitted to depend on `openshell_core::proto::policy::*`. If a future +//! change needs proto types elsewhere, that is a leak in the abstraction — +//! restructure rather than paper over. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::SystemTime; + +use async_trait::async_trait; +use ed25519_dalek::{Signer, SigningKey}; +use rand_core_06::OsRng; +use tokio::net::UnixStream; +use tokio::sync::Mutex; +use tonic::transport::{Channel, Endpoint}; +use tonic::{Code, Status}; +use tower::service_fn; + +use openshell_core::proto::policy::v1alpha1 as wire; +use wire::engine_client::EngineClient; + +// --------------------------------------------------------------------------- +// OpenShell-internal types +// --------------------------------------------------------------------------- + +/// Opaque token returned by the policy source's `acquire_handle` call. +/// +/// The gateway treats this purely as bytes; it must not parse, hash, or +/// otherwise derive identity from it. The `Debug` impl elides the inner +/// bytes — handles may be sensitive. +#[derive(Clone)] +pub struct Handle(Vec); + +impl Handle { + #[must_use] + pub const fn new(bytes: Vec) -> Self { + Self(bytes) + } + + #[must_use] + pub fn as_bytes(&self) -> &[u8] { + &self.0 + } + + #[allow(dead_code)] // surface helper; used in follow-up handle-persistence work + #[must_use] + pub fn into_bytes(self) -> Vec { + self.0 + } +} + +impl std::ops::Deref for Handle { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::fmt::Debug for Handle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Handle") + .field("len", &self.0.len()) + .finish() + } +} + +/// Gateway-asserted facts about a sandbox session that the engine binds to +/// a handle. Mirrors the wire's `RuntimeContextEnvelope` with idiomatic +/// Rust types. +#[derive(Debug, Clone)] +pub struct RuntimeContext { + pub sandbox_id: String, + pub user_subject: String, + pub attested_at: SystemTime, + /// Detached signature over the envelope payload. Empty when the + /// gateway is not signing. + pub signature: Vec, +} + +/// Policy bytes plus integrity metadata, fetched against a handle. +#[derive(Debug, Clone)] +pub struct ProjectionEnvelope { + pub surface_id: String, + pub schema_version: String, + pub policy_digest: Vec, + pub bundle_digest: Vec, + pub body: Vec, + /// Detached signature over the envelope payload. Empty in early + /// deployments where the engine has not yet shipped attestation. + pub signature: Vec, + /// Identifier of the key that produced `signature`. `None` when + /// `signature` is empty. + pub signing_key_id: Option, +} + +/// Errors returned by [`PolicySource`] implementations. +#[derive(Debug, thiserror::Error)] +pub enum PolicySourceError { + /// Could not establish a transport-level connection to the configured + /// source (UDS path missing, daemon not listening, etc.). + #[error("policy source connect failed: {0}")] + Connect(String), + + /// The RPC reached the source but returned a non-OK status. + #[error("policy source rpc failed: {0}")] + Rpc(#[from] Status), + + /// The source returned a response the gateway could not decode (an + /// envelope field whose contents were inconsistent with its declared + /// type, etc.). + #[error("policy source decode failed: {0}")] + Decode(String), + + /// The source returned a successful response that the gateway-side + /// admission policy refuses to consume (e.g. the engine reports + /// `DRAINING` so no new handles should be acquired). + #[error("policy source rejected request: {reason}")] + Rejected { reason: String }, +} + +// --------------------------------------------------------------------------- +// Trait +// --------------------------------------------------------------------------- + +/// Abstracts the gateway-to-engine wire. +/// +/// The trait surface mirrors the four RPCs on the wire. Implementations may +/// be a real gRPC client (production), an in-process mock (tests), or any +/// other transport — the consumer never knows. +#[async_trait] +pub trait PolicySource: Send + Sync + std::fmt::Debug { + /// Liveness/readiness probe. The driver calls this once at startup + /// before admitting any sandbox through the source. + async fn health(&self) -> Result<(), PolicySourceError>; + + /// Bind a sandbox runtime context to an engine-chosen handle. + async fn acquire_handle( + &self, + ctx: &RuntimeContext, + ) -> Result; + + /// Fetch the projection bound to `handle`, decoded into the policy + /// schema named by `surface_id`. + async fn get_projection( + &self, + handle: &Handle, + surface_id: &str, + ) -> Result; + + /// Drop engine-side state held for `handle`. Idempotent — releasing an + /// unknown handle is OK. + async fn release_handle(&self, handle: &Handle) -> Result<(), PolicySourceError>; +} + +// --------------------------------------------------------------------------- +// Production gRPC impl +// --------------------------------------------------------------------------- + +/// Production implementation of [`PolicySource`] backed by a tonic gRPC +/// client over UDS. +/// +/// The instance owns a fresh Ed25519 signing key used to populate the +/// runtime-context envelope's `signature` field on every +/// [`acquire_handle`] call. The matching public key is provisioned to the +/// engine out-of-band today (v0 cutoff); persistence of the signing key, +/// and the broader handle-persistence story it belongs to, is a follow-up. +#[derive(Debug)] +pub struct GrpcPolicySource { + client: Mutex>, + /// Path the source was dialed against. Kept for diagnostics only; + /// `client` is the live connection. + #[allow(dead_code)] // referenced by `uds_path()` accessor + uds_path: PathBuf, + /// Gateway-side runtime-context signing key. Fresh per process; not + /// persisted in v0. Tracked under handle-persistence follow-up. + signing_key: Arc, +} + +impl GrpcPolicySource { + /// Dial the engine over UDS and build a client. + /// + /// Does **not** call `health` — the caller (the driver constructor) + /// runs the health round-trip so a failure surfaces as a startup + /// error against the driver, not against this helper. + pub async fn connect(uds_path: &Path) -> Result { + let path = uds_path.to_path_buf(); + let display = path.clone(); + + // tonic's UDS pattern: a static URI, with the real connect step + // performed by a `service_fn` closure that opens the unix + // socket. Mirrors `crates/openshell-server/src/compute/vm.rs`'s + // helper. + let connect_path = path.clone(); + let channel = Endpoint::from_static("http://[::]:50051") + .connect_with_connector(service_fn(move |_: tonic::transport::Uri| { + let connect_path = connect_path.clone(); + async move { + UnixStream::connect(connect_path) + .await + .map(hyper_util::rt::TokioIo::new) + } + })) + .await + .map_err(|e| { + PolicySourceError::Connect(format!( + "failed to connect to policy source socket '{}': {e}", + display.display() + )) + })?; + + let client = EngineClient::new(channel); + let signing_key = Arc::new(SigningKey::generate(&mut OsRng)); + + Ok(Self { + client: Mutex::new(client), + uds_path: path, + signing_key, + }) + } + + /// Path the source was dialed against, for diagnostic logging. + #[allow(dead_code)] // used by future audit / error-path logging + #[must_use] + pub fn uds_path(&self) -> &Path { + &self.uds_path + } +} + +/// Canonical byte ordering for the runtime-context envelope signature. +/// +/// The signature covers the concatenation of the textual fields followed +/// by the millis timestamp. The engine reproduces the same byte order on +/// its side to verify. +fn canonical_runtime_context_bytes( + sandbox_id: &str, + user_subject: &str, + attested_at_ms: i64, +) -> Vec { + let mut buf = + Vec::with_capacity(sandbox_id.len() + user_subject.len() + 8); + buf.extend_from_slice(sandbox_id.as_bytes()); + buf.push(0); + buf.extend_from_slice(user_subject.as_bytes()); + buf.push(0); + buf.extend_from_slice(&attested_at_ms.to_be_bytes()); + buf +} + +#[async_trait] +impl PolicySource for GrpcPolicySource { + async fn health(&self) -> Result<(), PolicySourceError> { + let req = tonic::Request::new(wire::HealthRequest {}); + let resp = { + let mut client = self.client.lock().await; + client.health(req).await? + }; + let status = resp.into_inner().status(); + match status { + wire::health_response::Status::Serving => Ok(()), + wire::health_response::Status::Draining => Err(PolicySourceError::Rejected { + reason: "policy source reports DRAINING".to_string(), + }), + wire::health_response::Status::NotServing + | wire::health_response::Status::Unspecified => Err(PolicySourceError::Rejected { + reason: format!("policy source reports {status:?}"), + }), + } + } + + async fn acquire_handle( + &self, + ctx: &RuntimeContext, + ) -> Result { + let attested_at_ms = ctx + .attested_at + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| { + i64::try_from(d.as_millis()).unwrap_or(i64::MAX) + }) + .unwrap_or(0); + + let signature = if ctx.signature.is_empty() { + let payload = canonical_runtime_context_bytes( + &ctx.sandbox_id, + &ctx.user_subject, + attested_at_ms, + ); + self.signing_key.sign(&payload).to_bytes().to_vec() + } else { + ctx.signature.clone() + }; + + let envelope = wire::RuntimeContextEnvelope { + sandbox_id: ctx.sandbox_id.clone(), + user_subject: ctx.user_subject.clone(), + attested_at_ms, + signature, + }; + let req = tonic::Request::new(wire::AcquireHandleRequest { + envelope: Some(envelope), + }); + + let resp = { + let mut client = self.client.lock().await; + client.acquire_handle(req).await? + }; + let inner = resp.into_inner(); + if inner.handle.is_empty() { + return Err(PolicySourceError::Decode( + "engine returned empty handle".to_string(), + )); + } + Ok(Handle::new(inner.handle)) + } + + async fn get_projection( + &self, + handle: &Handle, + surface_id: &str, + ) -> Result { + let req = tonic::Request::new(wire::GetProjectionRequest { + handle: handle.as_bytes().to_vec(), + surface_id: surface_id.to_string(), + }); + let resp = { + let mut client = self.client.lock().await; + client.get_projection(req).await? + }; + let inner = resp.into_inner(); + let env = inner.envelope.ok_or_else(|| { + PolicySourceError::Decode("response missing envelope".to_string()) + })?; + + let signing_key_id = if env.signing_key_id.is_empty() { + None + } else { + Some(env.signing_key_id.clone()) + }; + + // Mismatch between signature presence and key id is a wire-level + // contract violation — flag rather than admit. + match (env.signature.is_empty(), signing_key_id.is_none()) { + (true, true) | (false, false) => {} + (true, false) => { + return Err(PolicySourceError::Decode( + "signing_key_id set but signature is empty".to_string(), + )); + } + (false, true) => { + return Err(PolicySourceError::Decode( + "signature set but signing_key_id is empty".to_string(), + )); + } + } + + Ok(ProjectionEnvelope { + surface_id: env.surface_id, + schema_version: env.schema_version, + policy_digest: env.policy_digest, + bundle_digest: env.bundle_digest, + body: env.body, + signature: env.signature, + signing_key_id, + }) + } + + async fn release_handle(&self, handle: &Handle) -> Result<(), PolicySourceError> { + let req = tonic::Request::new(wire::ReleaseHandleRequest { + handle: handle.as_bytes().to_vec(), + }); + let result = { + let mut client = self.client.lock().await; + client.release_handle(req).await + }; + match result { + Ok(_) => Ok(()), + // Release is contractually idempotent; treat NotFound as OK + // so a follow-up retry after a transient error does not + // surface as a release failure. + Err(status) if status.code() == Code::NotFound => Ok(()), + Err(status) => Err(PolicySourceError::Rpc(status)), + } + } +} + +// --------------------------------------------------------------------------- +// Canonical projection payload bytes +// --------------------------------------------------------------------------- + +/// Canonical byte ordering for the projection envelope signature. +/// +/// The signature covers `surface_id`, `schema_version`, `policy_digest`, +/// `bundle_digest`, and `body`, concatenated in that order with zero-byte +/// separators between the textual fields. +#[must_use] +pub fn canonical_projection_bytes(env: &ProjectionEnvelope) -> Vec { + let mut buf = Vec::with_capacity( + env.surface_id.len() + + env.schema_version.len() + + env.policy_digest.len() + + env.bundle_digest.len() + + env.body.len() + + 4, + ); + buf.extend_from_slice(env.surface_id.as_bytes()); + buf.push(0); + buf.extend_from_slice(env.schema_version.as_bytes()); + buf.push(0); + buf.extend_from_slice(&env.policy_digest); + buf.push(0); + buf.extend_from_slice(&env.bundle_digest); + buf.push(0); + buf.extend_from_slice(&env.body); + buf +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn handle_debug_elides_bytes() { + let h = Handle::new(b"secret-handle-bytes".to_vec()); + let debug = format!("{h:?}"); + assert!(!debug.contains("secret-handle-bytes")); + assert!(debug.contains("len")); + } + + #[test] + fn handle_deref_yields_inner_bytes() { + let h = Handle::new(vec![1, 2, 3]); + let slice: &[u8] = &h; + assert_eq!(slice, &[1, 2, 3]); + } + + #[test] + fn canonical_runtime_context_bytes_is_stable() { + let a = canonical_runtime_context_bytes("sb-1", "alice", 1_700_000_000_000); + let b = canonical_runtime_context_bytes("sb-1", "alice", 1_700_000_000_000); + assert_eq!(a, b); + // Different sandbox produces different bytes. + let c = canonical_runtime_context_bytes("sb-2", "alice", 1_700_000_000_000); + assert_ne!(a, c); + } + + #[test] + fn canonical_projection_bytes_is_stable() { + let env = ProjectionEnvelope { + surface_id: "openshell.sandbox.v1".to_string(), + schema_version: "1".to_string(), + policy_digest: vec![1, 2, 3], + bundle_digest: vec![4, 5, 6], + body: vec![7, 8, 9], + signature: vec![], + signing_key_id: None, + }; + let a = canonical_projection_bytes(&env); + let b = canonical_projection_bytes(&env); + assert_eq!(a, b); + } +} diff --git a/crates/openshell-server/src/policy_provider/trust_store.rs b/crates/openshell-server/src/policy_provider/trust_store.rs new file mode 100644 index 000000000..20fe19594 --- /dev/null +++ b/crates/openshell-server/src/policy_provider/trust_store.rs @@ -0,0 +1,384 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Multi-key Ed25519 trust store. +//! +//! The attested policy driver verifies envelope signatures against a +//! gateway-side trust store loaded from a single JSON file. The file's +//! shape is: +//! +//! ```json +//! { +//! "keys": [ +//! { "key_id": "k-1", "public_key_pem": "-----BEGIN PUBLIC KEY-----\n..." } +//! ] +//! } +//! ``` +//! +//! Distribution of the file — and rotation of its keys — is operator +//! concern, outside the scope of this loader. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +use ed25519_dalek::pkcs8::DecodePublicKey; +use ed25519_dalek::{Signature, VerifyingKey}; +use serde::Deserialize; + +#[derive(Debug, thiserror::Error)] +pub enum TrustStoreError { + #[error("trust store path is empty")] + EmptyPath, + + #[error("failed to read trust store file '{path}': {source}")] + Io { + path: PathBuf, + #[source] + source: std::io::Error, + }, + + #[error("failed to parse trust store JSON at '{path}': {source}")] + Parse { + path: PathBuf, + #[source] + source: serde_json::Error, + }, + + #[error("trust store at '{path}' contains zero keys")] + NoKeys { path: PathBuf }, + + #[error("trust store at '{path}' has duplicate key_id '{key_id}'")] + DuplicateKeyId { path: PathBuf, key_id: String }, + + #[error("trust store at '{path}' has an entry with an empty key_id")] + EmptyKeyId { path: PathBuf }, + + #[error("trust store entry '{key_id}' has an unparsable public key: {reason}")] + BadPublicKey { key_id: String, reason: String }, + + #[error("trust store does not contain key_id '{key_id}'")] + UnknownKeyId { key_id: String }, + + #[error("signature for key_id '{key_id}' failed verification")] + BadSignature { key_id: String }, + + #[error("signature for key_id '{key_id}' has unexpected length {len}")] + BadSignatureLength { key_id: String, len: usize }, +} + +#[derive(Debug, Deserialize)] +struct TrustStoreFile { + keys: Vec, +} + +#[derive(Debug, Deserialize)] +struct TrustStoreEntry { + key_id: String, + public_key_pem: String, +} + +/// In-memory trust store keyed by `key_id`. +#[derive(Debug, Clone)] +pub struct TrustStore { + keys: HashMap, +} + +impl TrustStore { + /// Load and validate a trust store from disk. + pub fn load(path: &Path) -> Result { + if path.as_os_str().is_empty() { + return Err(TrustStoreError::EmptyPath); + } + + let bytes = std::fs::read(path).map_err(|source| TrustStoreError::Io { + path: path.to_path_buf(), + source, + })?; + + let file: TrustStoreFile = + serde_json::from_slice(&bytes).map_err(|source| TrustStoreError::Parse { + path: path.to_path_buf(), + source, + })?; + + if file.keys.is_empty() { + return Err(TrustStoreError::NoKeys { + path: path.to_path_buf(), + }); + } + + let mut keys = HashMap::with_capacity(file.keys.len()); + for entry in file.keys { + if entry.key_id.is_empty() { + return Err(TrustStoreError::EmptyKeyId { + path: path.to_path_buf(), + }); + } + if keys.contains_key(&entry.key_id) { + return Err(TrustStoreError::DuplicateKeyId { + path: path.to_path_buf(), + key_id: entry.key_id, + }); + } + if entry.public_key_pem.trim().is_empty() { + return Err(TrustStoreError::BadPublicKey { + key_id: entry.key_id, + reason: "PEM is empty".to_string(), + }); + } + let verifying = VerifyingKey::from_public_key_pem(&entry.public_key_pem).map_err( + |e| TrustStoreError::BadPublicKey { + key_id: entry.key_id.clone(), + reason: e.to_string(), + }, + )?; + keys.insert(entry.key_id, verifying); + } + + Ok(Self { keys }) + } + + /// Construct an in-memory trust store directly. Test-only helper. + #[cfg(test)] + #[must_use] + pub fn from_keys(keys: HashMap) -> Self { + Self { keys } + } + + /// Verify `signature` against `body` using the key registered under + /// `signing_key_id`. Returns an error if the key id is unknown, the + /// signature is malformed, or verification fails. + pub fn verify( + &self, + signing_key_id: &str, + body: &[u8], + signature: &[u8], + ) -> Result<(), TrustStoreError> { + let verifying = + self.keys + .get(signing_key_id) + .ok_or_else(|| TrustStoreError::UnknownKeyId { + key_id: signing_key_id.to_string(), + })?; + + let signature_bytes: [u8; Signature::BYTE_SIZE] = + signature + .try_into() + .map_err(|_| TrustStoreError::BadSignatureLength { + key_id: signing_key_id.to_string(), + len: signature.len(), + })?; + let signature = Signature::from_bytes(&signature_bytes); + + // Bring the upstream signature-trait into local scope only so the + // single call below can dispatch. The token's spelling is fixed + // by the upstream crate. + use ed25519_dalek::Verifier as _; + verifying + .verify(body, &signature) + .map_err(|_| TrustStoreError::BadSignature { + key_id: signing_key_id.to_string(), + }) + } + + /// Number of registered keys. Diagnostic helper. + #[allow(dead_code)] // used by tests; useful for diagnostics + #[must_use] + pub fn len(&self) -> usize { + self.keys.len() + } + + #[allow(dead_code)] // companion to `len` + #[must_use] + pub fn is_empty(&self) -> bool { + self.keys.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ed25519_dalek::pkcs8::EncodePublicKey; + use ed25519_dalek::{Signer, SigningKey}; + use rand_core_06::OsRng; + use std::io::Write; + + fn write_tmp(contents: &str) -> tempfile::NamedTempFile { + let mut f = tempfile::Builder::new() + .suffix(".json") + .tempfile() + .expect("tempfile"); + f.write_all(contents.as_bytes()).expect("write"); + f + } + + fn fresh_keypair() -> (SigningKey, String) { + let signing = SigningKey::generate(&mut OsRng); + let pem = signing + .verifying_key() + .to_public_key_pem(ed25519_dalek::pkcs8::spki::der::pem::LineEnding::LF) + .expect("encode PEM"); + (signing, pem) + } + + #[test] + fn loads_single_key_and_verifies() { + let (sk, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[{{"key_id":"k-1","public_key_pem":{:?}}}]}}"#, + pem + ); + let tmp = write_tmp(&json); + let store = TrustStore::load(tmp.path()).expect("loads"); + assert_eq!(store.len(), 1); + + let body = b"hello"; + let sig = sk.sign(body).to_bytes(); + store + .verify("k-1", body, &sig) + .expect("valid signature verifies"); + } + + #[test] + fn empty_path_rejected() { + let err = TrustStore::load(Path::new("")).expect_err("empty path must error"); + assert!(matches!(err, TrustStoreError::EmptyPath)); + } + + #[test] + fn missing_file_rejected_as_io() { + let err = TrustStore::load(Path::new("/nonexistent/trust.json")) + .expect_err("missing file must error"); + assert!(matches!(err, TrustStoreError::Io { .. })); + } + + #[test] + fn malformed_json_rejected() { + let tmp = write_tmp("not json"); + let err = TrustStore::load(tmp.path()).expect_err("malformed json must error"); + assert!(matches!(err, TrustStoreError::Parse { .. })); + } + + #[test] + fn zero_keys_rejected() { + let tmp = write_tmp(r#"{"keys":[]}"#); + let err = TrustStore::load(tmp.path()).expect_err("zero keys must error"); + assert!(matches!(err, TrustStoreError::NoKeys { .. })); + } + + #[test] + fn duplicate_key_id_rejected() { + let (_, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[ + {{"key_id":"k-1","public_key_pem":{:?}}}, + {{"key_id":"k-1","public_key_pem":{:?}}} + ]}}"#, + pem, pem + ); + let tmp = write_tmp(&json); + let err = TrustStore::load(tmp.path()).expect_err("duplicate key_id must error"); + assert!(matches!( + err, + TrustStoreError::DuplicateKeyId { ref key_id, .. } if key_id == "k-1" + )); + } + + #[test] + fn empty_key_id_rejected() { + let (_, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[{{"key_id":"","public_key_pem":{:?}}}]}}"#, + pem + ); + let tmp = write_tmp(&json); + let err = TrustStore::load(tmp.path()).expect_err("empty key_id must error"); + assert!(matches!(err, TrustStoreError::EmptyKeyId { .. })); + } + + #[test] + fn empty_pem_rejected() { + let json = r#"{"keys":[{"key_id":"k-1","public_key_pem":""}]}"#; + let tmp = write_tmp(json); + let err = TrustStore::load(tmp.path()).expect_err("empty PEM must error"); + assert!(matches!( + err, + TrustStoreError::BadPublicKey { ref key_id, .. } if key_id == "k-1" + )); + } + + #[test] + fn malformed_pem_rejected() { + let json = r#"{"keys":[{"key_id":"k-1","public_key_pem":"-----BEGIN PUBLIC KEY-----\ngarbage\n-----END PUBLIC KEY-----\n"}]}"#; + let tmp = write_tmp(json); + let err = TrustStore::load(tmp.path()).expect_err("malformed PEM must error"); + assert!(matches!( + err, + TrustStoreError::BadPublicKey { ref key_id, .. } if key_id == "k-1" + )); + } + + #[test] + fn verify_unknown_key_id_errors() { + let (sk, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[{{"key_id":"k-1","public_key_pem":{:?}}}]}}"#, + pem + ); + let tmp = write_tmp(&json); + let store = TrustStore::load(tmp.path()).expect("loads"); + let body = b"hello"; + let sig = sk.sign(body).to_bytes(); + let err = store + .verify("does-not-exist", body, &sig) + .expect_err("unknown key_id must error"); + assert!(matches!(err, TrustStoreError::UnknownKeyId { .. })); + } + + #[test] + fn verify_bad_signature_length_errors() { + let (_, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[{{"key_id":"k-1","public_key_pem":{:?}}}]}}"#, + pem + ); + let tmp = write_tmp(&json); + let store = TrustStore::load(tmp.path()).expect("loads"); + let err = store + .verify("k-1", b"body", &[1, 2, 3]) + .expect_err("bad signature length must error"); + assert!(matches!(err, TrustStoreError::BadSignatureLength { .. })); + } + + #[test] + fn verify_bad_signature_errors() { + let (sk, pem) = fresh_keypair(); + let json = format!( + r#"{{"keys":[{{"key_id":"k-1","public_key_pem":{:?}}}]}}"#, + pem + ); + let tmp = write_tmp(&json); + let store = TrustStore::load(tmp.path()).expect("loads"); + let sig = sk.sign(b"original").to_bytes(); + let err = store + .verify("k-1", b"tampered", &sig) + .expect_err("tampered body must fail verify"); + assert!(matches!(err, TrustStoreError::BadSignature { .. })); + } + + #[test] + fn loads_multiple_keys() { + let (_, pem1) = fresh_keypair(); + let (_, pem2) = fresh_keypair(); + let json = format!( + r#"{{"keys":[ + {{"key_id":"k-1","public_key_pem":{:?}}}, + {{"key_id":"k-2","public_key_pem":{:?}}} + ]}}"#, + pem1, pem2 + ); + let tmp = write_tmp(&json); + let store = TrustStore::load(tmp.path()).expect("loads"); + assert_eq!(store.len(), 2); + } +} diff --git a/proto/policy.proto b/proto/policy.proto new file mode 100644 index 000000000..52da0e3c6 --- /dev/null +++ b/proto/policy.proto @@ -0,0 +1,170 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package openshell.policy.v1alpha1; + +// OpenShell out-of-process policy delivery + handle-lifecycle contract. +// +// Defines the wire between the gateway and an external policy engine. The +// engine answers four RPCs over UDS; the gateway acquires a handle for each +// sandbox, fetches policy projections against that handle, and releases the +// handle when the sandbox is dropped. Engine identity and policy integrity +// are carried by signed envelope fields the gateway resolves against a +// trust store it owns. +// +// The wire is OpenShell-owned: this proto is the API, and any conforming +// engine on the other side is valid. + +// -------------------------------------------------------------------------- +// Service +// -------------------------------------------------------------------------- + +// Engine is the server side of the wire. A conforming implementation +// answers four RPCs over UDS. +service Engine { + // Health reports the engine's serving state. The gateway calls this + // once at startup, before admitting any sandbox through the engine, + // and may call it again periodically. + rpc Health(HealthRequest) returns (HealthResponse); + + // AcquireHandle binds a gateway-asserted runtime context to an + // engine-chosen opaque handle. The handle is the join key for all + // subsequent projection lookups for that sandbox. + rpc AcquireHandle(AcquireHandleRequest) returns (AcquireHandleResponse); + + // GetProjection returns the policy bound to `handle`, projected to the + // schema named by `surface_id`. The response envelope carries an + // optional detached signature the gateway verifies against its trust + // store before consuming `body`. + rpc GetProjection(GetProjectionRequest) returns (GetProjectionResponse); + + // ReleaseHandle drops engine-side state held for `handle`. Idempotent; + // releasing an unknown or already-released handle returns OK. + rpc ReleaseHandle(ReleaseHandleRequest) returns (ReleaseHandleResponse); +} + +// -------------------------------------------------------------------------- +// Health +// -------------------------------------------------------------------------- + +message HealthRequest {} + +message HealthResponse { + // Engine serving state. + enum Status { + // Proto3 zero default. Gateways should treat this as NOT_SERVING. + STATUS_UNSPECIFIED = 0; + // Engine is ready to bind handles and serve projections. + SERVING = 1; + // Engine is not currently serving — startup not complete, terminal + // failure, etc. + NOT_SERVING = 2; + // Graceful shutdown initiated. The gateway must stop admitting new + // sandboxes through this engine; already-bound handles remain valid + // until their sandboxes are released. + DRAINING = 3; + } + Status status = 1; +} + +// -------------------------------------------------------------------------- +// AcquireHandle +// -------------------------------------------------------------------------- + +// RuntimeContextEnvelope is the gateway-asserted set of facts about a +// sandbox session that the engine binds to a handle. +message RuntimeContextEnvelope { + // Stable identifier of the sandbox the policy applies to. + string sandbox_id = 1; + + // Authenticated subject of the principal that issued the sandbox + // request. Format is gateway-defined; engines should treat the value + // opaquely. + string user_subject = 2; + + // Unix-millis timestamp the gateway stamped at acquisition time. + // Engines may use this for freshness or replay checks. + int64 attested_at_ms = 3; + + // Detached signature over the preceding fields, produced by the + // gateway's runtime-context signing key. Empty when the gateway is + // not signing envelopes. + bytes signature = 4; +} + +message AcquireHandleRequest { + RuntimeContextEnvelope envelope = 1; +} + +message AcquireHandleResponse { + // Engine-chosen opaque token. The gateway treats this purely as + // bytes; it must not parse, hash, or otherwise derive identity from + // it. + bytes handle = 1; +} + +// -------------------------------------------------------------------------- +// GetProjection +// -------------------------------------------------------------------------- + +message GetProjectionRequest { + // Handle previously returned by AcquireHandle. + bytes handle = 1; + + // Schema identifier selecting the decoder for the response `body` + // (e.g. "openshell.sandbox.v1"). Surface version bumps track + // incompatible changes to the bound schema. + string surface_id = 2; +} + +// ProjectionEnvelope carries the policy bytes plus integrity metadata. +// +// When `signature` is non-empty the gateway verifies it (Ed25519) over the +// canonical serialization of `surface_id`, `schema_version`, +// `policy_digest`, `bundle_digest`, and `body`, using the public key the +// trust store resolves `signing_key_id` to. +message ProjectionEnvelope { + // Echoes the requested surface. Always populated. + string surface_id = 1; + + // Schema version of `body` within `surface_id`'s namespace. + string schema_version = 2; + + // Digest over `body`. Stable identifier for the projected policy + // bytes. + bytes policy_digest = 3; + + // Digest of the upstream source artifact this projection was lowered + // from. Provenance — one source produces many projections. + bytes bundle_digest = 4; + + // Encoded policy body. Decoding is determined by `surface_id` + + // `schema_version`. + bytes body = 5; + + // Detached signature over the envelope. May be empty; once non-empty, + // the gateway requires the trust store to recognise `signing_key_id` + // and the signature to verify. + bytes signature = 6; + + // Identifier of the key that produced `signature`, opaque to the + // gateway except for trust-store lookup. Empty when `signature` is + // empty. + string signing_key_id = 7; +} + +message GetProjectionResponse { + ProjectionEnvelope envelope = 1; +} + +// -------------------------------------------------------------------------- +// ReleaseHandle +// -------------------------------------------------------------------------- + +message ReleaseHandleRequest { + bytes handle = 1; +} + +message ReleaseHandleResponse {}