ArdaClient (sync)
Blocking client for notebooks, scripts, and batch jobs—explicit methods for every resource with automatic pagination, polling helpers, and consistent error types.
Python
The AI-native Python client for ARDA. Discover governing equations, causal structures, and conservation laws from scientific data with typed models, automatic retries, and first-class notebook support.
Blocking client for notebooks, scripts, and batch jobs—explicit methods for every resource with automatic pagination, polling helpers, and consistent error types.
Async-first interface for services and high-concurrency pipelines—compose concurrent runs, streaming artifacts, and cancellation that respects server-side state.
Requests and responses map to Pydantic models generated from the OpenAPI spec—IDE autocompletion and type checkers catch mistakes before they reach production.
Automatic retries with exponential backoff for transient failures (429, 5xx), aligned with platform rate headers. Non-retryable errors raise immediately.
The SDK requires Python 3.9+ and is distributed on PyPI. Install with pip or any PEP 517-compatible installer.
# Core SDK pip install arda-sdk # With async support (adds httpx[http2]) pip install arda-sdk[async] # With notebook extras (adds rich display + pandas) pip install arda-sdk[notebook] # Everything pip install arda-sdk[async,notebook]
Python: 3.9, 3.10, 3.11, 3.12, 3.13
Dependencies: httpx, pydantic ≥ 2.0, typing-extensions
Optional: pandas, matplotlib (notebook extras)

From pip install to your first claim in 10 lines. Set the ARDA_API_KEY environment variable or pass it directly to the client.
from arda import ArdaClient
client = ArdaClient(api_key="arda_key_sk_live_7f3a9b2c...")
project = client.projects.create(name="My First Discovery")
dataset = client.datasets.upload(project_id=project.id, file_path="data.parquet")
campaign = client.campaigns.create(project_id=project.id, name="Initial sweep")
run = client.runs.create(project_id=project.id, campaign_id=campaign.id, mode="symbolic", dataset_id=dataset.id)
run = client.runs.wait(run.id, poll_interval=5.0)
for claim in client.claims.list(project_id=project.id, run_id=run.id):
print(f"[{claim.type}] {claim.summary} (score: {claim.score})")Both ArdaClient and AsyncArdaClient accept the same configuration options. Environment variables are used as defaults — explicit arguments take precedence.
from arda import ArdaClient
client = ArdaClient(
api_key="arda_key_sk_live_7f3a9b2c...", # or ARDA_API_KEY env var
base_url="https://api.arda.vareon.com/v1", # or ARDA_BASE_URL env var
timeout=60.0, # request timeout in seconds (default: 30)
max_retries=5, # retries for transient failures (default: 3)
headers={"X-Custom-Header": "value"}, # additional headers on every request
)Your ARDA API key. Falls back to ARDA_API_KEY.
API base URL. Defaults to https://api.arda.vareon.com/v1.
Per-request timeout in seconds. Default: 30. Dataset uploads use 5x this value automatically.
Number of retries for 429 and 5xx responses with exponential backoff + jitter. Default: 3.
Additional HTTP headers merged into every request. Useful for tracing or custom metadata.
ARDA_API_KEY, ARDA_BASE_URL, ARDA_TIMEOUT, ARDA_MAX_RETRIES.
# List all projects (returns paginated iterator)
projects = client.projects.list(status="active")
for project in projects:
print(f"{project.id}: {project.name}")
# Create a new project with governance settings
project = client.projects.create(
name="Fluid Dynamics Study",
description="Discover governing equations for turbulent flow",
settings={
"default_mode": "neuro_symbolic",
"governance": {
"promotion_ceiling": "validate",
"require_negative_controls": True,
"truth_dial_threshold": 0.85
}
}
)
# Retrieve a specific project
project = client.projects.get("proj_tds_8f2a1b")
# Update project settings
project = client.projects.update(
"proj_tds_8f2a1b",
description="Updated description",
settings={"governance": {"truth_dial_threshold": 0.90}}
)# Upload a dataset from a local file (parquet, CSV, HDF5)
dataset = client.datasets.upload(
project_id="proj_tds_8f2a1b",
file_path="turbulence_re5000.parquet",
name="Turbulence Re=5000",
variables=["velocity_x", "velocity_y", "pressure", "time"]
)
print(f"Uploaded: {dataset.id} ({dataset.size_bytes} bytes, {dataset.rows} rows)")
# Get automated profile (statistics, quality, distributions)
profile = client.datasets.profile(
project_id="proj_tds_8f2a1b",
dataset_id="ds_turb_001"
)
for col in profile.columns:
print(f" {col.name}: mean={col.mean:.3f}, std={col.std:.3f}, nulls={col.null_count}")
# List datasets in a project
datasets = client.datasets.list(project_id="proj_tds_8f2a1b", status="ready")# Create a campaign with budget and governance
campaign = client.campaigns.create(
project_id="proj_tds_8f2a1b",
name="Reynolds Number Sweep",
budget={"compute_units": 2500, "max_runs": 50},
governance={
"promotion_ceiling": "validate",
"require_negative_controls": True,
"auto_promote_to": "explore"
}
)
# List campaigns with filtering
campaigns = client.campaigns.list(
project_id="proj_tds_8f2a1b",
status="active"
)
# Get campaign details including budget usage
campaign = client.campaigns.get(
project_id="proj_tds_8f2a1b",
campaign_id="camp_rns_4e8f2a"
)
print(f"Budget: {campaign.budget.compute_units_used}/{campaign.budget.compute_units} CU")Submit discovery runs in any of ARDA's four modes: symbolic, neural, neuro_symbolic, and cde (Causal mode).
# Symbolic discovery — find closed-form governing equations
run_sym = client.runs.create(
project_id="proj_tds_8f2a1b",
campaign_id="camp_rns_4e8f2a",
mode="symbolic",
dataset_id="ds_turb_001",
parameters={"constraints": ["conservation_of_energy"], "max_complexity": 5}
)
# Neural discovery — learn latent representations
run_neural = client.runs.create(
project_id="proj_tds_8f2a1b",
campaign_id="camp_rns_4e8f2a",
mode="neural",
dataset_id="ds_turb_001",
parameters={"embedding": "transformer", "epochs": 200}
)
# Neuro-Symbolic — combine neural embeddings with symbolic regression
run_ns = client.runs.create(
project_id="proj_tds_8f2a1b",
campaign_id="camp_rns_4e8f2a",
mode="neuro_symbolic",
dataset_id="ds_turb_001",
parameters={
"neural_backbone": "transformer",
"symbolic_prior": "polynomial",
"max_complexity": 5,
"cross_validate": True
}
)
# Causal mode — causal dynamics discovery
run_cde = client.runs.create(
project_id="proj_tds_8f2a1b",
campaign_id="camp_rns_4e8f2a",
mode="cde",
dataset_id="ds_turb_001",
parameters={"conserved_quantities": ["energy", "momentum"], "symmetry_groups": ["translational"]}
)
# Wait for a run to complete (blocks with polling)
completed = client.runs.wait(run_sym.id, poll_interval=5.0, timeout=3600.0)
print(f"Status: {completed.status}, Claims: {completed.claims_count}")
# Check status without blocking
status = client.runs.status(run_ns.id)
print(f"Stage: {status.pipeline.current_stage}, Progress: {status.pipeline.progress}")
# Cancel a queued or running run
client.runs.cancel(run_neural.id)
# List runs in a campaign
runs = client.runs.list(
project_id="proj_tds_8f2a1b",
campaign_id="camp_rns_4e8f2a",
status="completed",
mode="symbolic"
)# List claims with filters
claims = client.claims.list(
project_id="proj_tds_8f2a1b",
run_id="run_ns_7a3b2c",
type="law", # "law", "causal", "conservation"
tier="explore", # "explore", "validate", "publish"
min_score=0.8,
sort="score",
order="desc"
)
for claim in claims:
print(f"[{claim.type}] {claim.summary}")
print(f" Score: {claim.score}, Tier: {claim.tier}")
print(f" Scope: {claim.scope}")
# Get a specific claim with full detail
claim = client.claims.get(project_id="proj_tds_8f2a1b", claim_id="clm_law_9d4e5f")
print(f"Expression: {claim.expression}")
print(f"Evidence: r²={claim.evidence.r_squared}, controls={claim.evidence.negative_controls_passed}")
# Promote a claim through Truth Dial tiers
promoted = client.claims.promote(
project_id="proj_tds_8f2a1b",
claim_id="clm_law_9d4e5f",
target_tier="validate",
rationale="Passed all negative controls and cross-dataset validation."
)
print(f"Promoted to {promoted.tier} (from {promoted.previous_tier})")
# Export claims to JSON or CSV
client.claims.export(
project_id="proj_tds_8f2a1b",
run_id="run_ns_7a3b2c",
format="csv",
output_path="claims_export.csv"
)# Query ledger entries for a specific claim
entries = client.ledger.query(
project_id="proj_tds_8f2a1b",
claim_id="clm_law_9d4e5f",
event_type="claim_promoted"
)
for entry in entries:
print(f"[{entry.timestamp}] {entry.event_type} by {entry.actor}")
print(f" Details: {entry.details}")
# Verify ledger integrity (chain validation)
verification = client.ledger.verify(project_id="proj_tds_8f2a1b")
print(f"Integrity: {verification.status}") # "valid" or "corrupted"
print(f"Entries verified: {verification.entries_checked}")
# Export full ledger for external auditing
client.ledger.export(
project_id="proj_tds_8f2a1b",
format="json",
output_path="ledger_audit.json"
)# List artifacts produced by a run
artifacts = client.artifacts.list(
project_id="proj_tds_8f2a1b",
run_id="run_ns_7a3b2c"
)
for artifact in artifacts:
print(f"{artifact.name} ({artifact.type}, {artifact.size_bytes} bytes)")
# Download an artifact to disk
client.artifacts.download(
project_id="proj_tds_8f2a1b",
artifact_id="art_model_weights_01",
output_path="model_weights.pt"
)
# Stream large artifacts without loading into memory
with client.artifacts.stream(
project_id="proj_tds_8f2a1b",
artifact_id="art_embeddings_01"
) as stream:
for chunk in stream:
process(chunk)The async client shares the same method signatures but returns awaitables. Built on httpx async transport, it integrates cleanly with asyncio for FastAPI services, Celery workers, and concurrent pipelines.
from arda import AsyncArdaClient
import asyncio
async def parallel_discovery():
async with AsyncArdaClient() as client:
# Launch all four modes concurrently
runs = await asyncio.gather(
client.runs.create(
project_id="proj_tds_8f2a1b", campaign_id="camp_rns_4e8f2a",
mode="symbolic", dataset_id="ds_turb_001",
parameters={"constraints": ["conservation_of_energy"], "max_complexity": 5}
),
client.runs.create(
project_id="proj_tds_8f2a1b", campaign_id="camp_rns_4e8f2a",
mode="neural", dataset_id="ds_turb_001",
parameters={"embedding": "transformer", "epochs": 200}
),
client.runs.create(
project_id="proj_tds_8f2a1b", campaign_id="camp_rns_4e8f2a",
mode="neuro_symbolic", dataset_id="ds_turb_001",
parameters={"neural_backbone": "transformer", "symbolic_prior": "polynomial"}
),
client.runs.create(
project_id="proj_tds_8f2a1b", campaign_id="camp_rns_4e8f2a",
mode="cde", dataset_id="ds_turb_001",
parameters={"conserved_quantities": ["energy", "momentum"]}
),
)
# Wait for all to complete
completed = await asyncio.gather(*[client.runs.wait(r.id) for r in runs])
# Collect and compare claims across modes
for run in completed:
claims = await client.claims.list(project_id="proj_tds_8f2a1b", run_id=run.id)
async for claim in claims:
print(f"[{run.mode}] {claim.summary} (score: {claim.score})")
# Stream a large artifact
async with client.artifacts.stream(
project_id="proj_tds_8f2a1b", artifact_id="art_embeddings_01"
) as stream:
async for chunk in stream:
await process(chunk)
asyncio.run(parallel_discovery())The async with context manager ensures the underlying HTTP connection pool is closed when your workflow completes, even if exceptions occur. You can also instantiate the client without a context manager and call await client.close() manually.
All exceptions inherit from ArdaError. The SDK retries transient failures (429, 5xx) automatically—non-retryable errors raise immediately with the structured error body from the API.
ArdaError
├── AuthenticationError (401)
├── PermissionError (403)
├── PolicyViolationError (403, governance)
├── NotFoundError (404)
├── ValidationError (400, 422)
├── ConflictError (409)
├── RateLimitError (429)
├── BudgetExhaustedError (budget depleted)
├── TimeoutError (request or poll timeout)
└── ServerError (500, 503)
.status_code — HTTP status code from the API
.error_code — Machine-readable error code string
.message — Human-readable error message
.details — Dict with error-specific metadata
.request_id — Request ID for support tickets
.constraint — (PolicyViolationError) the violated constraint
.retry_after — (RateLimitError) seconds to wait
from arda import ArdaClient
from arda.exceptions import (
ArdaError, PolicyViolationError, NotFoundError,
RateLimitError, BudgetExhaustedError, ValidationError
)
client = ArdaClient(max_retries=3, timeout=30.0)
try:
promoted = client.claims.promote(
project_id="proj_tds_8f2a1b",
claim_id="clm_law_9d4e5f",
target_tier="publish",
rationale="Passed all negative controls and cross-dataset validation."
)
print(f"Promoted to {promoted.tier}")
except PolicyViolationError as e:
print(f"Governance blocked promotion: {e.constraint}")
print(f"Policy ceiling: {e.details['policy_ceiling']}")
print(f"Truth Dial score: {e.details['truth_dial_score']}")
except NotFoundError:
print("Claim not found — it may have been archived.")
except BudgetExhaustedError as e:
print(f"Campaign budget depleted: {e.details['compute_units_used']}/{e.details['compute_units']}")
except ValidationError as e:
print(f"Invalid request: {e.message}")
for field_error in e.details.get("field_errors", []):
print(f" {field_error['field']}: {field_error['message']}")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")
except ArdaError as e:
print(f"Unexpected error [{e.status_code}]: {e.message}")
print(f"Request ID for support: {e.request_id}")The SDK is built for Jupyter. The synchronous client avoids event loop conflicts in notebook kernels. Model objects implement _repr_html_ for rich rendering, and list results convert to pandas DataFrames with a single call.
from arda import ArdaClient
client = ArdaClient()
# Rich display — just evaluate the object in a cell
claim = client.claims.get(project_id="proj_tds_8f2a1b", claim_id="clm_law_9d4e5f")
claim # renders as a formatted card with type, scope, score, provenance
# Convert claims to a pandas DataFrame
claims = client.claims.list(project_id="proj_tds_8f2a1b", run_id="run_ns_7a3b2c")
df = claims.to_dataframe()
df.groupby("type")["score"].describe()
# Inline plotting of run progress
status = client.runs.status("run_ns_7a3b2c")
status.plot() # renders a pipeline progress bar in the cell
# Export claims for downstream analysis
df.to_csv("claims_analysis.csv", index=False)
The SDK provides a MockArdaClient for unit tests. It records calls, returns configurable responses, and validates request shapes against the same Pydantic models used in production.
from arda.testing import MockArdaClient, mock_claim, mock_run
import pytest
@pytest.fixture
def arda_client():
client = MockArdaClient()
client.runs.mock_create(return_value=mock_run(
id="run_test_001", mode="symbolic", status="completed"
))
client.runs.mock_wait(return_value=mock_run(
id="run_test_001", mode="symbolic", status="completed", claims_count=2
))
client.claims.mock_list(return_value=[
mock_claim(id="clm_001", type="law", summary="F = ma", score=0.95),
mock_claim(id="clm_002", type="causal", summary="Force causes acceleration", score=0.88),
])
return client
def test_discovery_pipeline(arda_client):
run = arda_client.runs.create(
project_id="proj_test", campaign_id="camp_test",
mode="symbolic", dataset_id="ds_test"
)
assert run.status == "completed"
claims = list(arda_client.claims.list(project_id="proj_test", run_id=run.id))
assert len(claims) == 2
assert claims[0].type == "law"
assert claims[0].score > 0.9
# Verify the client recorded the right calls
assert arda_client.runs.create_called_with(mode="symbolic")
assert arda_client.claims.list_called_with(run_id="run_test_001")Every claim type carries domain-specific fields. Models are generated from ARDA's OpenAPI spec and validated with Pydantic. Type checkers (mypy, pyright) catch structural errors at development time.
.id: str
.type: "law"
.summary: str
.expression: str
.score: float
.tier: Tier
.scope: Scope
.evidence: Evidence
.ledger_entry_id: str
.run_id: str
.created_at: datetime
.id: str
.type: "causal"
.summary: str
.cause: str
.effect: str
.direction: str
.temporal_scope: str
.intervention_markers: list[str]
.score: float
.tier: Tier
.scope: Scope
.evidence: CausalEvidence
.id: str
.type: "conservation"
.summary: str
.conserved_quantity: str
.system_boundaries: list[str]
.symmetry_group: str | None
.flux_terms: list[str]
.score: float
.tier: Tier
.scope: Scope
.evidence: ConservationEvidence
All claim types include Scope (variables, domain, constraints satisfied), Evidence (statistical measures, negative control results), and governance metadata (ledger references, Truth Dial scores). Models support both attribute access and dict-style access.
from arda.types import LawClaim, CausalClaim, ConservationClaim
claim = client.claims.get(project_id="proj_tds_8f2a1b", claim_id="clm_law_9d4e5f")
# Type narrowing
if isinstance(claim, LawClaim):
print(f"Governing equation: {claim.expression}")
print(f"R²: {claim.evidence.r_squared}")
elif isinstance(claim, CausalClaim):
print(f"Cause: {claim.cause} → Effect: {claim.effect}")
print(f"Granger score: {claim.evidence.granger_score}")
elif isinstance(claim, ConservationClaim):
print(f"Conserved: {claim.conserved_quantity}")
print(f"Symmetry: {claim.symmetry_group}")
print(f"Boundaries: {claim.system_boundaries}")
# All claims share these fields
print(f"Score: {claim.score}, Tier: {claim.tier}")
print(f"Ledger: {claim.ledger_entry_id}")
print(f"Scope: {claim.scope.domain}")The SDK wraps the same resources available through the REST API and the MCP server. Use the SDK for scheduled jobs, CI validation, and services that coordinate many runs programmatically.
All three interfaces share the same authentication, resource model, and governance stack.