The AllSource ecosystem ships first-party SDKs for Rust, Go, Python, and TypeScript. But not every team is on one of those runtimes, and even the ones that are sometimes need to understand what's happening underneath. This post walks through the full wire protocol: how to connect, authenticate, ingest events, run queries, subscribe to the live stream, and drive projections using only HTTP and WebSockets.
Nothing in this article requires an SDK. If you can make an HTTPS call, you can use AllSource.
The mental model
AllSource runs two services. Only one of them is reachable from outside your network.
- Gateway — on our hosted offering, this is
https://api.all-source.xyz. The only AllSource endpoint on public DNS. Handles auth, rate limiting, quotas, and billing before forwarding to Core. Self-hosted deployments run the equivalent under their own domain. - Core (internal,
:3900) — the event store itself. Rust, durable via WAL + Parquet, in-memory reads via DashMap. Validates API keys in-process. Reachable only on your internal network (K8s service DNS, VPC, Fly*.internal, etc.). Never publish Core on public DNS — it's trust-the-caller by design.
Both expose the same /api/v1/* surface. Which one you target depends on where your caller runs:
- Caller outside your network (mobile, browser, third-party service, your customer's backend) → gateway URL.
- Caller inside your network (your own worker, projection service, admin tooling) → Core's internal DNS, at full speed.
The companion post on connection paths goes into the decision in depth. For this post, the examples use https://your-allsource as a placeholder — substitute the gateway URL if you're external, the internal Core URL if you're inside the network.
Authentication
Every request carries an API key. The SDK defaults to Authorization: Bearer; the legacy X-API-Key header also works. Pick one and stick with it:
# Preferred
curl -H "Authorization: Bearer ask_your_key_here" \
https://your-allsource/health
# Legacy but accepted
curl -H "X-API-Key: ask_your_key_here" \
https://your-allsource/healthKeys look like ask_ followed by a 32-character token. You provision them by setting ALLSOURCE_BOOTSTRAP_API_KEY on Core's first boot — the key persists in Core's system WAL and is replayed automatically on restart.
Ingesting events
A single event:
curl -X POST https://your-allsource/api/v1/events \
-H "Authorization: Bearer ask_..." \
-H "Content-Type: application/json" \
-d '{
"event_type": "order.placed",
"entity_id": "order-42",
"payload": {"total_usd": 19.99, "customer": "alice"},
"metadata": {"source": "api"}
}'Response shape:
{
"event_id": "01HZ...",
"timestamp": "2026-04-17T10:30:00.000Z",
"version": 1
}The version field is per-entity: each subsequent event for order-42 will come back with version: 2, 3, .... This is what powers per-entity ordering and dedup later.
A batch of events in one round-trip:
curl -X POST https://your-allsource/api/v1/events/batch \
-H "Authorization: Bearer ask_..." \
-H "Content-Type: application/json" \
-d '{
"events": [
{"event_type": "order.placed", "entity_id": "o-1", "payload": {"total": 10}},
{"event_type": "order.shipped", "entity_id": "o-1", "payload": {"carrier": "ups"}},
{"event_type": "order.placed", "entity_id": "o-2", "payload": {"total": 25}}
]
}'Rule of thumb: if you have more than a couple of events to write, batch. One HTTP round-trip instead of N is the single biggest throughput win available to a client.
Querying events
Filter by entity, event type, time range, or any combination:
# All events for one entity
curl -H "Authorization: Bearer ask_..." \
"https://your-allsource/api/v1/events/query?entity_id=order-42&limit=100"
# All events of a type since yesterday
curl -H "Authorization: Bearer ask_..." \
"https://your-allsource/api/v1/events/query?event_type=order.placed&since=2026-04-16T00:00:00Z&limit=1000"
# Prefix match (everything under order.*)
curl -H "Authorization: Bearer ask_..." \
"https://your-allsource/api/v1/events/query?event_type_prefix=order.&limit=500"Responses wrap events in an object with a total count:
{
"events": [
{
"id": "01HZ...",
"event_type": "order.placed",
"entity_id": "order-42",
"payload": {"total_usd": 19.99},
"metadata": {"source": "api"},
"timestamp": "2026-04-17T10:30:00.000Z",
"version": 1,
"tenant_id": "your-tenant"
}
],
"count": 1
}Default limit is 100. For paginated reads, use offset or filter by since/until time windows.
Subscribing to the live stream
WebSocket at /api/v1/events/stream. Two modes.
Fire-and-forget (live only)
wscat -H "Authorization: Bearer ask_..." \
-c wss://your-allsource/api/v1/events/stream
> {"type": "subscribe", "filters": ["order.*"]}The server sends each matching event as a bare JSON object. Good for ephemeral dashboards, simple tails, LLM agents doing a single-session observation.
Durable consumer (replay on reconnect)
wscat -H "Authorization: Bearer ask_..." \
-c "wss://your-allsource/api/v1/events/stream?consumer_id=my-worker"
> {"type": "subscribe", "filters": ["order.*"]}The ?consumer_id= parameter makes the subscription durable. Core tracks a cursor position for that consumer name and replays all events since the last ack on every reconnect. Frames come in one of four shapes:
// Replay event — has a position you can ack later
{"type": "replay", "position": 42, "event": { ... }}
// Replay done — subsequent events will be live
{"type": "replay_complete", "replayed": 42}
// Live event — no position (live events don't carry WAL offsets on the wire)
{"id": "01HZ...", "event_type": "order.placed", ...}
// Server broadcast lag — you dropped events; reconnect with a known position
{"type": "lagged", "missed": 17}To advance the cursor, POST the last replay position back over HTTP:
curl -X POST https://your-allsource/api/v1/consumers/my-worker/ack \
-H "Authorization: Bearer ask_..." \
-H "Content-Type: application/json" \
-d '{"position": 42}'That's the whole durable-consumer protocol. Register it once with filters:
curl -X POST https://your-allsource/api/v1/consumers \
-H "Authorization: Bearer ask_..." \
-H "Content-Type: application/json" \
-d '{"consumer_id": "my-worker", "event_type_filters": ["order.*"]}'...then connect, read, ack. Crash-safe: you resume exactly where you last acked.
Building a projection (read model)
A projection is application-defined state computed from the event stream. AllSource's Rust SDK has ProjectionWorker which wraps all the boilerplate; building it by hand in any language is four steps:
- Register a durable consumer with the event-type filters you care about.
- Open the WebSocket with
?consumer_id=<name>, subscribe. - Loop: for each event, apply your reducer to in-memory state. Ack every N events (or time-based) so restarts are cheap.
- Optional: write the computed state back to Core via
POST /api/v1/projections/:name/:entity_id/stateso other consumers can read it.
Pseudocode (works in any language):
state = {}
filters = ["asset.registered", "asset.updated"]
register_consumer("assets", filters)
ws = connect(f"wss://host/api/v1/events/stream?consumer_id=assets")
ws.send({"type": "subscribe", "filters": filters})
last_acked_position = 0
events_since_ack = 0
for msg in ws:
if msg.type == "replay_complete":
ack("assets", last_acked_position)
events_since_ack = 0
continue
event = msg.event if msg.type == "replay" else msg
reduce(state, event)
if msg.type == "replay" and msg.position:
last_acked_position = msg.position
events_since_ack += 1
if events_since_ack >= 100:
ack("assets", last_acked_position)
events_since_ack = 0
100 lines of application code replaces what used to require a Kafka cluster, a stream processor, and a state store.
Reading projection state
Projections written via POST .../state are readable by name + entity_id:
# Single entity
curl -H "Authorization: Bearer ask_..." \
https://your-allsource/api/v1/projections/assets/BTC/state
# All entities under a projection
curl -H "Authorization: Bearer ask_..." \
https://your-allsource/api/v1/projections/assets/state
# Bulk fetch by entity_id list
curl -X POST https://your-allsource/api/v1/projections/assets/bulk \
-H "Authorization: Bearer ask_..." \
-H "Content-Type: application/json" \
-d '{"entity_ids": ["BTC", "ETH", "SOL"]}'As of Core 0.19.1, these endpoints fall back to the projection state cache when no projection is registered in Core's projection manager — which means your SDK-managed projections (ones you computed client-side and pushed back via POST .../state) are readable via the same endpoints. Before 0.19.1 only Core-registered projections were readable this way.
Error handling
All endpoints return standard HTTP status codes:
200/201— success400— malformed request (bad JSON, missing field)401— missing or invalid API key403— key is valid but lacks permission for this operation404— entity/projection not found (the/stateendpoints return{"found": false, "state": null}with 200 instead)429— rate limit (only via Query Service — Core has no rate limiter)5xx— treat as retryable with backoff
Error bodies follow a consistent shape:
{"error": {"message": "description", "code": "optional-machine-code"}}Performance patterns
A few things that apply regardless of language:
- Reuse HTTP connections. Use your HTTP client's keep-alive and connection pool. Opening a fresh TCP + TLS handshake per request kills throughput, especially when you're hitting the gateway over TLS.
- Batch writes.
POST /events/batchis your friend — one round-trip for N events. - Subscribe, don't poll. The WebSocket delivers events sub-millisecond after commit. A 100ms polling loop is 100× more latency and N× more load on the server for no benefit.
- Use durable consumers for projections. The cursor-tracking means a restart isn't an O(total-events) replay from scratch; it's O(events-since-last-ack).
- One API key per service, not per process. Keys are cheap to provision but each one has its own quota window in the gateway. Reuse across your worker pool.
- If you're inside the network, skip the gateway for hot paths. Hitting Core's internal URL directly saves the gateway round-trip (~5–15ms) — worth it for anything high-volume like projection workers or batch ingest loops.
What you don't get without an SDK
The wire protocol is complete enough that nothing's strictly missing, but a first-party SDK saves you from writing:
- Event-type normalization (
VerificationCreated→verification.created) - Retry + circuit breaker on transient errors
- Automatic reconnection for WebSocket drops
- Version-based dedup on replay overlap
- Typed structs for payloads, responses, errors
If any of those look like they'd be useful: the Rust SDK is on crates.io (allsource), and TypeScript / Python / Go SDKs are available via our GitHub registry. The source is open, so you can read exactly how each one implements the protocol above and port it to whatever runtime you're on.
Where to go next
- Choosing direct-to-Core vs through Query Service — the architecture decision
- Custom projections end-to-end guide — the design space for read models
- Auth chain reference — what happens inside the auth layer
- Full API reference — every endpoint

