Dynamic Connectors
Add, remove, and list data sources at runtime via the admin API.
Overview
Connectors let you register external data sources at runtime — no server restart required. Every connector is managed programmatically via the admin REST API: register, list, and unregister connectors with simple HTTP calls. This makes it straightforward to automate connector management from scripts, CI/CD pipelines, or agent workflows.
Each connector maps to a named source adapter in the gateway. Database connectors (Postgres, MySQL, BigQuery) let clients pull data using the source query parameter. API connectors (Jira, Salesforce) run their own pollers that push data into the gateway automatically.
Supported connector types:
| Type | Kind | Config Required |
|---|---|---|
postgres | Database | connectionString |
mysql | Database | connectionString |
bigquery | Database | projectId, dataset |
jira | REST API | domain, email, apiToken |
salesforce | REST API | instanceUrl, clientId, clientSecret, username, password |
Database connectors can optionally include an ingest configuration that starts a poller to automatically detect changes in the source database and push them through the gateway. See Source Polling Ingest for details on change detection strategies.
API connectors like Jira and Salesforce define their own tables internally and poll automatically — no SQL queries or table configuration needed. Just provide the connection credentials and an optional poll interval.
All connector routes require a JWT with role: "admin".
ConnectorConfig
interface ConnectorConfigBase {
/** Connector type — determines which adapter to create. */
type: string;
/** Unique connector name (used as the source adapter key). */
name: string;
/** Optional ingest polling configuration. */
ingest?: ConnectorIngestConfig;
}
type ConnectorConfig =
| PostgresConnectorConfigFull // type: "postgres", postgres: { connectionString }
| MySQLConnectorConfigFull // type: "mysql", mysql: { connectionString }
| BigQueryConnectorConfigFull // type: "bigquery", bigquery: { projectId, dataset, ... }
| JiraConnectorConfigFull // type: "jira", jira: { domain, email, apiToken, ... }
| SalesforceConnectorConfigFull // type: "salesforce", salesforce: { instanceUrl, ... }
| (ConnectorConfigBase & Record<string, unknown>); // extensible catch-allThe open base type allows custom connector types (e.g. cloudwatch, stripe) without modifying @lakesync/core. Typed variants provide full type safety for built-in connector types.
Built-in connector config fields:
postgres{ connectionString: string }— required when type is"postgres"mysql{ connectionString: string }— required when type is"mysql"bigquery{ projectId: string; dataset: string; keyFilename?: string; location?: string }— required when type is"bigquery"jira{ domain: string; email: string; apiToken: string; jql?: string; includeComments?: boolean; includeProjects?: boolean }— required when type is"jira"salesforce{ instanceUrl: string; clientId: string; clientSecret: string; username: string; password: string; apiVersion?: string; isSandbox?: boolean; soqlFilter?: string; includeAccounts?: boolean; includeContacts?: boolean; includeOpportunities?: boolean; includeLeads?: boolean }— required when type is"salesforce"ingest{ tables?: Array<{ table, query, rowIdColumn?, strategy }>; intervalMs?: number }— optional ingest polling configuration
Validation is performed server-side via validateConnectorConfig(). Missing or invalid fields return a 400 response with a descriptive error message.
For API connectors (Jira, Salesforce), the ingest.tables field is not required — they define their tables internally. Only intervalMs, chunkSize, and memoryBudgetBytes are used from the ingest config.
API Reference
Register a Connector
POST /admin/connectors/:gatewayIdRegisters a new data source. For database connectors, creates a database adapter and adds it as a named source on the gateway. For API connectors like Jira, starts a dedicated poller that pushes data into the gateway automatically.
Headers: Authorization: Bearer <admin JWT>
Body: ConnectorConfig JSON
Response:
{ "registered": true, "name": "analytics-db" }Example (database connector):
curl -X POST https://gateway.example.com/admin/connectors/my-gw \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "analytics-db",
"type": "postgres",
"postgres": { "connectionString": "postgres://user:pass@host/analytics" }
}'Example (Jira connector):
curl -X POST https://gateway.example.com/admin/connectors/my-gw \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "jira-eng",
"type": "jira",
"jira": {
"domain": "mycompany",
"email": "bot@mycompany.com",
"apiToken": "ATATT3...",
"jql": "project = ENG"
},
"ingest": { "intervalMs": 30000 }
}'Example (Salesforce connector):
curl -X POST https://gateway.example.com/admin/connectors/my-gw \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "sf-crm",
"type": "salesforce",
"salesforce": {
"instanceUrl": "https://mycompany.salesforce.com",
"clientId": "3MVG9...",
"clientSecret": "ABC123...",
"username": "bot@mycompany.com",
"password": "mypasswordSECURITYTOKEN"
},
"ingest": { "intervalMs": 30000 }
}'Registering a connector with a name that already exists returns a 409 Conflict error.
List Connectors
GET /admin/connectors/:gatewayIdReturns all registered connectors. Connection strings and secrets are never included in the response.
Headers: Authorization: Bearer <admin JWT>
Response:
[
{
"name": "analytics-db",
"type": "postgres",
"hasIngest": false,
"isPolling": false
},
{
"name": "bq-events",
"type": "bigquery",
"hasIngest": true,
"isPolling": true
}
]Example:
curl https://gateway.example.com/admin/connectors/my-gw \
-H "Authorization: Bearer $ADMIN_TOKEN"Unregister a Connector
DELETE /admin/connectors/:gatewayId/:nameRemoves a connector. If an ingest poller is running, it is stopped. The database connection is closed and the source adapter is removed from the gateway. Clients can no longer pull from this source.
Headers: Authorization: Bearer <admin JWT>
Response:
{ "unregistered": true, "name": "analytics-db" }Example:
curl -X DELETE https://gateway.example.com/admin/connectors/my-gw/analytics-db \
-H "Authorization: Bearer $ADMIN_TOKEN"Unregistering a connector that does not exist returns a 404 Not Found error.
Examples
Adding a PostgreSQL Source
Register a Postgres database, then pull data from it:
# Register the connector
curl -X POST https://gateway.example.com/admin/connectors/my-gw \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "users-db",
"type": "postgres",
"postgres": { "connectionString": "postgres://user:pass@host/mydb" }
}'
# Pull from the registered source
curl "https://gateway.example.com/sync/my-gw/pull?since=0&clientId=app-1&source=users-db" \
-H "Authorization: Bearer $CLIENT_TOKEN"BigQuery with Ingest Polling
Register a BigQuery source with automatic change detection that polls every 30 seconds using a cursor strategy:
curl -X POST https://gateway.example.com/admin/connectors/my-gw \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "bq-events",
"type": "bigquery",
"bigquery": { "projectId": "my-project", "dataset": "events_ds" },
"ingest": {
"intervalMs": 30000,
"tables": [{
"table": "events",
"query": "SELECT * FROM events",
"strategy": { "type": "cursor", "cursorColumn": "updated_at" }
}]
}
}'Once registered, the poller starts immediately. Detected changes are pushed through the gateway and broadcast to all connected clients via the standard sync flow.
MySQL with Diff Strategy
Register a MySQL source with diff-based change detection for a small reference table:
curl -X POST https://gateway.example.com/admin/connectors/my-gw \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "config-db",
"type": "mysql",
"mysql": { "connectionString": "mysql://user:pass@host/config" },
"ingest": {
"intervalMs": 60000,
"tables": [{
"table": "feature_flags",
"query": "SELECT id, flag_name, enabled FROM feature_flags",
"strategy": { "type": "diff" }
}]
}
}'The diff strategy compares the full result set on each poll, detecting inserts, updates, and hard deletes. Use it for small tables where delete detection matters.
Jira Cloud
Register a Jira connector to sync issues, comments, and projects into LakeSync. The Jira connector polls the Jira Cloud REST API — no SQL or table configuration needed.
curl -X POST https://gateway.example.com/admin/connectors/my-gw \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "jira-eng",
"type": "jira",
"jira": {
"domain": "mycompany",
"email": "bot@mycompany.com",
"apiToken": "ATATT3xFfGF0...",
"jql": "project = ENG",
"includeComments": true,
"includeProjects": true
},
"ingest": { "intervalMs": 30000 }
}'Once registered, the Jira poller starts immediately and syncs three tables:
| Table | Row ID | Description |
|---|---|---|
jira_issues | Issue key (e.g. ENG-42) | Summary, status, priority, assignee, reporter, labels, timestamps |
jira_comments | {issueKey}:{commentId} | Comment body, author, timestamps |
jira_projects | Project key (e.g. ENG) | Name, type, lead |
How it works:
- Issues use a cursor strategy — on each poll, only issues updated since the last poll are fetched (via JQL
updated >= "date") - Comments are fetched per-issue and compared against an in-memory snapshot (diff strategy) to detect inserts and updates
- Projects use a full diff strategy — the complete project list is compared against a snapshot to detect inserts, updates, and deletes
All detected changes are pushed through the gateway and broadcast to connected clients, just like any other connector.
JQL filtering — The jql field scopes which issues are polled. For example, "project = ENG AND status != Done" only syncs active Engineering issues. When omitted, all issues in the Jira instance are polled.
Pulling Jira data — Once the Jira connector is polling, clients receive Jira data through the standard sync flow. Apply sync rules to filter which Jira data reaches each client:
{
"buckets": [{
"name": "eng-issues",
"filters": [
{ "column": "project_key", "op": "eq", "value": "ENG" }
],
"tables": ["jira_issues", "jira_comments"]
}]
}Salesforce CRM
Register a Salesforce connector to sync accounts, contacts, opportunities, and leads into LakeSync. The Salesforce connector polls the Salesforce REST API via SOQL — no SQL or table configuration needed.
curl -X POST https://gateway.example.com/admin/connectors/my-gw \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "sf-crm",
"type": "salesforce",
"salesforce": {
"instanceUrl": "https://mycompany.salesforce.com",
"clientId": "3MVG9...",
"clientSecret": "ABC123...",
"username": "bot@mycompany.com",
"password": "mypasswordSECURITYTOKEN",
"includeAccounts": true,
"includeContacts": true,
"includeOpportunities": true,
"includeLeads": true
},
"ingest": { "intervalMs": 30000 }
}'Once registered, the Salesforce poller starts immediately and syncs four tables:
| Table | Row ID | Description |
|---|---|---|
sf_accounts | Salesforce Id (e.g. 001ABC123) | Name, type, industry, website, phone, billing address, annual revenue, employees, owner |
sf_contacts | Salesforce Id (e.g. 003ABC123) | First/last name, email, phone, title, account, mailing address, owner |
sf_opportunities | Salesforce Id (e.g. 006ABC123) | Name, stage, amount, close date, probability, account, type, lead source, is closed/won, owner |
sf_leads | Salesforce Id (e.g. 00QABC123) | First/last name, company, email, phone, title, status, lead source, conversion fields, owner |
How it works:
- All four entity types use a cursor strategy via
LastModifiedDate— on each poll, only records modified since the last poll are fetched via SOQL - First poll fetches all records (no cursor yet), subsequent polls are incremental
- Authentication uses OAuth 2.0 Username-Password flow — the connector auto-authenticates and re-authenticates on token expiry (401)
- Rate limiting (503) is handled with automatic retries using the
Retry-Afterheader
Authentication setup — The password field must contain the Salesforce password concatenated with the security token (e.g. mypasswordABCDEF123456). Create a Connected App in Salesforce Setup to obtain the clientId and clientSecret. For sandbox environments, set isSandbox: true to authenticate against test.salesforce.com.
SOQL filtering — The soqlFilter field appends a WHERE clause fragment to all SOQL queries. For example, "Type = 'Customer'" only syncs customer accounts. This applies across all entity types.
Selective polling — Disable entity types you do not need with includeAccounts, includeContacts, includeOpportunities, and includeLeads (all default to true).
Pulling Salesforce data — Once the connector is polling, clients receive CRM data through the standard sync flow. Apply sync rules to filter which data reaches each client:
{
"buckets": [{
"name": "sales-pipeline",
"filters": [
{ "column": "stage_name", "op": "neq", "value": "Closed Won" }
],
"tables": ["sf_opportunities"]
}, {
"name": "my-accounts",
"filters": [
{ "column": "owner_name", "op": "eq", "value": "jwt:name" }
],
"tables": ["sf_accounts", "sf_contacts"]
}]
}Programmatic Connector Management
All connector operations are available via the admin REST API, making it straightforward to automate from scripts, CI/CD, or agent workflows. No server restart or configuration file changes are needed.
Register any connector type with a single HTTP call:
# Database connector
curl -X POST $GATEWAY_URL/admin/connectors/$GW_ID \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{ "name": "my-pg", "type": "postgres", "postgres": { "connectionString": "..." } }'
# API connector (Jira)
curl -X POST $GATEWAY_URL/admin/connectors/$GW_ID \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{ "name": "my-jira", "type": "jira", "jira": { "domain": "co", "email": "...", "apiToken": "..." } }'
# API connector (Salesforce)
curl -X POST $GATEWAY_URL/admin/connectors/$GW_ID \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{ "name": "my-sf", "type": "salesforce", "salesforce": { "instanceUrl": "https://co.salesforce.com", "clientId": "...", "clientSecret": "...", "username": "...", "password": "..." } }'List all registered connectors:
curl $GATEWAY_URL/admin/connectors/$GW_ID \
-H "Authorization: Bearer $ADMIN_TOKEN"Remove a connector (stops poller, closes connections):
curl -X DELETE $GATEWAY_URL/admin/connectors/$GW_ID/my-pg \
-H "Authorization: Bearer $ADMIN_TOKEN"Configure sync rules (filter which data reaches each client):
curl -X POST $GATEWAY_URL/admin/sync-rules/$GW_ID \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"buckets": [{
"name": "team-data",
"filters": [{ "column": "project_key", "op": "eq", "value": "jwt:team" }],
"tables": ["jira_issues", "jira_comments"]
}, {
"name": "my-pipeline",
"filters": [{ "column": "owner_name", "op": "eq", "value": "jwt:name" }],
"tables": ["sf_opportunities", "sf_accounts"]
}]
}'Register table schemas:
curl -X POST $GATEWAY_URL/admin/schema/$GW_ID \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"table": "jira_issues",
"columns": [
{ "name": "key", "type": "string" },
{ "name": "summary", "type": "string" },
{ "name": "status", "type": "string" }
]
}'The full admin API surface — connectors, sync rules, schemas, flush — is designed for programmatic use. The gateway can be fully configured at runtime without touching any configuration files.
Gateway Server vs Worker
| Capability | Gateway Server (self-hosted) | Gateway Worker (Cloudflare) |
|---|---|---|
| Database connections | Creates live connections | Stores config only |
| API connectors (Jira, Salesforce) | Starts API pollers automatically | Not supported |
| Ingest polling | Starts pollers automatically | Not supported (no long-running processes) |
| Connector storage | In-memory (lost on restart) | Durable Object storage (persistent) |
Gateway Server (@lakesync/gateway-server) creates live database connections and starts ingest pollers when a connector is registered. For API connectors like Jira and Salesforce, it starts dedicated pollers that communicate with the external API. The server must have network access to the source database or API.
Gateway Worker (gateway-worker on Cloudflare Workers) stores connector configuration in the Durable Object but cannot create live database connections, run pollers, or communicate with external APIs — the Workers runtime does not support long-running processes or arbitrary TCP connections. Use the gateway server for connectors that require ingest polling or API access.
E2E Testing
Each API connector includes an E2E test suite that runs against a real service account. Tests are skipped automatically when credentials are not present, so unit tests always pass in CI without secrets.
Jira Cloud
- Create an API token for your Atlassian account.
- Copy the
.env.exampleinpackages/connector-jira/to.envand fill in your credentials:
JIRA_DOMAIN=mycompany # for mycompany.atlassian.net
JIRA_EMAIL=you@company.com
JIRA_API_TOKEN=ATATT3xFfGF0...
# Optional: scope to a specific project
JIRA_JQL=project = ENG- Run the E2E tests:
cd packages/connector-jira
bun vitest run -t "Jira E2E"The test suite verifies: issue search, comment fetching, project listing, empty-result handling, and a full poll-to-gateway roundtrip.
Salesforce CRM
- Create a Connected App in Salesforce Setup (App Manager > New Connected App):
- Enable OAuth Settings
- Add scopes:
Full access (full) - Note the Consumer Key and Consumer Secret
- Get your Security Token from Setup > My Personal Information > Reset My Security Token.
- Copy the
.env.exampleinpackages/connector-salesforce/to.envand fill in your credentials:
SF_INSTANCE_URL=https://your-dev-ed.develop.my.salesforce.com
SF_CLIENT_ID=your-connected-app-consumer-key
SF_CLIENT_SECRET=your-connected-app-consumer-secret
SF_USERNAME=your-email@example.com
# Password + Security Token concatenated (no separator)
SF_PASSWORD=YourPassword123ABCSecurityToken
SF_IS_SANDBOX=false- Run the E2E tests:
cd packages/connector-salesforce
bun vitest run -t "Salesforce E2E"The test suite verifies: OAuth authentication, account and contact queries, empty-result handling, and a full poll-to-gateway roundtrip.
How It Works
Both test suites follow the same pattern:
- Credentials are read from
process.env(loaded via vitest's built-in.envsupport) - When any required variable is missing,
describe.skipIfskips the entire suite .envfiles are gitignored — only.env.exampleis committed- Tests exercise the real HTTP client (no mocks) against live API endpoints
Memory-Managed Ingestion
API connectors (Jira, Salesforce) use a streaming accumulation pipeline that pushes deltas in bounded chunks instead of collecting every delta in a single in-memory array. This keeps memory usage constant regardless of how many records are returned by the external API — critical for Cloudflare Workers with a 128 MB limit.
How It Works
- Each record from the external API is converted to a delta and accumulated one at a time
- When the accumulator reaches
chunkSize(default 500), the chunk is pushed to the gateway - Before each push, the poller checks the gateway's buffer — if it signals backpressure or exceeds the
memoryBudgetBytesthreshold (at 70%), the poller triggers a flush first - If a push is rejected due to backpressure, the poller flushes and retries once
- At the end of each poll cycle, any remaining deltas are flushed
Configuration
Both Jira and Salesforce connectors accept optional memory tuning in their ingest config:
curl -X POST $GATEWAY_URL/admin/connectors/$GW_ID \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "jira-eng",
"type": "jira",
"jira": { "domain": "co", "email": "...", "apiToken": "..." },
"ingest": {
"intervalMs": 30000,
"chunkSize": 200,
"memoryBudgetBytes": 67108864
}
}'| Parameter | Default | Description |
|---|---|---|
chunkSize | 500 | Number of deltas per push chunk |
memoryBudgetBytes | No limit | Triggers a gateway flush when buffer reaches 70% of this value |
IngestTarget Interface
The streaming pipeline works with any gateway that implements the IngestTarget interface (exported from @lakesync/core):
interface IngestTarget extends PushTarget {
flush(): Promise<Result<void, FlushError>>;
shouldFlush(): boolean;
readonly bufferStats: { logSize: number; indexSize: number; byteSize: number };
}SyncGateway implements IngestTarget automatically. Plain PushTarget gateways degrade gracefully — chunks are pushed without flush checks.
Use isIngestTarget(target) to check at runtime whether a target supports flush.
Connector Discovery
The gateway exposes a GET /connectors/types endpoint that returns machine-readable metadata for all supported connector types. This enables UIs to dynamically render "Add Connector" forms without hardcoding per-type config shapes.
Endpoint
GET /connectors/typesNo authentication required — this returns static metadata, not instance-specific data. Available on both gateway-server and gateway-worker.
Response:
[
{
"type": "postgres",
"displayName": "PostgreSQL",
"description": "PostgreSQL relational database connector.",
"category": "database",
"configSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"connectionString": { "type": "string", "description": "..." }
},
"required": ["connectionString"]
},
"ingestSchema": { ... },
"outputTables": null
},
{
"type": "jira",
"displayName": "Jira",
"description": "Atlassian Jira Cloud issue tracker connector.",
"category": "api",
"configSchema": { ... },
"ingestSchema": { ... },
"outputTables": [
{ "table": "jira_issues", "columns": [...] },
{ "table": "jira_comments", "columns": [...] },
{ "table": "jira_projects", "columns": [...] }
]
}
]ConnectorDescriptor
Each entry in the response is a ConnectorDescriptor:
interface ConnectorDescriptor {
type: ConnectorType; // "postgres" | "jira" | ...
displayName: string; // "PostgreSQL"
description: string; // Short description
category: "database" | "api"; // Determines ingest model
configSchema: Record<string, unknown>; // JSON Schema draft-07
ingestSchema: Record<string, unknown>; // JSON Schema for ingest config
outputTables: ReadonlyArray<TableSchema> | null; // null for DB connectors
}configSchema— JSON Schema describing the connector-specific config object (e.g.connectionStringfor Postgres,domain/email/apiTokenfor Jira). Use this to render form fields dynamically.ingestSchema— JSON Schema for the ingest configuration. Database connectors require atablesarray; API connectors only acceptintervalMs.outputTables— For API connectors, describes the tables the connector produces.nullfor database connectors (tables are user-defined via ingest config).
React Hook
The useConnectorTypes hook fetches connector types from the gateway and returns them reactively:
import { useConnectorTypes } from "@lakesync/react";
function AddConnectorForm() {
const { types, isLoading, error, refetch } = useConnectorTypes();
if (isLoading) return <p>Loading...</p>;
if (error) return <p>Error: {error.message}</p>;
return (
<select>
{types.map((t) => (
<option key={t.type} value={t.type}>
{t.displayName}
</option>
))}
</select>
);
}Custom Connectors
Register custom connector types at module load time using registerConnectorDescriptor:
import { registerConnectorDescriptor } from "@lakesync/core";
registerConnectorDescriptor({
type: "cloudwatch" as ConnectorType,
displayName: "CloudWatch",
description: "AWS CloudWatch Logs connector.",
category: "api",
configSchema: {
$schema: "http://json-schema.org/draft-07/schema#",
type: "object",
properties: {
logGroupName: { type: "string" },
region: { type: "string" },
},
required: ["logGroupName", "region"],
},
ingestSchema: { ... },
outputTables: null,
});Custom descriptors appear in GET /connectors/types and useConnectorTypes() alongside built-in types.
curl Example
curl https://gateway.example.com/connectors/typesSecurity
- All connector routes require a JWT with
role: "admin". - Connection strings and credentials are never returned by the list endpoint.
- Use environment variables or a secret manager for connection strings in production — avoid hardcoding credentials in requests.
- Connector names are validated as non-empty strings. Names are used as source adapter keys internally and appear in pull request URLs.