Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: log node ids with smart client #35

Merged
merged 8 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ export class Saturn {
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {Node[]} [opts.nodes]
* @param {Node} [opts.node]
* @param {('car'|'raw')} [opts.format]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
Expand All @@ -83,17 +85,16 @@ export class Saturn {
}
}

let origins = options.origins
if (!origins || origins.length === 0) {
const replacementUrl = options.url ?? options.cdnURL
origins = [replacementUrl]
let nodes = options.nodes
if (!nodes || nodes.length === 0) {
const replacementNode = options.node ?? { url: this.opts.cdnURL }
nodes = [replacementNode]
}
const controllers = []

const createFetchPromise = async (origin) => {
const fetchOptions = { ...options, url: origin }
const createFetchPromise = async (node) => {
const fetchOptions = { ...options, url: node.url }
const url = this.createRequestURL(cidPath, fetchOptions)

const controller = new AbortController()
controllers.push(controller)
const connectTimeout = setTimeout(() => {
Expand All @@ -103,11 +104,10 @@ export class Saturn {
try {
res = await fetch(parseUrl(url), { signal: controller.signal, ...options })
clearTimeout(connectTimeout)
return { res, url, controller }
return { res, url, node, controller }
} catch (err) {
throw new Error(
`Non OK response received: ${res.status} ${res.statusText}`
)
err.node = node
throw err
}
}

Expand All @@ -119,17 +119,18 @@ export class Saturn {
})
}

const fetchPromises = Promise.any(origins.map((origin) => createFetchPromise(origin)))
const fetchPromises = Promise.any(nodes.map((node) => createFetchPromise(node)))

let log = {
startTime: new Date()
}

let res, url, controller
let res, url, controller, node
try {
({ res, url, controller } = await fetchPromises)
({ res, url, controller, node } = await fetchPromises)

abortRemainingFetches(controller, controllers)
log.nodeId = node.id
log = Object.assign(log, this._generateLog(res, log), { url })
if (!res.ok) {
const error = new Error(
Expand All @@ -142,6 +143,8 @@ export class Saturn {
if (!res) {
log.error = err.message
}
if (err.node) log.nodeId = err.node.id

// Report now if error, otherwise report after download is done.
this._finalizeLog(log)

Expand All @@ -156,6 +159,7 @@ export class Saturn {
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {Node} [opts.node]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @returns {Promise<object>}
Expand All @@ -167,11 +171,15 @@ export class Saturn {
const jwt = await getJWT(this.opts, this.storage)

const options = Object.assign({}, this.opts, { format: 'car', jwt }, opts)
const url = this.createRequestURL(cidPath, options)
const node = options.node
const origin = node?.url ?? this.opts.cdnURL
const url = this.createRequestURL(cidPath, { ...options, url: origin })

let log = {
url,
startTime: new Date()
}
if (node?.id) log.nodeId = node.id

const controller = options.controller ?? new AbortController()
const connectTimeout = setTimeout(() => {
Expand Down Expand Up @@ -220,7 +228,7 @@ export class Saturn {
const { headers } = res
log.httpStatusCode = res.status
log.cacheHit = headers.get('saturn-cache-status') === 'HIT'
log.nodeId = headers.get('saturn-node-id')
log.nodeId = log.nodeId ?? headers.get('saturn-node-id')
log.requestId = headers.get('saturn-transfer-id')
log.httpProtocol = headers.get('quic-status')

Expand Down Expand Up @@ -294,10 +302,9 @@ export class Saturn {
return
}
if (opts.raceNodes) {
const origins = nodes.slice(i, i + Saturn.defaultRaceCount).map((node) => node.url)
opts.origins = origins
opts.nodes = nodes.slice(i, i + Saturn.defaultRaceCount)
} else {
opts.url = nodes[i].url
opts.node = nodes[i]
}

try {
Expand Down Expand Up @@ -376,10 +383,11 @@ export class Saturn {
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {string} [opts.url]
* @returns {URL}
*/
createRequestURL (cidPath, opts) {
let origin = opts.url || (opts.origins && opts.origins[0]) || opts.cdnURL
let origin = opts.url ?? this.opts.cdnURL
origin = addHttpPrefix(origin)
const url = new URL(`${origin}/ipfs/${cidPath}`)

Expand Down
1 change: 1 addition & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
/**
*
* @typedef {object} Node
* @property {string} id
* @property {string} ip
* @property {number} weight
* @property {number} distance
Expand Down
62 changes: 34 additions & 28 deletions test/fallback.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ import { describe, mock, test } from 'node:test'
import { Saturn } from '#src/index.js'
import { concatChunks, generateNodes, getMockServer, HTTP_STATUS_GONE, mockJWT, mockNodesHandlers, mockOrchHandler, mockSaturnOriginHandler, MSW_SERVER_OPTS } from './test-utils.js'

const TEST_DEFAULT_ORCH = 'https://orchestrator.strn.pl/nodes'
const TEST_DEFAULT_ORCH = 'https://orchestrator.strn.pl.test/nodes'
const TEST_NODES_LIST_KEY = 'saturn-nodes'
const TEST_AUTH = 'https://fz3dyeyxmebszwhuiky7vggmsu0rlkoy.lambda-url.us-west-2.on.aws/'
const TEST_ORIGIN_DOMAIN = 'saturn.ms'
const TEST_AUTH = 'https://auth.test/'
const TEST_ORIGIN_DOMAIN = 'l1s.saturn.test'
const CLIENT_KEY = 'key'

const experimental = true
const options = {
cdnURL: TEST_ORIGIN_DOMAIN,
orchURL: TEST_DEFAULT_ORCH,
authURL: TEST_AUTH,
experimental: true,
clientKey: CLIENT_KEY,
clientId: 'test'
}

describe('Client Fallback', () => {
test('Nodes are loaded from the orchestrator if no storage is passed', async (t) => {
Expand All @@ -24,7 +31,7 @@ describe('Client Fallback', () => {
const expectedNodes = generateNodes(2, TEST_ORIGIN_DOMAIN)

// No Storage is injected
const saturn = new Saturn({ clientKey: CLIENT_KEY, experimental })
const saturn = new Saturn({ ...options })
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }

await saturn._loadNodes(mockOpts)
Expand All @@ -37,7 +44,7 @@ describe('Client Fallback', () => {

test('Storage is invoked correctly when supplied', async (t) => {
const handlers = [
mockOrchHandler(2, TEST_DEFAULT_ORCH, 'saturn.ms')
mockOrchHandler(2, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN)
]
const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
Expand All @@ -53,7 +60,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, experimental })
const saturn = new Saturn({ storage: mockStorage, ...options })

// Mocking options
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }
Expand All @@ -75,7 +82,7 @@ describe('Client Fallback', () => {

test('Storage is loaded first when the orch is slower', async (t) => {
const handlers = [
mockOrchHandler(2, TEST_DEFAULT_ORCH, 'saturn.ms', 1000)
mockOrchHandler(2, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN, 500)
]
const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
Expand All @@ -90,7 +97,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, experimental })
const saturn = new Saturn({ storage: mockStorage, ...options })

// Mocking options
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }
Expand All @@ -111,7 +118,7 @@ describe('Client Fallback', () => {

test('Content Fallback fetches a cid properly', async (t) => {
const handlers = [
mockOrchHandler(2, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(2, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
mockSaturnOriginHandler(TEST_ORIGIN_DOMAIN, 0, true),
...mockNodesHandlers(2, TEST_ORIGIN_DOMAIN)
Expand All @@ -129,7 +136,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ storage: mockStorage, ...options })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4')

Expand All @@ -144,7 +151,7 @@ describe('Client Fallback', () => {

test('Content Fallback fetches a cid properly with race', async (t) => {
const handlers = [
mockOrchHandler(5, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(5, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
mockSaturnOriginHandler(TEST_ORIGIN_DOMAIN, 0, true),
...mockNodesHandlers(5, TEST_ORIGIN_DOMAIN)
Expand All @@ -162,8 +169,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })
// const origins =
const saturn = new Saturn({ ...options })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })

Expand All @@ -178,7 +184,7 @@ describe('Client Fallback', () => {

test('Content Fallback with race fetches from consecutive nodes on failure', async (t) => {
const handlers = [
mockOrchHandler(5, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(5, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
mockSaturnOriginHandler(TEST_ORIGIN_DOMAIN, 0, true),
...mockNodesHandlers(5, TEST_ORIGIN_DOMAIN, 2)
Expand All @@ -196,7 +202,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ storage: mockStorage, ...options })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })

Expand All @@ -211,14 +217,14 @@ describe('Client Fallback', () => {

test('should fetch content from the first node successfully', async () => {
const handlers = [
mockOrchHandler(2, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(2, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(2, TEST_ORIGIN_DOMAIN)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })

const fetchContentMock = mock.fn(async function * (cidPath, opts) {
yield Buffer.from('chunk1')
Expand All @@ -239,14 +245,14 @@ describe('Client Fallback', () => {
test('should try all nodes and fail if all nodes fail', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })

const fetchContentMock = mock.fn(async function * (cidPath, opts) { throw new Error('Fetch error') }) // eslint-disable-line
saturn.fetchContent = fetchContentMock
Expand All @@ -270,14 +276,14 @@ describe('Client Fallback', () => {
test('Should abort fallback on 410s', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN, 3, HTTP_STATUS_GONE)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })
await saturn.loadNodesPromise

let error
Expand All @@ -299,14 +305,14 @@ describe('Client Fallback', () => {
test('Should abort fallback on specific errors', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN, 3, HTTP_STATUS_GONE)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })
await saturn.loadNodesPromise

let callCount = 0
Expand Down Expand Up @@ -334,14 +340,14 @@ describe('Client Fallback', () => {
test('Handles fallback with chunk overlap correctly', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })

let callCount = 0
const fetchContentMock = mock.fn(async function * (cidPath, opts) {
Expand Down Expand Up @@ -373,14 +379,14 @@ describe('Client Fallback', () => {
test('should handle byte chunk overlaps correctly', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })

let callCount = 0
let fetchContentMock = mock.fn(async function * (cidPath, opts) {
Expand Down
2 changes: 2 additions & 0 deletions test/test-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export function generateNodes (count, originDomain) {
const nodeIp = `node${i}`
const node = {
ip: nodeIp,
id: nodeIp,
weight: 50,
distance: 100,
url: `https://${nodeIp}.${originDomain}`
Expand All @@ -52,6 +53,7 @@ export function generateNodes (count, originDomain) {
*/
export function mockSaturnOriginHandler (cdnURL, delay = 0, error = false) {
cdnURL = addHttpPrefix(cdnURL)
cdnURL = `${cdnURL}/ipfs/:cid`
return rest.get(cdnURL, (req, res, ctx) => {
if (error) {
throw Error('Simulated Error')
Expand Down