Development Preview · PR #2103 · a19684b · built
Skip to content

API Layer

Litestar REST + WebSocket API: controllers, authentication, guards, and channels.

App

app

Litestar application factory.

Creates and configures the Litestar application with all controllers, middleware, exception handlers, plugins, and lifecycle hooks (startup/shutdown).

create_app

create_app(
    *,
    config=None,
    persistence=None,
    message_bus=None,
    cost_tracker=None,
    approval_store=None,
    auth_service=None,
    task_engine=None,
    coordinator=None,
    work_pipeline=None,
    intake_entry_adapter=None,
    task_board_entry_adapter=None,
    agent_registry=None,
    meeting_orchestrator=None,
    meeting_scheduler=None,
    performance_tracker=None,
    settings_service=None,
    provider_registry=None,
    provider_health_tracker=None,
    tool_invocation_tracker=None,
    delegation_record_store=None,
    artifact_storage=None,
    audit_log=None,
    trust_service=None,
    coordination_metrics_store=None,
    training_service=None,
    event_stream_hub=None,
    interrupt_store=None,
    client_simulation_state=None,
    _skip_lifecycle_shutdown=False,
)

Create and configure the Litestar application.

All parameters are optional for testing -- provide fakes via keyword arguments. Services not explicitly provided are auto-wired from config and environment variables.

Parameters:

Name Type Description Default
config RootConfig | None

Root company configuration.

None
persistence PersistenceBackend | None

Persistence backend.

None
message_bus MessageBus | None

Internal message bus.

None
cost_tracker CostTracker | None

Cost tracking service.

None
approval_store ApprovalStoreProtocol | None

Approval queue store.

None
auth_service AuthService | None

Pre-built auth service (for testing).

None
task_engine TaskEngine | None

Centralized task state engine.

None
coordinator MultiAgentCoordinator | None

Multi-agent coordinator.

None
work_pipeline WorkPipeline | None

Work pipeline spine (injected double wins over the boot-autowired one).

None
intake_entry_adapter WorkEntryAdapter[Any] | None

Real work-entry adapter (injected double wins over the boot-autowired one).

None
task_board_entry_adapter TaskBoardEntryAdapter | None

Real task-board work-entry adapter (injected double wins over the boot-autowired one).

None
agent_registry AgentRegistryService | None

Agent registry service.

None
meeting_orchestrator MeetingOrchestrator | None

Meeting orchestrator.

None
meeting_scheduler MeetingScheduler | None

Meeting scheduler.

None
performance_tracker PerformanceTracker | None

Performance tracking service.

None
settings_service SettingsService | None

Settings service for runtime config.

None
provider_registry ProviderRegistry | None

Provider registry.

None
provider_health_tracker ProviderHealthTracker | None

Provider health tracking service.

None
tool_invocation_tracker ToolInvocationTracker | None

Tool invocation tracking service.

None
delegation_record_store DelegationRecordStore | None

Delegation record store.

None
artifact_storage ArtifactStorageBackend | None

Artifact storage backend.

None
audit_log AuditLog | None

Pre-built audit log (auto-wired if None).

None
trust_service TrustService | None

Pre-built trust service.

None
coordination_metrics_store CoordinationMetricsStore | None

Pre-built metrics store (auto-wired if None).

None
training_service TrainingService | None

Pre-built training service (auto-wired in startup if None and dependencies are available).

None
event_stream_hub EventStreamHub | None

Pre-built event stream hub (auto-created if None).

None
interrupt_store InterruptStore | None

Pre-built interrupt store (auto-created if None).

None
client_simulation_state ClientSimulationState | None

Pre-built client simulation state. Wired before the optional-controller predicate check so the Simulation / Request controllers register correctly on a test app boot.

None
_skip_lifecycle_shutdown bool

Test-only flag. When True, the Litestar app is built with an empty on_shutdown list so the lifespan exit is a no-op. Used by the session-scoped test fixture in tests/unit/api/conftest.py to reuse the same app across tests without tearing down the task engine, message bus, and persistence between each one. Never use in production: shutdown hooks perform critical cleanup (task-engine drain, persistence disconnect, health prober stop, etc.).

False

Returns:

Type Description
Litestar

Configured Litestar application.

Source code in src/synthorg/api/app.py
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
def create_app(  # noqa: PLR0913
    *,
    config: RootConfig | None = None,
    persistence: PersistenceBackend | None = None,
    message_bus: MessageBus | None = None,
    cost_tracker: CostTracker | None = None,
    approval_store: ApprovalStoreProtocol | None = None,
    auth_service: AuthService | None = None,
    task_engine: TaskEngine | None = None,
    coordinator: MultiAgentCoordinator | None = None,
    work_pipeline: WorkPipeline | None = None,
    intake_entry_adapter: WorkEntryAdapter[Any] | None = None,
    task_board_entry_adapter: TaskBoardEntryAdapter | None = None,
    agent_registry: AgentRegistryService | None = None,
    meeting_orchestrator: MeetingOrchestrator | None = None,
    meeting_scheduler: MeetingScheduler | None = None,
    performance_tracker: PerformanceTracker | None = None,
    settings_service: SettingsService | None = None,
    provider_registry: ProviderRegistry | None = None,
    provider_health_tracker: ProviderHealthTracker | None = None,
    tool_invocation_tracker: ToolInvocationTracker | None = None,
    delegation_record_store: DelegationRecordStore | None = None,
    artifact_storage: ArtifactStorageBackend | None = None,
    audit_log: AuditLog | None = None,
    trust_service: TrustService | None = None,
    coordination_metrics_store: CoordinationMetricsStore | None = None,
    training_service: TrainingService | None = None,
    event_stream_hub: EventStreamHub | None = None,
    interrupt_store: InterruptStore | None = None,
    client_simulation_state: ClientSimulationState | None = None,
    _skip_lifecycle_shutdown: bool = False,
) -> Litestar:
    """Create and configure the Litestar application.

    All parameters are optional for testing -- provide fakes via
    keyword arguments.  Services not explicitly provided are
    auto-wired from config and environment variables.

    Args:
        config: Root company configuration.
        persistence: Persistence backend.
        message_bus: Internal message bus.
        cost_tracker: Cost tracking service.
        approval_store: Approval queue store.
        auth_service: Pre-built auth service (for testing).
        task_engine: Centralized task state engine.
        coordinator: Multi-agent coordinator.
        work_pipeline: Work pipeline spine (injected double wins over
            the boot-autowired one).
        intake_entry_adapter: Real work-entry adapter (injected double
            wins over the boot-autowired one).
        task_board_entry_adapter: Real task-board work-entry adapter
            (injected double wins over the boot-autowired one).
        agent_registry: Agent registry service.
        meeting_orchestrator: Meeting orchestrator.
        meeting_scheduler: Meeting scheduler.
        performance_tracker: Performance tracking service.
        settings_service: Settings service for runtime config.
        provider_registry: Provider registry.
        provider_health_tracker: Provider health tracking service.
        tool_invocation_tracker: Tool invocation tracking service.
        delegation_record_store: Delegation record store.
        artifact_storage: Artifact storage backend.
        audit_log: Pre-built audit log (auto-wired if None).
        trust_service: Pre-built trust service.
        coordination_metrics_store: Pre-built metrics store
            (auto-wired if None).
        training_service: Pre-built training service (auto-wired
            in startup if None and dependencies are available).
        event_stream_hub: Pre-built event stream hub (auto-created
            if None).
        interrupt_store: Pre-built interrupt store (auto-created
            if None).
        client_simulation_state: Pre-built client simulation state.
            Wired before the optional-controller predicate check so
            the Simulation / Request controllers register correctly
            on a test app boot.
        _skip_lifecycle_shutdown: Test-only flag.  When ``True``, the
            Litestar app is built with an empty ``on_shutdown`` list so
            the lifespan exit is a no-op.  Used by the session-scoped
            test fixture in ``tests/unit/api/conftest.py`` to reuse the
            same app across tests without tearing down the task engine,
            message bus, and persistence between each one.  Never use
            in production: shutdown hooks perform critical cleanup
            (task-engine drain, persistence disconnect, health prober
            stop, etc.).

    Returns:
        Configured Litestar application.
    """
    effective_config = config or RootConfig(company_name="default")

    # Activate the structured logging pipeline before any
    # other setup so that auto-wiring, persistence, and bus logs all
    # flow through the configured sinks.  Respects SYNTHORG_LOG_DIR
    # env var for Docker log directory override.
    try:
        effective_config = _bootstrap_app_logging(effective_config)
    except Exception as exc:
        print(  # noqa: T201
            f"CRITICAL: Failed to initialise logging pipeline: {safe_error_description(exc)}. "  # noqa: E501
            "Check SYNTHORG_LOG_DIR, SYNTHORG_LOG_LEVEL, and the "
            "'logging' section of your config file.",
            file=sys.stderr,
            flush=True,
        )
        raise

    api_config = effective_config.api

    # Resolve runtime paths for backup service wiring.
    resolved_db_path: Path | None = None
    resolved_config_path_str = (os.environ.get("SYNTHORG_CONFIG_PATH") or "").strip()
    resolved_config_path: Path | None = (
        Path(resolved_config_path_str) if resolved_config_path_str else None
    )

    # Read persistence env vars unconditionally so downstream code
    # (e.g. the secret-backend gate below) can still observe which
    # environment choice won, even when ``persistence`` was injected
    # by the caller rather than auto-wired here.
    db_url = (os.environ.get("SYNTHORG_DATABASE_URL") or "").strip()
    db_path = (os.environ.get("SYNTHORG_DB_PATH") or "").strip()

    # Auto-wire persistence from CLI-provided env vars. The CLI compose
    # template sets ONE of these per init choice:
    #   * SYNTHORG_DATABASE_URL=postgresql://user:pass@host:port/db   (postgres)
    #   * SYNTHORG_DB_PATH=/data/synthorg.db                          (sqlite)
    # Postgres takes precedence so a half-converted state (both env
    # vars present) does not silently fall back to SQLite. The startup
    # lifecycle handles connect() + migrate() + auth service creation.
    if persistence is None:
        if db_url:
            try:
                pg_persistence_config = build_postgres_persistence_config_from_url(
                    db_url,
                    ssl_mode_override=normalize_ssl_mode_value(
                        os.environ.get("SYNTHORG_POSTGRES_SSL_MODE"),
                    ),
                )
                persistence = create_backend(pg_persistence_config)
            except Exception as exc:
                reraise_critical(exc)
                log_exception_redacted(
                    logger,
                    API_APP_STARTUP,
                    exc,
                    note="Postgres persistence creation failed",
                )
                raise
            assert pg_persistence_config.postgres is not None  # noqa: S101
            logger.info(
                API_APP_STARTUP,
                note="Auto-wired Postgres persistence from SYNTHORG_DATABASE_URL",
                host=pg_persistence_config.postgres.host,
                database=pg_persistence_config.postgres.database,
            )
            # Postgres has no on-disk artifact directory tied to the DB
            # path, so default artifact storage to /data (the standard
            # data volume in the CLI compose template) when not set.
            if artifact_storage is None:
                artifact_dir_str = _resolve_artifact_dir_env()
                artifact_storage = build_filesystem_artifact_storage(
                    data_dir=Path(artifact_dir_str),
                )
                logger.info(
                    API_APP_STARTUP,
                    note="Auto-wired filesystem artifact storage (postgres mode)",
                    data_dir=artifact_dir_str,
                )
        elif db_path:
            resolved_db_path = Path(db_path)
            try:
                persistence = create_backend(
                    build_sqlite_persistence_config(path=db_path),
                )
            except Exception as exc:
                reraise_critical(exc)
                log_exception_redacted(
                    logger,
                    API_APP_STARTUP,
                    exc,
                    note="Failed to create persistence backend from env",
                )
                raise
            logger.info(
                API_APP_STARTUP,
                note="Auto-wired SQLite persistence from SYNTHORG_DB_PATH",
                db_name=Path(db_path).name,
            )
            # Auto-wire artifact storage from the same data directory.
            if artifact_storage is None:
                artifact_storage = build_filesystem_artifact_storage(
                    data_dir=resolved_db_path.parent,
                )
                logger.info(
                    API_APP_STARTUP,
                    note="Auto-wired filesystem artifact storage",
                )

    # ── Construction-time auto-wire: services that don't need connected persistence ──
    phase1 = auto_wire_phase1(
        effective_config=effective_config,
        persistence=persistence,
        message_bus=message_bus,
        cost_tracker=cost_tracker,
        task_engine=task_engine,
        provider_registry=provider_registry,
        provider_health_tracker=provider_health_tracker,
    )
    message_bus = phase1.message_bus
    cost_tracker = phase1.cost_tracker
    task_engine = phase1.task_engine
    provider_registry = phase1.provider_registry
    provider_health_tracker = phase1.provider_health_tracker
    distributed_task_queue = phase1.distributed_task_queue
    distributed_dispatcher = phase1.distributed_dispatcher
    distributed_backend_services = phase1.distributed_backend_services

    # Pre-meetings; versioning wires on startup once persistence.connect() runs.
    if agent_registry is None:
        agent_registry = AgentRegistryService()
        logger.info(API_SERVICE_AUTO_WIRED, service="agent_registry")

    # ── Meeting auto-wire: orchestrator + scheduler (construction-time) ──
    meeting_wire = auto_wire_meetings(
        effective_config=effective_config,
        meeting_orchestrator=meeting_orchestrator,
        meeting_scheduler=meeting_scheduler,
        agent_registry=agent_registry,
        provider_registry=provider_registry,
        persistence=persistence,
    )
    meeting_orchestrator = meeting_wire.meeting_orchestrator
    meeting_scheduler = meeting_wire.meeting_scheduler
    ceremony_scheduler = meeting_wire.ceremony_scheduler

    channels_plugin = create_channels_plugin()
    expire_callback = _make_expire_callback(channels_plugin)
    effective_approval_store: ApprovalStoreProtocol = (
        approval_store
        if approval_store is not None
        else ApprovalStore(on_expire=expire_callback)
    )

    # Wire meeting event publisher to the meetings WS channel.
    if meeting_scheduler is not None and meeting_scheduler._event_publisher is None:  # noqa: SLF001
        meeting_scheduler._event_publisher = _make_meeting_publisher(  # noqa: SLF001
            channels_plugin,
        )

    # Auto-wire performance tracker with composite quality strategy
    # when not explicitly injected (production path).
    if performance_tracker is None:
        performance_tracker = _build_performance_tracker(
            cost_tracker=cost_tracker,
            provider_registry=provider_registry,
            perf_config=effective_config.performance,
        )

    notification_dispatcher = build_notification_dispatcher(
        effective_config.notifications,
    )

    # -- Integration services auto-wire ──────────────────────────────────
    integrations = auto_wire_integrations(
        effective_config=effective_config,
        persistence=persistence,
        message_bus=message_bus,
        api_config=api_config,
        ceremony_scheduler=ceremony_scheduler,
        db_url=db_url,
        resolved_db_path=resolved_db_path,
        boot_db_path=db_path,
    )
    connection_catalog = integrations.connection_catalog
    oauth_token_manager = integrations.oauth_token_manager
    health_prober_service = integrations.health_prober_service
    tunnel_provider = integrations.tunnel_provider
    webhook_event_bridge = integrations.webhook_event_bridge
    mcp_catalog_service = integrations.mcp_catalog_service
    mcp_installations_repo = integrations.mcp_installations_repo

    # Auto-wire control-plane services when not injected.
    if audit_log is None:
        audit_log = AuditLog()
    if coordination_metrics_store is None:
        coordination_metrics_store = CoordinationMetricsStore(
            max_entries=_resolve_budget_int("coordination_metrics_max_entries"),
        )
    if trust_service is None:
        trust_service = _build_configured_trust_service(effective_config.trust)
    autonomy_change_strategy = _build_configured_autonomy_change_strategy(
        effective_config.config.autonomy,
    )

    # One boot clock shared between the uptime baseline and AppState so
    # ``app_state.clock`` and ``startup_time`` cannot diverge, and a
    # FakeClock injected via AppState in tests governs both.
    _boot_clock = SystemClock()
    app_state = AppState(
        clock=_boot_clock,
        config=effective_config,
        persistence=persistence,
        message_bus=message_bus,
        cost_tracker=cost_tracker,
        approval_store=effective_approval_store,
        auth_service=auth_service,
        task_engine=task_engine,
        coordinator=coordinator,
        work_pipeline=work_pipeline,
        intake_entry_adapter=intake_entry_adapter,
        task_board_entry_adapter=task_board_entry_adapter,
        agent_registry=agent_registry,
        meeting_orchestrator=meeting_orchestrator,
        meeting_scheduler=meeting_scheduler,
        ceremony_scheduler=ceremony_scheduler,
        performance_tracker=performance_tracker,
        settings_service=settings_service,
        provider_registry=provider_registry,
        provider_health_tracker=provider_health_tracker,
        tool_invocation_tracker=tool_invocation_tracker,
        delegation_record_store=delegation_record_store,
        artifact_storage=artifact_storage,
        notification_dispatcher=notification_dispatcher,
        audit_log=audit_log,
        trust_service=trust_service,
        autonomy_change_strategy=autonomy_change_strategy,
        coordination_metrics_store=coordination_metrics_store,
        event_stream_hub=event_stream_hub or EventStreamHub(),
        interrupt_store=interrupt_store or InterruptStore(),
        connection_catalog=connection_catalog,
        oauth_token_manager=oauth_token_manager,
        health_prober_service=health_prober_service,
        tunnel_provider=tunnel_provider,
        webhook_event_bridge=webhook_event_bridge,
        mcp_catalog_service=mcp_catalog_service,
        mcp_installations_repo=mcp_installations_repo,
        training_service=training_service,
        startup_time=_boot_clock.monotonic(),
    )
    if distributed_task_queue is not None:
        app_state.set_distributed_task_queue(distributed_task_queue)
    if distributed_dispatcher is not None:
        # Late-bind the live bridge-config provider now that AppState
        # exists (the dispatcher is built in auto_wire_phase1 before
        # AppState). Each publish then reads the current snapshot, so
        # an operator hot-reload of a workers.dispatcher_publish_*
        # setting takes effect without restarting the dispatcher.
        distributed_dispatcher.set_workers_bridge_provider(
            lambda: app_state.workers_bridge_config,
        )
    if distributed_backend_services is not None:
        app_state.set_distributed_backend_services(distributed_backend_services)

    # Opaque pagination cursor HMAC secret.  Loaded from the
    # ``SYNTHORG_PAGINATION_CURSOR_SECRET`` env var; rolling with a
    # random per-process key silently invalidates every client cursor
    # on every restart, which is a correctness defect, not a warning.
    # We refuse to boot unconditionally -- dev, pre-release, and prod
    # share the same posture so this latent failure can never hide
    # behind a "looks fine in dev" code path.
    cursor_secret = CursorSecret.from_config(CursorConfig.from_env())
    app_state.set_cursor_secret(cursor_secret)
    if cursor_secret.is_ephemeral:
        msg = (
            "refusing to start with an ephemeral pagination cursor "
            "secret; set SYNTHORG_PAGINATION_CURSOR_SECRET to a stable "
            "value (>= 16 bytes)."
        )
        # Emit the structured error before raising so centralized log
        # collectors see the refusal reason even if the caller swallows
        # or reformats the exception message.
        logger.error(
            API_APP_STARTUP,
            note="refusing startup: ephemeral pagination cursor secret",
        )
        raise RuntimeError(msg)

    # Human escalation approval queue. Builds the pluggable store +
    # processor + Future registry and attaches them to ``AppState``
    # so the escalations controller and the
    # ``HumanEscalationResolver`` share a single instance.
    escalation_config = effective_config.communication.conflict_resolution.escalation
    _escalation_store = build_escalation_queue_store(
        escalation_config,
        persistence,
    )
    # Communication-domain services (messages + meetings).
    # Both wrap pre-existing infrastructure (bus + persistence,
    # orchestrator) and centralize audit logging so HTTP controllers
    # and MCP handlers route through a single facade per
    # `docs/reference/conventions.md` § Repository CRUD pattern. The
    # MCP handlers in `meta/mcp/handlers/communication.py` already
    # call `app_state.message_service` / `app_state.meeting_service`,
    # so wiring here is what activates them in production.
    if message_bus is not None and persistence is not None:
        from synthorg.communication.messages.service import (  # noqa: PLC0415
            MessageService,
        )

        app_state.set_message_service(
            MessageService(bus=message_bus, persistence=persistence),
        )
    if meeting_orchestrator is not None:
        from synthorg.communication.meetings.service import (  # noqa: PLC0415
            MeetingService,
        )

        app_state.set_meeting_service(
            MeetingService(orchestrator=meeting_orchestrator),
        )

    app_state.set_escalation_store(_escalation_store)
    app_state.set_escalation_processor(build_decision_processor(escalation_config))
    _escalation_registry = PendingFuturesRegistry()
    app_state.set_escalation_registry(_escalation_registry)
    app_state.set_escalation_sweeper(
        EscalationExpirationSweeper(
            _escalation_store,
            interval_seconds=escalation_config.sweeper_interval_seconds,
        ),
    )
    # Cross-instance wake-up subscriber. No-op unless the queue
    # backend is Postgres and ``cross_instance_notify`` is enabled;
    # otherwise the sweeper and per-resolver timeout cover eventual
    # consistency on their own.
    app_state.set_escalation_notify_subscriber(
        build_escalation_notify_subscriber(
            escalation_config,
            _escalation_store,
            _escalation_registry,
            reconnect_delay_seconds=escalation_config.reconnect_delay_seconds,
            config_resolver=app_state.config_resolver
            if app_state.has_config_resolver
            else None,
        ),
    )

    bridge = (
        MessageBusBridge(
            message_bus,
            channels_plugin,
            config_resolver=(
                app_state.config_resolver if app_state.has_config_resolver else None
            ),
        )
        if message_bus is not None
        else None
    )
    backup_service = build_backup_service(
        effective_config,
        resolved_db_path=resolved_db_path,
        resolved_config_path=resolved_config_path,
    )
    # ``_build_settings_dispatcher`` needs the scheduler instance to
    # wire the ``security.timeout_check_interval_seconds`` subscriber,
    # so the scheduler must be in scope before the dispatcher is built.
    approval_timeout_scheduler = _build_default_approval_timeout_scheduler(
        approval_store=effective_approval_store,
    )
    settings_dispatcher = _build_settings_dispatcher(
        message_bus,
        settings_service,
        effective_config,
        app_state,
        backup_service,
        approval_timeout_scheduler,
    )
    plugins: list[ChannelsPlugin] = [channels_plugin]
    rate_limiter_enabled = _resolve_rate_limiter_enabled()
    if not rate_limiter_enabled:
        logger.warning(
            API_APP_STARTUP,
            note=(
                "global rate limiter disabled by api.rate_limiter_enabled;"
                " do not deploy this configuration to production"
            ),
        )
    middleware = _build_middleware(
        api_config,
        a2a_enabled=effective_config.a2a.enabled,
        rate_limiter_enabled=rate_limiter_enabled,
    )

    # Integration controllers add ~20 routes (~0.7s of Litestar
    # registration per create_app). Skip them entirely when the
    # integrations subsystem is disabled, so unit tests that do not
    # exercise integration endpoints pay no registration cost.
    #
    # When enabled, gate each controller by its own collaborators
    # instead of a single boolean. ``MCPCatalogController`` only
    # needs ``mcp_catalog_service``; ``WebhooksController`` needs a
    # bus; ``TunnelController`` needs ``tunnel_provider``. A single
    # global gate either under-exposes controllers that are ready
    # or over-exposes ones whose dependencies failed to auto-wire.
    integration_controllers: tuple[type[Controller], ...] = ()
    if effective_config.integrations.enabled:
        from synthorg.api.controllers.connections import (  # noqa: PLC0415
            ConnectionsController,
        )
        from synthorg.api.controllers.integration_health import (  # noqa: PLC0415
            IntegrationHealthController,
        )
        from synthorg.api.controllers.mcp_catalog import (  # noqa: PLC0415
            MCPCatalogController,
        )
        from synthorg.api.controllers.oauth import OAuthController  # noqa: PLC0415
        from synthorg.api.controllers.tunnel import (  # noqa: PLC0415
            TunnelController,
        )
        from synthorg.api.controllers.webhooks import (  # noqa: PLC0415
            WebhooksController,
        )

        controller_readiness: tuple[
            tuple[type[Controller], tuple[tuple[str, object], ...]], ...
        ] = (
            (
                ConnectionsController,
                (("connection_catalog", connection_catalog),),
            ),
            (
                IntegrationHealthController,
                (("connection_catalog", connection_catalog),),
            ),
            (
                OAuthController,
                (
                    ("connection_catalog", connection_catalog),
                    ("persistence", persistence),
                ),
            ),
            (
                WebhooksController,
                (
                    ("connection_catalog", connection_catalog),
                    ("message_bus", message_bus),
                ),
            ),
            (
                MCPCatalogController,
                (("mcp_catalog_service", mcp_catalog_service),),
            ),
            (
                TunnelController,
                (("tunnel_provider", tunnel_provider),),
            ),
        )
        ready: list[type[Controller]] = []
        for controller_cls, deps in controller_readiness:
            missing = [name for name, value in deps if value is None]
            if missing:
                logger.warning(
                    API_APP_STARTUP,
                    note="skipping integration controller (missing deps)",
                    controller=controller_cls.__name__,
                    missing=missing,
                )
                continue
            ready.append(controller_cls)
        integration_controllers = tuple(ready)

    # ── A2A gateway auto-wire ─────────────────────────────────────
    a2a_controllers: tuple[type[Controller], ...] = ()
    a2a_root_controllers: tuple[type[Controller], ...] = ()
    if effective_config.a2a.enabled:
        # Build every A2A artefact into local variables FIRST so an
        # exception anywhere in the construction chain leaves
        # ``app_state`` untouched. Only commit to ``app_state`` and
        # the controller tuples after every required object is
        # successfully constructed -- otherwise a half-wired surface
        # would survive a non-fatal failure (card builder set but
        # client absent, peer registry registered but gateway
        # controller missing, etc.).
        a2a_card_builder = None
        a2a_root_pending: tuple[type[Controller], ...] = ()
        a2a_pending: tuple[type[Controller], ...] = ()
        a2a_peer_registry = None
        a2a_client_obj = None
        try:
            from synthorg.a2a.agent_card import (  # noqa: PLC0415
                AgentCardBuilder,
            )
            from synthorg.a2a.models import A2AAuthSchemeInfo  # noqa: PLC0415
            from synthorg.a2a.well_known import (  # noqa: PLC0415
                WellKnownAgentCardController,
            )

            auth_schemes = (
                A2AAuthSchemeInfo(
                    scheme=str(
                        effective_config.a2a.auth.inbound_scheme,
                    ),
                ),
            )
            a2a_card_builder = AgentCardBuilder(
                default_auth_schemes=auth_schemes,
            )
            a2a_root_pending = (WellKnownAgentCardController,)

            # Outbound client + JSON-RPC gateway need the connection
            # catalog and integrations enabled.
            if effective_config.integrations.enabled and connection_catalog is not None:
                import httpx  # noqa: PLC0415

                from synthorg.a2a.client import A2AClient  # noqa: PLC0415
                from synthorg.a2a.gateway import (  # noqa: PLC0415
                    A2AGatewayController,
                )
                from synthorg.a2a.peer_registry import (  # noqa: PLC0415
                    PeerRegistry,
                )

                a2a_peer_registry = PeerRegistry()
                a2a_client_timeout = float(
                    resolve_init_value(
                        SettingNamespace.A2A,
                        "client_timeout_seconds",
                        parse=parse_float,
                    ).value
                )
                a2a_http_client = httpx.AsyncClient(timeout=a2a_client_timeout)
                from synthorg.tools.network_validator import (  # noqa: PLC0415
                    NetworkPolicy,
                )

                a2a_network_policy = NetworkPolicy()
                a2a_client_obj = A2AClient(
                    connection_catalog,
                    network_validator=a2a_network_policy,
                    http_client=a2a_http_client,
                    timeout_seconds=a2a_client_timeout,
                )
                a2a_pending = (A2AGatewayController,)
        except Exception as exc:
            reraise_critical(exc)
            logger.warning(
                API_APP_STARTUP,
                note="A2A gateway auto-wire failed (non-fatal)",
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
        else:
            # Commit only on full success. Partial failures land in
            # the except branch above with all ``app_state`` slots
            # still empty.
            app_state.set_a2a_card_builder(a2a_card_builder)
            a2a_root_controllers = a2a_root_pending
            if a2a_peer_registry is not None and a2a_client_obj is not None:
                app_state.set_a2a_peer_registry(a2a_peer_registry)
                app_state.set_a2a_client(a2a_client_obj)
                a2a_controllers = a2a_pending
            logger.info(
                API_SERVICE_AUTO_WIRED,
                service="a2a_gateway",
            )

    # Client-simulation runtime. An explicit kwarg always wins (test
    # doubles / bespoke wiring). Otherwise, when a TaskEngine is
    # present, build the live runtime (real IntakeEngine + review
    # pipeline) so ``has_simulation_runtime`` is true and the
    # ``/simulations`` + ``/requests`` controllers register; the
    # default ``direct`` intake strategy makes no LLM calls and works
    # for an empty company. With no TaskEngine the intake engine has
    # nothing to create tasks against, so fall back to a fresh empty
    # ``ClientSimulationState()`` -- the always-registered
    # ``ClientController`` still serves an empty ``/clients`` list
    # instead of 503ing on every dashboard poll. This mirrors the
    # ``review_gate_service`` "build it whenever task_engine exists"
    # gate above.
    if client_simulation_state is None:
        if task_engine is not None:
            from synthorg.client.runtime_builder import (  # noqa: PLC0415
                build_client_simulation_runtime,
            )

            client_simulation_state = build_client_simulation_runtime(app_state)
        else:
            from synthorg.client.simulation_state import (  # noqa: PLC0415
                ClientSimulationState as _ClientSimulationState,
            )

            client_simulation_state = _ClientSimulationState()
    app_state.set_client_simulation_state(client_simulation_state)

    # Optional controllers gated on their primary collaborator service.
    # Routes for unconfigured subsystems are not registered at all so
    # the dashboard receives 404 (route does not exist) instead of the
    # 503 it used to get for every poll cycle.  /capabilities reports
    # which subsystems are wired so the dashboard can skip the polling
    # loops at the source. Fail loudly when a registered predicate name
    # is missing from AppState (typo / rename) -- silently disabling
    # routes via ``getattr(..., False)`` would turn a wiring bug into
    # an unnoticed 404 surface regression.
    _optional: list[type[Controller]] = []
    for controller_cls, predicate_attr in OPTIONAL_CONTROLLERS:
        if not hasattr(app_state, predicate_attr):
            msg = (
                f"Optional controller predicate {predicate_attr!r} is "
                f"missing on AppState (controller={controller_cls.__name__})."
            )
            logger.error(API_APP_STARTUP, note=msg, controller=controller_cls.__name__)
            raise RuntimeError(msg)
        if bool(getattr(app_state, predicate_attr)):
            _optional.append(controller_cls)
    optional_controllers: tuple[type[Controller], ...] = tuple(_optional)

    api_router = Router(
        path=api_config.api_prefix,
        route_handlers=[
            *BASE_CONTROLLERS,
            *integration_controllers,
            *a2a_controllers,
            *optional_controllers,
            ws_handler,
        ],
        guards=[require_password_changed],
    )

    # Startup auto-wiring flag: persistence being non-None is the
    # enabling condition -- SettingsService needs connected persistence
    # and is created in on_startup after _init_persistence().
    _should_auto_wire = settings_service is None and persistence is not None

    # Review gate service -- transitions tasks from IN_REVIEW on approval.
    # Needs ``task_engine`` for self-review enforcement (preflight) and
    # state transitions; ``persistence`` is OPTIONAL and only used for
    # the auditable decisions drop-box.  Construct the service whenever
    # ``task_engine`` exists so the fail-fast self-review / missing-task
    # preflight still runs in task-engine-only deployments; decision
    # recording gracefully degrades to a WARNING-level no-op when
    # persistence is absent.
    if task_engine is not None:
        review_gate_service = ReviewGateService(
            task_engine=task_engine,
            persistence=persistence,
        )
        app_state.set_review_gate_service(review_gate_service)

    # ``approval_timeout_scheduler`` is built above (alongside the
    # backup service and bridge); the lifecycle owns starting it.
    # ``_apply_security_timeout_interval`` in ``lifecycle_helpers.py``
    # resolves the operator-tuned interval from ``ConfigResolver`` after
    # persistence connects and calls ``scheduler.reschedule(...)`` so the
    # configured cadence takes effect on the next loop tick. Default
    # policy is ``WaitForeverPolicy``: the scheduler runs but never
    # auto-decides. Operators swap in DenyOnTimeout / Tiered /
    # EscalationChain via the security.* settings at runtime.

    startup, shutdown = _build_lifecycle(
        persistence,
        message_bus,
        bridge,
        settings_dispatcher,
        task_engine,
        meeting_scheduler,
        backup_service,
        approval_timeout_scheduler,
        app_state,
        should_auto_wire_settings=_should_auto_wire,
        effective_config=effective_config,
    )

    _runtime_services_installed = False

    async def _install_runtime_services() -> None:
        # Installs the worker execution service AND the multi-agent
        # coordinator behind the single provider-present switch, both
        # sharing one boot AgentEngine. Appended first (runs immediately
        # after the core startup hooks that connect persistence and
        # wire SettingsService / ConfigResolver), and before any other
        # appended hook, so the once-only ``set_worker_execution_service``
        # / ``set_coordinator`` cannot lose a race with the
        # worker-service property's lazy lifecycle-only default. With no
        # provider this installs the empty-company backstop and no
        # coordinator (``/coordinate`` honestly 503s); a provider added
        # later swaps both in via ``post_setup_reinit`` (no restart). The
        # closure flag keeps the one-shot ``set_`` calls idempotent
        # across a lifespan re-entry (shared-app test fixtures),
        # mirroring ``_wire_chief_of_staff_chat``.
        nonlocal _runtime_services_installed
        if _runtime_services_installed:
            return
        from synthorg.engine.errors import (  # noqa: PLC0415
            RuntimeServicesBuildError,
        )
        from synthorg.workers.runtime_builder import (  # noqa: PLC0415
            build_runtime_services,
        )

        # Pin the sandbox workspace onto the mounted data volume in an
        # env-driven deployment so agent file/sandbox tools persist with
        # the runtime data, not a process temp dir. Injected/dev apps
        # return None and keep the documented temp fallback.
        env_workspace_root = resolve_agent_workspace_root_env()
        if env_workspace_root is not None:
            app_state.set_agent_workspace_root(env_workspace_root)

        # Per-project persistent workspace substrate. The git backend is
        # config-selected (embedded default, no external dep);
        # ProjectWorkspaceService provisions one persistent git-backed
        # tree per project under the workspace base. Persistence-less
        # boots (test fixtures, dev apps with no DB) skip wiring -- the
        _try_wire_cost_dial(app_state)
        _try_wire_cockpit(app_state)

        # service is optional and gates on ``has_project_workspace_service``.
        if app_state.has_persistence and app_state.project_workspace_service is None:
            # Guard against partial-startup retry: this hook fires once
            # the persistence layer is connected, but ``build_runtime_services``
            # below is fallible and a re-entry after its failure would
            # otherwise hit the ``_set_once`` guard inside
            # ``set_project_workspace_service`` and fail with
            # "already configured" instead of cleanly retrying the
            # runtime-services build.
            from synthorg.engine.workspace.git_backend import (  # noqa: PLC0415
                GitBackendConfig,
                GitBackendDeps,
                build_git_backend,
            )
            from synthorg.engine.workspace.project_workspace_service import (  # noqa: PLC0415
                ProjectWorkspaceService,
            )

            git_backend_config = GitBackendConfig()
            git_backend = build_git_backend(
                git_backend_config,
                GitBackendDeps(
                    workspace_base_root=app_state.agent_workspace_root,
                    connection_catalog=connection_catalog,
                    clock=app_state.clock,
                ),
            )
            app_state.set_project_workspace_service(
                ProjectWorkspaceService(
                    base_root=app_state.agent_workspace_root,
                    repo=app_state.persistence.project_workspaces,
                    git_backend=git_backend,
                    config=git_backend_config,
                    clock=app_state.clock,
                ),
            )

        # Per-project reproducible environment substrate (extracted to
        # keep this hook under the cyclomatic-complexity cap).
        _wire_environment_service(app_state)

        try:
            services = await build_runtime_services(
                app_state,
                workspace_root=app_state.agent_workspace_root,
            )
        except Exception as exc:
            reraise_critical(exc)
            log_exception_redacted(
                logger,
                API_APP_STARTUP,
                exc,
                service="runtime_services",
                note="failed to build the runtime services at boot",
                provider_present=app_state.has_active_provider,
            )
            msg = "Runtime services failed to build at boot"
            raise RuntimeServicesBuildError(msg) from exc
        app_state.set_worker_execution_service(
            services.worker_execution_service,
        )
        # An explicitly injected coordinator (``create_app(coordinator=)``
        # in tests / custom DI) wins over the autowired one, matching the
        # injection-over-autowire convention used across ``create_app``.
        # ``set_coordinator_if_absent`` makes the check-and-set atomic in
        # the seam (no boot-time check-then-act), so an injected
        # coordinator is kept and the built one is a logged no-op then.
        if services.coordinator is not None:
            app_state.set_coordinator_if_absent(services.coordinator)
        # Same injection-over-autowire rule for the work pipeline spine:
        # an injected ``create_app(work_pipeline=)`` is kept, the built
        # one is a logged no-op then.
        if services.work_pipeline is not None:
            app_state.set_work_pipeline_if_absent(services.work_pipeline)
        # Attach the vision verifier gate to the review gate service when
        # the subsystem is enabled. The service was built during app
        # construction (before a provider connected); the gate is built
        # here once the workspace + provider are available.
        if (
            services.vision_gate is not None
            and app_state.review_gate_service is not None
        ):
            app_state.review_gate_service.set_vision_gate(services.vision_gate)
        # Same seam for the adversarial red-team gate: built in the
        # runtime wiring once the boot engine exists, attached here so a
        # review pipeline supplied with red_team_input reaches the live
        # gate. ``None`` when the red-team subsystem is disabled.
        if (
            services.red_team_runtime is not None
            and app_state.review_gate_service is not None
        ):
            app_state.review_gate_service.set_red_team_gate(
                services.red_team_runtime.gate,
            )
        # Bring the real client-request, goal/objective, and
        # task-board work-entry paths online: ensure the configured
        # default projects exist and attach the entry adapters. No-op
        # for an empty company (no pipeline). The task-board adapter
        # follows the same gate but skips the project bootstrap (board
        # filings carry their own project).
        from synthorg.engine.pipeline.entry.boot import (  # noqa: PLC0415
            wire_real_intake_entry,
            wire_real_objective_entry,
            wire_real_task_board_entry,
        )

        await wire_real_intake_entry(app_state)
        await wire_real_objective_entry(app_state)
        await wire_real_task_board_entry(app_state)
        _runtime_services_installed = True

    _docs_engine_installed = False

    async def _wire_docs_engine() -> None:
        # Living-documentation engine. Constructs DocsService and the
        # ProjectAwareMemoryFacade behind the same persistence + project
        # workspace gate used by _install_runtime_services. The facade is
        # held on the engine bundle so the per-agent retrieval pipeline
        # can consult it when an execution context exposes a project_id;
        # the dev / empty-company path with no persistence cleanly skips
        # wiring.
        nonlocal _docs_engine_installed
        if _docs_engine_installed:
            return
        if not app_state.has_persistence:
            return
        if app_state.project_workspace_service is None:
            return
        if app_state.docs_service is not None:
            _docs_engine_installed = True
            return
        from synthorg.docs_engine.factory import (  # noqa: PLC0415
            build_docs_service,
        )
        from synthorg.docs_engine.tool_factory import (  # noqa: PLC0415
            DocsToolFactory,
        )

        if not app_state.has_memory_backend:
            logger.info(
                API_APP_STARTUP,
                service="docs_engine",
                note="memory backend not wired; docs engine wiring skipped",
            )
            return
        runtime = build_docs_service(
            repo=app_state.persistence.project_docs,
            workspace_service=app_state.project_workspace_service,
            git_backend=app_state.project_workspace_service.git_backend,
            memory_backend=app_state.memory_backend,
            clock=app_state.clock,
        )
        app_state.set_docs_service(runtime.docs_service)
        app_state.set_project_doc_memory_facade(runtime.memory_facade)
        app_state.set_docs_tool_factory(
            DocsToolFactory(docs_service=runtime.docs_service)
        )
        _docs_engine_installed = True

    _knowledge_engine_installed = False

    async def _wire_knowledge_engine() -> None:
        # Knowledge + provenance substrate. Constructs the
        # KnowledgeService over the connected persistence repos and the
        # memory backend (the pluggable vector store), behind the same
        # persistence + memory gate as the docs engine. Web ingestion
        # needs a governed HTTP fetcher injected here; until that transport
        # is wired the service ingests PDF + repo sources and rejects WEB.
        nonlocal _knowledge_engine_installed
        if _knowledge_engine_installed:
            return
        if not app_state.has_persistence:
            return
        if app_state.knowledge_service is not None:
            _knowledge_engine_installed = True
            return
        if not app_state.has_memory_backend:
            logger.info(
                API_APP_STARTUP,
                service="knowledge_engine",
                note="memory backend not wired; knowledge engine wiring skipped",
            )
            return
        from synthorg.knowledge.config import KnowledgeConfig  # noqa: PLC0415
        from synthorg.knowledge.factory import (  # noqa: PLC0415
            build_knowledge_service,
        )
        from synthorg.knowledge.tool_factory import (  # noqa: PLC0415
            build_knowledge_tool_factory,
        )

        service = build_knowledge_service(
            memory_backend=app_state.memory_backend,
            persistence=app_state.persistence,
            config=KnowledgeConfig(enabled=True),
            clock=app_state.clock,
        )
        app_state.set_knowledge_service(service)
        app_state.set_knowledge_tool_factory(
            build_knowledge_tool_factory(service=service)
        )
        _knowledge_engine_installed = True

    _research_engine_installed = False

    async def _wire_research_engine() -> None:
        # Research subsystem: builds the ResearchService over the connected
        # persistence repo and the configured completion provider, behind
        # the research.enabled + research.model settings. Best-effort and
        # idempotent (mirrors the cost-dial / knowledge wiring): a missing
        # provider, unset model, or disabled flag logs and skips rather
        # than poisoning startup. Web / academic / code retrieval sources
        # are vendor-agnostic and wire only when a provider is injected, so
        # the boot service fans out to the knowledge substrate alone.
        nonlocal _research_engine_installed
        if _research_engine_installed:
            return
        if not app_state.has_persistence:
            return
        if app_state.research_service is not None:
            _research_engine_installed = True
            return
        if not app_state.has_settings_service or provider_registry is None:
            return
        runtime_settings = app_state.settings_service
        try:
            from synthorg.research.config import ResearchConfig  # noqa: PLC0415
            from synthorg.research.factory import (  # noqa: PLC0415
                build_research_service,
            )
            from synthorg.research.tool_factory import (  # noqa: PLC0415
                build_research_tool_factory,
            )

            enabled = (
                await runtime_settings.get("research", "enabled")
            ).value.strip().lower() == "true"
            model = (await runtime_settings.get("research", "model")).value.strip()
            if not enabled or not model:
                logger.info(
                    API_APP_STARTUP,
                    service="research_engine",
                    note="research disabled or model unset; wiring skipped",
                )
                return
            provider_names = provider_registry.list_providers()
            if not provider_names:
                return
            provider_name = (
                await runtime_settings.get("research", "provider")
            ).value.strip()
            provider = (
                provider_registry.get(provider_name)
                if provider_name and provider_name in provider_registry
                else provider_registry.get(provider_names[0])
            )
            config = ResearchConfig(
                enabled=True,
                query_planner=(
                    await runtime_settings.get("research", "query_planner")
                ).value.strip(),  # type: ignore[arg-type]
                credibility_triage=(
                    await runtime_settings.get("research", "credibility_triage")
                ).value.strip(),  # type: ignore[arg-type]
                deduplicator=(
                    await runtime_settings.get("research", "deduplicator")
                ).value.strip(),  # type: ignore[arg-type]
                synthesizer=(
                    await runtime_settings.get("research", "synthesizer")
                ).value.strip(),  # type: ignore[arg-type]
            )
            service = build_research_service(
                runs_repo=app_state.persistence.research_runs,
                provider=provider,
                model=model,
                config=config,
                knowledge_service=app_state.knowledge_service,
                clock=app_state.clock,
            )
            app_state.set_research_service(service)
            app_state.set_research_tool_factory(
                build_research_tool_factory(service=service, clock=app_state.clock)
            )
            _research_engine_installed = True
        except Exception as exc:
            reraise_critical(exc)
            logger.info(
                API_APP_STARTUP,
                service="research_engine",
                note="research engine wiring unavailable; skipped",
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )

    _brownfield_intake_installed = False

    async def _wire_brownfield_intake() -> None:
        # Brownfield codebase intake (the "merger/acquisition" entry mode).
        # Runs AFTER _wire_knowledge_engine so the import service can index
        # the codebase into the knowledge store. Best-effort + idempotent:
        # a missing collaborator (no persistence / workspace / knowledge)
        # leaves the /brownfield controller to 503 rather than poisoning
        # startup.
        nonlocal _brownfield_intake_installed
        if _brownfield_intake_installed:
            return
        from synthorg.engine.pipeline.entry.boot import (  # noqa: PLC0415
            wire_real_brownfield_entry,
        )

        try:
            await wire_real_brownfield_entry(app_state)
            _brownfield_intake_installed = True
        except Exception as exc:
            reraise_critical(exc)
            logger.info(
                API_APP_STARTUP,
                service="brownfield_intake",
                note="brownfield intake wiring unavailable; skipped",
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )

    startup = [
        *startup,
        _install_runtime_services,
        _wire_docs_engine,
        _wire_knowledge_engine,
        _wire_research_engine,
        _wire_brownfield_intake,
    ]

    # Project telemetry: build collector (reads SYNTHORG_TELEMETRY_ENABLED env for
    # opt-in, defaults to disabled). Attach to app_state so the health
    # endpoint can report the state, and hook start()/shutdown() into the
    # Litestar lifespan. Telemetry is SynthOrg-owned and silent on
    # failure: a broken reporter falls back to noop and never affects
    # the app.
    #
    # Shutdown is appended (runs LAST), not prepended: critical
    # infrastructure (task engine drain, persistence disconnect, bus
    # stop) must complete first so the session-summary event emitted
    # by ``telemetry_collector.shutdown`` reflects final state, and so
    # a hanging Logfire flush never blocks cleanup of load-bearing
    # resources.
    telemetry_collector = _build_telemetry_collector(effective_config.telemetry)
    app_state.set_telemetry_collector(telemetry_collector)
    startup = [*startup, telemetry_collector.start]
    shutdown = [*shutdown, telemetry_collector.shutdown]

    # Automated report service: wired from the cost tracker + budget config
    # so the ``POST /api/v1/reports/generate`` endpoint can serve the
    # documented inputs instead of returning 503 unconfigured. Risk and
    # performance trackers are optional; the service degrades to empty
    # per-tracker reports when either is absent (see
    # ``AutomatedReportService.generate_*_report`` for the None-tolerant
    # paths). When ``cost_tracker`` is itself absent (degenerate test
    # configurations) we skip the wire and the controller falls back to
    # 503 ServiceUnavailableError -- which is the honest status code for
    # "feature unavailable", not the AttributeError it used to surface.
    if cost_tracker is not None:
        from synthorg.budget.automated_reports import (  # noqa: PLC0415
            AutomatedReportService,
        )
        from synthorg.budget.reports import ReportGenerator  # noqa: PLC0415

        report_generator = ReportGenerator(
            cost_tracker=cost_tracker,
            budget_config=effective_config.budget,
        )
        report_service = AutomatedReportService(
            report_generator=report_generator,
            cost_tracker=cost_tracker,
            risk_tracker=None,
            performance_tracker=performance_tracker,
        )
        app_state.set_report_service(report_service)

    async def _wire_chief_of_staff_chat() -> None:
        # Wired only when the meta config opts in via
        # ``chief_of_staff.chat_enabled`` AND a provider is registered.
        # When unwired, ``POST /meta/chat`` surfaces 503 rather than the
        # silent placeholder it returned previously.
        # Idempotent: a re-entry of lifespan startup against the same
        # ``AppState`` (e.g. ASGI restart in tests) would otherwise make
        # the one-shot ``set_chief_of_staff_chat`` raise.
        if app_state.has_chief_of_staff_chat:
            return
        if provider_registry is None:
            return
        from synthorg.meta.config import (  # noqa: PLC0415
            load_self_improvement_config,
        )

        meta_self_improvement = await load_self_improvement_config(
            app_state.settings_service if app_state.has_settings_service else None,
        )
        chat_backend = build_chief_of_staff_chat(
            meta_self_improvement.chief_of_staff,
            provider_registry=provider_registry,
            cost_tracker=cost_tracker,
        )
        if chat_backend is not None:
            app_state.set_chief_of_staff_chat(chat_backend)

    startup = [*startup, _wire_chief_of_staff_chat]

    async def _wire_chief_of_staff_proposer() -> None:
        # Wired only when ``chief_of_staff.propose_enabled`` is set AND
        # a provider is registered AND persistence is connected (the
        # conversation/turn/proposal stores are durable). Otherwise
        # ``POST /meta/chat/propose`` honestly surfaces 503.
        # Idempotent for re-entered lifespans (shared-app test fixtures).
        if app_state.has_chief_of_staff_proposer:
            return
        from synthorg.meta.config import (  # noqa: PLC0415
            load_self_improvement_config,
        )
        from synthorg.persistence.conversational_factory import (  # noqa: PLC0415
            build_conversational_repositories,
        )

        # Repo wiring must run before the provider-missing early return:
        # a conversational-intake approval that exists from a previous
        # boot still needs the repo to route approve/reject decisions,
        # even on boots without an LLM provider (proposer absent).
        repositories = build_conversational_repositories(persistence)
        if repositories is not None:
            app_state.set_conversational_proposal_repo(repositories.proposal_repo)
        if provider_registry is None:
            return
        meta_self_improvement = await load_self_improvement_config(
            app_state.settings_service if app_state.has_settings_service else None,
        )
        # Hard-block the unsupported SQLite + persistent ApprovalStore
        # combination at startup: this schema does not admit
        # ``conversational_intake`` approval rows, so proposal writes
        # would fail at runtime. Supported configurations are Postgres
        # or an in-memory ApprovalStore on SQLite.
        store_has_persistent_repo = (
            isinstance(effective_approval_store, ApprovalStore)
            and effective_approval_store.has_persistent_repo
        )
        if (
            meta_self_improvement.chief_of_staff.propose_enabled
            and persistence is not None
            and persistence.backend_name == "sqlite"
            and store_has_persistent_repo
        ):
            msg = (
                "Chief of Staff propose is enabled with a persistent "
                "SQLite ApprovalStore. This combination cannot durably "
                "persist conversational-intake approvals. Switch the "
                "backend to Postgres, or keep ApprovalStore in-memory "
                "on SQLite."
            )
            raise ServiceUnavailableError(msg)
        proposer = build_chief_of_staff_proposer(
            meta_self_improvement.chief_of_staff,
            provider_registry=provider_registry,
            approval_store=effective_approval_store,
            repositories=repositories,
            cost_tracker=cost_tracker,
        )
        if proposer is not None:
            app_state.set_chief_of_staff_proposer(proposer)

    startup = [*startup, _wire_chief_of_staff_proposer]

    async def _wire_charter_engine() -> None:
        # Deep CEO interview to project charter. Wired only when
        # ``meta.charter.interview_enabled`` is set AND a provider is
        # registered AND persistence is connected (the conversation +
        # charter stores are durable). Otherwise the /meta/charters
        # controllers honestly surface 503. Best-effort: a wiring failure
        # never poisons startup. Idempotent for re-entered lifespans.
        if app_state.has_charter_service:
            return
        if (
            provider_registry is None
            or persistence is None
            or not app_state.has_persistence
        ):
            return
        try:
            from synthorg.api.services.project_service import (  # noqa: PLC0415
                ProjectService,
            )
            from synthorg.meta.charter.dispatch import (  # noqa: PLC0415
                CharterDispatcher,
            )
            from synthorg.meta.charter.factory import (  # noqa: PLC0415
                build_charter_interview_strategy,
            )
            from synthorg.meta.charter.service import (  # noqa: PLC0415
                CharterInterviewService,
            )
            from synthorg.meta.config import (  # noqa: PLC0415
                load_self_improvement_config,
            )
            from synthorg.persistence.charter_factory import (  # noqa: PLC0415
                build_charter_repository,
            )
            from synthorg.persistence.conversational_factory import (  # noqa: PLC0415
                build_conversational_repositories,
            )

            si_config = await load_self_improvement_config(
                app_state.settings_service if app_state.has_settings_service else None,
            )
            charter_config = si_config.charter
            if not charter_config.interview_enabled:
                return
            charter_repo = build_charter_repository(persistence)
            conv_repos = build_conversational_repositories(persistence)
            available = provider_registry.list_providers()
            if charter_repo is None or conv_repos is None or not available:
                logger.warning(
                    CHARTER_SUBSTRATE_UNAVAILABLE,
                    note="charter interview enabled but stores/provider unavailable",
                )
                return
            provider = provider_registry.get(available[0])
            strategy = build_charter_interview_strategy(
                charter_config,
                provider=provider,
                cost_tracker=cost_tracker,
            )
            app_state.set_charter_service(
                CharterInterviewService(
                    strategy=strategy,
                    config=charter_config,
                    conversation_repo=conv_repos.conversation_repo,
                    turn_repo=conv_repos.turn_repo,
                    charter_repo=charter_repo,
                )
            )
            # The approval dispatcher additionally needs the work-pipeline
            # spine, the cost-forecast store, and the live budget config.
            # When any is absent the interview still works; only approve
            # 503s.
            forecast_repo = app_state.cost_forecast_repo
            budget_config = app_state.budget_config
            if (
                not app_state.has_work_pipeline
                or forecast_repo is None
                or budget_config is None
            ):
                logger.warning(
                    CHARTER_SUBSTRATE_UNAVAILABLE,
                    note="charter dispatcher deps absent; approve will 503",
                )
                return
            resolved_budget = budget_config
            app_state.set_charter_dispatcher(
                CharterDispatcher(
                    charter_repo=charter_repo,
                    forecast_repo=forecast_repo,
                    project_service=ProjectService(repo=persistence.projects),
                    work_pipeline=app_state.work_pipeline,
                    conversation_repo=conv_repos.conversation_repo,
                    budget_currency=lambda: resolved_budget.currency,
                )
            )
        except Exception as exc:
            reraise_critical(exc)
            # Any other failure (settings load, repo construction,
            # strategy build, ...) must not poison startup; the
            # controllers will keep 503ing until the operator fixes
            # the underlying configuration and reboots.
            logger.warning(
                CHARTER_SUBSTRATE_UNAVAILABLE,
                note="charter wiring raised; charter endpoints stay unavailable",
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            return

    startup = [*startup, _wire_charter_engine]

    async def _wire_toolsmith() -> None:
        # Self-extending toolkit. Wired only when
        # ``tool_creation_enabled`` is set AND a provider is registered
        # AND persistence is connected (authored blueprints are durable).
        # Disabled by default, so a normal boot skips this entirely.
        # Idempotent for re-entered lifespans (shared-app fixtures).
        if app_state.toolsmith_service is not None or provider_registry is None:
            return
        if persistence is None or not app_state.has_persistence:
            return
        from synthorg.meta.config import load_self_improvement_config  # noqa: PLC0415

        si_config = await load_self_improvement_config(
            app_state.settings_service if app_state.has_settings_service else None,
        )
        if not si_config.tool_creation_enabled:
            return
        try:
            runtime = _build_toolsmith_runtime(
                si_config=si_config,
                provider_registry=provider_registry,
                persistence=persistence,
                approval_store=effective_approval_store,
                cost_tracker=cost_tracker,
                workspace_root=app_state.agent_workspace_root,
            )
        except Exception as exc:
            reraise_critical(exc)
            logger.warning(
                API_APP_STARTUP,
                service="toolsmith",
                note="toolsmith wiring failed; self-extending toolkit disabled",
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            return
        if runtime is None:
            return
        # Install the layered MCP surface BEFORE the once-only AppState
        # mutation. ``set_toolsmith_service`` cannot be replayed on
        # retry, so if the layer install fails after the AppState mutation
        # the runtime is left half-wired (service present, layer missing)
        # with no path back. Installing first means a failure here leaves
        # the toolsmith disabled cleanly, mirroring the upstream try/except.
        from synthorg.meta.mcp.server import (  # noqa: PLC0415
            install_dynamic_tool_layer,
        )

        try:
            install_dynamic_tool_layer(runtime.dynamic_registry)
        except Exception as exc:
            reraise_critical(exc)
            logger.warning(
                API_APP_STARTUP,
                service="toolsmith",
                note="toolsmith dynamic layer install failed; disabled",
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            return
        app_state.set_toolsmith_service(runtime.service)
        logger.info(API_APP_STARTUP, service="toolsmith", note="wired")

    startup = [*startup, _wire_toolsmith]

    # Bring up the notification dispatcher's HTTP-bearing sinks
    # (slack/ntfy ``httpx.AsyncClient``) lazily under their lifecycle
    # locks. Stateless sinks (console/email) implement no-op
    # start()/close() so the fan-out treats every adapter uniformly.
    # Shutdown registration lives in ``lifecycle_builder._safe_shutdown``
    # via ``notification_dispatcher.aclose`` so audit-style shutdown
    # notifications can fire during service teardown before sink
    # close() runs.
    startup = [*startup, notification_dispatcher.start]

    async def _resolve_runtime_security_settings() -> None:
        # Each security key resolves independently so a validation
        # failure on an unrelated ``api.*`` field (e.g. a bad
        # ``request_max_body_size_bytes``) does not silently suppress
        # CSP-origin or error-docs overrides. The shared
        # ``ApiBridgeConfig`` validator still runs per key by
        # constructing a one-field model -- defaults satisfy the
        # remaining fields without re-resolving them.
        # Failure branches must actively re-write the module global to
        # ``ApiBridgeConfig()`` defaults, not just log "fallback":
        # a previous app instance (or earlier test on the same worker)
        # may have already mutated the global, in which case skipping
        # the write would silently keep a stale override instead of
        # the documented default.
        from synthorg.settings.bridge_configs import (  # noqa: PLC0415
            ApiBridgeConfig,
        )

        defaults = ApiBridgeConfig()

        if not app_state.has_config_resolver:
            set_docs_csp_origins(defaults.csp_docs_external_origins)
            set_error_docs_base_url(defaults.error_docs_base_url)
            logger.warning(
                API_BRIDGE_CONFIG_RESOLVE_FAILED,
                bridge="api",
                reason="config_resolver_unavailable",
                fallback="module_defaults",
            )
            return
        resolver = app_state.config_resolver

        try:
            origins_raw = await resolver.get_json("api", "csp_docs_external_origins")
            # Pass the raw JSON shape directly so ApiBridgeConfig sees
            # the unmodified payload. ``tuple(...)`` would coerce a
            # mapping to its keys (and other non-iterable shapes to
            # TypeError), masking the real validation failure. Pydantic
            # returns a ``tuple[str, ...]`` after its own validation
            # runs, so ``set_docs_csp_origins`` still receives the
            # correct shape.
            csp_bridge = ApiBridgeConfig(csp_docs_external_origins=origins_raw)
            set_docs_csp_origins(csp_bridge.csp_docs_external_origins)
        except (
            SettingNotFoundError,
            SettingsEncryptionError,
            ValueError,
            ValidationError,
        ) as exc:
            set_docs_csp_origins(defaults.csp_docs_external_origins)
            logger.warning(
                API_BRIDGE_CONFIG_RESOLVE_FAILED,
                bridge="api",
                key="csp_docs_external_origins",
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
                fallback="module_default",
            )

        try:
            url_raw = await resolver.get_str("api", "error_docs_base_url")
            error_bridge = ApiBridgeConfig(error_docs_base_url=url_raw)
            set_error_docs_base_url(error_bridge.error_docs_base_url)
            logger.info(
                SETTINGS_VALUE_RESOLVED,
                namespace="api",
                key="error_docs_base_url",
                value=error_bridge.error_docs_base_url,
            )
        except (
            SettingNotFoundError,
            SettingsEncryptionError,
            ValueError,
            ValidationError,
        ) as exc:
            set_error_docs_base_url(defaults.error_docs_base_url)
            logger.warning(
                API_BRIDGE_CONFIG_RESOLVE_FAILED,
                bridge="api",
                key="error_docs_base_url",
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
                fallback="module_default",
            )

    startup = [*startup, _resolve_runtime_security_settings]

    if _skip_lifecycle_shutdown:
        shutdown = []

    # Per-operation rate limiter.  Layered on top of the global
    # two-tier limiter; read from app state by ``per_op_rate_limit``
    # guards.  The store is built unconditionally so that operators who
    # toggle ``api.per_op_rate_limit_enabled`` at runtime (the setting
    # is marked runtime-editable) do not land on a wired-but-uncapped
    # request path; the config's ``enabled`` flag short-circuits the
    # guard when disabled.  Store construction is cheap (empty dicts +
    # per-key locks materialise lazily on first acquire).
    per_op_rate_limit_store: SlidingWindowStore = build_sliding_window_store(
        api_config.per_op_rate_limit,
    )
    app_state.set_per_op_rate_limit_config(api_config.per_op_rate_limit)
    # Honour ``_skip_lifecycle_shutdown`` so tests that share an
    # app across multiple lifespans do not tear down the store
    # (and its background GC) on the first teardown.
    if not _skip_lifecycle_shutdown:
        shutdown = [*shutdown, per_op_rate_limit_store.close]

    # Per-operation inflight-concurrency limiter.
    # Layered on top of the sliding-window per-op limiter; caps
    # simultaneous long-running requests per (operation, subject).
    # Enforced by ``PerOpConcurrencyMiddleware`` registered in the
    # middleware stack.  Built unconditionally (same rationale as the
    # sliding-window store): runtime toggling of
    # ``api.per_op_concurrency_enabled`` must not encounter a missing
    # store.  The middleware short-circuits when
    # ``config.enabled`` is False without ever touching the store.
    per_op_inflight_store: InflightStore = build_inflight_store(
        api_config.per_op_concurrency,
    )
    app_state.set_per_op_concurrency_config(api_config.per_op_concurrency)
    if not _skip_lifecycle_shutdown:
        shutdown = [*shutdown, per_op_inflight_store.close]

    _trusted_proxies = _resolve_api_str_tuple("trusted_proxies")

    return Litestar(
        route_handlers=[api_router, *a2a_root_controllers],
        # Disable Litestar's built-in logging config to preserve the
        # structlog multi-file-sink pipeline set up by
        # _bootstrap_app_logging() above.  Without this, Litestar calls
        # dictConfig() at startup which triggers _clearExistingHandlers
        # and replaces structlog's file sinks with a stdlib
        # queue_listener, causing all runtime logs to go only to Docker
        # stdout.
        logging_config=None,
        state=State(
            {
                "app_state": app_state,
                "per_op_rate_limit_store": per_op_rate_limit_store,
                "per_op_rate_limit_config": api_config.per_op_rate_limit,
                # Inflight-concurrency state used by
                # ``PerOpConcurrencyMiddleware``; mirrors the
                # sliding-window store's wiring.
                "per_op_inflight_store": per_op_inflight_store,
                "per_op_inflight_config": api_config.per_op_concurrency,
                # Mirrors the global limiter's trusted-proxy set so the
                # per-op guard extracts the same "real" client IP behind
                # reverse proxies instead of bucketing all traffic by
                # the proxy's IP.  The raw frozenset is kept for
                # diagnostic reads; the parsed tuple beside it is what
                # the guards consult per-request.
                "per_op_trusted_proxies": frozenset(_trusted_proxies),
                "per_op_trusted_networks": parse_trusted_networks(
                    frozenset(_trusted_proxies),
                ),
            },
        ),
        cors_config=CORSConfig(
            allow_origins=list(_resolve_api_str_tuple("cors_allowed_origins")),
            allow_methods=list(api_config.cors.allow_methods),  # type: ignore[arg-type]
            allow_headers=list(api_config.cors.allow_headers),
            allow_credentials=api_config.cors.allow_credentials,
        ),
        compression_config=CompressionConfig(
            backend="brotli",
            minimum_size=_resolve_api_int("compression_minimum_size_bytes"),
        ),
        # Must be >= artifact API max payload (50 MB) so endpoint-level
        # validation can enforce exact storage limits.
        request_max_body_size=_resolve_api_int("request_max_body_size_bytes"),
        before_send=[security_headers_hook],
        middleware=middleware,
        plugins=plugins,
        exception_handlers=dict(EXCEPTION_HANDLERS),  # type: ignore[arg-type]
        openapi_config=OpenAPIConfig(
            title="SynthOrg API",
            version=__version__,
            path="/docs",
            render_plugins=[
                ScalarRenderPlugin(path="/api"),
            ],
        ),
        on_startup=startup,
        on_shutdown=shutdown,
    )

Config

config

API configuration models.

Frozen Pydantic models for CORS, rate limiting, server, authentication, and the top-level ApiConfig that aggregates them all.

CorsConfig pydantic-model

Bases: BaseModel

CORS configuration for the API.

Attributes:

Name Type Description
allowed_origins tuple[str, ...]

Origins permitted to make cross-origin requests.

allow_methods tuple[str, ...]

HTTP methods permitted in cross-origin requests.

allow_headers tuple[str, ...]

Headers permitted in cross-origin requests.

allow_credentials bool

Whether credentials (cookies, auth) are allowed in cross-origin requests.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_wildcard_credentials

allowed_origins pydantic-field

allowed_origins = ()

Origins permitted to make cross-origin requests

allow_methods pydantic-field

allow_methods = ('GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS')

HTTP methods permitted in cross-origin requests

allow_headers pydantic-field

allow_headers = ('Content-Type', 'Authorization', 'X-CSRF-Token')

Headers permitted in cross-origin requests

allow_credentials pydantic-field

allow_credentials = True

Whether credentials (cookies) are allowed

RateLimitTimeUnit

Bases: StrEnum

Valid time windows for rate limiting.

RateLimitConfig pydantic-model

Bases: BaseModel

API rate limiting configuration.

Three tiers stacked around the auth middleware:

  • IP floor (outermost, un-gated): keyed by client IP, applies to every request -- including ones the auth middleware rejects with 401. Guards against flood attacks that burn auth-validation cycles on protected endpoints with forged tokens.
  • Unauthenticated (middle, only when scope["user"] is None): keyed by client IP, aggressive cap on brute-force against login/setup/logout.
  • Authenticated (innermost, only when scope["user"] is set): keyed by user ID, generous cap for normal dashboard use.

Keying authenticated limits by user ID instead of IP prevents multi-user deployments behind a shared gateway or NAT from collectively exhausting a single per-IP budget.

Attributes:

Name Type Description
floor_max_requests int

Maximum total requests per time window (by IP) across the whole API. Catches traffic that auth_middleware rejects before the unauth tier sees it.

unauth_max_requests int

Maximum unauthenticated requests per time window (by IP).

auth_max_requests int

Maximum authenticated requests per time window (by user ID).

time_unit RateLimitTimeUnit

Time window (second, minute, hour, day).

exclude_paths tuple[str, ...]

Paths excluded from rate limiting.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_floor_above_user_tiers
  • _apply_mirrors
  • _reject_legacy_max_requests

floor_max_requests pydantic-field

floor_max_requests = 10000

Maximum total requests per time window (by IP) across the whole API, including requests rejected by the auth middleware. Defense-in-depth against floods of invalid auth attempts on protected endpoints. The floor wraps both user-gated tiers in the middleware stack, so it must be >= auth_max_requests AND >= unauth_max_requests -- a lower floor would silently cap either the authenticated per-user budget or the unauthenticated per-IP budget below its documented value (especially behind a shared NAT where many users share one IP). Enforced by :meth:_validate_floor_above_user_tiers.

unauth_max_requests pydantic-field

unauth_max_requests = 20

Maximum unauthenticated requests per time window (by IP)

auth_max_requests pydantic-field

auth_max_requests = 6000

Maximum authenticated requests per time window (by user ID)

time_unit pydantic-field

time_unit = MINUTE

Time window (second, minute, hour, day)

exclude_paths pydantic-field

exclude_paths = ('/api/v1/healthz', '/api/v1/readyz')

Paths excluded from rate limiting

max_rpm_default pydantic-field

max_rpm_default = 60

Fallback requests-per-minute applied to per-connection coordinators when the catalog does not provide a limiter (mirrors the api.max_rpm_default setting; restart required)

ServerConfig pydantic-model

Bases: BaseModel

Uvicorn server configuration.

Host, port, TLS paths, trusted-proxy list, and the compression / request-size limits are resolved at boot via :func:synthorg.settings.bootstrap_resolver.resolve_init_value against the api.* registry entries rather than carried on this model. Only the worker-process / auto-reload / WebSocket-ping knobs that uvicorn needs at construction time live here.

Attributes:

Name Type Description
reload bool

Enable auto-reload for development.

workers int

Number of worker processes.

ws_ping_interval float

WebSocket ping interval in seconds (0 to disable).

ws_ping_timeout float

WebSocket pong timeout in seconds.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

reload pydantic-field

reload = False

Enable auto-reload for development

workers pydantic-field

workers = 1

Number of worker processes

ws_ping_interval pydantic-field

ws_ping_interval = 20.0

WebSocket ping interval in seconds (0 to disable)

ws_ping_timeout pydantic-field

ws_ping_timeout = 20.0

WebSocket pong timeout in seconds

ApiConfig pydantic-model

Bases: BaseModel

Top-level API configuration aggregating all sub-configs.

Attributes:

Name Type Description
cors CorsConfig

CORS configuration.

rate_limit RateLimitConfig

Global three-tier rate limiting configuration (IP floor un-gated, unauthenticated by IP, authenticated by user ID).

rate_limiter_enabled bool

Master kill switch for the three-tier global rate limiter. Mirrors the api.rate_limiter_enabled registry entry (read_only_post_init=True): the boot-time resolver in api/app.py reads SYNTHORG_API_RATE_LIMITER_ENABLED and falls through to the registered default (env > code default per the Cat-2 precedence model).

per_op_rate_limit PerOpRateLimitConfig

Per-operation throttling configuration (layered on top of the global three-tier limiter).

per_op_concurrency PerOpConcurrencyConfig

Per-operation inflight concurrency capping (layered on top of the sliding-window per-op limiter; caps simultaneous long-running requests per operation per subject).

server ServerConfig

Uvicorn server configuration.

auth AuthConfig

Authentication configuration.

api_prefix NotBlankStr

URL prefix for all API routes.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _apply_mirrors

cors pydantic-field

cors

CORS configuration

rate_limit pydantic-field

rate_limit

Global three-tier rate limiting configuration: un-gated IP floor, unauthenticated by IP, authenticated by user ID

rate_limiter_enabled pydantic-field

rate_limiter_enabled = True

Master kill switch for the three-tier global rate limiter. Mirrors the api.rate_limiter_enabled registry entry (read_only_post_init=True): the boot-time resolver in api/app.py reads SYNTHORG_API_RATE_LIMITER_ENABLED and falls through to the registered default (env > code default per the Cat-2 precedence model).

per_op_rate_limit pydantic-field

per_op_rate_limit

Per-operation throttling (layered on the global limiter)

per_op_concurrency pydantic-field

per_op_concurrency

Per-operation inflight concurrency capping (layered on the sliding-window per-op limiter; caps simultaneous long-running requests per (operation, subject))

server pydantic-field

server

Uvicorn server configuration

auth pydantic-field

auth

Authentication configuration

api_prefix pydantic-field

api_prefix = '/api/v1'

URL prefix for all API routes

DTOs

dto

Request/response DTOs and envelope models.

Response envelopes wrap all API responses in a consistent structure. Request DTOs define write-operation payloads (separate from domain models because they omit server-generated fields).

ErrorDetail pydantic-model

Bases: BaseModel

Structured error metadata (RFC 9457).

Self-contained so agents can parse it without referencing the parent envelope.

Attributes:

Name Type Description
detail NotBlankStr

Human-readable occurrence-specific explanation.

error_code ErrorCode

Machine-readable error code (by convention, 4-digit category-grouped; see ErrorCode).

error_category ErrorCategory

High-level error category.

retryable bool

Whether the client should retry the request.

retry_after int | None

Seconds to wait before retrying (None when not applicable).

instance NotBlankStr

Request correlation ID for log tracing.

title NotBlankStr

Static per-category title (e.g. "Authentication Error").

type NotBlankStr

Documentation URI for the error category.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_retry_after_consistency

retry_after pydantic-field

retry_after = None

Seconds to wait before retrying (null when not applicable).

ProblemDetail pydantic-model

Bases: BaseModel

Bare RFC 9457 application/problem+json response body.

Returned when the client sends Accept: application/problem+json.

Attributes:

Name Type Description
type NotBlankStr

Documentation URI for the error category.

title NotBlankStr

Static per-category title.

status int

HTTP status code.

detail NotBlankStr

Human-readable occurrence-specific explanation.

instance NotBlankStr

Request correlation ID for log tracing.

error_code ErrorCode

Machine-readable 4-digit error code.

error_category ErrorCategory

High-level error category.

retryable bool

Whether the client should retry the request.

retry_after int | None

Seconds to wait before retrying (None when not applicable).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_retry_after_consistency

retry_after pydantic-field

retry_after = None

Seconds to wait before retrying (null when not applicable).

ApiResponse pydantic-model

Bases: BaseModel

Standard API response envelope.

Attributes:

Name Type Description
data T | None

Response payload (None on error).

error str | None

Operator-facing error message (None on success). Must be either a static, generic, non-secret-bearing string or the output of safe_error_description(exc) (from synthorg.observability import safe_error_description); never raw str(exc) because the value is serialised over HTTP and can leak credential material from httpx.HTTPStatusError / psycopg.Error / OAuth provider exception messages otherwise.

error_detail ErrorDetail | None

Structured error metadata (None on success).

success bool

Whether the request succeeded (computed from error).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

  • data (T | None)
  • error (str | None)
  • error_detail (ErrorDetail | None)

Validators:

  • _validate_error_detail_consistency

success property

success

Whether the request succeeded (derived from error).

Returns:

Type Description
bool

True or False reflecting the condition.

PaginationMeta pydantic-model

Bases: BaseModel

Pagination metadata for list responses.

Cursor-based: clients receive an opaque next_cursor and walk forward until has_more is False.

Attributes:

Name Type Description
limit int

Maximum items per page.

next_cursor str | None

Opaque cursor for the next page (None on the final page).

has_more bool

Whether more items follow the current page.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_cursor_consistency

limit pydantic-field

limit

Maximum items per page

next_cursor pydantic-field

next_cursor = None

Opaque cursor for the next page (null on final page)

has_more pydantic-field

has_more = False

Whether more items follow the current page

PaginatedResponse pydantic-model

Bases: BaseModel

Paginated API response envelope.

Attributes:

Name Type Description
data tuple[T, ...]

Page of items.

error str | None

Error message (None on success).

error_detail ErrorDetail | None

Structured error metadata (None on success).

pagination PaginationMeta

Pagination metadata.

degraded_sources tuple[NotBlankStr, ...]

Data sources that failed gracefully, resulting in partial data. Empty when all sources responded normally.

success bool

Whether the request succeeded (computed from error).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_error_detail_consistency

degraded_sources pydantic-field

degraded_sources = ()

Data sources that failed gracefully (partial data)

success property

success

Whether the request succeeded (derived from error).

Returns:

Type Description
bool

True or False reflecting the condition.

CreateArtifactRequest pydantic-model

Bases: BaseModel

Payload for creating a new artifact.

Attributes:

Name Type Description
type ArtifactType

Artifact type (code, tests, documentation).

path NotBlankStr

Logical file/directory path of the artifact.

task_id NotBlankStr

ID of the originating task.

created_by NotBlankStr

Agent ID of the creator.

description str

Human-readable description.

content_type str

MIME content type (empty if no content stored).

project_id NotBlankStr | None

Optional project ID to link the artifact to.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

type pydantic-field

type

Artifact category (code, tests, documentation).

path pydantic-field

path

File path or artifact identifier within the workspace.

task_id pydantic-field

task_id

Originating task identifier.

created_by pydantic-field

created_by

Agent identifier of the artifact creator.

description pydantic-field

description = ''

Human-readable artifact description.

content_type pydantic-field

content_type = ''

MIME type of the artifact content (empty when no content is stored).

project_id pydantic-field

project_id = None

Optional project identifier to link the artifact to.

CreateProjectRequest pydantic-model

Bases: BaseModel

Payload for creating a new project.

Attributes:

Name Type Description
name NotBlankStr

Project display name.

description str

Detailed project description.

team tuple[NotBlankStr, ...]

Agent IDs assigned to the project.

lead NotBlankStr | None

Agent ID of the project lead.

deadline str | None

Optional deadline (ISO 8601 string).

budget float

Total budget in base currency.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_request

CreateTaskRequest pydantic-model

Bases: BaseModel

Payload for creating a new task.

Attributes:

Name Type Description
title NotBlankStr

Short task title.

description NotBlankStr

Detailed task description.

type TaskType

Task work type.

priority Priority

Task priority level.

project NotBlankStr

Project ID.

created_by NotBlankStr

Agent name of the creator.

assigned_to NotBlankStr | None

Optional assignee agent ID.

estimated_complexity Complexity

Complexity estimate.

budget_limit float

Maximum spend in base currency.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

UpdateTaskRequest pydantic-model

Bases: BaseModel

Payload for updating task fields.

All fields are optional -- only provided fields are updated.

Attributes:

Name Type Description
title NotBlankStr | None

New title.

description NotBlankStr | None

New description.

priority Priority | None

New priority.

assigned_to NotBlankStr | None

New assignee.

budget_limit float | None

New budget limit.

expected_version int | None

Optimistic concurrency guard.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

expected_version pydantic-field

expected_version = None

Optimistic concurrency version guard

TransitionTaskRequest pydantic-model

Bases: BaseModel

Payload for a task status transition.

Attributes:

Name Type Description
target_status TaskStatus

The desired target status.

assigned_to NotBlankStr | None

Optional assignee override for the transition.

expected_version int | None

Optimistic concurrency guard.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

target_status pydantic-field

target_status

Desired target status

expected_version pydantic-field

expected_version = None

Optimistic concurrency version guard

RegisterExperimentVariantRequest pydantic-model

Bases: BaseModel

Payload for registering an A/B experiment variant.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

variant pydantic-field

variant

Variant name within the experiment

weight pydantic-field

weight

Relative selection weight

description pydantic-field

description = ''

Operator notes

AssignExperimentRequest pydantic-model

Bases: BaseModel

Payload for requesting a deterministic variant assignment.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

subject_id pydantic-field

subject_id

Subject identifier (agent id, user id, project id, ...)

ExecuteTaskRequest pydantic-model

Bases: BaseModel

Payload for the worker-callable POST /tasks/{id}/execute endpoint.

Mirrors the TaskClaim envelope fields the worker carries so the backend's WorkerExecutionService has the same provenance the dispatcher captured when it built the claim. The endpoint only needs the status pair and the dedup key; the task body is read server-side via the task repository.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

previous_status pydantic-field

previous_status = None

Task status before the triggering transition

new_status pydantic-field

new_status

Task status that triggered the dispatch (typically 'assigned' or 'ready')

idempotency_key pydantic-field

idempotency_key

Per-dispatch idempotency key; backend dedups duplicate executions

TaskBoardSubmissionResponse pydantic-model

Bases: BaseModel

Acknowledgement envelope for POST /tasks (HTTP 202 Accepted).

The board hands the filing to the work-entry adapter; the adapter drives the pipeline spine in a detached background coroutine. The spine creates the task during its intake phase, so this response carries the correlation id rather than a task id: the board UI correlates the eventual task.created WS event by this id.

Attributes:

Name Type Description
correlation_id NotBlankStr

End-to-end trace id stamped onto the work item.

title NotBlankStr

Title submitted by the user (echoed for UX confirmation).

project NotBlankStr

Project the task was filed into.

status Literal['submitted']

Always "submitted" at this point; included so the UI can switch off a single enum field rather than inferring from HTTP status.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

correlation_id pydantic-field

correlation_id

End-to-end trace id stamped onto the work item

title pydantic-field

title

Title submitted by the user

project pydantic-field

project

Project the task was filed into

status pydantic-field

status = 'submitted'

Always 'submitted' for the 202 ack

CancelTaskRequest pydantic-model

Bases: BaseModel

Payload for cancelling a task.

Attributes:

Name Type Description
reason NotBlankStr

Reason for cancellation.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

reason pydantic-field

reason

Reason for cancellation

CreateApprovalRequest pydantic-model

Bases: BaseModel

Payload for creating a new approval item.

Attributes:

Name Type Description
action_type NotBlankStr

Kind of action requiring approval (category:action format).

title NotBlankStr

Short summary.

description NotBlankStr

Detailed explanation.

risk_level ApprovalRiskLevel

Assessed risk level.

ttl_seconds int | None

Optional time-to-live in seconds (min 60, max 604 800 = 7 days).

task_id NotBlankStr | None

Optional associated task.

metadata dict[str, str]

Additional key-value pairs.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_action_type_formataction_type
  • _validate_metadata_bounds

action_type pydantic-field

action_type

Kind of action requiring approval in category:action format.

title pydantic-field

title

Short human-readable summary of the approval.

description pydantic-field

description

Detailed explanation of the action and why it requires approval.

risk_level pydantic-field

risk_level

Assessed risk level for the action.

ttl_seconds pydantic-field

ttl_seconds = None

Optional time-to-live in seconds before the approval auto-expires (minimum 60, maximum 604800 = 7 days).

task_id pydantic-field

task_id = None

Optional associated task identifier.

ApproveRequest pydantic-model

Bases: BaseModel

Payload for approving an approval item.

Attributes:

Name Type Description
comment NotBlankStr | None

Optional comment explaining the approval.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

RejectRequest pydantic-model

Bases: BaseModel

Payload for rejecting an approval item.

Attributes:

Name Type Description
reason NotBlankStr

Mandatory reason for rejection.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

CoordinateTaskRequest pydantic-model

Bases: BaseModel

Payload for triggering multi-agent coordination on a task.

Attributes:

Name Type Description
agent_names tuple[NotBlankStr, ...] | None

Agent names to coordinate with (None = all active). When provided, must be non-empty and unique.

max_subtasks int

Maximum subtasks for decomposition.

max_concurrency_per_wave int | None

Override for max concurrency per wave.

fail_fast bool | None

Override for fail-fast behaviour (None = use section config default).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

  • agent_names (tuple[NotBlankStr, ...] | None)
  • max_subtasks (int)
  • max_concurrency_per_wave (int | None)
  • fail_fast (bool | None)

Validators:

  • _validate_unique_agent_names

agent_names pydantic-field

agent_names = None

Agent names to coordinate with (None = all active)

CoordinationPhaseResponse pydantic-model

Bases: BaseModel

Response model for a single coordination phase.

Attributes:

Name Type Description
phase NotBlankStr

Phase name.

success bool

Whether the phase completed successfully.

duration_seconds float

Wall-clock duration of the phase.

error NotBlankStr | None

Error description if the phase failed.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_success_error_consistency

CoordinationResultResponse pydantic-model

Bases: BaseModel

Response model for a complete coordination run.

Attributes:

Name Type Description
parent_task_id NotBlankStr

ID of the parent task.

topology NotBlankStr

Resolved coordination topology.

total_duration_seconds float

Total wall-clock duration.

total_cost float

Total cost across all waves.

phases tuple[CoordinationPhaseResponse, ...]

Phase results in execution order.

wave_count int

Number of execution waves.

is_success bool

Whether all phases succeeded (computed).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

currency pydantic-field

currency = DEFAULT_CURRENCY

ISO 4217 currency code

is_success property

is_success

True when every phase completed successfully.

Returns:

Type Description
bool

True or False reflecting the condition.

RollbackAgentIdentityRequest pydantic-model

Bases: BaseModel

Request body for rolling back an agent identity to a previous version.

Attributes:

Name Type Description
target_version int

Snapshot version number to restore content from (monotonic counter in the agent_identity_versions table).

reason NotBlankStr | None

Optional human-readable justification recorded alongside the evolution event for audit purposes.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

target_version pydantic-field

target_version

Snapshot version to rollback to

reason pydantic-field

reason = None

Optional rollback justification for the audit trail

Errors

The error taxonomy and exception classes live in synthorg.core:

  • synthorg.core.error_taxonomy -- ErrorCategory, ErrorCode, RFC 9457 helpers
  • synthorg.core.domain_errors -- DomainError base + concrete subclasses (NotFoundError, ConflictError, ValidationError, ...)
  • synthorg.core.persistence_errors -- PersistenceError hierarchy

Guards

guards

Route guards for access control.

Guards read the authenticated user identity from connection.user (populated by the auth middleware) and check role-based permissions.

The require_roles factory creates guards for arbitrary role sets. Pre-built constants cover common patterns::

require_ceo              -- CEO only
require_ceo_or_manager   -- CEO or Manager
require_approval_roles   -- CEO, Manager, or Board Member

require_ceo module-attribute

require_ceo = require_roles(CEO)

Guard allowing only the CEO role.

require_ceo_or_manager module-attribute

require_ceo_or_manager = require_roles(CEO, MANAGER)

Guard allowing CEO or Manager roles.

require_approval_roles module-attribute

require_approval_roles = require_roles(CEO, MANAGER, BOARD_MEMBER)

Guard allowing roles that can approve or reject actions.

has_write_role

has_write_role(role)

Return True if the role grants write access.

Use this for inline role checks instead of importing _WRITE_ROLES directly. The write set includes CEO, Manager, and Pair Programmer.

Returns:

Type Description
bool

True or False reflecting the condition.

Source code in src/synthorg/api/guards.py
def has_write_role(role: HumanRole) -> bool:
    """Return True if the role grants write access.

    Use this for inline role checks instead of importing ``_WRITE_ROLES``
    directly.  The write set includes CEO, Manager, and Pair Programmer.

    Returns:
        ``True`` or ``False`` reflecting the condition.
    """
    return role in _WRITE_ROLES

require_write_access

require_write_access(connection, _)

Guard that allows only write-capable human roles.

Checks connection.user.role for ceo, manager, or pair_programmer. Board members are excluded (they may only observe and approve). The system role is intentionally excluded -- use require_roles() with the desired roles for endpoints the CLI needs to reach.

Parameters:

Name Type Description Default
connection ASGIConnection

The incoming connection.

required
_ object

Route handler (unused).

required

Raises:

Type Description
PermissionDeniedException

If the role is not permitted.

Source code in src/synthorg/api/guards.py
def require_write_access(
    connection: ASGIConnection,  # type: ignore[type-arg]
    _: object,
) -> None:
    """Guard that allows only write-capable human roles.

    Checks ``connection.user.role`` for ``ceo``, ``manager``,
    or ``pair_programmer``.  Board members are excluded (they
    may only observe and approve).  The ``system`` role is
    intentionally excluded -- use ``require_roles()`` with the
    desired roles for endpoints the CLI needs to reach.

    Args:
        connection: The incoming connection.
        _: Route handler (unused).

    Raises:
        PermissionDeniedException: If the role is not permitted.
    """
    role = _get_role(connection)
    if role not in _WRITE_ROLES:
        logger.warning(
            API_GUARD_DENIED,
            guard="require_write_access",
            role=role,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Write access denied")

require_read_access

require_read_access(connection, _)

Guard that allows all human roles (excludes SYSTEM).

Checks connection.user.role for any human role including observer and board_member. The internal system role is excluded -- use require_roles() for endpoints the CLI needs to reach.

Parameters:

Name Type Description Default
connection ASGIConnection

The incoming connection.

required
_ object

Route handler (unused).

required

Raises:

Type Description
PermissionDeniedException

If the role is not permitted.

Source code in src/synthorg/api/guards.py
def require_read_access(
    connection: ASGIConnection,  # type: ignore[type-arg]
    _: object,
) -> None:
    """Guard that allows all human roles (excludes SYSTEM).

    Checks ``connection.user.role`` for any human role
    including ``observer`` and ``board_member``.  The internal
    ``system`` role is excluded -- use ``require_roles()`` for
    endpoints the CLI needs to reach.

    Args:
        connection: The incoming connection.
        _: Route handler (unused).

    Raises:
        PermissionDeniedException: If the role is not permitted.
    """
    role = _get_role(connection)
    if role not in _READ_ROLES:
        logger.warning(
            API_GUARD_DENIED,
            guard="require_read_access",
            role=role,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Read access denied")

require_roles

require_roles(*roles)

Create a guard that allows only the specified roles.

Parameters:

Name Type Description Default
*roles HumanRole

One or more HumanRole members to permit.

()

Returns:

Type Description
Callable[[ASGIConnection, object], None]

A guard function compatible with Litestar's guard protocol.

Raises:

Type Description
ValueError

If no roles are provided.

Source code in src/synthorg/api/guards.py
def require_roles(
    *roles: HumanRole,
) -> Callable[[ASGIConnection, object], None]:  # type: ignore[type-arg]
    """Create a guard that allows only the specified roles.

    Args:
        *roles: One or more ``HumanRole`` members to permit.

    Returns:
        A guard function compatible with Litestar's guard protocol.

    Raises:
        ValueError: If no roles are provided.
    """
    if not roles:
        msg = "require_roles() requires at least one role"
        raise ValueError(msg)

    allowed = frozenset(roles)
    label = ",".join(sorted(r.value for r in allowed))

    def guard(
        connection: ASGIConnection,  # type: ignore[type-arg]
        _: object,
    ) -> None:
        """Handle guard.

        Raises:
            PermissionDeniedException: Raised on the corresponding failure path.
        """
        role = _get_role(connection)
        if role not in allowed:
            logger.warning(
                API_GUARD_DENIED,
                guard=f"require_roles({label})",
                role=role,
                path=str(connection.url.path),
            )
            raise PermissionDeniedException(detail="Access denied")

    guard.__name__ = f"require_roles({label})"
    guard.__qualname__ = f"require_roles({label})"
    return guard

require_org_mutation

require_org_mutation(department_param=None)

Guard factory for org config mutations.

Access is granted if the user has one of:

  • OrgRole.OWNER -- always allowed
  • OrgRole.EDITOR -- always allowed
  • OrgRole.DEPARTMENT_ADMIN -- allowed only when the target department (read from the path parameter named department_param) is in the user's scoped_departments

If the user has no org_roles (empty tuple), falls back to the existing HumanRole write-access check so legacy installations without organisation-level roles still resolve.

Parameters:

Name Type Description Default
department_param str | None

Path parameter name containing the target department (e.g. "name"). None skips department scope checking (company-level endpoints).

None

Returns:

Type Description
Callable[[ASGIConnection, object], None]

A guard function compatible with Litestar's guard protocol.

Raises:

Type Description
PermissionDeniedException

Raised on the corresponding failure path.

Source code in src/synthorg/api/guards.py
def require_org_mutation(
    department_param: str | None = None,
) -> Callable[[ASGIConnection, object], None]:  # type: ignore[type-arg]
    """Guard factory for org config mutations.

    Access is granted if the user has one of:

    - ``OrgRole.OWNER`` -- always allowed
    - ``OrgRole.EDITOR`` -- always allowed
    - ``OrgRole.DEPARTMENT_ADMIN`` -- allowed only when the
      target department (read from the path parameter named
      *department_param*) is in the user's ``scoped_departments``

    If the user has no ``org_roles`` (empty tuple), falls back to
    the existing ``HumanRole`` write-access check so legacy
    installations without organisation-level roles still resolve.

    Args:
        department_param: Path parameter name containing the target
            department (e.g. ``"name"``).  ``None`` skips department
            scope checking (company-level endpoints).

    Returns:
        A guard function compatible with Litestar's guard protocol.

    Raises:
        PermissionDeniedException: Raised on the corresponding failure path.
    """

    def guard(
        connection: ASGIConnection,  # type: ignore[type-arg]
        _: object,
    ) -> None:
        """Handle guard.

        Raises:
            PermissionDeniedException: Raised on the corresponding failure path.
        """
        org_roles = _get_org_roles(connection)

        # Backward compat: if no org_roles set, fall back to HumanRole
        if not org_roles:
            role = _get_role(connection)
            if role in _WRITE_ROLES:
                return
            logger.warning(
                API_GUARD_DENIED,
                guard="require_org_mutation(fallback)",
                role=role,
                path=str(connection.url.path),
            )
            raise PermissionDeniedException(detail="Write access denied")

        # Owner and editor always allowed
        if _ORG_ROLE_OWNER in org_roles or _ORG_ROLE_EDITOR in org_roles:
            return

        # Department admin: check scope
        if _ORG_ROLE_DEPARTMENT_ADMIN in org_roles:
            if department_param is None:
                # Company-level endpoint -- dept_admin cannot modify
                logger.warning(
                    API_GUARD_DENIED,
                    guard="require_org_mutation(dept_admin_no_scope)",
                    path=str(connection.url.path),
                )
                raise PermissionDeniedException(
                    detail="Department admins cannot modify company-level settings",
                )
            target_dept = connection.path_params.get(department_param, "")
            scoped = _get_scoped_departments(connection)
            if target_dept.lower() in (d.lower() for d in scoped):
                return
            logger.warning(
                API_GUARD_DENIED,
                guard="require_org_mutation(dept_admin_out_of_scope)",
                target_department=target_dept,
                scoped_departments=scoped,
                path=str(connection.url.path),
            )
            raise PermissionDeniedException(
                detail=f"Department admin access denied for {target_dept!r}",
            )

        # Viewer or unrecognised role
        logger.warning(
            API_GUARD_DENIED,
            guard="require_org_mutation(insufficient_org_role)",
            org_roles=org_roles,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Org mutation access denied")

    guard.__name__ = "require_org_mutation"
    guard.__qualname__ = "require_org_mutation"
    return guard

Middleware

middleware

Request middleware and before-send hooks.

Provides ASGI middleware for request logging, and a before_send hook that injects security headers (CSP, CORP, HSTS, Cache-Control, etc.) into every HTTP response -- including exception-handler and unmatched-route (404/405) responses.

Why before_send instead of ASGI middleware? Litestar's before_send hook wraps the ASGI send callback at the outermost layer (before the middleware stack), so it fires for all responses. By contrast, user-defined ASGI middleware only runs for matched routes -- 404 and 405 responses from the router bypass it.

RequestLoggingMiddleware

RequestLoggingMiddleware(app)

ASGI middleware that logs request start and completion.

Uses time.perf_counter() for high-resolution duration measurement. Only logs HTTP requests (non-HTTP scopes like WebSocket and lifespan are passed through without logging).

Each HTTP request is also wrapped in an OpenTelemetry span (http.request) carrying OTel-semconv attributes (http.request.method, http.route, http.response.status_code) plus the synthorg.correlation_id so distributed traces line up with the structured-log stream. When no tracer provider is configured (default), get_tracer returns a no-op tracer and the span is essentially free.

Source code in src/synthorg/api/middleware.py
def __init__(self, app: ASGIApp) -> None:
    self.app = app

__call__ async

__call__(scope, receive, send)

Process an ASGI request, logging start and completion.

Source code in src/synthorg/api/middleware.py
async def __call__(
    self,
    scope: Scope,
    receive: Receive,
    send: Send,
) -> None:
    """Process an ASGI request, logging start and completion."""
    if scope["type"] != ScopeType.HTTP:
        await self.app(scope, receive, send)
        return

    request: Request[Any, Any, Any] = Request(scope)
    method = request.method
    path = str(request.url.path)

    correlation_id = generate_correlation_id()
    bind_correlation_id(request_id=correlation_id)
    _log_request_started(method, path)
    start = time.perf_counter()

    status_code: int | None = None
    original_send = send

    async def capture_send(message: Any) -> None:
        """Run capture send."""
        nonlocal status_code
        if (
            isinstance(message, dict)
            and message.get("type") == "http.response.start"
        ):
            raw_status = message.get("status")
            if raw_status is None:
                logger.warning(
                    API_ASGI_MISSING_STATUS,
                    type=message.get("type"),
                )
                status_code = 500
            else:
                status_code = raw_status
        await original_send(message)  # pyright: ignore[reportArgumentType]

    with _tracer.start_as_current_span(
        "http.request",
        record_exception=False,
        set_status_on_exception=False,
    ) as span:
        span.set_attribute("http.request.method", method)
        span.set_attribute("synthorg.correlation_id", correlation_id)
        try:
            await self.app(scope, receive, capture_send)
        except Exception as exc:
            reraise_critical(exc)
            # OTel's ``record_exception`` would serialise the full
            # traceback (including frame locals) into the span,
            # bypassing the structlog secret-log redaction the
            # rest of the codebase relies on. To keep the OTLP
            # transport on the same redaction posture as the
            # structlog sink, set OTel-semconv exception
            # attributes directly using the scrubbed description
            # and skip the traceback emission. See
            # ``docs/reference/sec-prompt-safety.md`` for the
            # transport-level redaction policy.
            span.set_attribute("exception.type", type(exc).__name__)
            span.set_attribute(
                "exception.message",
                safe_error_description(exc),
            )
            span.set_status(Status(StatusCode.ERROR, type(exc).__name__))
            raise
        finally:
            span.set_attribute("http.route", _resolve_route_template(scope))
            if status_code is not None:
                span.set_attribute("http.response.status_code", status_code)
                if status_code >= 500:  # noqa: PLR2004
                    span.set_status(Status(StatusCode.ERROR))
            elapsed_sec = time.perf_counter() - start
            duration_ms = round(elapsed_sec * 1000, 2)
            _log_request_completion(method, path, status_code, duration_ms)
            _record_request_metric(scope, method, status_code, elapsed_sec)
            clear_correlation_ids()

build_docs_csp

build_docs_csp(origins)

Build the relaxed Scalar UI CSP from a list of trusted origins.

Origins are applied uniformly to script-src, style-src, img-src, font-src and connect-src so operators can swap the public Scalar hosts for an internally-mirrored CDN with a single configuration change.

An empty origins list raises ValueError rather than emit a malformed CSP with trailing whitespace before each ;. CSP parsers tolerate the trailing space but operators reading the header back would see an obviously broken policy; the ApiBridgeConfig validator is the right place to enforce non-empty (currently only validates pattern), so callers pass through the bridge-config-validated tuple.

Parameters:

Name Type Description Default
origins Sequence[str]

Origin URLs that Scalar UI assets and proxy requests may target. Must be non-empty. Each entry must already be a valid origin (scheme + host); ApiBridgeConfig performs the per-entry validation.

required

Returns:

Type Description
str

A CSP header value safe to assign to

str

Content-Security-Policy for /docs/ responses.

Raises:

Type Description
ValueError

If origins is empty.

Source code in src/synthorg/api/middleware.py
def build_docs_csp(origins: Sequence[str]) -> str:
    """Build the relaxed Scalar UI CSP from a list of trusted origins.

    Origins are applied uniformly to ``script-src``, ``style-src``,
    ``img-src``, ``font-src`` and ``connect-src`` so operators can
    swap the public Scalar hosts for an internally-mirrored CDN with
    a single configuration change.

    An empty *origins* list raises ``ValueError`` rather than emit a
    malformed CSP with trailing whitespace before each ``;``. CSP
    parsers tolerate the trailing space but operators reading the
    header back would see an obviously broken policy; the
    ``ApiBridgeConfig`` validator is the right place to enforce
    non-empty (currently only validates pattern), so callers pass
    through the bridge-config-validated tuple.

    Args:
        origins: Origin URLs that Scalar UI assets and proxy requests
            may target. Must be non-empty. Each entry must already be
            a valid origin (scheme + host); ``ApiBridgeConfig``
            performs the per-entry validation.

    Returns:
        A CSP header value safe to assign to
        ``Content-Security-Policy`` for ``/docs/`` responses.

    Raises:
        ValueError: If *origins* is empty.
    """
    if not origins:
        msg = "build_docs_csp requires at least one trusted origin"
        raise ValueError(msg)
    joined = " ".join(origins)
    return (
        f"default-src 'self'; "
        f"script-src 'self' 'unsafe-inline' {joined}; "
        f"style-src 'self' 'unsafe-inline' {joined}; "
        f"img-src 'self' data: {joined}; "
        f"font-src 'self' data: {joined}; "
        f"connect-src 'self' {joined}; "
        f"object-src 'none'; "
        f"base-uri 'self'; "
        f"frame-ancestors 'none'"
    )

set_docs_csp_origins

set_docs_csp_origins(origins)

Replace the docs CSP value with one built from origins.

Called once at app startup after resolving api.csp_docs_external_origins through the settings service. Reset to the default list with _DOCS_CSP_DEFAULT_ORIGINS for test isolation.

Calling this outside startup creates a brief eventual-consistency window for in-flight HTTP responses, since the docs before_send hook reads the global at request time. The api.csp_docs_external_origins setting is marked restart_required=True precisely to keep this single-writer.

Source code in src/synthorg/api/middleware.py
def set_docs_csp_origins(origins: Sequence[str]) -> None:
    """Replace the docs CSP value with one built from *origins*.

    Called once at app startup after resolving
    ``api.csp_docs_external_origins`` through the settings service.
    Reset to the default list with ``_DOCS_CSP_DEFAULT_ORIGINS`` for
    test isolation.

    Calling this outside startup creates a brief eventual-consistency
    window for in-flight HTTP responses, since the docs ``before_send``
    hook reads the global at request time. The
    ``api.csp_docs_external_origins`` setting is marked
    ``restart_required=True`` precisely to keep this single-writer.
    """
    global _DOCS_CSP  # noqa: PLW0603 -- single-writer startup hook; tests reset via the same setter
    _DOCS_CSP = build_docs_csp(origins)
    logger.info(
        SETTINGS_VALUE_RESOLVED,
        namespace="api",
        key="csp_docs_external_origins",
        origins_count=len(origins),
    )

security_headers_hook async

security_headers_hook(message, scope)

Inject security headers into every HTTP response.

Registered as a Litestar before_send hook so it fires for all HTTP responses -- successful, exception-handler, and router-level 404/405.

Adds static security headers (CORP, HSTS, X-Content-Type-Options, etc.) and path-aware Content-Security-Policy (strict for API, relaxed for /docs/ to allow Scalar UI resources) and Cache-Control (no-store for API, public, max-age=300 for /docs/ since it serves public, non-user-specific content).

Uses __setitem__ (not add) so that if any handler or middleware already set a header, the known-good value overwrites it rather than creating a duplicate.

Parameters:

Name Type Description Default
message Message

ASGI message dict (only http.response.start is processed).

required
scope Scope

ASGI connection scope.

required
Source code in src/synthorg/api/middleware.py
async def security_headers_hook(message: Message, scope: Scope) -> None:
    """Inject security headers into every HTTP response.

    Registered as a Litestar ``before_send`` hook so it fires for
    **all** HTTP responses -- successful, exception-handler, and
    router-level 404/405.

    Adds static security headers (CORP, HSTS, X-Content-Type-Options,
    etc.) and path-aware Content-Security-Policy (strict for API,
    relaxed for ``/docs/`` to allow Scalar UI resources) and
    Cache-Control (``no-store`` for API, ``public, max-age=300``
    for ``/docs/`` since it serves public, non-user-specific content).

    Uses ``__setitem__`` (not ``add``) so that if any handler or
    middleware already set a header, the known-good value overwrites
    it rather than creating a duplicate.

    Args:
        message: ASGI message dict (only ``http.response.start``
            is processed).
        scope: ASGI connection scope.
    """
    if scope.get("type") != ScopeType.HTTP:
        return
    if message.get("type") != "http.response.start":
        return

    headers = MutableScopeHeaders.from_message(message)

    # Static security headers -- overwrite to prevent duplicates.
    for name, value in _SECURITY_HEADERS.items():
        headers[name] = value

    # Path-aware headers
    path: str = scope.get("path", "")
    is_docs = path == "/docs" or path.startswith("/docs/")
    headers["Content-Security-Policy"] = _DOCS_CSP if is_docs else _API_CSP

    # Relax COOP for /docs -- Scalar UI may open cross-origin popups
    # for OAuth/API proxy features via proxy.scalar.com.
    # same-origin-allow-popups: allows the page to open popups but
    # blocks cross-origin pages from retaining an opener reference,
    # preventing XS-Leak side-channel attacks via window.opener.
    # Allow brief caching for docs -- public, non-user-specific content.
    if is_docs:
        headers["Cross-Origin-Opener-Policy"] = "same-origin-allow-popups"
        headers["Cache-Control"] = _DOCS_CACHE_CONTROL
        # Defense-in-depth: even if an upstream layer set Pragma we
        # actively clear it on /docs so the no-cache hint can never
        # leak onto cacheable assets.
        with suppress(KeyError):
            del headers["Pragma"]
    else:
        headers["Pragma"] = _API_PRAGMA

Pagination

pagination

Cursor-based pagination helpers.

In-memory helper :func:paginate_cursor slices a tuple and produces a signed cursor so controllers backed by in-memory collections (config lists, bus channel names, approval-store filtered views) can return the same envelope shape as repo-backed endpoints.

The cursor layer is opaque offset encoding today. Repositories that need seek-based paging (append-only tables) decode the opaque cursor into a composite (created_at, id) seek tuple internally -- the wire format stays the same.

CursorLimit module-attribute

CursorLimit = Annotated[
    int,
    QueryParameter(
        ge=1,
        le=MAX_LIMIT,
        description=f"Page size (default {DEFAULT_LIMIT}, max {MAX_LIMIT})",
    ),
]

Query-parameter type for the page size (1-MAX_LIMIT).

HTTP-boundary only: the bounds are enforced by Litestar's QueryParameter metadata at request parsing. Do not reuse this alias for in-process validation, where the constraint would silently not apply.

CursorParam module-attribute

CursorParam = Annotated[
    str | None,
    QueryParameter(
        max_length=512,
        description="Opaque pagination cursor returned by the previous page",
    ),
]

Query-parameter type for the opaque cursor (max 512 chars).

HTTP-boundary only: the max_length is enforced by Litestar's QueryParameter metadata at request parsing, not by the type itself. Do not reuse this alias for in-process validation.

InvalidCursorError

InvalidCursorError(message=None)

Bases: ValidationError

Raised when a cursor token is malformed, tampered, or unsigned.

Renders as HTTP 422 Unprocessable Entity with a structured ErrorDetail (error_category=validation, error_code=VALIDATION_ERROR) via the centralised RFC 9457 dispatch.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

paginate_cursor

paginate_cursor(items, *, limit, cursor, secret)

Slice a tuple and produce cursor-based pagination metadata.

Clamps limit to [1, MAX_LIMIT]. A missing cursor starts at offset 0. Invalid / tampered cursors raise :class:InvalidCursorError which controllers should surface as HTTP 400.

Parameters:

Name Type Description Default
items tuple[T, ...]

Full collection to paginate (must be already ordered).

required
limit int

Maximum items to return on this page.

required
cursor str | None

Opaque cursor from the previous page, or None for the first page.

required
secret CursorSecret

HMAC secret used to sign / verify cursors.

required

Returns:

Type Description
tuple[tuple[T, ...], PaginationMeta]

Tuple of (page_items, pagination_meta).

Raises:

Type Description
InvalidCursorError

If cursor is malformed, tampered, or signed by a different secret.

Source code in src/synthorg/api/pagination.py
def paginate_cursor[T](
    items: tuple[T, ...],
    *,
    limit: int,
    cursor: str | None,
    secret: CursorSecret,
) -> tuple[tuple[T, ...], PaginationMeta]:
    """Slice a tuple and produce cursor-based pagination metadata.

    Clamps ``limit`` to ``[1, MAX_LIMIT]``. A missing cursor starts at
    offset 0. Invalid / tampered cursors raise :class:`InvalidCursorError`
    which controllers should surface as HTTP 400.

    Args:
        items: Full collection to paginate (must be already ordered).
        limit: Maximum items to return on this page.
        cursor: Opaque cursor from the previous page, or ``None`` for
            the first page.
        secret: HMAC secret used to sign / verify cursors.

    Returns:
        Tuple of (page_items, pagination_meta).

    Raises:
        InvalidCursorError: If ``cursor`` is malformed, tampered, or
            signed by a different secret.
    """
    if cursor is None:
        offset = 0
    else:
        try:
            offset = decode_cursor(cursor, secret=secret)
        except InvalidCursorError:
            # Malformed / tampered / foreign-secret cursors raise here;
            # log before re-raising so 400s from decode failures are
            # observable in production alongside the truncation branch
            # below.  The cursor itself is NOT logged -- it's attacker-
            # controlled input and may carry secret fragments from
            # tampering attempts.
            logger.warning(
                API_CURSOR_INVALID,
                reason="cursor_decode_failed",
            )
            raise
    effective_limit = max(1, min(limit, MAX_LIMIT))
    # Out-of-bounds cursors are rejected explicitly.  The cursor is
    # HMAC-signed so a client cannot forge one past the true end;
    # reaching this branch means the collection shrunk between
    # issuing the cursor and walking it (e.g. deletions) -- returning
    # an empty page would silently hide the truncation from callers
    # that rely on ``has_more`` progressing consistently.  The
    # comparison is ``>=`` because ``has_more`` is False whenever
    # ``next_offset == len(items)``, so no valid cursor is ever issued
    # pointing exactly at the collection end -- reaching that position
    # is the unambiguous truncation signal.
    if offset and offset >= len(items):
        # Truncation is an operator-visible event: the collection
        # shrank between cursor issuance and replay, and silently
        # returning an empty page would hide that from monitoring.
        logger.warning(
            API_CURSOR_INVALID,
            reason="cursor_past_end",
            offset=offset,
            collection_length=len(items),
        )
        msg = "cursor points past the end of the collection"
        raise InvalidCursorError(msg)
    page = items[offset : offset + effective_limit]
    next_offset = offset + effective_limit
    has_more = next_offset < len(items)
    next_cursor = encode_cursor(next_offset, secret=secret) if has_more else None
    meta = PaginationMeta(
        limit=effective_limit,
        next_cursor=next_cursor,
        has_more=has_more,
    )
    return page, meta

encode_repo_seek_meta

encode_repo_seek_meta(
    *, offset, page_len, total, limit, secret, reject_stale_cursor=True
)

Build PaginationMeta for controllers that push limit+offset into the repo.

Centralizes the has_more snapshot-drift guard so the next pagination bug cannot regress across every version-history controller one at a time. An empty or short page (page_len == 0 or offset + page_len == offset) cannot advance the cursor past the current offset, so the guard refuses to emit a cursor that would loop the client on the same page when count_versions disagrees with list_versions.

Parameters:

Name Type Description Default
offset int

The decoded cursor offset the current page started at.

required
page_len int

The number of repo rows consumed (len(repo_rows), not the filtered slice) -- the cursor must advance by consumed rows so filtered pages do not replay already-read rows on the next request.

required
total int

The repo's reported total row count. Drives the has_more check.

required
limit int

The page size requested.

required
secret CursorSecret

HMAC secret used to sign the next_cursor.

required
reject_stale_cursor bool

When True (the default), a decoded offset == total raises :class:InvalidCursorError (mirrors the paginate_cursor helper). Set to False only when the caller genuinely tolerates a cursor landing exactly on the current end of an append-only repo. offset > total is ALWAYS rejected regardless of this flag -- a cursor past the repo end is never legitimate (the HMAC signature would have come from a larger snapshot) and silently returning a terminal page would hide the truncation from monitoring.

True

Returns:

Type Description
PaginationMeta

PaginationMeta with the has_more / next_cursor

PaginationMeta

fields filled in, safe to wrap in PaginatedResponse.

Raises:

Type Description
InvalidCursorError

When the cursor's decoded offset is past the repo end. offset > total always raises; offset == total (with offset > 0) raises unless reject_stale_cursor=False.

Source code in src/synthorg/api/pagination.py
def encode_repo_seek_meta(  # noqa: PLR0913 -- every arg tracks a distinct pagination input
    *,
    offset: int,
    page_len: int,
    total: int,
    limit: int,
    secret: CursorSecret,
    reject_stale_cursor: bool = True,
) -> PaginationMeta:
    """Build ``PaginationMeta`` for controllers that push limit+offset into the repo.

    Centralizes the ``has_more`` snapshot-drift guard so the next
    pagination bug cannot regress across every version-history
    controller one at a time.  An empty or short page (``page_len ==
    0`` or ``offset + page_len == offset``) cannot advance the cursor
    past the current offset, so the guard refuses to emit a cursor
    that would loop the client on the same page when
    ``count_versions`` disagrees with ``list_versions``.

    Args:
        offset: The decoded cursor offset the current page started at.
        page_len: The number of repo rows consumed (``len(repo_rows)``,
            *not* the filtered slice) -- the cursor must advance by
            consumed rows so filtered pages do not replay already-read
            rows on the next request.
        total: The repo's reported total row count.  Drives the
            ``has_more`` check.
        limit: The page size requested.
        secret: HMAC secret used to sign the ``next_cursor``.
        reject_stale_cursor: When ``True`` (the default), a decoded
            ``offset == total`` raises :class:`InvalidCursorError`
            (mirrors the ``paginate_cursor`` helper).  Set to
            ``False`` only when the caller genuinely tolerates a
            cursor landing exactly on the current end of an
            append-only repo.  ``offset > total`` is ALWAYS rejected
            regardless of this flag -- a cursor past the repo end is
            never legitimate (the HMAC signature would have come from
            a larger snapshot) and silently returning a terminal
            page would hide the truncation from monitoring.

    Returns:
        ``PaginationMeta`` with the ``has_more`` / ``next_cursor``
        fields filled in, safe to wrap in ``PaginatedResponse``.

    Raises:
        InvalidCursorError: When the cursor's decoded offset is past
            the repo end.  ``offset > total`` always raises;
            ``offset == total`` (with offset > 0) raises unless
            ``reject_stale_cursor=False``.
    """
    # Out-of-bounds cursors signal the repo shrank between cursor
    # issuance and replay (deletions, filters).  Silently reporting
    # ``has_more=False`` would hide the truncation from monitoring
    # and strand clients on an empty page they cannot recover from;
    # raise so callers surface the state change as HTTP 400.  Split
    # the boundary (``offset == total``) from the past-end case
    # (``offset > total``) so ``reject_stale_cursor=False`` can relax
    # the boundary alone without opening a loophole for clearly
    # invalid cursors.
    if offset and offset > total:
        logger.warning(
            API_CURSOR_INVALID,
            reason="cursor_past_end",
            offset=offset,
            total=total,
        )
        msg = "cursor points past the end of the collection"
        raise InvalidCursorError(msg)
    if reject_stale_cursor and offset and offset == total:
        logger.warning(
            API_CURSOR_INVALID,
            reason="cursor_at_end",
            offset=offset,
            total=total,
        )
        msg = "cursor points past the end of the collection"
        raise InvalidCursorError(msg)
    next_offset = offset + page_len
    has_more = page_len > 0 and next_offset > offset and next_offset < total
    next_cursor = encode_cursor(next_offset, secret=secret) if has_more else None
    return PaginationMeta(
        limit=limit,
        next_cursor=next_cursor,
        has_more=has_more,
    )

encode_countless_seek_meta

encode_countless_seek_meta(*, offset, fetched_rows, limit, secret)

Build PaginationMeta for repos that skip the COUNT(*) round-trip.

Counterpart to :func:encode_repo_seek_meta for endpoints that use the fetch limit+1, detect overflow pattern instead of issuing a separate count query. The caller fetches up to limit + 1 rows from the backing store; this helper uses the overflow to drive has_more and ensures PaginationMeta.total stays None so clients know the count is unknown (and must derive display counts from data.length per the frontend contract in web/CLAUDE.md).

Parameters:

Name Type Description Default
offset int

The decoded cursor offset the current page started at.

required
fetched_rows int

The number of rows the repo returned when asked for limit + 1 (cap inclusive; the caller is responsible for slicing the excess before handing to PaginatedResponse).

required
limit int

The page size requested.

required
secret CursorSecret

HMAC secret used to sign the next_cursor.

required

Returns:

Type Description
PaginationMeta

PaginationMeta with total=None and the

PaginationMeta

has_more / next_cursor fields derived from overflow.

Raises:

Type Description
InvalidCursorError

When offset > 0 and fetched_rows == 0. Under the limit + 1 contract a server-issued cursor always points at a row that existed when the previous page responded, so an empty follow-up page signals truncation (rows disappeared between requests); silently returning a terminal page would hide that from monitoring.

Source code in src/synthorg/api/pagination.py
def encode_countless_seek_meta(
    *,
    offset: int,
    fetched_rows: int,
    limit: int,
    secret: CursorSecret,
) -> PaginationMeta:
    """Build ``PaginationMeta`` for repos that skip the COUNT(*) round-trip.

    Counterpart to :func:`encode_repo_seek_meta` for endpoints that
    use the ``fetch limit+1, detect overflow`` pattern instead of
    issuing a separate count query.  The caller fetches up to
    ``limit + 1`` rows from the backing store; this helper uses the
    overflow to drive ``has_more`` and ensures ``PaginationMeta.total``
    stays ``None`` so clients know the count is unknown (and must
    derive display counts from ``data.length`` per the frontend
    contract in ``web/CLAUDE.md``).

    Args:
        offset: The decoded cursor offset the current page started at.
        fetched_rows: The number of rows the repo returned when asked
            for ``limit + 1`` (cap inclusive; the caller is
            responsible for slicing the excess before handing to
            ``PaginatedResponse``).
        limit: The page size requested.
        secret: HMAC secret used to sign the ``next_cursor``.

    Returns:
        ``PaginationMeta`` with ``total=None`` and the
        ``has_more`` / ``next_cursor`` fields derived from overflow.

    Raises:
        InvalidCursorError: When ``offset > 0`` and
            ``fetched_rows == 0``.  Under the ``limit + 1`` contract
            a server-issued cursor always points at a row that
            existed when the previous page responded, so an empty
            follow-up page signals truncation (rows disappeared
            between requests); silently returning a terminal page
            would hide that from monitoring.
    """
    if offset > 0 and fetched_rows == 0:
        logger.warning(
            API_CURSOR_INVALID,
            reason="cursor_past_end",
            offset=offset,
        )
        msg = "cursor points past the end of the collection"
        raise InvalidCursorError(msg)
    has_more = fetched_rows > limit
    next_cursor = encode_cursor(offset + limit, secret=secret) if has_more else None
    return PaginationMeta(
        limit=limit,
        next_cursor=next_cursor,
        has_more=has_more,
    )

encode_keyset_meta

encode_keyset_meta(*, next_after_key, has_more, limit, secret)

Build PaginationMeta for a keyset-paginated read.

Keyset pagination is stable under concurrent inserts and deletes: the cursor encodes the sort key of the last row returned, and the next page reads WHERE sort_key > after_key. Out-of-bounds cursors degrade gracefully -- a cursor pointing past the current end of the collection just returns an empty page (rather than the offset-pagination InvalidCursorError for offset > total) because keyset reads cannot tell whether a cursor is "stale" or just pointing at a row that has been deleted.

Parameters:

Name Type Description Default
next_after_key str | None

Sort key of the last row on the page that was just returned, or None when there is no next page. has_more=True requires a non-None key.

required
has_more bool

Whether the caller observed an overflow row when fetching limit + 1 rows. Drives next_cursor emission.

required
limit int

Page size requested.

required
secret CursorSecret

HMAC secret used to sign the next_cursor.

required

Returns:

Type Description
PaginationMeta

PaginationMeta ready to wrap in PaginatedResponse.

Raises:

Type Description
ValueError

If has_more=True but next_after_key is None.

Source code in src/synthorg/api/pagination.py
def encode_keyset_meta(
    *,
    next_after_key: str | None,
    has_more: bool,
    limit: int,
    secret: CursorSecret,
) -> PaginationMeta:
    """Build ``PaginationMeta`` for a keyset-paginated read.

    Keyset pagination is stable under concurrent inserts and deletes:
    the cursor encodes the sort key of the last row returned, and the
    next page reads ``WHERE sort_key > after_key``. Out-of-bounds
    cursors degrade gracefully -- a cursor pointing past the current
    end of the collection just returns an empty page (rather than the
    offset-pagination ``InvalidCursorError`` for ``offset > total``)
    because keyset reads cannot tell whether a cursor is "stale" or
    just pointing at a row that has been deleted.

    Args:
        next_after_key: Sort key of the last row on the page that was
            just returned, or ``None`` when there is no next page.
            ``has_more=True`` requires a non-``None`` key.
        has_more: Whether the caller observed an overflow row when
            fetching ``limit + 1`` rows. Drives ``next_cursor``
            emission.
        limit: Page size requested.
        secret: HMAC secret used to sign the ``next_cursor``.

    Returns:
        ``PaginationMeta`` ready to wrap in ``PaginatedResponse``.

    Raises:
        ValueError: If ``has_more=True`` but ``next_after_key`` is
            ``None``.
    """
    if has_more and not next_after_key:
        msg = "keyset pagination: has_more=True requires next_after_key"
        raise ValueError(msg)
    next_cursor = (
        encode_keyset_cursor(next_after_key, secret=secret)
        if has_more and next_after_key is not None
        else None
    )
    return PaginationMeta(
        limit=limit,
        next_cursor=next_cursor,
        has_more=has_more,
    )

WebSocket Models

ws_models

WebSocket event models for real-time feeds.

Defines event types and the WsEvent payload that is serialised to JSON and pushed to WebSocket subscribers.

WsEventType

Bases: StrEnum

Types of real-time WebSocket events.

WsEvent pydantic-model

Bases: BaseModel

A real-time event pushed over WebSocket.

Callers must not mutate the payload dict after construction; the dict is a mutable reference inside a frozen model.

Attributes:

Name Type Description
version int

Wire-protocol version. Clients MUST ignore events whose version they do not understand. Bump only when introducing a breaking change to WsEvent -- coordinate with the WS_PROTOCOL_VERSION constant in web/src/utils/constants.ts.

event_type WsEventType

Classification of the event.

channel NotBlankStr

Target channel name.

timestamp AwareDatetime

When the event occurred.

payload dict[str, object]

Event-specific data.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _deep_copy_payload
  • _validate_payload_shape

version pydantic-field

version = WS_PROTOCOL_VERSION

WS wire-protocol version (clients ignore unknown)

event_type pydantic-field

event_type

Event classification

channel pydantic-field

channel

Target channel name

timestamp pydantic-field

timestamp

When the event occurred

payload pydantic-field

payload

Event-specific data

Auth

The auth domain types (AuthConfig, User, ApiKey, AuthenticatedUser, OrgRole, HumanRole, Session, RefreshRecord) live under synthorg.core.auth; the HTTP-coupled service, middleware, and request-scoped user binding live in synthorg.api.auth.

AuthContextMiddleware (in synthorg.api.auth.context) runs immediately after ApiAuthMiddleware and binds the authenticated user into a per-asyncio-Task ContextVar, so controllers and audit helpers read the user via no-argument accessors (get_authenticated_user_id, get_authenticated_user, audit_actor_from_context) without threading a Request.

service

Authentication service -- password hashing, JWT ops, API key hashing.

SecretNotConfiguredError

SecretNotConfiguredError(message=None)

Bases: ServiceUnavailableError

Raised when the JWT secret is required but not configured.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

RefreshRotation pydantic-model

Bases: BaseModel

Result of a successful refresh-token rotation.

The controller turns this into the session/csrf/refresh cookies and emits the post-persistence SECURITY_AUTH_REFRESH_CONSUMED audit event. session_id is the original session id (the access token rotated in place), not a freshly minted one.

Config:

  • frozen: True
  • extra: forbid

Fields:

  • token (str)
  • expires_in (int)
  • session_id (str)
  • user (User)

AuthService

AuthService(config)

Immutable authentication operations.

Owns the cryptographic primitives behind login: Argon2id password hashing and verification, JWT mint and decode, HMAC-SHA256 API key hashing, secure API key generation, and refresh-token persistence through the auth-domain boundary.

Parameters:

Name Type Description Default
config AuthConfig

Authentication configuration (carries JWT secret).

required

Async vs sync. Methods follow a single rule: an operation is declared async only when it touches an event-loop boundary -- either offloading CPU-bound work via :func:asyncio.to_thread, or awaiting a repository write. Everything else stays sync.

  • :meth:hash_password_async and :meth:verify_password_async are async because Argon2id is CPU-bound (3 time-cost iterations over 64MiB of memory by default); :func:asyncio.to_thread keeps a single login from stalling every concurrent request waiting on the loop.
  • :meth:persist_refresh_token is async because it awaits a repository write through the auth-domain boundary.
  • :meth:create_token, :meth:decode_token, :meth:hash_api_key, and :meth:generate_api_key are sync: each is either pure CPU with bounded sub-millisecond cost (HMAC, secrets.token_urlsafe) or an in-process JWT codec call with no I/O.

Thread-safety. Instances are safe to share across the request-handler pool without external locking. After :meth:__init__, the only state held is _config: AuthConfig -- itself a Pydantic frozen=True model. The module-global :class:argon2.PasswordHasher is configured once at import and treated as a deployment-wide concern (Argon2 parameter selection is not per-request); the underlying argon2 and jwt libraries are stateless and thread-safe.

Out of scope. This service does not implement token revocation (the auth middleware enforces that by checking pwd_sig on every request), session storage (handled by the refresh-token repository), or SYSTEM-role token minting (rejected by :meth:create_token; SYSTEM tokens are minted by the Go CLI with :data:SYSTEM_ISSUER / :data:SYSTEM_AUDIENCE).

Source code in src/synthorg/api/auth/service.py
def __init__(self, config: AuthConfig) -> None:
    self._config = config

hash_password_async async

hash_password_async(password)

Hash a password with Argon2id off the event loop.

Argon2id is CPU-bound; asyncio.to_thread defers the work to the default thread pool so a single login request cannot stall every concurrent request waiting on the loop.

Parameters:

Name Type Description Default
password str

Plaintext password.

required

Returns:

Type Description
str

Argon2id hash string.

Source code in src/synthorg/api/auth/service.py
async def hash_password_async(self, password: str) -> str:
    """Hash a password with Argon2id off the event loop.

    Argon2id is CPU-bound; ``asyncio.to_thread`` defers the work
    to the default thread pool so a single login request cannot
    stall every concurrent request waiting on the loop.

    Args:
        password: Plaintext password.

    Returns:
        Argon2id hash string.
    """
    return await asyncio.to_thread(_hasher.hash, password)

verify_password_async async

verify_password_async(password, password_hash)

Verify a password against an Argon2id hash off the event loop.

Parameters:

Name Type Description Default
password str

Plaintext password to check.

required
password_hash str

Stored Argon2id hash.

required

Returns:

Type Description
bool

True if the password matches.

Raises:

Type Description
VerificationError

On non-mismatch verification failures (e.g. unsupported parameters).

InvalidHashError

If the stored hash is corrupted or malformed (data integrity issue).

Source code in src/synthorg/api/auth/service.py
async def verify_password_async(
    self,
    password: str,
    password_hash: str,
) -> bool:
    """Verify a password against an Argon2id hash off the event loop.

    Args:
        password: Plaintext password to check.
        password_hash: Stored Argon2id hash.

    Returns:
        ``True`` if the password matches.

    Raises:
        argon2.exceptions.VerificationError: On non-mismatch
            verification failures (e.g. unsupported parameters).
        argon2.exceptions.InvalidHashError: If the stored hash
            is corrupted or malformed (data integrity issue).
    """
    try:
        return await asyncio.to_thread(_hasher.verify, password_hash, password)
    except argon2.exceptions.VerifyMismatchError:
        return False
    except argon2.exceptions.VerificationError as exc:
        logger.warning(
            SECURITY_AUTH_FAILED,
            reason="hash_verification_error",
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise
    except argon2.exceptions.InvalidHashError as exc:
        log_exception_redacted(
            logger, SECURITY_AUTH_FAILED, exc, reason="invalid_hash_data_corruption"
        )
        raise

create_token

create_token(user, *, session_id=None)

Create a JWT for the given human user.

The token includes a pwd_sig claim -- a 16-character truncated SHA-256 of the stored password hash. This is plain SHA-256, not HMAC -- the password hash is already a high-entropy Argon2id output, and the claim is protected by the JWT signature. The auth middleware validates this claim on every request so that tokens issued before a password change are automatically rejected.

A jti (JWT ID) claim is included for per-token session tracking and revocation.

SYSTEM-role tokens are minted by the Go CLI with :data:SYSTEM_ISSUER / :data:SYSTEM_AUDIENCE -- never by this method. Calling create_token with a SYSTEM user would mint a token bearing :data:USER_ISSUER / :data:USER_AUDIENCE, which the middleware's _resolve_jwt_user immediately rejects (per-role iss/aud enforcement). We fail-fast with ValueError here so a future caller that accidentally passes a SYSTEM user surfaces the problem at mint time, not at the next request.

The claim shape is built through :class:JwtClaims so the encode-side payload is statically typed and the decode-side boundary helper validates against the same model.

Parameters:

Name Type Description Default
user User

Authenticated human user.

required
session_id str | None

Reuse this session id (jti) instead of minting a fresh one. Refresh-token rotation passes the consumed record's session id so the access token rotates within the existing session rather than spawning a new one (which would orphan the old session and saturate max_concurrent_sessions).

None

Returns:

Type Description
tuple[str, int, str]

Tuple of (encoded JWT, expiry seconds, session ID).

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

ValueError

If user has the SYSTEM role -- mint via the CLI's system-token path instead.

Source code in src/synthorg/api/auth/service.py
def create_token(
    self,
    user: User,
    *,
    session_id: str | None = None,
) -> tuple[str, int, str]:
    """Create a JWT for the given **human** user.

    The token includes a ``pwd_sig`` claim -- a 16-character
    truncated SHA-256 of the stored password hash.  This is
    plain SHA-256, not HMAC -- the password hash is already a
    high-entropy Argon2id output, and the claim is protected
    by the JWT signature.  The auth middleware validates this
    claim on every request so that tokens issued before a
    password change are automatically rejected.

    A ``jti`` (JWT ID) claim is included for per-token session
    tracking and revocation.

    SYSTEM-role tokens are minted by the Go CLI with
    :data:`SYSTEM_ISSUER` / :data:`SYSTEM_AUDIENCE` -- never by
    this method. Calling ``create_token`` with a SYSTEM user
    would mint a token bearing :data:`USER_ISSUER` /
    :data:`USER_AUDIENCE`, which the middleware's
    ``_resolve_jwt_user`` immediately rejects (per-role iss/aud
    enforcement). We fail-fast with ``ValueError`` here so a
    future caller that accidentally passes a SYSTEM user
    surfaces the problem at mint time, not at the next request.

    The claim shape is built through :class:`JwtClaims` so the
    encode-side payload is statically typed and the decode-side
    boundary helper validates against the same model.

    Args:
        user: Authenticated human user.
        session_id: Reuse this session id (``jti``) instead of
            minting a fresh one. Refresh-token rotation passes the
            consumed record's session id so the access token
            rotates *within* the existing session rather than
            spawning a new one (which would orphan the old
            session and saturate ``max_concurrent_sessions``).

    Returns:
        Tuple of (encoded JWT, expiry seconds, session ID).

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
        ValueError: If *user* has the SYSTEM role -- mint via
            the CLI's system-token path instead.
    """
    if user.role is HumanRole.SYSTEM:
        msg = (
            "create_token cannot mint SYSTEM-role tokens; "
            "system tokens are issued by the CLI with "
            "SYSTEM_ISSUER / SYSTEM_AUDIENCE"
        )
        raise ValueError(msg)
    secret = self._require_secret("create_token")
    now = datetime.now(UTC)
    expiry_seconds = self._config.jwt_expiry_minutes * 60
    session_id = session_id if session_id is not None else uuid.uuid4().hex
    pwd_sig = hashlib.sha256(
        user.password_hash.encode(),
    ).hexdigest()[:16]
    claims = JwtClaims(
        iss=USER_ISSUER,
        aud=USER_AUDIENCE,
        sub=user.id,
        jti=session_id,
        iat=int(now.timestamp()),
        exp=int((now + timedelta(seconds=expiry_seconds)).timestamp()),
        username=user.username,
        role=user.role,
        must_change_password=user.must_change_password,
        pwd_sig=pwd_sig,
    )
    token = jwt.encode(
        claims.model_dump(mode="json"),
        secret,
        algorithm=self._config.jwt_algorithm,
    )
    return token, expiry_seconds, session_id

decode_token

decode_token(token)

Decode and validate a JWT into a typed claim set.

Issuer (iss) and audience (aud) verification is intentionally deferred to the auth middleware's _resolve_jwt_user: the canonical pair differs by role (synthorg-cli / synthorg-backend for CLI-minted SYSTEM tokens vs. synthorg-api / synthorg-api for API-minted user tokens), and the middleware loads the user record before deciding which pair to enforce. Both claims are require-listed here so a missing claim fails decode rather than reaching the middleware as None.

After PyJWT validates the signature and required claims, the raw payload is routed through :func:synthorg.api.boundary.parse_typed so a malformed claim set (extra keys, type mismatch, iat >= exp) is rejected at the boundary with a structured api.boundary.validation_failed log instead of slipping through and surprising a downstream attribute access.

Parameters:

Name Type Description Default
token str

Encoded JWT string.

required

Returns:

Name Type Description
Validated JwtClaims

class:JwtClaims instance.

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

InvalidTokenError

If the token signature, expiry, or required claim set is invalid.

ValidationError

If the decoded claim set does not conform to :class:JwtClaims (extra keys, wrong types, or violated invariants).

Source code in src/synthorg/api/auth/service.py
def decode_token(self, token: str) -> JwtClaims:
    """Decode and validate a JWT into a typed claim set.

    Issuer (``iss``) and audience (``aud``) verification is
    intentionally deferred to the auth middleware's
    ``_resolve_jwt_user``: the canonical pair differs by role
    (``synthorg-cli`` / ``synthorg-backend`` for CLI-minted
    SYSTEM tokens vs. ``synthorg-api`` / ``synthorg-api`` for
    API-minted user tokens), and the middleware loads the user
    record before deciding which pair to enforce. Both claims are
    ``require``-listed here so a missing claim fails decode rather
    than reaching the middleware as ``None``.

    After PyJWT validates the signature and required claims, the
    raw payload is routed through
    :func:`synthorg.api.boundary.parse_typed` so a malformed claim
    set (extra keys, type mismatch, ``iat >= exp``) is rejected at
    the boundary with a structured ``api.boundary.validation_failed``
    log instead of slipping through and surprising a downstream
    attribute access.

    Args:
        token: Encoded JWT string.

    Returns:
        Validated :class:`JwtClaims` instance.

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
        jwt.InvalidTokenError: If the token signature, expiry,
            or required claim set is invalid.
        ValidationError: If the decoded claim set does not
            conform to :class:`JwtClaims` (extra keys, wrong
            types, or violated invariants).
    """
    secret = self._require_secret("decode_token")
    raw_claims = jwt.decode(
        token,
        secret,
        algorithms=[self._config.jwt_algorithm],
        options={
            "require": ["exp", "iat", "sub", "jti", "iss", "aud"],
            "verify_aud": False,
            "verify_iss": False,
        },
    )
    return parse_typed("jwt", raw_claims, JwtClaims)

persist_refresh_token async

persist_refresh_token(store, *, token_hash, session_id, user_id, expires_at)

Persist a refresh token through the auth-domain boundary.

Centralises the refresh-store write + audit log so callers (notably make_session_cookies) do not reach into app_state._refresh_store directly. The repo handle is passed in rather than held by the service so this stays compatible with the existing AuthService construction (no constructor change required).

Parameters:

Name Type Description Default
store object

The :class:RefreshTokenRepository instance to write through. Typed as object to keep this module free of persistence-layer imports.

required
token_hash str

HMAC-SHA256 hex digest of the raw refresh token.

required
session_id str

Session identifier.

required
user_id str

User identifier.

required
expires_at datetime

Refresh token expiry (UTC).

required

Raises:

Type Description
QueryError

If the underlying repo write fails.

Source code in src/synthorg/api/auth/service.py
async def persist_refresh_token(
    self,
    store: object,
    *,
    token_hash: str,
    session_id: str,
    user_id: str,
    expires_at: datetime,
) -> None:
    """Persist a refresh token through the auth-domain boundary.

    Centralises the refresh-store write + audit log so callers
    (notably ``make_session_cookies``) do not reach into
    ``app_state._refresh_store`` directly.  The repo handle is
    passed in rather than held by the service so this stays
    compatible with the existing AuthService construction (no
    constructor change required).

    Args:
        store: The :class:`RefreshTokenRepository` instance to
            write through.  Typed as ``object`` to keep this
            module free of persistence-layer imports.
        token_hash: HMAC-SHA256 hex digest of the raw refresh token.
        session_id: Session identifier.
        user_id: User identifier.
        expires_at: Refresh token expiry (UTC).

    Raises:
        QueryError: If the underlying repo write fails.
    """
    await store.create(  # type: ignore[attr-defined]
        token_hash=token_hash,
        session_id=session_id,
        user_id=user_id,
        expires_at=expires_at,
    )
    logger.info(
        SECURITY_AUTH_REFRESH_CREATED,
        session_id=session_id,
        user_id=user_id,
    )

rotate_refresh_token async

rotate_refresh_token(*, raw_refresh_token, refresh_store, users, is_session_revoked)

Single-use refresh rotation: consume, validate, re-mint.

The reject matrix lives here (not the controller) so it is unit-testable without the full app: a missing / replayed / expired refresh token or a revoked session emits SECURITY_AUTH_REFRESH_REJECTED (typed reason) and raises :class:RefreshTokenInvalidError (HTTP 401, code 1005). The success path re-mints the access token within the consumed record's session so rotation does not orphan the session or saturate max_concurrent_sessions.

SECURITY_AUTH_REFRESH_CONSUMED is emitted by the caller AFTER the rotated refresh row is persisted (state-transition events log after the write), so it is intentionally not emitted here.

Parameters:

Name Type Description Default
raw_refresh_token str

The opaque refresh cookie value.

required
refresh_store RefreshTokenRepository

Repository providing single-use consume (CAS + replay + session-revocation).

required
users UserRepository

User repository for the post-consume owner lookup.

required
is_session_revoked Callable[[str], bool] | None

Predicate passed into consume so a revoked session rejects rotation.

required

Returns:

Name Type Description
A RefreshRotation

class:RefreshRotation with the new access token and

RefreshRotation

the preserved session id.

Raises:

Type Description
RefreshTokenInvalidError

For any reject path (missing cookie, consume rejection, or owner deleted between issuance and rotation).

Source code in src/synthorg/api/auth/service.py
async def rotate_refresh_token(
    self,
    *,
    raw_refresh_token: str,
    refresh_store: RefreshTokenRepository,
    users: UserRepository,
    is_session_revoked: Callable[[str], bool] | None,
) -> RefreshRotation:
    """Single-use refresh rotation: consume, validate, re-mint.

    The reject matrix lives here (not the controller) so it is
    unit-testable without the full app: a missing / replayed /
    expired refresh token or a revoked session emits
    ``SECURITY_AUTH_REFRESH_REJECTED`` (typed reason) and raises
    :class:`RefreshTokenInvalidError` (HTTP 401, code 1005). The
    success path re-mints the access token *within the consumed
    record's session* so rotation does not orphan the session or
    saturate ``max_concurrent_sessions``.

    ``SECURITY_AUTH_REFRESH_CONSUMED`` is emitted by the caller
    AFTER the rotated refresh row is persisted (state-transition
    events log after the write), so it is intentionally not
    emitted here.

    Args:
        raw_refresh_token: The opaque refresh cookie value.
        refresh_store: Repository providing single-use
            ``consume`` (CAS + replay + session-revocation).
        users: User repository for the post-consume owner lookup.
        is_session_revoked: Predicate passed into ``consume`` so
            a revoked session rejects rotation.

    Returns:
        A :class:`RefreshRotation` with the new access token and
        the preserved session id.

    Raises:
        RefreshTokenInvalidError: For any reject path (missing
            cookie, consume rejection, or owner deleted between
            issuance and rotation).
    """
    if not raw_refresh_token:
        logger.warning(SECURITY_AUTH_REFRESH_REJECTED, reason="cookie_missing")
        raise RefreshTokenInvalidError

    token_hash = self.hash_api_key(raw_refresh_token)
    outcome = await refresh_store.consume(
        token_hash,
        is_session_revoked=is_session_revoked,
    )
    if outcome.reject_reason is not None:
        logger.warning(
            SECURITY_AUTH_REFRESH_REJECTED,
            reason=outcome.reject_reason.value,
        )
        raise RefreshTokenInvalidError

    record = outcome.record
    if record is None:
        # RefreshConsumeOutcome's validator guarantees exactly one
        # of record / reject_reason is set and reject_reason was
        # None above, so this is unreachable in practice. Handle it
        # explicitly anyway (not `assert`, which `python -O`
        # strips) so the security path fails closed if the
        # invariant is ever violated by a future change.
        logger.warning(
            SECURITY_AUTH_REFRESH_REJECTED,
            reason="consume_outcome_invariant_violation",
        )
        raise RefreshTokenInvalidError

    user = await users.get(record.user_id)
    if user is None:
        # The token row is already marked used; it cannot be
        # un-consumed. Reject so a deleted owner cannot rotate.
        logger.warning(
            SECURITY_AUTH_REFRESH_REJECTED,
            reason="user_not_found_after_consume",
            user_id=record.user_id,
        )
        raise RefreshTokenInvalidError

    token, expires_in, session_id = self.create_token(
        user,
        session_id=record.session_id,
    )
    return RefreshRotation(
        token=token,
        expires_in=expires_in,
        session_id=session_id,
        user=user,
    )

hash_api_key

hash_api_key(raw_key)

Compute HMAC-SHA256 hex digest of a raw API key.

Uses the server-side JWT secret as the HMAC key so that an attacker with read access to stored hashes cannot brute-force API keys offline.

Parameters:

Name Type Description Default
raw_key str

The plaintext API key.

required

Returns:

Type Description
str

Lowercase hex digest.

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

Source code in src/synthorg/api/auth/service.py
def hash_api_key(self, raw_key: str) -> str:
    """Compute HMAC-SHA256 hex digest of a raw API key.

    Uses the server-side JWT secret as the HMAC key so that
    an attacker with read access to stored hashes cannot
    brute-force API keys offline.

    Args:
        raw_key: The plaintext API key.

    Returns:
        Lowercase hex digest.

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
    """
    secret = self._require_secret("hash_api_key")
    return hmac.digest(
        secret.encode(),
        raw_key.encode(),
        "sha256",
    ).hex()

generate_api_key staticmethod

generate_api_key()

Generate a cryptographically secure API key.

Returns:

Type Description
str

URL-safe base64 string sized by security.auth_token_bytes

str

(default 32 bytes / 43 base64 chars).

Source code in src/synthorg/api/auth/service.py
@staticmethod
def generate_api_key() -> str:
    """Generate a cryptographically secure API key.

    Returns:
        URL-safe base64 string sized by ``security.auth_token_bytes``
        (default 32 bytes / 43 base64 chars).
    """
    return secrets.token_urlsafe(get_auth_token_bytes())

middleware

JWT + API key authentication middleware.

ApiAuthMiddleware

Bases: AbstractAuthenticationMiddleware

Authenticate requests via cookie, JWT header, or API key.

Authentication priority:

  1. Session cookie: HttpOnly cookie set by login/setup. Primary auth path for browser sessions.
  2. Authorization header: Bearer <token>. Tokens with dots are JWTs (system user CLI tokens). Tokens without dots are API keys (HMAC-SHA256 lookup).

Requires auth_service, persistence backend on app.state["app_state"].

authenticate_request async

authenticate_request(connection)

Validate the session cookie or Authorization header.

Tries the session cookie first. Falls back to the Authorization header for API keys and system user JWTs.

Parameters:

Name Type Description Default
connection ASGIConnection[Any, Any, Any, Any]

Incoming ASGI connection.

required

Returns:

Type Description
AuthenticationResult

AuthenticationResult with AuthenticatedUser.

Raises:

Type Description
NotAuthorizedException

If authentication fails.

Source code in src/synthorg/api/auth/middleware.py
async def authenticate_request(
    self,
    connection: ASGIConnection[Any, Any, Any, Any],
) -> AuthenticationResult:
    """Validate the session cookie or Authorization header.

    Tries the session cookie first.  Falls back to the
    Authorization header for API keys and system user JWTs.

    Args:
        connection: Incoming ASGI connection.

    Returns:
        AuthenticationResult with AuthenticatedUser.

    Raises:
        NotAuthorizedException: If authentication fails.
    """
    app_state = connection.app.state["app_state"]
    auth_service: AuthService = app_state.auth_service
    path = str(connection.url.path)

    # 1. Try session cookie (primary path for browser sessions)
    cookie_name = _get_cookie_name(app_state)
    session_cookie = connection.cookies.get(cookie_name)
    if session_cookie and "." in session_cookie:
        user = await _try_jwt_auth(
            session_cookie,
            auth_service,
            app_state,
            path,
        )
        if user is not None:
            logger.debug(
                API_AUTH_COOKIE_USED,
                user_id=user.user_id,
                path=path,
            )
            return AuthenticationResult(user=user, auth=session_cookie)

    if session_cookie:
        logger.warning(
            SECURITY_AUTH_FAILED,
            reason="cookie_jwt_invalid",
            path=path,
        )

    # 2. Fall back to Authorization header (API keys, system user)
    auth_header = connection.headers.get("authorization")
    if not auth_header:
        if session_cookie:
            # Cookie was present but invalid
            raise NotAuthorizedException(
                detail="Invalid session cookie",
            )
        logger.warning(
            SECURITY_AUTH_FAILED,
            reason="missing_authentication",
            path=path,
        )
        raise NotAuthorizedException(
            detail="Missing authentication",
        )

    token = extract_bearer_token(auth_header)
    if token is None:
        logger.warning(
            SECURITY_AUTH_FAILED,
            reason="invalid_scheme",
            path=path,
        )
        raise NotAuthorizedException(
            detail="Invalid authorization scheme",
        )

    if "." in token:
        user = await _try_jwt_auth(
            token,
            auth_service,
            app_state,
            path,
        )
        if user is not None:
            return AuthenticationResult(user=user, auth=token)
        raise NotAuthorizedException(detail="Invalid JWT token")

    user = await _try_api_key_auth(
        token,
        auth_service,
        app_state,
        path,
    )
    if user is not None:
        return AuthenticationResult(user=user, auth=token)
    raise NotAuthorizedException(detail="Invalid credentials")

create_auth_middleware_class

create_auth_middleware_class(auth_config)

Create a middleware class with excluded paths baked in.

Litestar's AbstractAuthenticationMiddleware.__init__ takes exclude as a parameter (default None). We create a subclass whose __init__ forwards the configured exclude list to super().__init__.

The middleware is restricted to ScopeType.HTTP only; WebSocket connections use ticket-based auth handled entirely inside the WS handler (see controllers/ws.py).

Parameters:

Name Type Description Default
auth_config AuthConfig

Auth configuration with exclude_paths.

required

Returns:

Type Description
type[ApiAuthMiddleware]

Middleware class ready for use in the Litestar middleware stack.

Source code in src/synthorg/api/auth/middleware.py
def create_auth_middleware_class(
    auth_config: AuthConfig,
) -> type[ApiAuthMiddleware]:
    """Create a middleware class with excluded paths baked in.

    Litestar's ``AbstractAuthenticationMiddleware.__init__`` takes
    ``exclude`` as a parameter (default ``None``).  We create a
    subclass whose ``__init__`` forwards the configured exclude
    list to ``super().__init__``.

    The middleware is restricted to ``ScopeType.HTTP`` only;
    WebSocket connections use ticket-based auth handled entirely
    inside the WS handler (see ``controllers/ws.py``).

    Args:
        auth_config: Auth configuration with exclude_paths.

    Returns:
        Middleware class ready for use in the Litestar middleware stack.
    """
    exclude_paths = (
        list(auth_config.exclude_paths) if auth_config.exclude_paths else None
    )

    class ConfiguredAuthMiddleware(ApiAuthMiddleware):
        """Auth middleware with pre-configured exclude paths."""

        def __init__(self, app: Any) -> None:
            super().__init__(
                app,
                exclude=exclude_paths,
                scopes={ScopeType.HTTP},
            )

    return ConfiguredAuthMiddleware

context

Request-scoped binding for the authenticated user.

The auth middleware (:class:synthorg.api.auth.middleware.ApiAuthMiddleware) populates connection.scope["user"] with an :class:~synthorg.core.auth.models.AuthenticatedUser after authentication. :class:AuthContextMiddleware runs immediately after auth and binds that user into the per-:class:asyncio.Task :class:~contextvars.ContextVar defined here. Controllers and request-coupled helpers then read the authenticated user via :func:get_authenticated_user_id / :func:get_authenticated_user without threading a Request argument.

Reading the var while no user is bound raises :class:AuthContextMissingError (a 500): this surfaces middleware misconfiguration loudly instead of masking it as "api".

WebSocket scopes use ticket-based authentication (synthorg.api.controllers.ws) and are not handled by this module; :class:AuthContextMiddleware is restricted to HTTP scopes.

AuthContextMissingError

AuthContextMissingError(message=None)

Bases: DomainError

Read attempted on the auth ContextVar with no user bound.

Surfacing this as a 500 is intentional: the auth middleware runs before any controller, so by the time a controller (or helper invoked from one) calls :func:get_authenticated_user_id the var must be set. An unset read is therefore a server bug -- exclude_paths misconfiguration, a helper invoked outside the request lifecycle, or :class:AuthContextMiddleware missing from the middleware stack -- not a client error.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

AuthContextMiddleware

Bases: ASGIMiddleware

Bind scope["user"] into the per-task ContextVar.

Runs immediately after :class:~synthorg.api.auth.middleware.ApiAuthMiddleware so authenticated handlers, downstream middleware, and helpers can read the user via :func:get_authenticated_user_id without threading a Request. Excluded paths (where ApiAuthMiddleware skipped) leave the var at its default None; helpers reading it raise :class:AuthContextMissingError, which is the desired behaviour for endpoints that should never have reached a user-coupled helper without authentication.

HTTP-only: WebSocket scopes use ticket-based authentication and are bypassed by the scopes filter on the base class.

handle async

handle(scope, receive, send, next_app)

Bind scope["user"] for the duration of the inner dispatch.

Source code in src/synthorg/api/auth/context.py
async def handle(
    self,
    scope: Scope,
    receive: Receive,
    send: Send,
    next_app: ASGIApp,
) -> None:
    """Bind ``scope["user"]`` for the duration of the inner dispatch."""
    scope_user: Any = scope.get("user")
    bound_user: AuthenticatedUser | None
    if isinstance(scope_user, AuthenticatedUser):
        bound_user = scope_user
        logger.debug(
            API_AUTH_CONTEXT_BOUND,
            user_id=scope_user.user_id,
            path=scope.get("path", ""),
        )
    else:
        # Excluded paths legitimately have no scope.user; a present
        # value of any other type means a downstream middleware
        # mutated it or auth was reordered, which is a wiring bug
        # the operator must see.
        bound_user = None
        if scope_user is not None:
            logger.warning(
                API_AUTH_CONTEXT_SKIPPED,
                scope_user_type=type(scope_user).__name__,
                path=scope.get("path", ""),
            )
            # Normalise the request scope to match the bound
            # ContextVar so downstream layers reading scope["user"]
            # directly (rate-limit identifiers, anonymous-tier
            # gate, etc.) see the same unauthenticated state as
            # get_authenticated_user*(). Without this, a foreign
            # principal would be visible to gates while the
            # accessors raise AuthContextMissingError.
            scope["user"] = None
    # Always bind a token (None on the skipped path) so a context
    # inherited from an outer task cannot leak a stale principal
    # into helpers reading the var; reset unconditionally restores
    # the prior binding.
    token = _authenticated_user.set(bound_user)
    try:
        if bound_user is not None:
            # Bind actor identity so decision leaves resolve
            # ``decided_by`` via ``current_actor()`` instead of
            # every caller threading it. ``actor_id`` is the
            # immutable user id; ``label`` is the human-readable
            # username recorded in audit rows.
            actor = ActorIdentity(
                actor_id=bound_user.user_id,
                kind=ActorKind.HUMAN,
                label=bound_user.username,
            )
            with actor_scope(actor):
                await next_app(scope, receive, send)
        else:
            # No principal resolved: clear any actor inherited
            # from an outer context so decision leaves don't
            # mis-attribute ``decided_by`` to a stale identity.
            with actor_scope_cleared():
                await next_app(scope, receive, send)
    finally:
        _authenticated_user.reset(token)

get_authenticated_user

get_authenticated_user()

Return the user bound to the active request's ContextVar.

Raises:

Type Description
AuthContextMissingError

When called outside an authenticated request scope.

Returns:

Type Description
AuthenticatedUser

AuthenticatedUser instance.

Source code in src/synthorg/api/auth/context.py
def get_authenticated_user() -> AuthenticatedUser:
    """Return the user bound to the active request's ContextVar.

    Raises:
        AuthContextMissingError: When called outside an authenticated
            request scope.

    Returns:
        ``AuthenticatedUser`` instance.
    """
    user = _authenticated_user.get()
    if user is None:
        # An unset read is a server-side wiring bug (excluded path
        # misrouted to a request-coupled helper, AuthContextMiddleware
        # missing from the stack, helper invoked outside a request).
        # Operators see only the 500 envelope without this breadcrumb,
        # so emit a structured event before raising.
        logger.warning(API_AUTH_CONTEXT_MISSING, caller="get_authenticated_user")
        raise AuthContextMissingError
    return user

get_authenticated_user_id

get_authenticated_user_id()

Return the user_id of the user bound to the current request.

Raises:

Type Description
AuthContextMissingError

When called outside an authenticated request scope.

Returns:

Type Description
str

Resulting string.

Source code in src/synthorg/api/auth/context.py
def get_authenticated_user_id() -> str:
    """Return the ``user_id`` of the user bound to the current request.

    Raises:
        AuthContextMissingError: When called outside an authenticated
            request scope.

    Returns:
        Resulting string.
    """
    return get_authenticated_user().user_id

authenticated_user_scope async

authenticated_user_scope(user)

Bind user to the auth ContextVar for the duration of the block.

Production binding is performed by :class:AuthContextMiddleware. This helper exists for tests, background tasks, and any caller that needs to invoke a request-coupled helper outside the HTTP request path. Mirrors :func:synthorg.providers.cost_recording.cost_recording_scope -- token-based reset for exception safety, restoring whatever was active before.

Example (background task that calls a request-coupled helper)::

async def _background_audit(user: AuthenticatedUser) -> None:
    async with authenticated_user_scope(user):
        # audit_actor_from_context() now returns this user's
        # ProviderAuditActor without raising.
        actor = audit_actor_from_context()
        ...
Source code in src/synthorg/api/auth/context.py
@asynccontextmanager
async def authenticated_user_scope(
    user: AuthenticatedUser,
) -> AsyncIterator[None]:
    """Bind ``user`` to the auth ContextVar for the duration of the block.

    Production binding is performed by :class:`AuthContextMiddleware`.
    This helper exists for tests, background tasks, and any caller that
    needs to invoke a request-coupled helper outside the HTTP request
    path. Mirrors :func:`synthorg.providers.cost_recording.cost_recording_scope`
    -- token-based reset for exception safety, restoring whatever was
    active before.

    Example (background task that calls a request-coupled helper)::

        async def _background_audit(user: AuthenticatedUser) -> None:
            async with authenticated_user_scope(user):
                # audit_actor_from_context() now returns this user's
                # ProviderAuditActor without raising.
                actor = audit_actor_from_context()
                ...
    """
    token = _authenticated_user.set(user)
    try:
        yield
    finally:
        _authenticated_user.reset(token)