Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

179 changes: 179 additions & 0 deletions poc_asp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/**
* POC: Atlas Stream Processing — create / start / stop / drop a stream processor.
*
* Run with:
* npx ts-node --skipProject poc_asp.ts
*
* Pipeline used:
* $source → sample_stream_solar
* $emit → __testLog
*/

import { MongoServerError } from './src/error';
import { StreamProcessingClient } from './src/stream_processing/stream_processing_client';

// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------

const WORKSPACE_URI =
'mongodb://atlas-stream-69ed590869155100cecc8b33-lulzki.virginia-usa.a.query.mongodb-dev.net/';
const USERNAME = 'streams';
const PASSWORD = 'letsdostreaming123';

const PROCESSOR_NAME = 'simpletestSP_node';

const PIPELINE = [
{
$source: {
connectionName: 'sample_stream_solar'
}
},
{
$emit: {
connectionName: '__testLog'
}
}
];

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

const sleep = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));

// ---------------------------------------------------------------------------
// POC steps
// ---------------------------------------------------------------------------

async function main(): Promise<void> {
const client = new StreamProcessingClient(WORKSPACE_URI, {
auth: { username: USERNAME, password: PASSWORD }
});

try {
const sps = client.streamProcessors();

// ------------------------------------------------------------------
// 1. Create
// ------------------------------------------------------------------
console.log(`\n[1] Creating processor '${PROCESSOR_NAME}' ...`);
try {
await sps.create(PROCESSOR_NAME, PIPELINE);
console.log(' Created OK');
} catch (e) {
if (e instanceof MongoServerError) {
throw new Error(` Create failed (code ${e.code}): ${e.message}`);
}
throw e;
}

// ------------------------------------------------------------------
// 2. Inspect before starting
// ------------------------------------------------------------------
console.log('\n[2] Getting info ...');
let info = await sps.getInfo(PROCESSOR_NAME);
console.log(` state : ${info.state}`);
console.log(` pipelineVersion : ${info.pipelineVersion}`);
console.log(` hasStarted : ${info.hasStarted}`);

// ------------------------------------------------------------------
// 3. Start
// ------------------------------------------------------------------
const proc = sps.get(PROCESSOR_NAME);
console.log('\n[3] Starting processor ...');
try {
await proc.start();
console.log(' Start command sent OK');
} catch (e) {
if (e instanceof MongoServerError) {
throw new Error(` Start failed (code ${e.code}): ${e.message}`);
}
throw e;
}

await sleep(2000);

info = await sps.getInfo(PROCESSOR_NAME);
console.log(` state after start: ${info.state}`);

// ------------------------------------------------------------------
// 4. Stats
// ------------------------------------------------------------------
console.log('\n[4] Fetching stats ...');
try {
const rawStats = await proc.stats();
console.dir(rawStats, { depth: null });
} catch (e) {
if (e instanceof MongoServerError) {
console.log(` Stats unavailable (code ${e.code}): ${e.message}`);
} else {
throw e;
}
}

// ------------------------------------------------------------------
// 5. Sample (up to 5 docs)
// Note: breaking manually after N docs because the dev server does not
// signal cursor exhaustion with cursorId=0 as the spec requires.
// ------------------------------------------------------------------
console.log('\n[5] Sampling up to 5 documents ...');
try {
let count = 0;
for await (const doc of proc.sample()) {
console.log(` doc: ${JSON.stringify(doc)}`);
count += 1;
if (count >= 5) break;
}
console.log(` Sampled ${count} document(s)`);
} catch (e) {
if (e instanceof MongoServerError) {
console.log(` Sample unavailable (code ${e.code}): ${e.message}`);
} else {
throw e;
}
}

// ------------------------------------------------------------------
// 6. Stop
// ------------------------------------------------------------------
console.log('\n[6] Stopping processor ...');
try {
await proc.stop();
console.log(' Stop command sent OK');
} catch (e) {
if (e instanceof MongoServerError) {
throw new Error(` Stop failed (code ${e.code}): ${e.message}`);
}
throw e;
}

await sleep(1000);

info = await sps.getInfo(PROCESSOR_NAME);
console.log(` state after stop : ${info.state}`);

// ------------------------------------------------------------------
// 7. Drop (permanent — comment out to keep the processor alive)
// ------------------------------------------------------------------
console.log('\n[7] Dropping processor ...');
try {
await proc.drop();
console.log(' Dropped OK');
} catch (e) {
if (e instanceof MongoServerError) {
throw new Error(` Drop failed (code ${e.code}): ${e.message}`);
}
throw e;
}

console.log('\nDone.');
} finally {
await client.close();
}
}

main().catch(err => {
console.error(err);
process.exitCode = 1;
});
16 changes: 16 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export {
MongoBulkWriteError
} from './bulk/common';
export { ClientEncryption } from './client-side-encryption/client_encryption';
// Atlas Stream Processing (experimental)
export { ChangeStreamCursor } from './cursor/change_stream_cursor';
export { ExplainableCursor } from './cursor/explainable_cursor';
export {
Expand Down Expand Up @@ -87,6 +88,12 @@ export {
MongoWriteConcernError,
WriteConcernErrorResult
} from './error';
export {
SampleCursor,
StreamProcessingClient,
StreamProcessor,
StreamProcessors
} from './stream_processing';
export {
AbstractCursor,
// Actual driver classes exported
Expand Down Expand Up @@ -612,6 +619,15 @@ export type {
WithTransactionCallback
} from './sessions';
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
export type {
CreateStreamProcessorOptions,
GetStreamProcessorSamplesOptions,
GetStreamProcessorSamplesResult,
GetStreamProcessorStatsOptions,
StartStreamProcessorOptions,
StreamProcessorInfo,
StreamProcessorTier
} from './stream_processing';
export type {
CSOTTimeoutContext,
CSOTTimeoutContextOptions,
Expand Down
46 changes: 46 additions & 0 deletions src/operations/stream_processing/create_stream_processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { type Connection } from '../..';
import type { Document } from '../../bson';
import { MongoDBResponse } from '../../cmap/wire_protocol/responses';
import type { ClientSession } from '../../sessions';
import type { CreateStreamProcessorOptions } from '../../stream_processing/types';
import { CommandOperation, type CommandOperationOptions } from '../command';
import { Aspect, defineAspects } from '../operation';

/** @internal */
export class CreateStreamProcessorOperation extends CommandOperation<Document> {
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;

constructor(
readonly processorName: string,
readonly pipeline: Document[],
readonly aspOptions?: CreateStreamProcessorOptions,
options?: CommandOperationOptions
) {
super(undefined, options);
}

override get commandName() {
return 'createStreamProcessor' as const;
}

override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
const cmd: Document = {
createStreamProcessor: this.processorName,
pipeline: this.pipeline
};

if (this.aspOptions) {
const optsDoc: Document = {};
if (this.aspOptions.dlq != null) optsDoc.dlq = this.aspOptions.dlq;
if (this.aspOptions.streamMetaFieldName != null)
optsDoc.streamMetaFieldName = this.aspOptions.streamMetaFieldName;
if (this.aspOptions.tier != null) optsDoc.tier = this.aspOptions.tier;
if (this.aspOptions.failover != null) optsDoc.failover = this.aspOptions.failover;
if (Object.keys(optsDoc).length > 0) cmd.options = optsDoc;
}

return cmd;
}
}

defineAspects(CreateStreamProcessorOperation, [Aspect.WRITE_OPERATION]);
28 changes: 28 additions & 0 deletions src/operations/stream_processing/drop_stream_processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { type Connection } from '../..';
import type { Document } from '../../bson';
import { MongoDBResponse } from '../../cmap/wire_protocol/responses';
import type { ClientSession } from '../../sessions';
import { CommandOperation, type CommandOperationOptions } from '../command';
import { Aspect, defineAspects } from '../operation';

/** @internal */
export class DropStreamProcessorOperation extends CommandOperation<Document> {
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;

constructor(
readonly processorName: string,
options?: CommandOperationOptions
) {
super(undefined, options);
}

override get commandName() {
return 'dropStreamProcessor' as const;
}

override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
return { dropStreamProcessor: this.processorName };
}
}

defineAspects(DropStreamProcessorOperation, [Aspect.WRITE_OPERATION]);
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { type Connection } from '../..';
import type { Document } from '../../bson';
import { MongoDBResponse } from '../../cmap/wire_protocol/responses';
import type { ClientSession } from '../../sessions';
import { CommandOperation, type CommandOperationOptions } from '../command';
import { Aspect, defineAspects } from '../operation';

/** @internal */
export class GetMoreSampleStreamProcessorOperation extends CommandOperation<Document> {
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;

constructor(
readonly processorName: string,
readonly cursorId: bigint | number,
readonly batchSize?: number,
options?: CommandOperationOptions
) {
super(undefined, options);
}

override get commandName() {
return 'getMoreSampleStreamProcessor' as const;
}

override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
const cmd: Document = {
getMoreSampleStreamProcessor: this.processorName,
cursorId: this.cursorId
};
if (this.batchSize != null) cmd.batchSize = this.batchSize;
return cmd;
}
}

defineAspects(GetMoreSampleStreamProcessorOperation, [Aspect.WRITE_OPERATION]);
28 changes: 28 additions & 0 deletions src/operations/stream_processing/get_stream_processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { type Connection } from '../..';
import type { Document } from '../../bson';
import { MongoDBResponse } from '../../cmap/wire_protocol/responses';
import type { ClientSession } from '../../sessions';
import { CommandOperation, type CommandOperationOptions } from '../command';
import { Aspect, defineAspects } from '../operation';

/** @internal */
export class GetStreamProcessorOperation extends CommandOperation<Document> {
override SERVER_COMMAND_RESPONSE_TYPE = MongoDBResponse;

constructor(
readonly processorName: string,
options?: CommandOperationOptions
) {
super(undefined, options);
}

override get commandName() {
return 'getStreamProcessor' as const;
}

override buildCommandDocument(_connection: Connection, _session?: ClientSession): Document {
return { getStreamProcessor: this.processorName };
}
}

defineAspects(GetStreamProcessorOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE]);
Loading
Loading