Skip to content

fix: use web standard event apis for twilio websocket #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/shiny-berries-kiss.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@openai/agents': patch
'@openai/agents-extensions': patch
'@openai/agents-realtime': patch
---

fix: use web standard event apis for twilio websocket
2 changes: 1 addition & 1 deletion examples/docs/extensions/twilio-basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const agent = new RealtimeAgent({
// the OpenAI Realtime API.
const twilioTransport = new TwilioRealtimeTransportLayer({
// @ts-expect-error - this is not defined
twilioWebSocket: websoketConnection,
twilioWebSocket: websocketConnection,
});

const session = new RealtimeSession(agent, {
Expand Down
140 changes: 76 additions & 64 deletions packages/agents-extensions/src/TwilioRealtimeTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ import {
RealtimeSessionConfig,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My IDE autoformatted this file, but the changes are minimal, pointed out below

} from '@openai/agents/realtime';
import { getLogger } from '@openai/agents';
import type { WebSocket, MessageEvent } from 'ws';
import type {
WebSocket as NodeWebSocket,
MessageEvent as NodeMessageEvent,
ErrorEvent as NodeErrorEvent,
} from 'ws';

import type { ErrorEvent } from 'undici-types';
Comment on lines +10 to +16
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import some types


/**
* The options for the Twilio Realtime Transport Layer.
Expand All @@ -18,7 +24,7 @@ export type TwilioRealtimeTransportLayerOptions =
* The websocket that is receiving messages from Twilio's Media Streams API. Typically the
* connection gets passed into your request handler when running your WebSocket server.
*/
twilioWebSocket: WebSocket;
twilioWebSocket: WebSocket | NodeWebSocket;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accept the websocket type

};

/**
Expand Down Expand Up @@ -48,7 +54,7 @@ export type TwilioRealtimeTransportLayerOptions =
* ```
*/
export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
#twilioWebSocket: WebSocket;
#twilioWebSocket: WebSocket | NodeWebSocket;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this type here to accept ws or a standard WebSocket

#streamSid: string | null = null;
#audioChunkCount: number = 0;
#lastPlayedChunkCount: number = 0;
Expand Down Expand Up @@ -82,74 +88,80 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
options.initialSessionConfig,
);
// listen to Twilio messages as quickly as possible
this.#twilioWebSocket.on('message', (message: MessageEvent) => {
try {
const data = JSON.parse(message.toString());
if (this.#logger.dontLogModelData) {
this.#logger.debug('Twilio message:', data.event);
} else {
this.#logger.debug('Twilio message:', data);
}
this.emit('*', {
type: 'twilio_message',
message: data,
});
switch (data.event) {
case 'media':
if (this.status === 'connected') {
this.sendAudio(utils.base64ToArrayBuffer(data.media.payload));
}
break;
case 'mark':
if (
!data.mark.name.startsWith('done:') &&
data.mark.name.includes(':')
) {
// keeping track of what the last chunk was that the user heard fully
const count = Number(data.mark.name.split(':')[1]);
if (Number.isFinite(count)) {
this.#lastPlayedChunkCount = count;
} else {
this.#logger.warn(
'Invalid mark name received:',
data.mark.name,
);
this.#twilioWebSocket.addEventListener(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this to use .addEventListener

'message',
(message: MessageEvent | NodeMessageEvent) => {
try {
const data = JSON.parse(message.data.toString());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use message.data here

if (this.#logger.dontLogModelData) {
this.#logger.debug('Twilio message:', data.event);
} else {
this.#logger.debug('Twilio message:', data);
}
this.emit('*', {
type: 'twilio_message',
message: data,
});
switch (data.event) {
case 'media':
if (this.status === 'connected') {
this.sendAudio(utils.base64ToArrayBuffer(data.media.payload));
}
} else if (data.mark.name.startsWith('done:')) {
this.#lastPlayedChunkCount = 0;
}
break;
case 'start':
this.#streamSid = data.start.streamSid;
break;
default:
break;
break;
case 'mark':
if (
!data.mark.name.startsWith('done:') &&
data.mark.name.includes(':')
) {
// keeping track of what the last chunk was that the user heard fully
const count = Number(data.mark.name.split(':')[1]);
if (Number.isFinite(count)) {
this.#lastPlayedChunkCount = count;
} else {
this.#logger.warn(
'Invalid mark name received:',
data.mark.name,
);
}
} else if (data.mark.name.startsWith('done:')) {
this.#lastPlayedChunkCount = 0;
}
break;
case 'start':
this.#streamSid = data.start.streamSid;
break;
default:
break;
}
} catch (error) {
this.#logger.error(
'Error parsing message:',
error,
'Message:',
message,
);
this.emit('error', {
type: 'error',
error,
});
}
} catch (error) {
this.#logger.error(
'Error parsing message:',
error,
'Message:',
message,
);
},
);
this.#twilioWebSocket.addEventListener('close', () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this to .addEventListener

if (this.status !== 'disconnected') {
this.close();
}
});
this.#twilioWebSocket.addEventListener(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this to addEventListener

'error',
(error: ErrorEvent | NodeErrorEvent) => {
this.emit('error', {
type: 'error',
error,
});
}
});
this.#twilioWebSocket.on('close', () => {
if (this.status !== 'disconnected') {
this.close();
}
});
this.#twilioWebSocket.on('error', (error) => {
this.emit('error', {
type: 'error',
error,
});
this.close();
});
},
);
this.on('audio_done', () => {
this.#twilioWebSocket.send(
JSON.stringify({
Expand Down
11 changes: 11 additions & 0 deletions packages/agents-extensions/test/TwilioRealtimeTransport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import { describe, test, expect, vi, beforeEach } from 'vitest';
import { EventEmitter } from 'events';
import { TwilioRealtimeTransportLayer } from '../src/TwilioRealtimeTransport';

import type { MessageEvent as NodeMessageEvent } from 'ws';
import type { MessageEvent } from 'undici-types';

vi.mock('@openai/agents/realtime', () => {
// eslint-disable-next-line @typescript-eslint/no-require-imports
const { EventEmitter } = require('events');
Expand Down Expand Up @@ -31,6 +34,14 @@ class FakeTwilioWebSocket extends EventEmitter {
close = vi.fn();
}

// @ts-expect-error - we're making the node event emitter compatible with the browser event emitter
FakeTwilioWebSocket.prototype.addEventListener = function (
type: string,
listener: (evt: MessageEvent | NodeMessageEvent) => void,
) {
this.on(type, (evt) => listener(type === 'message' ? { data: evt } : evt));
};

const base64 = (data: string) => Buffer.from(data).toString('base64');

describe('TwilioRealtimeTransportLayer', () => {
Expand Down
9 changes: 9 additions & 0 deletions packages/agents-extensions/test/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { describe, test, expect, vi } from 'vitest';
import { EventEmitter } from 'events';
import { TwilioRealtimeTransportLayer } from '../src';
import type { MessageEvent as NodeMessageEvent } from 'ws';

vi.mock('ws', () => {
class FakeWebSocket {
Expand Down Expand Up @@ -30,6 +31,14 @@ class FakeTwilioWebSocket extends EventEmitter {
close = vi.fn();
}

// @ts-expect-error - we're making the node event emitter compatible with the browser event emitter
FakeTwilioWebSocket.prototype.addEventListener = function (
type: string,
listener: (evt: MessageEvent | NodeMessageEvent) => void,
) {
this.on(type, (evt) => listener(type === 'message' ? { data: evt } : evt));
};

describe('TwilioRealtimeTransportLayer', () => {
test('should be available', () => {
const transport = new TwilioRealtimeTransportLayer({
Expand Down
5 changes: 5 additions & 0 deletions packages/agents-realtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
"default": "./dist/index.mjs"
},
"./_shims": {
"workerd": {
"require": "./dist/shims/shims-workerd.js",
"types": "./dist/shims/shims-workerd.d.ts",
"default": "./dist/shims/shims-workerd.mjs"
},
"browser": {
"require": "./dist/shims/shims-browser.js",
"types": "./dist/shims/shims-browser.d.ts",
Expand Down
4 changes: 3 additions & 1 deletion packages/agents-realtime/src/openaiRealtimeWebsocket.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
isBrowserEnvironment,
useWebSocketProtocols,
WebSocket,
} from '@openai/agents-realtime/_shims';
import {
Expand Down Expand Up @@ -149,7 +150,8 @@ export class OpenAIRealtimeWebSocket
);
}

const websocketArguments = isBrowserEnvironment()
// browsers and workerd should use the protocols argument, node should use the headers argument
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workerd's WebSocket constructor is "standards" based, so can't pass headers

const websocketArguments = useWebSocketProtocols
? [
'realtime',
// Auth
Expand Down
1 change: 1 addition & 0 deletions packages/agents-realtime/src/shims/shims-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export const WebSocket = globalThis.WebSocket;
export function isBrowserEnvironment(): boolean {
return true;
}
export const useWebSocketProtocols = true;
1 change: 1 addition & 0 deletions packages/agents-realtime/src/shims/shims-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export { WebSocket } from 'ws';
export function isBrowserEnvironment(): boolean {
return false;
}
export const useWebSocketProtocols = false;
5 changes: 5 additions & 0 deletions packages/agents-realtime/src/shims/shims-workerd.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const WebSocket = globalThis.WebSocket;
export function isBrowserEnvironment(): boolean {
return false;
}
export const useWebSocketProtocols = true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open to some other name for this, but this seemed clear

3 changes: 3 additions & 0 deletions pnpm-workspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ packages:
- packages/*
- examples/*
- docs

onlyBuiltDependencies:
- '@tailwindcss/oxide'
- esbuild
- sharp

publishBranch: main