Pluggable Subsystems: Canonical Examples¶
On-demand reference. The rule in CLAUDE.md is: new cross-cutting subsystems follow a protocol + strategy + factory + config discriminator pattern, with safe defaults so the behaviour is opt-in. This page catalogues the canonical implementations.
Pattern recap¶
- Define a
Protocolinterface. - Ship concrete strategies that implement it.
- Register them in a factory keyed by a config discriminator.
- Plumb the active selection through frozen config.
- Ship safe defaults so nothing ever silently regresses.
Registries¶
Three registry classes replace the hand-rolled if config.type == "...": ... elif ... chains every factory used to carry. Each is immutable after construction (MappingProxyType-backed) and emits structured registry.* events for built / lookup / failure paths.
synthorg.core.registry.StrategyRegistry[T]: generic strategy dispatch keyed by aconfig.typediscriminator. Used across the codebase by factories for pruning, propagation, identity store, evolution triggers, evolution proposers, execution loops, training source selectors, training curation, procedural capture, notification sinks, secret backends, sandbox backends, per-op rate-limit stores, per-op inflight stores, memory-consolidation strategies (selector + op composite, ADR-0005), risk-tier classifiers (timeout policy seam, REWORK #9), and autonomy change strategies (promotion/downgrade plugin, REWORK #9).synthorg.persistence.registry.PersistenceBackendRegistry: domain-specific dispatch keyed byPersistenceConfig.backend; preserves the lazy import of the optionalpostgresextra.synthorg.memory.registry.MemoryBackendRegistry: domain-specific dispatch keyed byCompanyMemoryConfig.backend; the composite-backend child loop reuses a separate "leaf" registry to keep the wiring acyclic.
Each subsystem still owns its config discriminator; the registries replace only the dispatch step that translates the discriminator into a constructor call.
Canonical examples¶
Classification pipeline¶
engine/classification/protocol.py:Detector,ScopedContextLoader,ClassificationSink.budget/coordination_config.py: dispatcher.
Verification graders¶
engine/quality/decomposer_protocol.py:CriteriaDecomposer.engine/quality/grader_protocol.py:RubricGrader.engine/quality/verification_factory.py+engine/quality/verification_config.py.
Chief of Staff¶
meta/chief_of_staff/protocol.py:OutcomeStore,ConfidenceAdjuster,OrgInflectionSink,AlertSink.meta/chief_of_staff/config.py: discriminator.meta/factory.py::build_confidence_adjuster().
Analytics / telemetry¶
meta/telemetry/protocol.py:AnalyticsEmitter,AnalyticsCollector,RecommendationProvider.meta/telemetry/config.py: discriminator.meta/telemetry/factory.py::build_analytics_emitter().
Rollout strategies¶
meta/rollout/roster.py:OrgRoster.meta/rollout/group_aggregator.py:GroupSignalAggregator.meta/rollout/inverse_dispatch.py:RollbackHandler+ 4 mutator protocols.meta/factory.py::build_rollout_strategies()+build_rollback_executor().- All plumbed through frozen
SelfImprovementConfig, with safe defaults (SystemClockfromsynthorg.core.clock,NoOpOrgRoster, null aggregator) so the behaviour is opt-in.
Rollback mutators¶
Concrete implementations of the four mutator protocols live under meta/rollout/mutators/:
SettingsServiceConfigMutator(mutators/config_mutator.py): backsConfigMutatorwithSettingsService.set. Dotted target ("<namespace>.<key>");read_only_post_initsettings surface asRollbackMutationDeniedError.PrincipleOverridePromptMutator(mutators/prompt_mutator.py): backsPromptMutatorwithPrincipleOverrideRepository. Persists override rows thatengine/strategy/principles.py::load_packconsults on principle resolution. Schema:persistence/sqlite/revisions/20260513000002_principle_overrides.sql(and Postgres twin).RoutedArchitectureMutator(mutators/architecture_mutator.py): backsArchitectureMutatorwith a per-target-type adapter registry. Target format"<type>:<id>"(or"<type>:<id>:<sub_id>"); operators register adapters per type (role,department,workflow, etc.) without touching the executor. Unknown prefixes raiseUnknownArchitectureTargetError.WorkspaceCodeMutator(mutators/code_mutator.py): backsCodeMutatorwith atomic filesystem writes (tempfile.mkstemp+Path.replace) inside a workspace bounded byPathValidatorsorevert_codecannot escape via traversal.
Domain errors live at meta/errors.py::RollbackMutationDeniedError (409) and UnknownArchitectureTargetError. Wire-up assembles via meta/factory.py::build_rollback_executor(config_mutator=..., prompt_mutator=..., architecture_mutator=..., code_mutator=...) and stays opt-in: operators construct the executor in their own startup path when they want the self-improvement loop active.
API rate limits¶
api/rate_limits/protocol.py:SlidingWindowStore.api/rate_limits/in_memory.py.api/rate_limits/config.py::PerOpRateLimitConfig: discriminator.api/rate_limits/factory.py::build_sliding_window_store().
API per-op concurrency¶
api/rate_limits/inflight_protocol.py:InflightStore.api/rate_limits/in_memory_inflight.py.api/rate_limits/inflight_config.py::PerOpConcurrencyConfig: discriminator.api/rate_limits/inflight_factory.py::build_inflight_store().api/rate_limits/inflight_middleware.py::PerOpConcurrencyMiddleware(Litestar middleware that readsopt[per_op_concurrency]from each route handler).
Escalation queue¶
communication/conflict_resolution/escalation/protocol.py:EscalationQueueStore,DecisionProcessor.- In-memory / SQLite / Postgres implementations.
communication/conflict_resolution/escalation/config.py::EscalationQueueConfig: discriminator.communication/conflict_resolution/escalation/factory.py::build_escalation_queue_store().
Assignment ranking and pool filtering¶
engine/assignment/protocol.py:TaskAssignmentStrategy(the public Protocol; strategies are still selected by thestrategyconfig string).engine/assignment/pool_filter_protocol.py:CandidatePoolFilter(pre-scoring narrowing ofavailable_agents;IdentityPoolFilteris the default,HierarchicalPoolFilternarrows to subordinates of the task's delegator).engine/assignment/ranker_protocol.py:CandidateRanker(post-scoring ordering:ScoreDescendingRanker,WorkloadAscendingRanker,CostDescendingRanker,AuctionBidRanker).engine/assignment/scoring_based.py::ScoringBasedAssignmentStrategy: composes(scorer, pool_filter, ranker). The five logical assignment strategies (role_based,load_balanced,cost_optimized,auction,hierarchical) are allScoringBasedAssignmentStrategyinstances with different filter/ranker pairs.engine/assignment/registry.py::build_strategy_map(): the factory; preserves the public string discriminators.
HR pillar scoring¶
hr/evaluation/pillar_protocol.py:PillarScoringStrategy(the public per-pillar Protocol).hr/evaluation/metric_extractor_protocol.py:MetricExtractor(per-pillar sub-metric extraction). Implementations live underhr/evaluation/extractors/(one file per pillar: intelligence, efficiency, resilience, governance, experience).hr/evaluation/configurable_scorer.py::ConfigurablePillarScorer: composes(pillar, extractor)to satisfyPillarScoringStrategy. Owns the shared "redistribute weights -> weighted-average -> clamp -> confidence -> log ->PillarScore" pipeline so the per-pillar extractors stay focused on data extraction.hr/evaluation/evaluator.py::EvaluationService: factory + orchestrator. Each pillar has a_default_<pillar>()method that returnsConfigurablePillarScorer(pillar, <Pillar>MetricExtractor()). Callers can substitute any compatiblePillarScoringStrategyper pillar via the constructor's<pillar>_strategykeyword arguments.
Memory injection strategy¶
memory/injection.py:MemoryInjectionStrategyProtocol +InjectionStrategydiscriminator (CONTEXT,TOOL_BASED,SELF_EDITING).- Concrete implementations:
memory/context_injection.py::ContextInjectionStrategy,memory/tool_based.py::ToolBasedInjectionStrategy,memory/self_editing.py::SelfEditingMemoryStrategy. memory/retrieval_config.py::MemoryRetrievalConfig.strategy: discriminator.memory/injection_factory.py::build_memory_injection_strategy(): match-based dispatch withassert_neverexhaustiveness.
Engine recovery strategy¶
engine/recovery.py:RecoveryStrategyProtocol +FailAndReassignStrategy.engine/checkpoint/strategy.py::CheckpointRecoveryStrategy: resume-from-checkpoint sibling.engine/recovery_config.py::EngineRecoveryConfig.strategy(RecoveryStrategyType): discriminator.engine/recovery_factory.py::build_recovery_strategy(): match-based dispatch;RecoveryConfigErrorsurfaces missingcheckpoint_repo/checkpoint_configat boot rather than at recovery time.
Memory consolidation strategy (axis split, ADR-0005)¶
memory/consolidation/axis.py:EntrySelector+ConsolidationOpProtocols;SelectionGroup/ConsolidationContext/OpResultcontracts.- Selector:
memory/consolidation/selectors.py::HighestRelevanceSelector(shared by all three shipped strategies). - Ops:
memory/consolidation/ops.py(ConcatenationOp,DensityRoutingOp,ExtractivePreservationOp,AbstractiveSummarizationOp) +memory/consolidation/llm_op.py::LLMSynthesisOp. memory/consolidation/composite.py::CompositeConsolidationStrategy: selector + op aggregator (parallel=Truefor LLM cross-groupTaskGroupfan-out).memory/consolidation/config.py::ConsolidationStrategyType(SIMPLE/DUAL_MODE/LLM): discriminator.memory/consolidation/factory.py::build_consolidation_strategy():StrEnum-keyedStrategyRegistrydispatch;MemoryConfigErrorsurfaces missing op-specific deps at construction.
Risk-tier classifier (timeout policy seam, REWORK #9)¶
security/timeout/protocol.py::RiskTierClassifierProtocol (classify(action_type) -> ApprovalRiskLevel).- Impls:
risk_tier_classifier.py::DefaultRiskTierClassifier(safe default),workload_adaptive.py::WorkloadAdaptiveRiskClassifier,operator_configurable.py::OperatorConfigurableRiskClassifier,time_based_elevation.py::TimeBasedRiskElevationClassifier. risk_classifier_config.py::RiskClassifierTypediscriminator + frozenRiskClassifierConfig+RiskClassifierDeps(in-flight probe /Clockcollaborators).risk_classifier_factory.py::build_risk_tier_classifier():StrEnum-keyedStrategyRegistrydispatch;RiskClassifierConfigErrorsurfaces a missing required dep. Wired attimeout/factory.py::create_timeout_policy(tiered seam);SecOpsService+ approval-tool consumers stay on the default pending aSecurityConfig.risk_classifierfield.
Autonomy change strategy (promotion/downgrade plugin, REWORK #9)¶
security/autonomy/protocol.py::AutonomyChangeStrategyProtocol (request_promotion/auto_downgrade/request_recovery).- Impls:
change_strategy.py::HumanOnlyPromotionStrategy(safe default + override store),performance_gated.py,budget_aware.py,escalation_chain.py(each wraps the base via_base_delegate.py::BaseDelegatingStrategy). - Signal Protocols:
signals.py::PerformanceSignalProvider,RiskBudgetSignalProvider(injected, never concretehr//budget/imports). change_strategy_config.py::AutonomyStrategyTypediscriminator + frozenAutonomyStrategyConfig+AutonomyStrategyDeps.change_strategy_factory.py::build_autonomy_change_strategy():StrEnum-keyedStrategyRegistrydispatch;AutonomyStrategyConfigErrorsurfaces a missing required signal provider. No production seam wires a non-default strategy yet (surface-only, follow-up wires it end-to-end).
Conflict detector¶
communication/meeting/protocol.py:ConflictDetectorProtocol.communication/meeting/conflict_detection.py: six concrete implementations (KeywordConflictDetector,StructuredComparisonDetector,LlmJudgeDetector,EmbeddingSimilarityDetector,HybridDetector,AutoDetector).communication/meeting/enums.py::ConflictDetectorType: discriminator.communication/meeting/factory.py::build_conflict_detector():StrategyRegistrydispatch.
Trust strategy (conditional instantiation)¶
security/trust/protocol.py:TrustStrategyProtocol.security/trust/{weighted,per_category,milestone}_strategy.py: three real implementations.security/trust/config.py::TrustConfig.strategy(TrustStrategyTypewith aDISABLEDvalue): discriminator.security/trust/factory.py::build_trust_strategy(): registry dispatch that returnsNoneforDISABLEDso callers skipTrustServiceconstruction entirely (instead of wiring a no-op strategy).
Ontology versioning (inverted backend dependency)¶
ontology/versioning.py: pureEntityDefinitionsnapshot deserializers; carries no backend imports.persistence/sqlite/ontology_versioning.py::create_ontology_versioning(): SQLite-side factory.persistence/postgres/ontology_versioning.py::create_postgres_ontology_versioning(): Postgres-side factory.- Each backend's lifecycle helper composes the matching factory at startup, so the dependency arrow points
persistence -> ontology, never the reverse.
Backup handler registry (backend-pluggable)¶
backup/handlers/protocol.py:ComponentHandlerProtocol.backup/handlers/sqlite_persistence.py::SQLitePersistenceComponentHandler,backup/handlers/postgres_persistence.py::PostgresPersistenceComponentHandler,backup/handlers/memory.py::MemoryComponentHandler,backup/handlers/config_handler.py::ConfigComponentHandler.backup/registry.py::PERSISTENCE_BACKUP_HANDLER_REGISTRY:StrategyRegistrykeyed onconfig.persistence.backend("sqlite" / "postgres").backup/factory.py::build_backup_handlers(): dispatches perBackupComponentand uses the registry for the persistence handler.
Git backend storage strategy¶
engine/workspace/git_backend/protocol.py:GitBackend@runtime_checkableProtocol, withProvisionResult/PushResult/FetchResultfrozen result models.engine/workspace/git_backend/config.py:GitBackendConfig(frozen) withkind: GitBackendTypediscriminator andGitBackendDeps(collaborators not safe in frozen config:workspace_base_root,connection_catalog,secret_backend,clock).engine/workspace/git_backend/embedded.py::EmbeddedGitBackend(safe default: bare repo self-hosted on the persistent volume, no external dependency).engine/workspace/git_backend/local_path.py::LocalPathGitBackend(bring-your-own on-disk git repository, push/fetch are no-ops because the on-disk repo is the durable store).engine/workspace/git_backend/external_remote.py::ExternalRemoteGitBackend(GitHub / GitLab / Gitea / Forgejo resolved via the connection catalog; ships protocol + thin clone/push/fetch glue; deep OAuth hardening is a tracked follow-up).engine/workspace/git_backend/factory.py::build_git_backend():StrategyRegistry[GitBackend]keyed onGitBackendType. Missing required deps fail fast at construction withGitBackendConfigError. Wired at boot inapi/app.py::_install_runtime_servicesunder thehas_persistencegate, alongsideProjectWorkspaceService.
Stakes assessment (model-routing input)¶
engine/stakes/protocol.py:StakesAssessor@runtime_checkableProtocol (assess_task(task)/assess_subtask(subtask)returningStakes).engine/stakes/heuristic.py::DefaultStakesAssessor(safe default: deterministic, combines complexity base mapping, high/critical keyword signals, and critical-priority elevation; unknown complexity fails safe upward to HIGH).engine/stakes/config.py::StakesAssessmentConfig(frozen) withassessor: NotBlankStrdiscriminator, the complexity-to-stakes rules, and the keyword sets.engine/stakes/factory.py::build_stakes_assessor():StrategyRegistry[StakesAssessor]keyed onassessor("heuristic" default). Consumed byDecompositionService(per-subtask) and the work pipeline's LEAF path (parent task).
Stakes-aware model routing¶
engine/routing_policy/protocol.py:StakesRoutingStrategy@runtime_checkableProtocol (route(task, identity)returning a frozenStakesRoutingDecision).engine/routing_policy/strategies.py::StakesAwareStrategy(safe default: picks the cheapest tier whose benchmark score clears the per-stakesQualityFloors, bumps one tier when coordination metrics are unhealthy, marks high/critical work for the red-team gate, and never downgrades below the agent's configured tier) andFlatStrategy(no-op control / opt-out).engine/routing_policy/config.py::StakesRoutingConfig(frozen) withstrategy: NotBlankStrdiscriminator,QualityFloors(validated non-decreasing),red_team_min_stakes, and the coordination-nudge thresholds.engine/routing_policy/factory.py::build_stakes_router():StrategyRegistry[StakesRoutingStrategy]keyed onstrategy("stakes_aware" default; "stakes_aware" requires a benchmark provider, "flat" is dependency-free). Wired at boot inworkers/runtime_builder.py::_build_stakes_router_or_noneand injected intoAgentEngine, which applies routing before the budget auto-downgrade (a hard budget ceiling wins over a stakes upgrade).
Per-project environment strategy¶
engine/workspace/environment/protocol.py:EnvironmentStrategy@runtime_checkableProtocol (kind/detect/scaffold/declaration_hash/managed_paths/runtime_env_vars/provision), withProvisionedEnvironment/ScaffoldResult/CommandOutcomefrozen result models and theEnvironmentCommandRunnerseam (the resolved sandbox backend, adapted, so the subsystem never imports the tool layer).engine/workspace/environment/config.py:EnvironmentConfig(frozen) withkind: EnvironmentTypediscriminator andEnvironmentDeps(collaborators not safe in frozen config:image_builder,clock).engine/workspace/environment/manifest.py::ManifestEnvironmentStrategy(safe default: a committedsynthorg.env.yamlof lockfiles + ordered setup commands; runs in both sandboxes and emits a stockbootstrap.shso a fresh clone reproduces with no SynthOrg present).engine/workspace/environment/devcontainer.py::DevcontainerEnvironmentStrategy(builds a sealed image from.devcontainer/devcontainer.jsonviaimage_builder; Docker backend only, raisingEnvironmentBackendUnavailableErroron a subprocess-backed project).engine/workspace/environment/nix.py::NixEnvironmentStrategy(builds the declaredflake.nixdev shell vianix develop; tool-wrapping of subsequent calls is a documented boundary).engine/workspace/environment/factory.py::build_environment_strategy():StrategyRegistry[EnvironmentStrategy]keyed onEnvironmentType; the devcontainer strategy falls back to the defaultSubprocessImageBuilder(hostdocker build) when no builder is injected.engine/workspace/environment/service.py::EnvironmentService: provisions once per(project_id, declaration_hash)(persistedproject_environmentsrow is the durable cache), scaffolds + commits the declaration (GitWorkspaceCommitter), and is fail-loud. Wired at boot inapi/app.py::_install_runtime_servicesalongsideProjectWorkspaceService. The result threads to the agent's sandbox via the ambienttools/sandbox/active_environment.py::ActiveSandboxEnvironmentcontextvar (image override + env additions), set per task inworkers/execution_service.py.
Services are a distinct pattern (not pluggable subsystems)¶
A service wraps one or more repositories to keep controllers thin and centralise audit logging, and MAY orchestrate multiple repositories (e.g. WorkflowService spans workflow_definitions + workflow_versions; MemoryService spans fine-tune checkpoints + runs + settings).
The Protocol + Strategy + Factory + Config pattern applies only to genuinely cross-cutting subsystems that ship multiple interchangeable implementations selectable at runtime. Services do not need that machinery because there is exactly one service per domain.