Core API
API reference for @lakesync/core — HLC, Delta, Result, conflict resolution, sync rules, connector types, and validation.
The @lakesync/core package provides the foundational types and functions used across the LakeSync ecosystem.
HLC
Hybrid Logical Clock for causal ordering of events.
HLC
class HLC {
constructor(wallClock?: () => number);
now(): HLCTimestamp;
recv(remote: HLCTimestamp): Result<HLCTimestamp, ClockDriftError>;
static encode(wall: number, counter: number): HLCTimestamp;
static decode(ts: HLCTimestamp): { wall: number; counter: number };
static compare(a: HLCTimestamp, b: HLCTimestamp): -1 | 0 | 1;
static readonly MAX_DRIFT_MS: 5_000;
static readonly MAX_COUNTER: 0xffff;
}now()— Generate a new monotonically increasing timestamp.recv(remote)— Merge a remote timestamp, advancing the local clock. ReturnsErr(ClockDriftError)if drift exceeds 5 seconds.encode(wall, counter)— Pack wall clock ms and counter into anHLCTimestamp.decode(ts)— Unpack anHLCTimestampinto its wall and counter components.compare(a, b)— Compare two timestamps. Returns-1,0, or1.
HLCTimestamp
A branded bigint encoding 48 bits of wall-clock milliseconds and 16 bits of a monotonic counter.
type HLCTimestamp = bigint & { readonly __brand: "HLCTimestamp" };Delta
RowDelta
interface RowDelta {
op: DeltaOp;
table: string;
rowId: string;
clientId: string;
columns: ColumnDelta[];
hlc: HLCTimestamp;
deltaId: string;
}
type DeltaOp = "INSERT" | "UPDATE" | "DELETE";
interface ColumnDelta {
column: string;
value: unknown;
}TableSchema
interface TableSchema {
table: string;
sourceTable?: string;
columns: Array<{ name: string; type: "string" | "number" | "boolean" | "json" | "null" }>;
primaryKey?: string[];
softDelete?: boolean;
externalIdColumn?: string;
}| Field | Default | Description |
|---|---|---|
table | required | Destination table name |
sourceTable | table | Delta table name to match against (for renaming destination tables) |
columns | required | Column definitions with type hints |
primaryKey | ["row_id"] | Composite primary key columns. Each entry must be "row_id" or exist in columns. |
softDelete | true | When true, tombstoned rows get deleted_at set instead of being hard-deleted |
externalIdColumn | — | Column for upsert conflict resolution (adds a UNIQUE constraint). Must exist in columns. |
SyncPush / SyncPull / SyncResponse
interface SyncPush {
clientId: string;
deltas: RowDelta[];
lastSeenHlc: HLCTimestamp;
}
interface SyncPull {
clientId: string;
sinceHlc: HLCTimestamp;
maxDeltas: number;
source?: string;
}
interface SyncResponse {
deltas: RowDelta[];
serverHlc: HLCTimestamp;
hasMore: boolean;
}extractDelta(before, after, opts)
function extractDelta(
before: Record<string, unknown> | null | undefined,
after: Record<string, unknown> | null | undefined,
opts: {
table: string;
rowId: string;
clientId: string;
hlc: HLCTimestamp;
schema?: TableSchema;
},
): Promise<RowDelta | null>;Extract a column-level delta between two row states. Returns null if nothing changed. Generates a deterministic deltaId via SHA-256 of the stable-stringified payload.
nullbefore + present after = INSERT- present before +
nullafter = DELETE - both present = UPDATE (only changed columns)
applyDelta(row, delta)
function applyDelta(
row: Record<string, unknown> | null,
delta: RowDelta,
): Record<string, unknown> | null;Apply a delta to an existing row, returning the merged result. DELETE returns null, INSERT creates a new row, UPDATE merges columns onto the existing row (immutable).
Result
Type-safe error handling without exceptions.
Types
type Result<T, E = LakeSyncError> = { ok: true; value: T } | { ok: false; error: E };Constructors
function Ok<T>(value: T): Result<T, never>;
function Err<E>(error: E): Result<never, E>;Utilities
function mapResult<T, U, E>(result: Result<T, E>, fn: (value: T) => U): Result<U, E>;
function flatMapResult<T, U, E>(result: Result<T, E>, fn: (value: T) => Result<U, E>): Result<U, E>;
function unwrapOrThrow<T, E>(result: Result<T, E>): T;
function fromPromise<T>(promise: Promise<T>): Promise<Result<T, Error>>;Error Classes
class LakeSyncError extends Error {
readonly code: string;
override readonly cause?: Error;
constructor(message: string, code: string, cause?: Error);
}
class ClockDriftError extends LakeSyncError {} // code: "CLOCK_DRIFT"
class ConflictError extends LakeSyncError {} // code: "CONFLICT"
class FlushError extends LakeSyncError {} // code: "FLUSH_FAILED"
class SchemaError extends LakeSyncError {} // code: "SCHEMA_MISMATCH"
class AdapterError extends LakeSyncError {} // code: "ADAPTER_ERROR"
class AdapterNotFoundError extends LakeSyncError {} // code: "ADAPTER_NOT_FOUND"
class BackpressureError extends LakeSyncError {} // code: "BACKPRESSURE"Conflict Resolution
resolveLWW(local, remote)
function resolveLWW(
local: RowDelta,
remote: RowDelta,
): Result<RowDelta, ConflictError>;Column-level Last-Write-Wins merge. For each column present in both deltas, the one with the higher HLC wins. When HLC timestamps are equal, the lexicographically higher clientId wins (deterministic tiebreak).
LWWResolver
class LWWResolver implements ConflictResolver {
resolve(local: RowDelta, remote: RowDelta): Result<RowDelta, ConflictError>;
}
interface ConflictResolver {
resolve(local: RowDelta, remote: RowDelta): Result<RowDelta, ConflictError>;
}Sync Rules
Types
interface SyncRulesConfig {
version: number;
buckets: BucketDefinition[];
}
interface BucketDefinition {
name: string;
tables: string[];
filters: SyncRuleFilter[];
}
interface SyncRuleFilter {
column: string;
op: SyncRuleOp;
value: string;
}
type SyncRuleOp = "eq" | "in" | "neq" | "gt" | "lt" | "gte" | "lte";
interface SyncRulesContext {
claims: ResolvedClaims;
rules: SyncRulesConfig;
}
type ResolvedClaims = Record<string, string | string[]>;filterDeltas(deltas, context)
function filterDeltas(
deltas: RowDelta[],
context: SyncRulesContext,
): RowDelta[];Pure function that returns the union of deltas matching any bucket's filters. JWT claim references (prefixed with jwt:) are resolved from the context's claims object.
validateSyncRules(config)
function validateSyncRules(
config: unknown,
): Result<void, SyncRuleError>;Validates structural correctness of a sync rules configuration: version, bucket names, filter operators, and uniqueness.
resolveClientBuckets(rules, claims)
function resolveClientBuckets(
rules: SyncRulesConfig,
claims: ResolvedClaims,
): string[];Returns the names of buckets a client matches based on their JWT claims.
Validation
isValidIdentifier(name)
function isValidIdentifier(name: string): boolean;Check whether a string is a valid SQL identifier (starts with letter or underscore, alphanumeric + underscore, max 64 chars).
assertValidIdentifier(name)
function assertValidIdentifier(name: string): Result<void, SchemaError>;Returns Ok if valid, Err(SchemaError) if invalid.
quoteIdentifier(name)
function quoteIdentifier(name: string): string;Double-quote a SQL identifier with proper escaping.
Actions
Types for imperative actions against external systems. See Actions for the full guide.
Action Types
interface Action {
actionId: string;
clientId: string;
hlc: HLCTimestamp;
connector: string;
actionType: string;
params: Record<string, unknown>;
idempotencyKey?: string;
}
interface ActionPush {
clientId: string;
actions: Action[];
}
interface ActionResponse {
results: Array<ActionResult | ActionErrorResult>;
serverHlc: HLCTimestamp;
}
interface ActionResult {
actionId: string;
data: Record<string, unknown>;
serverHlc: HLCTimestamp;
}
interface ActionErrorResult {
actionId: string;
code: string;
message: string;
retryable: boolean;
}Action Discovery
interface ActionDescriptor {
actionType: string;
description: string;
paramsSchema?: Record<string, unknown>;
}
interface ActionDiscovery {
connectors: Record<string, ActionDescriptor[]>;
}ActionHandler
interface ActionHandler {
readonly supportedActions: ActionDescriptor[];
executeAction(
action: Action,
context?: AuthContext,
): Promise<Result<ActionResult, ActionExecutionError | ActionNotSupportedError>>;
}
function isActionHandler(obj: unknown): obj is ActionHandler;Action Errors
class ActionValidationError extends LakeSyncError {} // code: "ACTION_VALIDATION_ERROR"
class ActionExecutionError extends LakeSyncError { // code: "ACTION_EXECUTION_ERROR"
readonly retryable: boolean;
}
class ActionNotSupportedError extends LakeSyncError {} // code: "ACTION_NOT_SUPPORTED"Utilities
function validateAction(action: Action): Result<void, ActionValidationError>;
function generateActionId(action: Omit<Action, "actionId">): Promise<string>;
function isActionError(result: ActionResult | ActionErrorResult): result is ActionErrorResult;Connector Types
Types for dynamic connector registration. See Dynamic Connectors for the full API.
type ConnectorType = "postgres" | "mysql" | "bigquery" | "jira" | "salesforce";
interface ConnectorConfigBase {
type: string;
name: string;
ingest?: ConnectorIngestConfig;
}
type ConnectorConfig =
| PostgresConnectorConfigFull
| MySQLConnectorConfigFull
| BigQueryConnectorConfigFull
| JiraConnectorConfigFull
| SalesforceConnectorConfigFull
| (ConnectorConfigBase & Record<string, unknown>);
function validateConnectorConfig(
input: unknown,
): Result<ConnectorConfig, ConnectorValidationError>;The open catch-all variant allows custom connector types to pass through without modifying core. Existing switch statements still work for known types.
Adapter Interfaces
Core adapter types and type guards. These interfaces are defined in @lakesync/core and re-exported by @lakesync/adapter for backward compatibility.
LakeAdapter
interface LakeAdapter {
putObject(path: string, data: Uint8Array, contentType?: string): Promise<Result<void, AdapterError>>;
getObject(path: string): Promise<Result<Uint8Array, AdapterError>>;
headObject(path: string): Promise<Result<{ size: number; lastModified: Date }, AdapterError>>;
listObjects(prefix: string): Promise<Result<ObjectInfo[], AdapterError>>;
deleteObject(path: string): Promise<Result<void, AdapterError>>;
deleteObjects(paths: string[]): Promise<Result<void, AdapterError>>;
}
interface ObjectInfo {
key: string;
size: number;
lastModified: Date;
}DatabaseAdapter
interface DatabaseAdapter {
insertDeltas(deltas: RowDelta[]): Promise<Result<void, AdapterError>>;
queryDeltasSince(hlc: HLCTimestamp, tables?: string[]): Promise<Result<RowDelta[], AdapterError>>;
getLatestState(table: string, rowId: string): Promise<Result<Record<string, unknown> | null, AdapterError>>;
ensureSchema(schema: TableSchema): Promise<Result<void, AdapterError>>;
close(): Promise<void>;
}
function isDatabaseAdapter(adapter: unknown): adapter is DatabaseAdapter;Materialisable
interface Materialisable {
materialise(deltas: RowDelta[], schemas: ReadonlyArray<TableSchema>): Promise<Result<void, AdapterError>>;
}
function isMaterialisable(adapter: unknown): adapter is Materialisable;Opt-in capability for adapters that can materialise deltas into queryable destination tables. All three database adapters (PostgresAdapter, MySQLAdapter, BigQueryAdapter) implement this interface via the generic SqlDialect + executeMaterialise() pattern. See Materialise Protocol for details.
Source Polling
Base class and types for building source pollers. See Source Polling Ingest for usage.
abstract class BaseSourcePoller {
constructor(config: {
name: string;
intervalMs: number;
gateway: PushTarget;
memory?: PollerMemoryConfig;
});
start(): void;
stop(): void;
get isRunning(): boolean;
abstract poll(): Promise<void>;
protected pushDeltas(deltas: RowDelta[]): void;
protected accumulateDelta(delta: RowDelta): Promise<void>;
protected flushAccumulator(): Promise<void>;
}
interface PushTarget {
handlePush(push: SyncPush): unknown;
}
interface IngestTarget extends PushTarget {
flush(): Promise<Result<void, FlushError>>;
shouldFlush(): boolean;
readonly bufferStats: { logSize: number; indexSize: number; byteSize: number };
}
function isIngestTarget(target: PushTarget): target is IngestTarget;
interface PollerMemoryConfig {
chunkSize?: number;
memoryBudgetBytes?: number;
flushThreshold?: number;
}Poller Factories
Registry-based creation of source pollers by connector type.
interface PollerRegistry {
get(type: string): PollerFactory | undefined;
with(type: string, factory: PollerFactory): PollerRegistry;
}
function createPollerRegistry(factories?: Map<string, PollerFactory>): PollerRegistry;
function createPoller(config: ConnectorConfig, gateway: PushTarget, registry: PollerRegistry): BaseSourcePoller;
type PollerFactory = (config: ConnectorConfig, gateway: PushTarget) => BaseSourcePoller;Build a PollerRegistry with the factories you need. Connector packages export their factories as named exports (e.g. jiraPollerFactory from @lakesync/connector-jira).
CallbackPushTarget
A simple PushTarget that forwards pushes to a callback. Useful for testing.
class CallbackPushTarget implements PushTarget {
constructor(fn: (push: SyncPush) => unknown);
}Auth
function verifyToken(
token: string,
secret: string,
): Promise<Result<AuthClaims, AuthError>>;
interface AuthClaims {
clientId: string;
gatewayId: string;
role: string;
customClaims: Record<string, string | string[]>;
}
class AuthError extends Error {
constructor(message: string);
}Verifies a JWT signed with HMAC-SHA256 using the Web Crypto API. Checks expiry, required claims (sub, gw, exp), and extracts custom claims for sync rules evaluation. The role claim defaults to "client" if absent.