LakeSync

Dynamic Connectors

Add, remove, and list data sources at runtime via the admin API.

Overview

Connectors let you register external data sources at runtime — no server restart required. Every connector is managed programmatically via the admin REST API: register, list, and unregister connectors with simple HTTP calls. This makes it straightforward to automate connector management from scripts, CI/CD pipelines, or agent workflows.

Each connector maps to a named source adapter in the gateway. Database connectors (Postgres, MySQL, BigQuery) let clients pull data using the source query parameter. API connectors (Jira, Salesforce) run their own pollers that push data into the gateway automatically.

Supported connector types:

TypeKindConfig Required
postgresDatabaseconnectionString
mysqlDatabaseconnectionString
bigqueryDatabaseprojectId, dataset
jiraREST APIdomain, email, apiToken
salesforceREST APIinstanceUrl, clientId, clientSecret, username, password

Database connectors can optionally include an ingest configuration that starts a poller to automatically detect changes in the source database and push them through the gateway. See Source Polling Ingest for details on change detection strategies.

API connectors like Jira and Salesforce define their own tables internally and poll automatically — no SQL queries or table configuration needed. Just provide the connection credentials and an optional poll interval.

All connector routes require a JWT with role: "admin".

ConnectorConfig

interface ConnectorConfigBase {
  /** Connector type — determines which adapter to create. */
  type: string;
  /** Unique connector name (used as the source adapter key). */
  name: string;
  /** Optional ingest polling configuration. */
  ingest?: ConnectorIngestConfig;
}

type ConnectorConfig =
  | PostgresConnectorConfigFull   // type: "postgres", postgres: { connectionString }
  | MySQLConnectorConfigFull      // type: "mysql", mysql: { connectionString }
  | BigQueryConnectorConfigFull   // type: "bigquery", bigquery: { projectId, dataset, ... }
  | JiraConnectorConfigFull       // type: "jira", jira: { domain, email, apiToken, ... }
  | SalesforceConnectorConfigFull // type: "salesforce", salesforce: { instanceUrl, ... }
  | (ConnectorConfigBase & Record<string, unknown>);  // extensible catch-all

The open base type allows custom connector types (e.g. cloudwatch, stripe) without modifying @lakesync/core. Typed variants provide full type safety for built-in connector types.

Built-in connector config fields:

  • postgres { connectionString: string } — required when type is "postgres"
  • mysql { connectionString: string } — required when type is "mysql"
  • bigquery { projectId: string; dataset: string; keyFilename?: string; location?: string } — required when type is "bigquery"
  • jira { domain: string; email: string; apiToken: string; jql?: string; includeComments?: boolean; includeProjects?: boolean } — required when type is "jira"
  • salesforce { instanceUrl: string; clientId: string; clientSecret: string; username: string; password: string; apiVersion?: string; isSandbox?: boolean; soqlFilter?: string; includeAccounts?: boolean; includeContacts?: boolean; includeOpportunities?: boolean; includeLeads?: boolean } — required when type is "salesforce"
  • ingest { tables?: Array<{ table, query, rowIdColumn?, strategy }>; intervalMs?: number } — optional ingest polling configuration

Validation is performed server-side via validateConnectorConfig(). Missing or invalid fields return a 400 response with a descriptive error message.

For API connectors (Jira, Salesforce), the ingest.tables field is not required — they define their tables internally. Only intervalMs, chunkSize, and memoryBudgetBytes are used from the ingest config.

API Reference

Register a Connector

POST /admin/connectors/:gatewayId

Registers a new data source. For database connectors, creates a database adapter and adds it as a named source on the gateway. For API connectors like Jira, starts a dedicated poller that pushes data into the gateway automatically.

Headers: Authorization: Bearer <admin JWT> Body: ConnectorConfig JSON Response:

{ "registered": true, "name": "analytics-db" }

Example (database connector):

curl -X POST https://gateway.example.com/admin/connectors/my-gw \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "analytics-db",
    "type": "postgres",
    "postgres": { "connectionString": "postgres://user:pass@host/analytics" }
  }'

Example (Jira connector):

curl -X POST https://gateway.example.com/admin/connectors/my-gw \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "jira-eng",
    "type": "jira",
    "jira": {
      "domain": "mycompany",
      "email": "bot@mycompany.com",
      "apiToken": "ATATT3...",
      "jql": "project = ENG"
    },
    "ingest": { "intervalMs": 30000 }
  }'

Example (Salesforce connector):

curl -X POST https://gateway.example.com/admin/connectors/my-gw \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "sf-crm",
    "type": "salesforce",
    "salesforce": {
      "instanceUrl": "https://mycompany.salesforce.com",
      "clientId": "3MVG9...",
      "clientSecret": "ABC123...",
      "username": "bot@mycompany.com",
      "password": "mypasswordSECURITYTOKEN"
    },
    "ingest": { "intervalMs": 30000 }
  }'

Registering a connector with a name that already exists returns a 409 Conflict error.

List Connectors

GET /admin/connectors/:gatewayId

Returns all registered connectors. Connection strings and secrets are never included in the response.

Headers: Authorization: Bearer <admin JWT> Response:

[
  {
    "name": "analytics-db",
    "type": "postgres",
    "hasIngest": false,
    "isPolling": false
  },
  {
    "name": "bq-events",
    "type": "bigquery",
    "hasIngest": true,
    "isPolling": true
  }
]

Example:

curl https://gateway.example.com/admin/connectors/my-gw \
  -H "Authorization: Bearer $ADMIN_TOKEN"

Unregister a Connector

DELETE /admin/connectors/:gatewayId/:name

Removes a connector. If an ingest poller is running, it is stopped. The database connection is closed and the source adapter is removed from the gateway. Clients can no longer pull from this source.

Headers: Authorization: Bearer <admin JWT> Response:

{ "unregistered": true, "name": "analytics-db" }

Example:

curl -X DELETE https://gateway.example.com/admin/connectors/my-gw/analytics-db \
  -H "Authorization: Bearer $ADMIN_TOKEN"

Unregistering a connector that does not exist returns a 404 Not Found error.

Examples

Adding a PostgreSQL Source

Register a Postgres database, then pull data from it:

# Register the connector
curl -X POST https://gateway.example.com/admin/connectors/my-gw \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "users-db",
    "type": "postgres",
    "postgres": { "connectionString": "postgres://user:pass@host/mydb" }
  }'

# Pull from the registered source
curl "https://gateway.example.com/sync/my-gw/pull?since=0&clientId=app-1&source=users-db" \
  -H "Authorization: Bearer $CLIENT_TOKEN"

BigQuery with Ingest Polling

Register a BigQuery source with automatic change detection that polls every 30 seconds using a cursor strategy:

curl -X POST https://gateway.example.com/admin/connectors/my-gw \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "bq-events",
    "type": "bigquery",
    "bigquery": { "projectId": "my-project", "dataset": "events_ds" },
    "ingest": {
      "intervalMs": 30000,
      "tables": [{
        "table": "events",
        "query": "SELECT * FROM events",
        "strategy": { "type": "cursor", "cursorColumn": "updated_at" }
      }]
    }
  }'

Once registered, the poller starts immediately. Detected changes are pushed through the gateway and broadcast to all connected clients via the standard sync flow.

MySQL with Diff Strategy

Register a MySQL source with diff-based change detection for a small reference table:

curl -X POST https://gateway.example.com/admin/connectors/my-gw \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "config-db",
    "type": "mysql",
    "mysql": { "connectionString": "mysql://user:pass@host/config" },
    "ingest": {
      "intervalMs": 60000,
      "tables": [{
        "table": "feature_flags",
        "query": "SELECT id, flag_name, enabled FROM feature_flags",
        "strategy": { "type": "diff" }
      }]
    }
  }'

The diff strategy compares the full result set on each poll, detecting inserts, updates, and hard deletes. Use it for small tables where delete detection matters.

Jira Cloud

Register a Jira connector to sync issues, comments, and projects into LakeSync. The Jira connector polls the Jira Cloud REST API — no SQL or table configuration needed.

curl -X POST https://gateway.example.com/admin/connectors/my-gw \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "jira-eng",
    "type": "jira",
    "jira": {
      "domain": "mycompany",
      "email": "bot@mycompany.com",
      "apiToken": "ATATT3xFfGF0...",
      "jql": "project = ENG",
      "includeComments": true,
      "includeProjects": true
    },
    "ingest": { "intervalMs": 30000 }
  }'

Once registered, the Jira poller starts immediately and syncs three tables:

TableRow IDDescription
jira_issuesIssue key (e.g. ENG-42)Summary, status, priority, assignee, reporter, labels, timestamps
jira_comments{issueKey}:{commentId}Comment body, author, timestamps
jira_projectsProject key (e.g. ENG)Name, type, lead

How it works:

  • Issues use a cursor strategy — on each poll, only issues updated since the last poll are fetched (via JQL updated >= "date")
  • Comments are fetched per-issue and compared against an in-memory snapshot (diff strategy) to detect inserts and updates
  • Projects use a full diff strategy — the complete project list is compared against a snapshot to detect inserts, updates, and deletes

All detected changes are pushed through the gateway and broadcast to connected clients, just like any other connector.

JQL filtering — The jql field scopes which issues are polled. For example, "project = ENG AND status != Done" only syncs active Engineering issues. When omitted, all issues in the Jira instance are polled.

Pulling Jira data — Once the Jira connector is polling, clients receive Jira data through the standard sync flow. Apply sync rules to filter which Jira data reaches each client:

{
  "buckets": [{
    "name": "eng-issues",
    "filters": [
      { "column": "project_key", "op": "eq", "value": "ENG" }
    ],
    "tables": ["jira_issues", "jira_comments"]
  }]
}

Salesforce CRM

Register a Salesforce connector to sync accounts, contacts, opportunities, and leads into LakeSync. The Salesforce connector polls the Salesforce REST API via SOQL — no SQL or table configuration needed.

curl -X POST https://gateway.example.com/admin/connectors/my-gw \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "sf-crm",
    "type": "salesforce",
    "salesforce": {
      "instanceUrl": "https://mycompany.salesforce.com",
      "clientId": "3MVG9...",
      "clientSecret": "ABC123...",
      "username": "bot@mycompany.com",
      "password": "mypasswordSECURITYTOKEN",
      "includeAccounts": true,
      "includeContacts": true,
      "includeOpportunities": true,
      "includeLeads": true
    },
    "ingest": { "intervalMs": 30000 }
  }'

Once registered, the Salesforce poller starts immediately and syncs four tables:

TableRow IDDescription
sf_accountsSalesforce Id (e.g. 001ABC123)Name, type, industry, website, phone, billing address, annual revenue, employees, owner
sf_contactsSalesforce Id (e.g. 003ABC123)First/last name, email, phone, title, account, mailing address, owner
sf_opportunitiesSalesforce Id (e.g. 006ABC123)Name, stage, amount, close date, probability, account, type, lead source, is closed/won, owner
sf_leadsSalesforce Id (e.g. 00QABC123)First/last name, company, email, phone, title, status, lead source, conversion fields, owner

How it works:

  • All four entity types use a cursor strategy via LastModifiedDate — on each poll, only records modified since the last poll are fetched via SOQL
  • First poll fetches all records (no cursor yet), subsequent polls are incremental
  • Authentication uses OAuth 2.0 Username-Password flow — the connector auto-authenticates and re-authenticates on token expiry (401)
  • Rate limiting (503) is handled with automatic retries using the Retry-After header

Authentication setup — The password field must contain the Salesforce password concatenated with the security token (e.g. mypasswordABCDEF123456). Create a Connected App in Salesforce Setup to obtain the clientId and clientSecret. For sandbox environments, set isSandbox: true to authenticate against test.salesforce.com.

SOQL filtering — The soqlFilter field appends a WHERE clause fragment to all SOQL queries. For example, "Type = 'Customer'" only syncs customer accounts. This applies across all entity types.

Selective polling — Disable entity types you do not need with includeAccounts, includeContacts, includeOpportunities, and includeLeads (all default to true).

Pulling Salesforce data — Once the connector is polling, clients receive CRM data through the standard sync flow. Apply sync rules to filter which data reaches each client:

{
  "buckets": [{
    "name": "sales-pipeline",
    "filters": [
      { "column": "stage_name", "op": "neq", "value": "Closed Won" }
    ],
    "tables": ["sf_opportunities"]
  }, {
    "name": "my-accounts",
    "filters": [
      { "column": "owner_name", "op": "eq", "value": "jwt:name" }
    ],
    "tables": ["sf_accounts", "sf_contacts"]
  }]
}

Programmatic Connector Management

All connector operations are available via the admin REST API, making it straightforward to automate from scripts, CI/CD, or agent workflows. No server restart or configuration file changes are needed.

Register any connector type with a single HTTP call:

# Database connector
curl -X POST $GATEWAY_URL/admin/connectors/$GW_ID \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{ "name": "my-pg", "type": "postgres", "postgres": { "connectionString": "..." } }'

# API connector (Jira)
curl -X POST $GATEWAY_URL/admin/connectors/$GW_ID \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{ "name": "my-jira", "type": "jira", "jira": { "domain": "co", "email": "...", "apiToken": "..." } }'

# API connector (Salesforce)
curl -X POST $GATEWAY_URL/admin/connectors/$GW_ID \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{ "name": "my-sf", "type": "salesforce", "salesforce": { "instanceUrl": "https://co.salesforce.com", "clientId": "...", "clientSecret": "...", "username": "...", "password": "..." } }'

List all registered connectors:

curl $GATEWAY_URL/admin/connectors/$GW_ID \
  -H "Authorization: Bearer $ADMIN_TOKEN"

Remove a connector (stops poller, closes connections):

curl -X DELETE $GATEWAY_URL/admin/connectors/$GW_ID/my-pg \
  -H "Authorization: Bearer $ADMIN_TOKEN"

Configure sync rules (filter which data reaches each client):

curl -X POST $GATEWAY_URL/admin/sync-rules/$GW_ID \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "buckets": [{
      "name": "team-data",
      "filters": [{ "column": "project_key", "op": "eq", "value": "jwt:team" }],
      "tables": ["jira_issues", "jira_comments"]
    }, {
      "name": "my-pipeline",
      "filters": [{ "column": "owner_name", "op": "eq", "value": "jwt:name" }],
      "tables": ["sf_opportunities", "sf_accounts"]
    }]
  }'

Register table schemas:

curl -X POST $GATEWAY_URL/admin/schema/$GW_ID \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "table": "jira_issues",
    "columns": [
      { "name": "key", "type": "string" },
      { "name": "summary", "type": "string" },
      { "name": "status", "type": "string" }
    ]
  }'

The full admin API surface — connectors, sync rules, schemas, flush — is designed for programmatic use. The gateway can be fully configured at runtime without touching any configuration files.

Gateway Server vs Worker

CapabilityGateway Server (self-hosted)Gateway Worker (Cloudflare)
Database connectionsCreates live connectionsStores config only
API connectors (Jira, Salesforce)Starts API pollers automaticallyNot supported
Ingest pollingStarts pollers automaticallyNot supported (no long-running processes)
Connector storageIn-memory (lost on restart)Durable Object storage (persistent)

Gateway Server (@lakesync/gateway-server) creates live database connections and starts ingest pollers when a connector is registered. For API connectors like Jira and Salesforce, it starts dedicated pollers that communicate with the external API. The server must have network access to the source database or API.

Gateway Worker (gateway-worker on Cloudflare Workers) stores connector configuration in the Durable Object but cannot create live database connections, run pollers, or communicate with external APIs — the Workers runtime does not support long-running processes or arbitrary TCP connections. Use the gateway server for connectors that require ingest polling or API access.

E2E Testing

Each API connector includes an E2E test suite that runs against a real service account. Tests are skipped automatically when credentials are not present, so unit tests always pass in CI without secrets.

Jira Cloud

  1. Create an API token for your Atlassian account.
  2. Copy the .env.example in packages/connector-jira/ to .env and fill in your credentials:
JIRA_DOMAIN=mycompany          # for mycompany.atlassian.net
JIRA_EMAIL=you@company.com
JIRA_API_TOKEN=ATATT3xFfGF0...
# Optional: scope to a specific project
JIRA_JQL=project = ENG
  1. Run the E2E tests:
cd packages/connector-jira
bun vitest run -t "Jira E2E"

The test suite verifies: issue search, comment fetching, project listing, empty-result handling, and a full poll-to-gateway roundtrip.

Salesforce CRM

  1. Create a Connected App in Salesforce Setup (App Manager > New Connected App):
    • Enable OAuth Settings
    • Add scopes: Full access (full)
    • Note the Consumer Key and Consumer Secret
  2. Get your Security Token from Setup > My Personal Information > Reset My Security Token.
  3. Copy the .env.example in packages/connector-salesforce/ to .env and fill in your credentials:
SF_INSTANCE_URL=https://your-dev-ed.develop.my.salesforce.com
SF_CLIENT_ID=your-connected-app-consumer-key
SF_CLIENT_SECRET=your-connected-app-consumer-secret
SF_USERNAME=your-email@example.com
# Password + Security Token concatenated (no separator)
SF_PASSWORD=YourPassword123ABCSecurityToken
SF_IS_SANDBOX=false
  1. Run the E2E tests:
cd packages/connector-salesforce
bun vitest run -t "Salesforce E2E"

The test suite verifies: OAuth authentication, account and contact queries, empty-result handling, and a full poll-to-gateway roundtrip.

How It Works

Both test suites follow the same pattern:

  • Credentials are read from process.env (loaded via vitest's built-in .env support)
  • When any required variable is missing, describe.skipIf skips the entire suite
  • .env files are gitignored — only .env.example is committed
  • Tests exercise the real HTTP client (no mocks) against live API endpoints

Memory-Managed Ingestion

API connectors (Jira, Salesforce) use a streaming accumulation pipeline that pushes deltas in bounded chunks instead of collecting every delta in a single in-memory array. This keeps memory usage constant regardless of how many records are returned by the external API — critical for Cloudflare Workers with a 128 MB limit.

How It Works

  1. Each record from the external API is converted to a delta and accumulated one at a time
  2. When the accumulator reaches chunkSize (default 500), the chunk is pushed to the gateway
  3. Before each push, the poller checks the gateway's buffer — if it signals backpressure or exceeds the memoryBudgetBytes threshold (at 70%), the poller triggers a flush first
  4. If a push is rejected due to backpressure, the poller flushes and retries once
  5. At the end of each poll cycle, any remaining deltas are flushed

Configuration

Both Jira and Salesforce connectors accept optional memory tuning in their ingest config:

curl -X POST $GATEWAY_URL/admin/connectors/$GW_ID \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "jira-eng",
    "type": "jira",
    "jira": { "domain": "co", "email": "...", "apiToken": "..." },
    "ingest": {
      "intervalMs": 30000,
      "chunkSize": 200,
      "memoryBudgetBytes": 67108864
    }
  }'
ParameterDefaultDescription
chunkSize500Number of deltas per push chunk
memoryBudgetBytesNo limitTriggers a gateway flush when buffer reaches 70% of this value

IngestTarget Interface

The streaming pipeline works with any gateway that implements the IngestTarget interface (exported from @lakesync/core):

interface IngestTarget extends PushTarget {
  flush(): Promise<Result<void, FlushError>>;
  shouldFlush(): boolean;
  readonly bufferStats: { logSize: number; indexSize: number; byteSize: number };
}

SyncGateway implements IngestTarget automatically. Plain PushTarget gateways degrade gracefully — chunks are pushed without flush checks.

Use isIngestTarget(target) to check at runtime whether a target supports flush.

Connector Discovery

The gateway exposes a GET /connectors/types endpoint that returns machine-readable metadata for all supported connector types. This enables UIs to dynamically render "Add Connector" forms without hardcoding per-type config shapes.

Endpoint

GET /connectors/types

No authentication required — this returns static metadata, not instance-specific data. Available on both gateway-server and gateway-worker.

Response:

[
  {
    "type": "postgres",
    "displayName": "PostgreSQL",
    "description": "PostgreSQL relational database connector.",
    "category": "database",
    "configSchema": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "connectionString": { "type": "string", "description": "..." }
      },
      "required": ["connectionString"]
    },
    "ingestSchema": { ... },
    "outputTables": null
  },
  {
    "type": "jira",
    "displayName": "Jira",
    "description": "Atlassian Jira Cloud issue tracker connector.",
    "category": "api",
    "configSchema": { ... },
    "ingestSchema": { ... },
    "outputTables": [
      { "table": "jira_issues", "columns": [...] },
      { "table": "jira_comments", "columns": [...] },
      { "table": "jira_projects", "columns": [...] }
    ]
  }
]

ConnectorDescriptor

Each entry in the response is a ConnectorDescriptor:

interface ConnectorDescriptor {
  type: ConnectorType;                          // "postgres" | "jira" | ...
  displayName: string;                          // "PostgreSQL"
  description: string;                          // Short description
  category: "database" | "api";                 // Determines ingest model
  configSchema: Record<string, unknown>;        // JSON Schema draft-07
  ingestSchema: Record<string, unknown>;        // JSON Schema for ingest config
  outputTables: ReadonlyArray<TableSchema> | null; // null for DB connectors
}
  • configSchema — JSON Schema describing the connector-specific config object (e.g. connectionString for Postgres, domain/email/apiToken for Jira). Use this to render form fields dynamically.
  • ingestSchema — JSON Schema for the ingest configuration. Database connectors require a tables array; API connectors only accept intervalMs.
  • outputTables — For API connectors, describes the tables the connector produces. null for database connectors (tables are user-defined via ingest config).

React Hook

The useConnectorTypes hook fetches connector types from the gateway and returns them reactively:

import { useConnectorTypes } from "@lakesync/react";

function AddConnectorForm() {
  const { types, isLoading, error, refetch } = useConnectorTypes();

  if (isLoading) return <p>Loading...</p>;
  if (error) return <p>Error: {error.message}</p>;

  return (
    <select>
      {types.map((t) => (
        <option key={t.type} value={t.type}>
          {t.displayName}
        </option>
      ))}
    </select>
  );
}

Custom Connectors

Register custom connector types at module load time using registerConnectorDescriptor:

import { registerConnectorDescriptor } from "@lakesync/core";

registerConnectorDescriptor({
  type: "cloudwatch" as ConnectorType,
  displayName: "CloudWatch",
  description: "AWS CloudWatch Logs connector.",
  category: "api",
  configSchema: {
    $schema: "http://json-schema.org/draft-07/schema#",
    type: "object",
    properties: {
      logGroupName: { type: "string" },
      region: { type: "string" },
    },
    required: ["logGroupName", "region"],
  },
  ingestSchema: { ... },
  outputTables: null,
});

Custom descriptors appear in GET /connectors/types and useConnectorTypes() alongside built-in types.

curl Example

curl https://gateway.example.com/connectors/types

Security

  • All connector routes require a JWT with role: "admin".
  • Connection strings and credentials are never returned by the list endpoint.
  • Use environment variables or a secret manager for connection strings in production — avoid hardcoding credentials in requests.
  • Connector names are validated as non-empty strings. Names are used as source adapter keys internally and appear in pull request URLs.