diff --git a/packages/browser/src/cache/index.ts b/packages/browser/src/cache/index.ts new file mode 100644 index 00000000..d8f0a45e --- /dev/null +++ b/packages/browser/src/cache/index.ts @@ -0,0 +1,41 @@ +import * as common from '../@internal' + +export class LocalStorageCacheManager implements common.cache.PersistentCacheManager { + persistPath: string; + + constructor(persistLocation: string) { + this.persistPath = persistLocation + } + + async get(key: string): Promise { + const itemKey = this.getPersistKey(key) + const raw = localStorage.getItem(itemKey) + if (!raw) { + return null + } + return this.parse(raw) + } + + async set(key: string, val: T): Promise { + const itemKey = this.getPersistKey(key) + const raw = this.stringify(val) + localStorage.setItem(itemKey, raw) + } + + async delete(key: string): Promise { + const itemKey = this.getPersistKey(key) + localStorage.removeItem(itemKey) + } + + private getPersistKey(key: string): string { + return `qnsdk:${this.persistPath}:${key}` + } + + private parse(val: string): T { + return JSON.parse(val) + } + + private stringify(val: T): string { + return JSON.stringify(val) + } +} diff --git a/packages/browser/src/http/index.ts b/packages/browser/src/http/index.ts index ab283a4e..61293794 100644 --- a/packages/browser/src/http/index.ts +++ b/packages/browser/src/http/index.ts @@ -96,9 +96,17 @@ export class HttpClient implements common.HttpClient { mockProgress?.end() + const reqId = xhr.getResponseHeader('x-reqId') + if (!reqId) { + resolve({ + error: new UploadError('HijackedError', 'Response header x-reqId not found') + }) + return + } + resolve({ result: { - reqId: xhr.getResponseHeader('x-reqId') || undefined, + reqId, code: xhr.status, data: xhr.responseText } diff --git a/packages/common/package.json b/packages/common/package.json index 99a665a6..ebb84306 100644 --- a/packages/common/package.json +++ b/packages/common/package.json @@ -9,9 +9,10 @@ "build": "run-p build:*", "dev": "run-p \"build:* -- -w\"", "postinstall": "run-p build:*", - "build:browser": "cpx \"src/**/*\" ../browser/src/@internal", - "build:wechat-miniprogram": "cpx \"src/**/*\" ../wechat-miniprogram/src/@internal", - "build:harmony": "cpx \"src/**/*\" ../harmony/library/src/main/ets/components/@internal" + "cleanup": "rm -fr ../browser/src/@internal ../wechat-miniprogram/src/@internal ../harmony/library/src/main/ets/components/@internal", + "build:browser": "cpx \"src/**/!(*.test,*.mock).ts\" ../browser/src/@internal", + "build:wechat-miniprogram": "cpx \"src/**/!(*.test,*.mock).ts\" ../wechat-miniprogram/src/@internal", + "build:harmony": "cpx \"src/**/!(*.test,*.mock).ts\" ../harmony/library/src/main/ets/components/@internal" }, "author": "", "license": "ISC" diff --git a/packages/common/src/api/index.ts b/packages/common/src/api/index.ts index e7c47fe5..8368fd33 100644 --- a/packages/common/src/api/index.ts +++ b/packages/common/src/api/index.ts @@ -431,17 +431,20 @@ export class UploadApis { } interface GetHostConfigParams { + // TODO: typo assessKey: string bucket: string + serverUrl?: string } -interface HostConfig { +export interface HostConfig { hosts: Array<{ region: string ttl: number up: { domains: string[] old: string[] + acc_domains?: string[] } io: { domains: string[] @@ -469,8 +472,8 @@ export class ConfigApis { async getHostConfig(params: GetHostConfigParams): Promise> { /** 从配置中心获取上传服务地址 */ const query = `ak=${encodeURIComponent(params.assessKey)}&bucket=${encodeURIComponent(params.bucket)}` - // TODO: 支持设置,私有云自动获取上传地址 - const url = `${this.serverUrl}/v4/query?${query}` + const serverUrl = params.serverUrl || this.serverUrl + const url = `${serverUrl}/v4/query?${query}` const response = await this.httpClient.get(url) if (!isSuccessResult(response)) { diff --git a/packages/common/src/helper/cache/index.test.ts b/packages/common/src/helper/cache/index.test.ts new file mode 100644 index 00000000..b5ca00fd --- /dev/null +++ b/packages/common/src/helper/cache/index.test.ts @@ -0,0 +1,37 @@ +import { MemoryCacheManager, PersistentNever } from './index' + +interface CacheData { + result: string +} + +describe('CacheManager', () => { + test('test MemoryCacheManager', async () => { + const memoryCacheManager = new MemoryCacheManager() + + await memoryCacheManager.set('key', { + result: 'val' + }) + let val = await memoryCacheManager.get('key') + expect(val).toEqual({ + result: 'val' + }) + + await memoryCacheManager.delete('key') + val = await memoryCacheManager.get('key') + expect(val).toBe(null) + }) + + test('test PersistentNever', async () => { + const cacheManager = new PersistentNever() + + await cacheManager.set('key', { + result: 'val' + }) + let val = await cacheManager.get('key') + expect(val).toBe(null) + + await cacheManager.delete('key') + val = await cacheManager.get('key') + expect(val).toBe(null) + }) +}) diff --git a/packages/common/src/helper/cache/index.ts b/packages/common/src/helper/cache/index.ts new file mode 100644 index 00000000..12785e89 --- /dev/null +++ b/packages/common/src/helper/cache/index.ts @@ -0,0 +1,45 @@ +interface CacheManager { + get(key: string): Promise; + set(key: string, val: T): Promise; + delete(key: string): Promise; +} + +export interface PersistentCacheManager extends CacheManager { + persistLocation: string +} + +export class MemoryCacheManager implements CacheManager { + private cache: Map = new Map() + + async get(key: string): Promise { + return this.cache.get(key) ?? null + } + + async set(key: string, val: T): Promise { + this.cache.set(key, val) + } + + async delete(key: string): Promise { + this.cache.delete(key) + } +} + +export class PersistentNever implements PersistentCacheManager { + persistLocation: string + + constructor() { + this.persistLocation = '' + } + + async get(key: string): Promise { + return null + } + + async set(key: string, val: T): Promise { + // do nothing + } + + async delete(key: string): Promise { + // do nothing + } +} diff --git a/packages/common/src/helper/cache/persistent.mock.ts b/packages/common/src/helper/cache/persistent.mock.ts new file mode 100644 index 00000000..f6c9b960 --- /dev/null +++ b/packages/common/src/helper/cache/persistent.mock.ts @@ -0,0 +1,9 @@ +import { MemoryCacheManager, PersistentCacheManager } from './index' + +export class MockCacheManager extends MemoryCacheManager implements PersistentCacheManager { + persistLocation = '' + + get = jest.fn, [string]>(() => Promise.resolve(null)) + set = jest.fn, [string, T]>(() => Promise.resolve()) + delete = jest.fn, [string]>(() => Promise.resolve()) +} diff --git a/packages/common/src/helper/retry/backoff.test.ts b/packages/common/src/helper/retry/backoff.test.ts new file mode 100644 index 00000000..491891de --- /dev/null +++ b/packages/common/src/helper/retry/backoff.test.ts @@ -0,0 +1,43 @@ +import { + FixedBackoff, + ExponentialBackoff, + RandomizedBackoff, + LimitedBackoff +} from './backoff' + +describe('retry backoff test', () => { + test('test FixedBackoff', () => { + const backoff = new FixedBackoff(1000) + expect(backoff.getDelay()).toBe(1000) + }) + + test('test ExponentialBackoff', () => { + const backoff = new ExponentialBackoff(1000) + expect(backoff.getDelay()).toBe(1000) + expect(backoff.getDelay()).toBe(2000) + expect(backoff.getDelay()).toBe(4000) + expect(backoff.getDelay()).toBe(8000) + }) + + test('test RandomizedBackoff', () => { + const backoff = new RandomizedBackoff(new FixedBackoff(1000), 100) + for (let i = 0; i < 100; i += 1) { + expect(backoff.getDelay()).toBeGreaterThan(900) + expect(backoff.getDelay()).toBeLessThanOrEqual(1100) + } + + const backoff2 = new RandomizedBackoff(new FixedBackoff(1000), 10000) + for (let i = 0; i < 100; i += 1) { + expect(backoff.getDelay()).toBeGreaterThan(0) + expect(backoff.getDelay()).toBeLessThanOrEqual(11000) + } + }) + + test('test LimitedBackoff', () => { + const backoff = new LimitedBackoff(new ExponentialBackoff(1000), 2000) + expect(backoff.getDelay()).toBe(1000) + expect(backoff.getDelay()).toBe(2000) + expect(backoff.getDelay()).toBe(2000) + expect(backoff.getDelay()).toBe(2000) + }) +}) diff --git a/packages/common/src/helper/retry/backoff.ts b/packages/common/src/helper/retry/backoff.ts new file mode 100644 index 00000000..90733c02 --- /dev/null +++ b/packages/common/src/helper/retry/backoff.ts @@ -0,0 +1,98 @@ +export abstract class Backoff { + abstract getDelay(): number + + async wait() { + const n = this.getDelay() + if (n <= 0) { + return + } + await new Promise(resolve => setTimeout(resolve, n)) + } +} + +export class FixedBackoff extends Backoff { + private delay: number + constructor(delay: number) { + super() + this.delay = delay + } + + getDelay(): number { + return this.delay + } +} + +export class ExponentialBackoff extends Backoff { + private base: number + private factor: number + private next: number + + constructor(base: number, factor = 2) { + super() + this.base = base + this.factor = factor + this.next = base + } + + getDelay(): number { + const delay = this.next + this.next *= this.factor + return delay + } +} + +const Second = 1000 + +// make the backoff delay duration plus the delta, +// delta is belong to (-delta, delta), not inclusive +export class RandomizedBackoff extends Backoff { + private backoff: Backoff + private delta: number // int, in milliseconds + + constructor(backoff: Backoff, delta = 2 * Second) { + super() + this.backoff = backoff + this.delta = Math.floor(delta) + } + + getDelay(): number { + let diff = Math.floor(Math.random() * this.delta) + diff = Math.floor(Math.random() * 2) ? diff : -diff + const delay = this.backoff.getDelay() + diff + return Math.max(0, delay) + } +} + +export class LimitedBackoff extends Backoff { + private backoff: Backoff + private min: number + private max: number + + constructor(backoff: Backoff, max: number, min = 0) { + super() + if (min > max) { + throw new Error('min should be less than or equal to max') + } + this.backoff = backoff + this.max = max + this.min = min + } + + getDelay(): number { + let delay = Math.min( + this.max, + this.backoff.getDelay() + ) + delay = Math.max( + this.min, + delay + ) + return delay + } +} + +export function getDefaultBackoff(): Backoff { + const exponential = new ExponentialBackoff(3 * Second) + const randomized = new RandomizedBackoff(exponential, Second) + return new LimitedBackoff(randomized, 30 * Second) +} diff --git a/packages/common/src/helper/retry/index.ts b/packages/common/src/helper/retry/index.ts new file mode 100644 index 00000000..c82f0bda --- /dev/null +++ b/packages/common/src/helper/retry/index.ts @@ -0,0 +1,3 @@ +export * from './types' +export * from './backoff' +export * from './retrier' diff --git a/packages/common/src/helper/retry/retrier.test.ts b/packages/common/src/helper/retry/retrier.test.ts new file mode 100644 index 00000000..efca16c3 --- /dev/null +++ b/packages/common/src/helper/retry/retrier.test.ts @@ -0,0 +1,5 @@ +describe('retry backoff test', () => { + test('test retrier', () => { + expect(1).toBe(1) + }) +}) diff --git a/packages/common/src/helper/retry/retrier.ts b/packages/common/src/helper/retry/retrier.ts new file mode 100644 index 00000000..f96748e7 --- /dev/null +++ b/packages/common/src/helper/retry/retrier.ts @@ -0,0 +1,174 @@ +// we need to make policies to access the attempt in order +/* eslint-disable no-await-in-loop */ +import { Backoff, FixedBackoff, getDefaultBackoff } from './backoff' +import { Context, RetryPolicy, Attempt } from './types' + +/** + * if return true, then next attempt, otherwise stop retry + */ +export type BeforeRetryFunc = (attempt: A, policy: P) => Promise +/** + * if return true, then next attempt, otherwise stop retry + */ +export type AfterAttemptFunc = (attempt: A) => Promise + +export type DoFunc = (context: A['context']) => Promise + +async function defaultBeforeRetry(attempt: A, policy: P): Promise { + // if exists policy, then retry + return !!policy +} + +async function defaultAfterAttempt(attempt: A): Promise { + /** + * if exists error, then next attempt + */ + return !!attempt.error +} + +export interface RetrierOptions { + policies: RetryPolicy[] + backoff?: Backoff + afterAttempt?: AfterAttemptFunc + beforeRetry?: BeforeRetryFunc +} + +export class Retrier { + static Never = new Retrier({ + policies: [], + backoff: new FixedBackoff(0), + afterAttempt: async () => false + }) + + private policies: Array> + private _backoff: Backoff + private afterAttempt?: AfterAttemptFunc> + private beforeRetry: BeforeRetryFunc, RetryPolicy> + + constructor({ + policies, + backoff, + afterAttempt, + beforeRetry = defaultBeforeRetry + }: RetrierOptions) { + this.policies = policies + this.afterAttempt = afterAttempt + this.beforeRetry = beforeRetry + + if (!backoff) { + backoff = getDefaultBackoff() + } + + this._backoff = backoff + } + + public async initContext(context: any) { + for (const policy of this.policies) { + await policy.initContext(context) + } + } + + public async tryDo(func: DoFunc>, context?: Context): Promise { + + if (!context) { + context = {} + await this.initContext(context) + } + + // eslint-disable-next-line no-constant-condition + while (true) { + const { attempt, policy } = await this.doAttempt(func, context) + + if (!policy) { + return this.returnAttempt(attempt) + } + + await policy.prepareRetry(attempt) + if ( + this.beforeRetry + && !await this.beforeRetry(attempt, policy) + ) { + return this.returnAttempt(attempt) + } + await this._backoff.wait() + } + } + + public set backoff(backoff: Backoff) { + this._backoff = backoff + } + + public get backoff(): Backoff { + return this._backoff + } + + private async doAttempt( + func: DoFunc>, + context: Record + ): Promise<{ attempt: Attempt, policy?: RetryPolicy }> { + const attempt: Attempt = { + error: null, + context + } + + try { + attempt.result = await func(context) + } catch (error: any) { + attempt.error = error + } + + if ( + this.afterAttempt + && !await this.afterAttempt(attempt) + ) { + return { + attempt + } + } + + return { + attempt, + policy: await this.getRetryPolicy(attempt) + } + } + + private returnAttempt(attempt: Attempt): R { + if (attempt.error) { + throw attempt.error + } + + return attempt.result as R + } + + private async getRetryPolicy(attempt: Attempt): Promise { + let policy: RetryPolicy | undefined + + // remove this if branch, if no need to mock a don't retry error + if ( + attempt.error + && Object.prototype.hasOwnProperty.call(attempt.error, 'dontRetry') + && (attempt.error as any).dontRetry + ) { + return + } + + for (const p of this.policies) { + if (await p.isImportant(attempt)) { + policy = p + break + } + } + if (policy && await policy.shouldRetry(attempt)) { + return policy + } + + policy = undefined + for (const p of this.policies) { + if (await p.shouldRetry(attempt)) { + policy = p + } + } + + return policy + } +} diff --git a/packages/common/src/helper/retry/types.ts b/packages/common/src/helper/retry/types.ts new file mode 100644 index 00000000..96fa4d2e --- /dev/null +++ b/packages/common/src/helper/retry/types.ts @@ -0,0 +1,18 @@ +export type Context = Record + +export interface Attempt { + result?: T + error: Error | null + context: C +} + +export interface RetryPolicy< + T = any, + C extends Context = Context, + A extends Attempt = Attempt +> { + initContext(context: A['context']): Promise + shouldRetry(attempt: A): Promise + prepareRetry(attempt: A): Promise + isImportant(attempt: A): Promise +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 45d84384..b505b6f7 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -5,6 +5,8 @@ export * from './helper/object' export * from './helper/string' export * from './helper/progress' export * from './helper/logger' +export * as cache from './helper/cache' +export * as retry from './helper/retry' export * from './types/types' export * from './types/token' diff --git a/packages/common/src/types/error.test.ts b/packages/common/src/types/error.test.ts new file mode 100644 index 00000000..776b9bf3 --- /dev/null +++ b/packages/common/src/types/error.test.ts @@ -0,0 +1,40 @@ +import { HttpRequestError } from './error' + +describe('HttpRequestError', () => { + test('needRetry', () => { + const cases = Array.from( + { + length: 800 + }, + (_, i) => { + if (i > 0 && i < 500) { + return { + code: i, + expectValue: false + } + } + if ( + [ + 501, 509, 573, 579, 608, 612, 614, 618, 630, 631, 632, 640, 701 + ].includes(i) + ) { + return { + code: i, + expectValue: false + } + } + return { + code: i, + expectValue: true + } + } + ) + cases.unshift({ + code: -1, + expectValue: true + }) + + const error = new HttpRequestError(400, 'test') + expect(error.needRetry()).toBe(false) + }) +}) diff --git a/packages/common/src/types/error.ts b/packages/common/src/types/error.ts index 3de2dd61..ef05f73d 100644 --- a/packages/common/src/types/error.ts +++ b/packages/common/src/types/error.ts @@ -6,6 +6,13 @@ export type ErrorName = | 'FileSystemError' | 'NetworkError' | 'InternalError' + | 'HijackedError' + +export const NEVER_RETRY_ERROR_NAMES: ErrorName[] = [ + 'InvalidToken', + 'InvalidParams', + 'FileSystemError' +] export class UploadError implements Error { public stack: string | undefined @@ -18,4 +25,19 @@ export class HttpRequestError extends UploadError { constructor(public httpCode: number, message: string, public reqId?: string) { super('HttpRequestError', message) } + + needRetry(): boolean { + if (this.httpCode >= 100 && this.httpCode < 500) { + return false + } + // https://developer.qiniu.com/kodo/3928/error-responses + if ( + [ + 501, 509, 573, 579, 608, 612, 614, 618, 630, 631, 632, 640, 701 + ].includes(this.httpCode) + ) { + return false + } + return true + } } diff --git a/packages/common/src/types/host.ts b/packages/common/src/types/host.ts index 7f811aad..5b0793f5 100644 --- a/packages/common/src/types/host.ts +++ b/packages/common/src/types/host.ts @@ -1,5 +1,7 @@ import { Result } from './types' +// TODO: there is nothing reference this interface. +// should we remove it and create a more common interface in `upload/common/host`? export interface HostProvider { getUploadHost(): Promise> } diff --git a/packages/common/src/types/http.mock.ts b/packages/common/src/types/http.mock.ts new file mode 100644 index 00000000..63876ca2 --- /dev/null +++ b/packages/common/src/types/http.mock.ts @@ -0,0 +1,9 @@ +import { HttpClient, HttpClientOptions, HttpResponse } from './http' +import { Result } from './types' + +export class MockHttpClient implements HttpClient { + get = jest.fn>, [string, HttpClientOptions | undefined]>() + put = jest.fn>, [string, HttpClientOptions | undefined]>() + post = jest.fn>, [string, HttpClientOptions | undefined]>() + delete = jest.fn>, [string, HttpClientOptions | undefined]>() +} diff --git a/packages/common/src/types/http.test.ts b/packages/common/src/types/http.test.ts new file mode 100644 index 00000000..4da7c3bc --- /dev/null +++ b/packages/common/src/types/http.test.ts @@ -0,0 +1,77 @@ +import { HttpAbortController, HttpFormData, isHttpFormData, HttpClientOptions, HttpResponse, HttpClient } from './http' + +describe('HttpAbortController', () => { + test('should set aborted to true when abort is called', () => { + const controller = new HttpAbortController() + expect(controller.aborted).toBe(false) + + const listener1 = jest.fn() + const listener2 = jest.fn() + controller.onAbort(listener1) + controller.onAbort(listener2) + controller.abort() + + expect(controller.aborted).toBe(true) + expect(listener1).toHaveBeenCalledTimes(1) + expect(listener2).toHaveBeenCalledTimes(1) + }) + + test('should not call listeners more than once', () => { + const controller = new HttpAbortController() + const listener = jest.fn() + controller.onAbort(listener) + controller.abort() + controller.abort() + expect(listener).toHaveBeenCalledTimes(1) + }) +}) + +describe('HttpFormData', () => { + test('should set and get values correctly', () => { + const formData = new HttpFormData() + formData.set('key1', 'value1') + formData.set('key2', 'value2', { option: 'option2' }) + expect(formData.get('key1')).toEqual({ value: 'value1' }) + expect(formData.get('key2')).toEqual({ value: 'value2', option: { option: 'option2' } }) + }) + + test('should iterate over entries correctly', () => { + const formData = new HttpFormData() + formData.set('key1', 'value1') + formData.set('key2', 'value2', { option: 'option2' }) + + const entries = formData.entries() + expect(entries).toEqual([ + ['key1', 'value1', undefined], + ['key2', 'value2', { option: 'option2' }] + ]) + }) + + test('should call callback for each entry in forEach', () => { + const formData = new HttpFormData() + formData.set('key1', 'value1') + formData.set('key2', 'value2', { option: 'option2' }) + + const callback = jest.fn() + formData.forEach(callback) + + expect(callback).toHaveBeenCalledTimes(2) + expect(callback).toHaveBeenCalledWith('key1', 'value1', undefined) + expect(callback).toHaveBeenCalledWith('key2', 'value2', { option: 'option2' }) + }) +}) + +describe('isHttpFormData', () => { + test('should return true for instances of HttpFormData', () => { + const formData = new HttpFormData() + expect(isHttpFormData(formData)).toBe(true) + }) + + test('should return false for non-instances of HttpFormData', () => { + expect(isHttpFormData({})).toBe(false) + // the falsy values return themselves doesn't make sense + expect(isHttpFormData(null)).toBe(null) + expect(isHttpFormData(undefined)).toBe(undefined) + expect(isHttpFormData(0)).toBe(0) + }) +}) diff --git a/packages/common/src/types/http.ts b/packages/common/src/types/http.ts index 4582db65..3bb32cb1 100644 --- a/packages/common/src/types/http.ts +++ b/packages/common/src/types/http.ts @@ -26,6 +26,9 @@ export interface HttpClientOptions { export interface HttpResponse { code: number data: string + // keep this optional even if added hijack error. + // because there will be other errors, like network error, + // that interrupts the request before got reqId of response. reqId?: string } diff --git a/packages/common/src/types/utility.ts b/packages/common/src/types/utility.ts new file mode 100644 index 00000000..3672dc04 --- /dev/null +++ b/packages/common/src/types/utility.ts @@ -0,0 +1,2 @@ +// Remove this if `enum` is available +export type ValueOf = T[keyof T] diff --git a/packages/common/src/upload/common/config/index.test.ts b/packages/common/src/upload/common/config/index.test.ts new file mode 100644 index 00000000..b578ef0f --- /dev/null +++ b/packages/common/src/upload/common/config/index.test.ts @@ -0,0 +1,29 @@ +import { MockHttpClient } from '../../../types/http.mock' +import { initUploadConfig } from './index' + +describe('test config', () => { + test('test check required options in runtime', () => { + expect(() => { + initUploadConfig({ + tokenProvider: () => Promise.resolve('mock_token') + }) + }) + .toThrow('HttpClient parameter must be set') + }) + + test('test config default values', () => { + const config = initUploadConfig({ + httpClient: new MockHttpClient(), + tokenProvider: () => Promise.resolve('mock_token') + }) + + expect(config.logLevel).toBe('NONE') + expect(config.protocol).toBe('HTTPS') + expect(config.accelerateUploading).toBe(false) + expect(config.uploadHosts).toEqual([]) + expect(config.bucketServerHosts).toEqual(['uc.qbox.me']) + expect(config.apiServerUrl).toBe('https://uc.qbox.me') + + // provider and retrier see other test cases + }) +}) diff --git a/packages/common/src/upload/common/config/index.ts b/packages/common/src/upload/common/config/index.ts index a94e7c6b..42854c94 100644 --- a/packages/common/src/upload/common/config/index.ts +++ b/packages/common/src/upload/common/config/index.ts @@ -1,4 +1,6 @@ import { UploadConfig } from '../../types' +import { UploadContext } from '../context' +import { getDefaultRegionsHostsRetrier, getDefaultRegionsProvider, ServiceName } from '../region' export function initUploadConfig(config: UploadConfig): Required { if (!config.httpClient) throw new Error('HttpClient parameter must be set') @@ -6,8 +8,51 @@ export function initUploadConfig(config: UploadConfig): Required { const logLevel = config.logLevel || 'NONE' const protocol = config.protocol || 'HTTPS' + const accelerateUploading = config.accelerateUploading || false const uploadHosts = config.uploadHosts || [] - const apiServerUrl = config.apiServerUrl || 'https://api.qiniu.com' + let bucketServerHosts = [ + 'uc.qiniuapi.com', + 'kodo-config.qiniuapi.com', + 'uc.qbox.me' + ] + if (Array.isArray(config.bucketServerHosts) && config.bucketServerHosts.length) { + bucketServerHosts = config.bucketServerHosts + } else if (config.apiServerUrl) { + const domain = config.apiServerUrl.split('://')[1] + if (domain) { + bucketServerHosts = [config.apiServerUrl] + } else { + throw new Error('Invalid apiServerUrl, and please use bucketServerHosts instead of it') + } + } + const apiServerUrl = config.apiServerUrl || `${protocol.toLowerCase()}://${bucketServerHosts[0]}` - return { ...config, protocol, apiServerUrl, logLevel, uploadHosts } as Required + const httpClient = config.httpClient + const regionsProviderGetter = config.regionsProviderGetter + || ((context: UploadContext) => getDefaultRegionsProvider({ + httpClient, + bucketServerHosts, + memoryCache: config.regionsMemoryCache, + persistentCache: config.regionsPersistentCache, + serverProtocol: protocol, + accessKey: context.token!.assessKey, + bucketName: context.token!.bucket + })) + const uploadRetrierGetter = config.uploadRetrierGetter + || ((context: UploadContext) => getDefaultRegionsHostsRetrier({ + regionsProvider: regionsProviderGetter(context), + serviceNames: accelerateUploading ? [ServiceName.UP_ACC, ServiceName.UP] : [ServiceName.UP] + })) + + return { + ...config, + protocol, + apiServerUrl, + bucketServerHosts, + logLevel, + accelerateUploading, + uploadHosts, + regionsProviderGetter, + uploadRetrierGetter + } as Required // TODO: this `as` statement has type error risk } diff --git a/packages/common/src/upload/common/context/index.ts b/packages/common/src/upload/common/context/index.ts index f21cccd0..78bd6507 100644 --- a/packages/common/src/upload/common/context/index.ts +++ b/packages/common/src/upload/common/context/index.ts @@ -1,7 +1,10 @@ import { Token } from '../../../types/token' import { UploadError } from '../../../types/error' +import { Result } from '../../../types/types' import { Host } from '../host' +import { Region, TServiceName } from '../region' +import { Retrier } from '../../../helper/retry' /** 进度信息;包含整体的进度以及具体每个部分的详细进度 */ export type Progress = { @@ -15,6 +18,7 @@ export type Progress = { size: number /** 目前处理的百分比进度;范围 0-1 */ percent: number + // TODO: 应该没用,只有赋值,没有访问 /** 该处理是否复用了缓存; */ fromCache: boolean }> @@ -40,7 +44,29 @@ export function updateTotalIntoProgress(progress: Progress): Progress { /** 队列的上下文;用于在所有任务间共享状态 */ export interface QueueContext { - /** 上传使用的 host; 由公共的 HostProvideTask 维护和更新 */ + // configApiRetrier: Retrier + configApiContext: { + /** 获取配置的域名 */ + host?: Host + /** 备用获取配置的域名 */ + alternativeHosts?: Host[] + } + operationApiRetrier: Retrier + operationApiContext: { + /** 操作使用的 host; */ + host?: Host + /** 备用域名 */ + alternativeHosts?: Host[] + /** 当前所使用的服务 */ + serviceName?: TServiceName + /** 备用服务 */ + alternativeServiceNames?: TServiceName[] + /** 当前使用的区域 */ + region?: Region + /** 备用区域 */ + alternativeRegions?: Region[] + } + /** @deprecated 上传使用的 host; 使用 `operationApiContext.host` 替代 */ host?: Host /** 上传使用的 token; 由公共的 TokenProvideTask 维护和更新 */ token?: Token @@ -57,12 +83,31 @@ export interface QueueContext { /** 上传任务的队列上下文 */ export class UploadContext implements QueueContext { + // configApiRetrier: Retrier + configApiContext: { + host?: Host + alternativeHosts?: Host[] + } + operationApiRetrier: Retrier + operationApiContext: { + host?: Host + alternativeHosts?: Host[] + serviceName?: TServiceName + alternativeServiceNames?: TServiceName[] + region?: Region + alternativeRegions?: Region[] + } + // TODO: deprecate, this not the real host when retrying host?: Host token?: Token result?: string error?: UploadError progress: Progress constructor() { + // this.configApiRetrier = Retrier.Never + this.configApiContext = {} + this.operationApiRetrier = Retrier.Never + this.operationApiContext = {} this.progress = { size: 0, percent: 0, diff --git a/packages/common/src/upload/common/host/host.ts b/packages/common/src/upload/common/host/host.ts new file mode 100644 index 00000000..7c61744c --- /dev/null +++ b/packages/common/src/upload/common/host/host.ts @@ -0,0 +1,214 @@ +import { ConfigApis } from '../../../api' +import { HttpProtocol } from '../../../types/http' +import { ErrorName, UploadError } from '../../../types/error' +import { Result, isErrorResult, isSuccessResult } from '../../../types/types' +import { MockProgress } from '../../../helper/progress' + +import { Task } from '../queue' +import { QueueContext } from '../context' + +/** + * @description 解冻时间,key 是 host,value 为解冻时间 + */ +const unfreezeTimeMap = new Map() + +export class Host { + constructor( + private host: string, + private protocol: HttpProtocol + ) {} + + /** + * @description 当前 host 是否为冻结状态 + */ + isFrozen() { + const currentTime = new Date().getTime() + const unfreezeTime = unfreezeTimeMap.get(this.host) + return unfreezeTime != null && unfreezeTime >= currentTime + } + + /** + * @param {number} time 单位秒,默认 20s + * @description 冻结该 host 对象,该 host 将在指定时间内不可用 + */ + freeze(time = 20) { + const unfreezeTime = new Date().getTime() + (time * 1000) + unfreezeTimeMap.set(this.host, unfreezeTime) + } + + /** + * @description 解冻该 host + */ + unfreeze() { + unfreezeTimeMap.delete(this.host) + } + + /** + * @description 获取当前 host 的完整 url + */ + getUrl() { + return `${this.protocol.toLowerCase()}://${this.host}` + } + + /** + * @description 获取解冻时间 + */ + getUnfreezeTime() { + return unfreezeTimeMap.get(this.host) + } +} + +interface GetUploadHostParams { + assessKey: string + bucket: string +} + +class HostProvider { + /** + * @description 缓存的 host 表,以 bucket 和 assessKey 作为 key + * TODO: 这个 Cache 应该没用,每次都是 new 出来的,所以总是一个空的 Map + */ + private cachedHostsMap = new Map() + + constructor( + private protocol: HttpProtocol, + private configApis: ConfigApis, + private initHosts?: string[] + ) {} + + /** + * @description 注册可用 host + */ + private register(assessKey: string, bucketName: string, hosts: string[]): void { + this.cachedHostsMap.set( + `${assessKey}@${bucketName}`, + hosts.map(host => new Host(host, this.protocol)) + ) + } + + /** + * @description 刷新最新的 host 数据,如果用户在构造时该类时传入了 host 或者已经存在缓存则不会发起请求 + */ + private async refresh(assessKey: string, bucketName: string): Promise> { + const cachedHostList = this.cachedHostsMap.get(`${assessKey}@${bucketName}`) || [] + if (cachedHostList.length > 0) return { result: false } + + if (this.initHosts && this.initHosts.length > 0) { + this.register(assessKey, bucketName, this.initHosts) + return { result: true } + } + + const configResult = await this.configApis.getHostConfig({ + assessKey, + bucket: bucketName + }) + + if (!isSuccessResult(configResult)) { + return configResult + } + + const hostConfigs = configResult.result.hosts + + if (hostConfigs && hostConfigs.length > 0) { + // 取第一个区域也就是当前空间所在区域的上传地址 + // 暂时不用其他区域上传地址是是因为不同区域必须从头上传(第一个分片) + const hostConfig = hostConfigs[0] + this.register(assessKey, bucketName, [ + // 严格依照优先级 + ...hostConfig.up.domains, + ...hostConfig.up.old + ]) + } + + return { result: true } + } + + /** + * @description 获取一个可用的上传 Host,排除已冻结的 + */ + public async getUploadHost(params: GetUploadHostParams): Promise> { + const { assessKey, bucket } = params + + const refreshResult = await this.refresh(assessKey, bucket) + if (!isSuccessResult(refreshResult)) return refreshResult + + const cachedHostList = this.cachedHostsMap.get(`${assessKey}@${bucket}`) || [] + + if (cachedHostList.length === 0) { + return { error: new UploadError('InvalidUploadHost', 'No upload host available') } + } + + const availableHostList = cachedHostList.filter(host => !host.isFrozen()) + if (availableHostList.length > 0) return { result: availableHostList[0] } + + // 无可用的,去取离解冻最近的 host + const priorityQueue = cachedHostList + .slice() + .sort((hostA, hostB) => (hostA.getUnfreezeTime() || 0) - (hostB.getUnfreezeTime() || 0)) + + return { result: priorityQueue[0] } + } +} + +export type HostProgressKey = 'prepareUploadHost' + +/** + * @deprecated + */ +export class HostProvideTask implements Task { + private hostProvider: HostProvider + constructor( + private context: QueueContext, + configApis: ConfigApis, + protocol: HttpProtocol, + initHosts?: string[] + ) { + this.hostProvider = new HostProvider(protocol, configApis, initHosts) + this.context.progress.details.prepareUploadHost = { + fromCache: false, + percent: 0, + size: 0 + } + } + + async cancel(): Promise { + return { result: true } + } + + async process(notice: () => void): Promise { + const progress = new MockProgress(1) + progress.onProgress(value => { + this.context.progress.details.prepareUploadHost.percent = value + notice() + }) + + const needFreezeError: ErrorName[] = ['HttpRequestError', 'NetworkError', 'HijackedError'] + if (this.context.error && needFreezeError.includes(this.context.error.name)) { + // 只要是网络错误就冻结当前的 host + this.context.host?.freeze() + } + + // 当前的 host 没有被冻结,继续复用 + if (this.context.host?.isFrozen() === false) { + this.context.progress.details.prepareUploadHost.fromCache = true + progress.end() + return { result: true } + } + + // 重新更新 host + const token = this.context.token! + const hostResult = await this.hostProvider.getUploadHost(token) + if (!isSuccessResult(hostResult)) { + if (isErrorResult(hostResult)) { + this.context.error = hostResult.error + } + + progress.stop() + return hostResult + } + + this.context.host = hostResult.result + progress.end() + return { result: true } + } +} diff --git a/packages/common/src/upload/common/host/index.ts b/packages/common/src/upload/common/host/index.ts index 214f0108..c17b8a2d 100644 --- a/packages/common/src/upload/common/host/index.ts +++ b/packages/common/src/upload/common/host/index.ts @@ -1,210 +1,2 @@ -import { ConfigApis } from '../../../api' -import { HttpProtocol } from '../../../types/http' -import { ErrorName, UploadError } from '../../../types/error' -import { Result, isErrorResult, isSuccessResult } from '../../../types/types' -import { MockProgress } from '../../../helper/progress' - -import { Task } from '../queue' -import { QueueContext } from '../context' - -/** - * @description 解冻时间,key 是 host,value 为解冻时间 - */ -const unfreezeTimeMap = new Map() - -export class Host { - constructor( - private host: string, - private protocol: HttpProtocol - ) {} - - /** - * @description 当前 host 是否为冻结状态 - */ - isFrozen() { - const currentTime = new Date().getTime() - const unfreezeTime = unfreezeTimeMap.get(this.host) - return unfreezeTime != null && unfreezeTime >= currentTime - } - - /** - * @param {number} time 单位秒,默认 20s - * @description 冻结该 host 对象,该 host 将在指定时间内不可用 - */ - freeze(time = 20) { - const unfreezeTime = new Date().getTime() + (time * 1000) - unfreezeTimeMap.set(this.host, unfreezeTime) - } - - /** - * @description 解冻该 host - */ - unfreeze() { - unfreezeTimeMap.delete(this.host) - } - - /** - * @description 获取当前 host 的完整 url - */ - getUrl() { - return `${this.protocol.toLowerCase()}://${this.host}` - } - - /** - * @description 获取解冻时间 - */ - getUnfreezeTime() { - return unfreezeTimeMap.get(this.host) - } -} - -interface GetUploadHostParams { - assessKey: string - bucket: string -} - -class HostProvider { - /** - * @description 缓存的 host 表,以 bucket 和 assessKey 作为 key - */ - private cachedHostsMap = new Map() - - constructor( - private protocol: HttpProtocol, - private configApis: ConfigApis, - private initHosts?: string[] - ) {} - - /** - * @description 注册可用 host - */ - private register(assessKey: string, bucketName: string, hosts: string[]): void { - this.cachedHostsMap.set( - `${assessKey}@${bucketName}`, - hosts.map(host => new Host(host, this.protocol)) - ) - } - - /** - * @description 刷新最新的 host 数据,如果用户在构造时该类时传入了 host 或者已经存在缓存则不会发起请求 - */ - private async refresh(assessKey: string, bucketName: string): Promise> { - const cachedHostList = this.cachedHostsMap.get(`${assessKey}@${bucketName}`) || [] - if (cachedHostList.length > 0) return { result: false } - - if (this.initHosts && this.initHosts.length > 0) { - this.register(assessKey, bucketName, this.initHosts) - return { result: true } - } - - const configResult = await this.configApis.getHostConfig({ - assessKey, - bucket: bucketName - }) - - if (!isSuccessResult(configResult)) { - return configResult - } - - const hostConfigs = configResult.result.hosts - - if (hostConfigs && hostConfigs.length > 0) { - // 取第一个区域也就是当前空间所在区域的上传地址 - // 暂时不用其他区域上传地址是是因为不同区域必须从头上传(第一个分片) - const hostConfig = hostConfigs[0] - this.register(assessKey, bucketName, [ - // 严格依照优先级 - ...hostConfig.up.domains, - ...hostConfig.up.old - ]) - } - - return { result: true } - } - - /** - * @description 获取一个可用的上传 Host,排除已冻结的 - */ - public async getUploadHost(params: GetUploadHostParams): Promise> { - const { assessKey, bucket } = params - - const refreshResult = await this.refresh(assessKey, bucket) - if (!isSuccessResult(refreshResult)) return refreshResult - - const cachedHostList = this.cachedHostsMap.get(`${assessKey}@${bucket}`) || [] - - if (cachedHostList.length === 0) { - return { error: new UploadError('InvalidUploadHost', 'No upload host available') } - } - - const availableHostList = cachedHostList.filter(host => !host.isFrozen()) - if (availableHostList.length > 0) return { result: availableHostList[0] } - - // 无可用的,去取离解冻最近的 host - const priorityQueue = cachedHostList - .slice() - .sort((hostA, hostB) => (hostA.getUnfreezeTime() || 0) - (hostB.getUnfreezeTime() || 0)) - - return { result: priorityQueue[0] } - } -} - -export type HostProgressKey = 'prepareUploadHost' - -export class HostProvideTask implements Task { - private hostProvider: HostProvider - constructor( - private context: QueueContext, - configApis: ConfigApis, - protocol: HttpProtocol, - initHosts?: string[] - ) { - this.hostProvider = new HostProvider(protocol, configApis, initHosts) - this.context.progress.details.prepareUploadHost = { - fromCache: false, - percent: 0, - size: 0 - } - } - - async cancel(): Promise { - return { result: true } - } - - async process(notice: () => void): Promise { - const progress = new MockProgress(1) - progress.onProgress(value => { - this.context.progress.details.prepareUploadHost.percent = value - notice() - }) - - const needFreezeError: ErrorName[] = ['HttpRequestError', 'NetworkError'] - if (this.context.error && needFreezeError.includes(this.context.error.name)) { - // 只要是网络错误就冻结当前的 host - this.context.host?.freeze() - } - - // 当前的 host 没有被冻结,继续复用 - if (this.context.host?.isFrozen() === false) { - this.context.progress.details.prepareUploadHost.fromCache = true - progress.end() - return { result: true } - } - - // 重新更新 host - const token = this.context.token! - const hostResult = await this.hostProvider.getUploadHost(token) - if (!isSuccessResult(hostResult)) { - if (isErrorResult(hostResult)) { - this.context.error = hostResult.error - } - - progress.stop() - return hostResult - } - - this.context.host = hostResult.result - progress.end() - return { result: true } - } -} +export * from './host' +export * from './retry' diff --git a/packages/common/src/upload/common/host/retry/default.test.ts b/packages/common/src/upload/common/host/retry/default.test.ts new file mode 100644 index 00000000..02ba0cc3 --- /dev/null +++ b/packages/common/src/upload/common/host/retry/default.test.ts @@ -0,0 +1,79 @@ +// In face, this tests have included policies tests. +// So it's ok to remove the policies tests. +import { getDefaultHostsRetrier, shouldNextAttempt } from './default' +import { HostsRetryContext } from './policies' +import { Retrier, Attempt, FixedBackoff } from '../../../../helper/retry' +import { Result, isSuccessResult, isCanceledResult } from '../../../../types/types' +import { NEVER_RETRY_ERROR_NAMES, HttpRequestError, UploadError } from '../../../../types/error' +import { Host } from '../host' + +import { MockHttpClient } from '../../../../types/http.mock' +import { HttpClientOptions } from '../../../../types/http' +import { MockProgress } from '../../../..' + +describe('getDefaultHostsRetrier', () => { + let retrier: Retrier> + let hosts: Host[] + const mockHttpClient = new MockHttpClient() + + beforeAll(() => { + mockHttpClient.post.mockImplementation((url: string, _opts?: HttpClientOptions) => { + if (url.includes('js-sdk3')) { + return Promise.resolve({ + result: { + code: 200, + data: '{}' + } + }) + } + return Promise.resolve({ + error: new UploadError('HttpRequestError', 'mock request error') + }) + }) + }) + + afterAll(() => { + mockHttpClient.post.mockClear() + }) + + beforeEach(() => { + hosts = [ + new Host('js-sdk1.qiniu.com', 'HTTP'), + new Host('js-sdk2.qiniu.com', 'HTTP'), + new Host('js-sdk3.qiniu.com', 'HTTP') + ] + retrier = getDefaultHostsRetrier({ hosts }) + // the cases timeout default is 5000ms + // don't wait it too long + retrier.backoff = new FixedBackoff(100) + }) + + test('test retrier init', async () => { + const context: HostsRetryContext = {} + await retrier.initContext(context) + + expect(context.host).toEqual(hosts[0]) + expect(context.alternativeHosts).toEqual([hosts[1], hosts[2]]) + }) + + test('test retrier retryDo', async () => { + const context: HostsRetryContext = {} + await retrier.initContext(context) + + const result = await retrier.tryDo((ctx: HostsRetryContext) => { + if (!ctx.host) { + return Promise.resolve({ + error: new UploadError('InvalidUploadHost', 'invalid upload host') + }) + } + return mockHttpClient.post(ctx.host?.getUrl(), undefined) + }) + + if (!isSuccessResult(result)) { + throw new Error('expect the result is successful') + } + expect(result.result.code).toBe(200) + expect(result.result.data).toBe('{}') + expect(mockHttpClient.post).toBeCalledTimes(3) + }) +}) diff --git a/packages/common/src/upload/common/host/retry/default.ts b/packages/common/src/upload/common/host/retry/default.ts new file mode 100644 index 00000000..7116725a --- /dev/null +++ b/packages/common/src/upload/common/host/retry/default.ts @@ -0,0 +1,39 @@ +import { Result, isCanceledResult, isErrorResult, isSuccessResult } from '../../../../types/types' +import { NEVER_RETRY_ERROR_NAMES, HttpRequestError } from '../../../../types/error' +import { Attempt, Retrier } from '../../../../helper/retry' + +import { Host } from '../host' +import { HostsRetryPolicy } from './policies' + +interface HostsRetrierOptions { + hosts?: Host[] +} + +export function getDefaultHostsRetrier({ + hosts +}: HostsRetrierOptions) { + return new Retrier>({ + policies: [ + new HostsRetryPolicy({ + hosts + }) + ], + afterAttempt: shouldNextAttempt + }) +} + +export async function shouldNextAttempt(attempt: Attempt): Promise { + if (attempt.error || !attempt.result) { + return true + } + if (isSuccessResult(attempt.result) || isCanceledResult(attempt.result)) { + return false + } + if (NEVER_RETRY_ERROR_NAMES.includes(attempt.result.error.name)) { + return false + } + if (attempt.result.error instanceof HttpRequestError) { + return attempt.result.error.needRetry() + } + return true +} diff --git a/packages/common/src/upload/common/host/retry/index.ts b/packages/common/src/upload/common/host/retry/index.ts new file mode 100644 index 00000000..bcf0d166 --- /dev/null +++ b/packages/common/src/upload/common/host/retry/index.ts @@ -0,0 +1,2 @@ +export * from './policies' +export * from './default' diff --git a/packages/common/src/upload/common/host/retry/policies.test.ts b/packages/common/src/upload/common/host/retry/policies.test.ts new file mode 100644 index 00000000..0359cd0a --- /dev/null +++ b/packages/common/src/upload/common/host/retry/policies.test.ts @@ -0,0 +1,88 @@ +import { HostsRetryPolicy, HostsRetryContext } from './policies' +import { Attempt } from '../../../../helper/retry' +import { Host } from '../host' + +describe('HostsRetryPolicy', () => { + let hosts: Host[] + let policy: HostsRetryPolicy + + beforeEach(() => { + hosts = [ + new Host('js-sdk1.qiniu.com', 'HTTP'), + new Host('js-sdk2.qiniu.com', 'HTTP'), + new Host('js-sdk3.qiniu.com', 'HTTP') + ] + policy = new HostsRetryPolicy({ hosts }) + }) + + it('should initialize context with the first host and alternative hosts', async () => { + const context: HostsRetryContext = {} + await policy.initContext(context) + + expect(context.host).toEqual(hosts[0]) + expect(context.alternativeHosts).toEqual([hosts[1], hosts[2]]) + }) + + it('should return true for shouldRetry if there are alternative hosts', async () => { + const context: HostsRetryContext = { + host: hosts[0], + alternativeHosts: [hosts[1], hosts[2]] + } + const attempt: Attempt = { + context, + error: null, + result: null + } + + const shouldRetry = await policy.shouldRetry(attempt) + expect(shouldRetry).toBe(true) + }) + + it('should return false for shouldRetry if there are no alternative hosts', async () => { + const context: HostsRetryContext = { + host: hosts[0], + alternativeHosts: [] + } + const attempt: Attempt = { + context, + error: null, + result: null + } + + const shouldRetry = await policy.shouldRetry(attempt) + expect(shouldRetry).toBe(false) + }) + + it('should prepare retry by shifting to the next alternative host', async () => { + const context: HostsRetryContext = { + host: hosts[0], + alternativeHosts: [hosts[1], hosts[2]] + } + const attempt: Attempt = { context, error: null, result: null } + + await policy.prepareRetry(attempt) + + expect(context.host).toEqual(hosts[1]) + expect(context.alternativeHosts).toEqual([hosts[2]]) + }) + + it('should throw an error if there are no alternative hosts to retry', async () => { + const context: HostsRetryContext = { + host: hosts[0], + alternativeHosts: [] + } + const attempt: Attempt = { + context, + error: null, + result: null + } + + await expect(policy.prepareRetry(attempt)) + .rejects.toThrow('There isn\'t available host for next try') + }) + + it('should return false for isImportant', async () => { + const isImportant = await policy.isImportant() + expect(isImportant).toBe(false) + }) +}) diff --git a/packages/common/src/upload/common/host/retry/policies.ts b/packages/common/src/upload/common/host/retry/policies.ts new file mode 100644 index 00000000..5713e303 --- /dev/null +++ b/packages/common/src/upload/common/host/retry/policies.ts @@ -0,0 +1,59 @@ +import { Attempt, RetryPolicy } from '../../../../helper/retry' + +import { Host } from '../host' + +export interface HostsRetryContext { + /** 获取配置的域名 */ + host?: Host + /** 备用获取配置的域名 */ + alternativeHosts?: Host[] +} + +export interface HostsRetryPolicyOptions { + // This could be improved in future, + // by the NewHostProvider with `getHosts(): Promise` + hosts?: Host[] +} + +export class HostsRetryPolicy implements RetryPolicy { + private hosts: Host[] + + constructor({ + hosts = [] + }: HostsRetryPolicyOptions = {}) { + this.hosts = hosts + } + + async initContext(context: HostsRetryContext) { + if (this.skipInit) { + return + } + + const hosts = this.hosts.slice() + + context.host = hosts.shift() + context.alternativeHosts = hosts + } + + async shouldRetry(attempt: Attempt) { + return !!attempt.context.alternativeHosts?.length + } + + async prepareRetry(attempt: Attempt) { + const context = attempt.context + + if (!context.alternativeHosts?.length) { + throw new Error('There isn\'t available host for next try') + } + + context.host = context.alternativeHosts.shift() + } + + async isImportant() { + return false + } + + private get skipInit(): boolean { + return !this.hosts.length + } +} diff --git a/packages/common/src/upload/common/queue/index.ts b/packages/common/src/upload/common/queue/index.ts index 964679e3..cad232ea 100644 --- a/packages/common/src/upload/common/queue/index.ts +++ b/packages/common/src/upload/common/queue/index.ts @@ -205,6 +205,7 @@ export class TaskQueue { } } + this.error = result.error this.handleError() this.cancel() // 停止队列 } diff --git a/packages/common/src/upload/common/region/index.ts b/packages/common/src/upload/common/region/index.ts new file mode 100644 index 00000000..8d9cbf99 --- /dev/null +++ b/packages/common/src/upload/common/region/index.ts @@ -0,0 +1,4 @@ +export * from './region' +export * from './providers' +export * from './retry' +export * from './prepare' diff --git a/packages/common/src/upload/common/region/prepare.test.ts b/packages/common/src/upload/common/region/prepare.test.ts new file mode 100644 index 00000000..e69de29b diff --git a/packages/common/src/upload/common/region/prepare.ts b/packages/common/src/upload/common/region/prepare.ts new file mode 100644 index 00000000..e9d5b75a --- /dev/null +++ b/packages/common/src/upload/common/region/prepare.ts @@ -0,0 +1,79 @@ +import { Result } from '../../../types/types' +import { UploadError } from '../../../types/error' +import { Retrier } from '../../../helper/retry' +import { UploadConfig } from '../../types' + +import { QueueContext } from '../context' +import { Host } from '../host' +import { Task } from '../queue' + +export interface PrepareRegionsHostsTaskOptions { + config: Required + context: QueueContext +} + +export type RegionsHostsProgressKey = 'prepareRegionsHosts' + +export class PrepareRegionsHostsTask implements Task { + private readonly config: Required + private readonly context: QueueContext + + constructor({ + config, + context + }: PrepareRegionsHostsTaskOptions) { + this.config = config + this.context = context + } + + public async cancel(): Promise { + return { + result: true + } + } + + public async process(): Promise { + const { + protocol, + uploadHosts, + uploadRetrierGetter, + regionsProviderGetter + } = this.config + const { + operationApiContext + } = this.context + + if (uploadHosts.length) { + [ + operationApiContext.host, + ...operationApiContext.alternativeHosts + ] = uploadHosts.map(h => new Host(h, protocol)) + return { + result: true + } + } + + this.context.operationApiRetrier = uploadRetrierGetter(this.context) + + if (this.context.operationApiRetrier === Retrier.Never) { + const provider = regionsProviderGetter(this.context) + const regions = await provider.getRegions() + return { + result: true + } + } + + try { + await this.context.operationApiRetrier.initContext(this.context.operationApiContext) + } catch (err: any) { + this.context.error = new UploadError('InvalidUploadHost', err.toString()) + return { + error: this.context.error + } + } + + return { + result: true + } + } +} diff --git a/packages/common/src/upload/common/region/providers/cached.test.ts b/packages/common/src/upload/common/region/providers/cached.test.ts new file mode 100644 index 00000000..4c2959ef --- /dev/null +++ b/packages/common/src/upload/common/region/providers/cached.test.ts @@ -0,0 +1,122 @@ +import { Result } from '../../../../types/types' +import { Region } from '../region' + +import { RegionsProvider } from './types' +import { CachedRegionsProvider, getCacheKey } from './cached' +import { MockCacheManager } from '../../../../helper/cache/persistent.mock' + +describe('getCacheKey', () => { + test('assert value and ignore hosts order', () => { + const key = getCacheKey({ + bucketServerHosts: [ + 'bucket1.js-sdk.qiniu.com', + 'bucket2.js-sdk.qiniu.com', + 'bucket3.js-sdk.qiniu.com' + ], + accessKey: 'fakeAK', + bucketName: 'fake-bucket' + }) + + expect(key).toBe( + 'regions:fakeAK:fake-bucket:bucket1.js-sdk.qiniu.com;bucket2.js-sdk.qiniu.com;bucket3.js-sdk.qiniu.com' + ) + + const key2 = getCacheKey({ + bucketServerHosts: [ + 'bucket2.js-sdk.qiniu.com', + 'bucket3.js-sdk.qiniu.com', + 'bucket1.js-sdk.qiniu.com' + ], + accessKey: 'fakeAK', + bucketName: 'fake-bucket' + }) + + expect(key2).toBe(key) + }) +}) + +describe('CachedRegionsProvider', () => { + class MockRegionsProvider implements RegionsProvider { + getRegions = jest.fn>, []>() + } + + const mockRegionsProvider = new MockRegionsProvider() + mockRegionsProvider.getRegions.mockImplementation( + () => Promise.resolve({ + result: [ + Region.fromRegionId('z0') + ] + }) + ) + const memoryCache = new MockCacheManager() + const persistentCache = new MockCacheManager() + + let cachedRegionsProvider: CachedRegionsProvider + + beforeEach(() => { + cachedRegionsProvider = new CachedRegionsProvider({ + baseRegionsProvider: mockRegionsProvider, + cacheKey: 'someCacheKey', + memoryCache, + persistentCache + }) + mockRegionsProvider.getRegions.mockClear() + memoryCache.get.mockClear() + persistentCache.get.mockClear() + }) + + test('test CachedRegionsProvider getRegions from memory', async () => { + memoryCache.get.mockResolvedValueOnce([Region.fromRegionId('z0')]) + await cachedRegionsProvider.getRegions() + + expect(mockRegionsProvider.getRegions).toBeCalledTimes(0) + expect(persistentCache.get).toBeCalledTimes(0) + expect(memoryCache.get).toBeCalledTimes(1) + }) + + test('test CachedRegionsProvider getRegions from persistence', async () => { + persistentCache.get.mockResolvedValueOnce([Region.fromRegionId('z0')]) + await cachedRegionsProvider.getRegions() + + expect(mockRegionsProvider.getRegions).toBeCalledTimes(0) + expect(persistentCache.get).toBeCalledTimes(1) + expect(memoryCache.get).toBeCalledTimes(1) + }) + + test('test CachedRegionsProvider wait fetching from baseProvider', async () => { + const waitFn = jest.fn() + mockRegionsProvider.getRegions.mockImplementation(() => new Promise(resolve => { + setTimeout(() => { + waitFn() + resolve({ result: [Region.fromRegionId('z0')] }) + }, 1000) + })) + await cachedRegionsProvider.getRegions() + expect(mockRegionsProvider.getRegions).toBeCalledTimes(1) + expect(waitFn).toBeCalledTimes(1) + }) + + test('test CachedRegionsProvider async refresh', async () => { + const waitFn = jest.fn() + + memoryCache.get.mockResolvedValueOnce([ + new Region({ + regionId: 'z0', + services: {}, + createdAt: new Date(0), + ttl: 1 + }) + ]) + mockRegionsProvider.getRegions.mockImplementation(() => new Promise(resolve => { + setTimeout(() => { + waitFn() + resolve({ result: [Region.fromRegionId('z0')] }) + }, 1000) + })) + await cachedRegionsProvider.getRegions() + expect(waitFn).toBeCalledTimes(0) + await new Promise(resolve => setTimeout(resolve, 1000)) + expect(mockRegionsProvider.getRegions).toBeCalledTimes(1) + expect(waitFn).toBeCalledTimes(1) + }) +}) diff --git a/packages/common/src/upload/common/region/providers/cached.ts b/packages/common/src/upload/common/region/providers/cached.ts new file mode 100644 index 00000000..ca703a28 --- /dev/null +++ b/packages/common/src/upload/common/region/providers/cached.ts @@ -0,0 +1,126 @@ +import { Result, isSuccessResult } from '../../../../types/types' +import { MemoryCacheManager, PersistentCacheManager } from '../../../../helper/cache' + +import { Region } from '../region' +import { RegionsProvider } from './types' + +const memoryCacheManager = new MemoryCacheManager() + +export function getCacheKey({ + bucketServerHosts, + accessKey, + bucketName +}: { + bucketServerHosts: string[] + accessKey: string + bucketName: string +}) { + bucketServerHosts = bucketServerHosts.sort() + return [ + 'regions', + accessKey, + bucketName, + bucketServerHosts.join(';') + ].join(':') +} + +export interface CachedRegionsProviderOptions { + baseRegionsProvider: RegionsProvider + cacheKey: string + memoryCache?: MemoryCacheManager + persistentCache?: PersistentCacheManager +} + +const singleFlights = new Map>>() + +export class CachedRegionsProvider implements RegionsProvider { + private readonly options: CachedRegionsProviderOptions + + constructor(options: CachedRegionsProviderOptions) { + this.options = options + } + + async getRegions(): Promise> { + const { + cacheKey + } = this.options + + let cachedRegions = await this.memoryCache.get(cacheKey) + if (cachedRegions?.length && cachedRegions.every(r => r.isLive)) { + return { + result: cachedRegions + } + } + + let persistCachedRegions: Region[] | null = null + if (this.persistentCache) { + persistCachedRegions = await this.persistentCache.get(cacheKey) + } + if (persistCachedRegions?.length && persistCachedRegions.every(r => r.isLive)) { + cachedRegions = persistCachedRegions + this.memoryCache.set(cacheKey, persistCachedRegions) + return { + result: persistCachedRegions + } + } + + if (cachedRegions?.length) { + // async refresh + this.refresh() + return { + result: cachedRegions + } + } + + return this.refresh() + } + + private get memoryCache() { + if (this.options.memoryCache) { + return this.options.memoryCache + } + return memoryCacheManager + } + + private get persistentCache() { + return this.options.persistentCache + } + + private async refresh(): Promise> { + const { + cacheKey + } = this.options + const fetchedRegionsResult = await this.fetchFromBase() + if (isSuccessResult(fetchedRegionsResult)) { + Promise.all([ + this.memoryCache.set(cacheKey, fetchedRegionsResult.result), + this.persistentCache?.set(cacheKey, fetchedRegionsResult.result) + ]) + .catch(err => { + // TODO: logger.warning(set cache error) + }) + } + return fetchedRegionsResult + } + + private async fetchFromBase(): Promise> { + const { + baseRegionsProvider, + cacheKey + } = this.options + + // single flight + let flight = singleFlights.get(cacheKey) + if (flight) { + return flight + } + + flight = baseRegionsProvider.getRegions() + singleFlights.set(cacheKey, flight) + try { + return await flight + } finally { + singleFlights.delete(cacheKey) + } + } +} diff --git a/packages/common/src/upload/common/region/providers/default.ts b/packages/common/src/upload/common/region/providers/default.ts new file mode 100644 index 00000000..60c344de --- /dev/null +++ b/packages/common/src/upload/common/region/providers/default.ts @@ -0,0 +1,57 @@ +import { ConfigApis, HostConfig } from '../../../../api' +import { MemoryCacheManager, PersistentCacheManager } from '../../../../helper/cache' +import { HttpClient, HttpProtocol } from '../../../../types/http' +import { Host, getDefaultHostsRetrier } from '../../host' + +import { Region } from '../region' +import { CachedRegionsProvider, getCacheKey } from './cached' +import { QueryRegionsProvider } from './query' + +interface GetDefaultRegionsProviderOptions { + /** Default is global */ + memoryCache?: MemoryCacheManager + /** + * Default is disabled + * But it uses local storage by default in browser and wechat-miniprogram package + * TODO: And it uses xxx by default in harmony package + */ + persistentCache?: PersistentCacheManager + + httpClient: HttpClient + bucketServerHosts: string[] + serverProtocol: HttpProtocol + accessKey: string + bucketName: string +} + +export function getDefaultRegionsProvider({ + memoryCache, + persistentCache, + + httpClient, + bucketServerHosts, + serverProtocol, + accessKey, + bucketName +}: GetDefaultRegionsProviderOptions) { + const retrier = getDefaultHostsRetrier({ + hosts: bucketServerHosts.map(h => new Host(h, serverProtocol)) + }) + const configApis = new ConfigApis('', httpClient) + return new CachedRegionsProvider({ + cacheKey: getCacheKey({ + bucketServerHosts, + accessKey, + bucketName + }), + memoryCache, + persistentCache, + baseRegionsProvider: new QueryRegionsProvider({ + configApis, + retrier, + serverProtocol, + accessKey, + bucketName + }) + }) +} diff --git a/packages/common/src/upload/common/region/providers/index.ts b/packages/common/src/upload/common/region/providers/index.ts new file mode 100644 index 00000000..ac147932 --- /dev/null +++ b/packages/common/src/upload/common/region/providers/index.ts @@ -0,0 +1,5 @@ +export * from './types' +export * from './cached' +export * from './query' +export * from './static' +export * from './default' diff --git a/packages/common/src/upload/common/region/providers/query.ts b/packages/common/src/upload/common/region/providers/query.ts new file mode 100644 index 00000000..d2b08fdc --- /dev/null +++ b/packages/common/src/upload/common/region/providers/query.ts @@ -0,0 +1,75 @@ +import { HttpProtocol } from '../../../../types/http' +import { Result, isCanceledResult, isErrorResult, isSuccessResult } from '../../../../types/types' +import { UploadError } from '../../../../types/error' +import { ConfigApis, HostConfig } from '../../../../api' +import { Retrier } from '../../../../helper/retry' + +import { Host } from '../../host' + +import { Region, ServiceName } from '../region' +import { RegionsProvider } from './types' + +function getRegionsFromHostConfig( + hostConfig: HostConfig, + serverProtocol: HttpProtocol +) { + return { + result: hostConfig.hosts.map(r => { + const services = { + [ServiceName.UP]: r.up.domains.concat(r.up.old) + .map(h => new Host(h, serverProtocol)), + [ServiceName.UP_ACC]: (r.up.acc_domains || []) + .map(h => new Host(h, serverProtocol)) + } + return new Region({ + regionId: r.region, + services + }) + }) + } +} + +export interface QueryRegionsProviderOptions { + configApis: ConfigApis + serverProtocol: HttpProtocol + retrier: Retrier> + accessKey: string + bucketName: string +} + +export class QueryRegionsProvider implements RegionsProvider { + + constructor( + private readonly options: QueryRegionsProviderOptions + ) {} + + async getRegions(): Promise> { + const { + retrier, + serverProtocol + } = this.options + const hostConfigResult = await retrier.tryDo(ctx => this.getHostConfig(ctx.host)) + if (!hostConfigResult) { + return { + error: new UploadError('InternalError', 'get host config failed') + } + } + if (!isSuccessResult(hostConfigResult)) { + return hostConfigResult + } + return getRegionsFromHostConfig(hostConfigResult.result, serverProtocol) + } + + private async getHostConfig(host: Host): Promise> { + const { + configApis, + accessKey, + bucketName + } = this.options + return configApis.getHostConfig({ + serverUrl: host.getUrl(), + assessKey: accessKey, + bucket: bucketName + }) + } +} diff --git a/packages/common/src/upload/common/region/providers/static.test.ts b/packages/common/src/upload/common/region/providers/static.test.ts new file mode 100644 index 00000000..bac304a9 --- /dev/null +++ b/packages/common/src/upload/common/region/providers/static.test.ts @@ -0,0 +1,21 @@ +import { isSuccessResult } from '../../../../types/types' +import { Region } from '../region' + +import { StaticRegionsProvider } from './static' + +describe('StaticRegionsProvider', () => { + test('test getRegions', async () => { + const regions = [ + Region.fromRegionId('z0'), + Region.fromRegionId('z1') + ] + const provider = new StaticRegionsProvider(regions) + const result = await provider.getRegions() + + if (!isSuccessResult(result)) { + throw new Error('expect the result is successful') + } + + expect(result.result).toEqual(regions) + }) +}) diff --git a/packages/common/src/upload/common/region/providers/static.ts b/packages/common/src/upload/common/region/providers/static.ts new file mode 100644 index 00000000..1cdce69e --- /dev/null +++ b/packages/common/src/upload/common/region/providers/static.ts @@ -0,0 +1,15 @@ +import { Region } from '../region' +import { RegionsProvider } from './types' + +export class StaticRegionsProvider implements RegionsProvider { + regions: Region[] + + constructor(regions: Region[]) { + this.regions = regions.slice() + } + async getRegions() { + return { + result: this.regions + } + } +} diff --git a/packages/common/src/upload/common/region/providers/types.ts b/packages/common/src/upload/common/region/providers/types.ts new file mode 100644 index 00000000..8e990d3c --- /dev/null +++ b/packages/common/src/upload/common/region/providers/types.ts @@ -0,0 +1,8 @@ +import { Result } from '../../../../types/types' +import { Retrier } from '../../../../helper/retry' + +import { Region } from '../region' + +export interface RegionsProvider { + getRegions(): Promise> +} diff --git a/packages/common/src/upload/common/region/region.test.ts b/packages/common/src/upload/common/region/region.test.ts new file mode 100644 index 00000000..794c5f85 --- /dev/null +++ b/packages/common/src/upload/common/region/region.test.ts @@ -0,0 +1,58 @@ +import { Region, ServiceName } from './region' + +describe('Region', () => { + test('test fromRegionId', () => { + const region = Region.fromRegionId('z0') + + expect(region.createdAt.getTime()).toBeGreaterThanOrEqual(Date.now() - 100) + expect(region.regionId).toBe('z0') + expect(region.ttl).toBe(-1) + expect(region.isLive).toBeTruthy() + expect(region.services[ServiceName.UP_ACC]).toBe([]) + expect(region.services[ServiceName.UP].map(h => h.getUrl())) + .toEqual([ + 'https://upload-z0.qiniup.com', + 'https://up-z0.qiniup.com', + 'https://upload-z0.qiniuio.com' + ]) + + const regionHttp = Region.fromRegionId('z1', 'HTTP') + expect(regionHttp.services[ServiceName.UP].map(h => h.getUrl())) + .toEqual([ + 'http://upload-z0.qiniup.com', + 'http://up-z0.qiniup.com', + 'http://upload-z0.qiniuio.com' + ]) + }) + + const regionIsLiveCases = [ + { + params: { + ttl: 0 + }, + expectVal: false + }, { + params: { + ttl: 10 + }, + expectVal: true + }, { + params: { + ttl: 10, + createTime: new Date(0) + }, + expectVal: false + } + ] + regionIsLiveCases.forEach(caseItem => { + test(`test region isLive ${JSON.stringify(caseItem.params)}`, () => { + const region = new Region({ + regionId: 'fake', + services: {}, + ...caseItem.params + }) + + expect(region.isLive).toBe(caseItem.expectVal) + }) + }) +}) diff --git a/packages/common/src/upload/common/region/region.ts b/packages/common/src/upload/common/region/region.ts new file mode 100644 index 00000000..00cb5f6b --- /dev/null +++ b/packages/common/src/upload/common/region/region.ts @@ -0,0 +1,83 @@ +import { HttpProtocol } from '../../../types/http' +import { ValueOf } from '../../../types/utility' +import { Host } from '../host' + +// TODO: Is it feasible to use enum type on all platforms? +// - yes, browser compile by tsc +// - yes, wechat mini program compile by tsc +// - ?, harmony +// enum ServiceName { +// UP = 'up', +// UP_ACC = 'up_acc' +// } +export const ServiceName = { + UP: 'up', + UP_ACC: 'up_acc' +} +export type TServiceName = ValueOf + +export interface RegionOptions { + regionId: string + services: Record + createdAt?: Date + ttl?: number // seconds +} + +// TODO: The Region class could be improved +// by implement `clone` method for immutable usage +export class Region { + static fromRegionId(regionId: string, protocol: HttpProtocol = 'HTTPS'): Region { + const upHosts = [ + `upload-${regionId}.qiniup.com`, + `up-${regionId}.qiniup.com`, + `upload-${regionId}.qiniuio.com` + ] + const services: Record = { + [ServiceName.UP]: upHosts.map(host => new Host(host, protocol)), + [ServiceName.UP_ACC]: [] + } + return new Region({ + regionId, + services + }) + } + + readonly regionId: string + readonly services: Record + readonly createdAt: Date + ttl: number // seconds + + constructor({ + regionId, + services = {}, + ttl = -1, + createdAt = new Date() + }: RegionOptions) { + this.regionId = regionId + + // handle services. make sure all entries are array. + this.services = services + for (const sn of Object.values(ServiceName)) { + if ( + !Array.isArray(this.services[sn]) + || !this.services[sn].length + ) { + this.services[sn] = [] + } + } + + this.ttl = ttl + this.createdAt = createdAt + } + + get isLive(): boolean { + if (this.ttl < 0) { + return true + } + const liveDuration = Math.floor( + (Date.now() - this.createdAt.getTime()) + / 1000 + ) + return liveDuration <= this.ttl + } +} diff --git a/packages/common/src/upload/common/region/retry/default.ts b/packages/common/src/upload/common/region/retry/default.ts new file mode 100644 index 00000000..b4594088 --- /dev/null +++ b/packages/common/src/upload/common/region/retry/default.ts @@ -0,0 +1,27 @@ +import { Retrier } from '../../../../helper/retry' +import { HostsRetryPolicy, shouldNextAttempt } from '../../host' + +import { RegionsProvider } from '../providers' +import { TServiceName } from '../region' +import { RegionsRetryPolicy } from './policies' + +interface RegionsRetrierOptions { + regionsProvider?: RegionsProvider + serviceNames: TServiceName[] +} + +export function getDefaultRegionsHostsRetrier({ + regionsProvider, + serviceNames +}: RegionsRetrierOptions) { + return new Retrier({ + policies: [ + new HostsRetryPolicy(), + new RegionsRetryPolicy({ + regionsProvider, + serviceNames + }) + ], + afterAttempt: shouldNextAttempt + }) +} diff --git a/packages/common/src/upload/common/region/retry/index.ts b/packages/common/src/upload/common/region/retry/index.ts new file mode 100644 index 00000000..bcf0d166 --- /dev/null +++ b/packages/common/src/upload/common/region/retry/index.ts @@ -0,0 +1,2 @@ +export * from './policies' +export * from './default' diff --git a/packages/common/src/upload/common/region/retry/policies.ts b/packages/common/src/upload/common/region/retry/policies.ts new file mode 100644 index 00000000..274fc442 --- /dev/null +++ b/packages/common/src/upload/common/region/retry/policies.ts @@ -0,0 +1,144 @@ +import { isCanceledResult, isErrorResult } from '../../../../types/types' +import { Attempt, RetryPolicy } from '../../../../helper/retry' + +import { Host } from '../../host' +import { Region, TServiceName } from '../region' +import { RegionsProvider } from '../providers' + +export interface RegionsRetryContext { + /** 操作使用的 host; */ + host?: Host + /** 备用域名 */ + alternativeHosts?: Host[] + /** 当前所使用的服务 */ + serviceName?: TServiceName + /** 备用服务 */ + alternativeServiceNames?: TServiceName[] + /** 当前使用的区域 */ + region?: Region + /** 备用区域 */ + alternativeRegions?: Region[] +} + +export interface RegionsRetryPolicyOptions { + regionsProvider?: RegionsProvider + serviceNames: TServiceName[] + // used for resume from break point + // preferredHostsProvider: PreferredHostsProvider + // the type `QueueContext['operationApiContext` depends on the attempt type + onChangeRegion?: (context: RegionsRetryContext) => Promise +} + +export class RegionsRetryPolicy implements RetryPolicy { + private regionsProvider?: RegionsProvider + private serviceNames: TServiceName[] + private onChangeRegion?: (context: RegionsRetryContext) => Promise + + constructor({ + regionsProvider, + serviceNames, + // preferredHostsProvider, + onChangeRegion + }: RegionsRetryPolicyOptions) { + this.regionsProvider = regionsProvider + this.serviceNames = serviceNames + this.onChangeRegion = onChangeRegion + } + + async initContext(context: RegionsRetryContext) { + if (!this.regionsProvider) { + return + } + await this.initRegions(context, this.regionsProvider) + await this.prepareHosts(context) + } + + async shouldRetry(attempt: Attempt): Promise { + const context = attempt.context + return !!context.alternativeRegions?.length + || !!context.alternativeServiceNames?.length + } + + async prepareRetry(attempt: Attempt) { + await this.prepareHosts(attempt.context) + } + + async isImportant() { + return false + } + + private async initRegions(context: RegionsRetryContext, regionsProvider: RegionsProvider) { + const regionsResult = await regionsProvider.getRegions() + // TODO: is there a need for resuming? + // seems the resuming not storage on local. so it's may ok to reupload from the beginning. + // const preferredHosts = await this.preferredHostsProvider.getPreferredHosts() + if (isErrorResult(regionsResult)) { + throw regionsResult.error + } + if (isCanceledResult(regionsResult)) { + return + } + const regions = regionsResult.result.slice() + context.region = regions.shift() + context.alternativeRegions = regions + context.alternativeServiceNames = this.serviceNames.slice() + } + + private async prepareHosts(context: RegionsRetryContext) { + let hosts: Host[] | undefined + let regionChanged = false + + while (!hosts?.length) { + const sn = context.alternativeServiceNames?.shift() + if (sn) { + context.serviceName = sn + hosts = context.region?.services[context.serviceName]?.slice() + continue + } + + const r = context.alternativeRegions?.shift() + if (r) { + context.region = r + context.alternativeServiceNames = this.serviceNames.slice() + context.serviceName = context.alternativeServiceNames.shift() + regionChanged = true + continue + } + + throw new Error('There isn\'t available service or region for next try') + } + + context.alternativeHosts = hosts + context.host = context.alternativeHosts.shift() + if (regionChanged && this.onChangeRegion) { + await this.onChangeRegion(context) + } + } +} + +export class AccUnavailableRetryPolicy implements RetryPolicy { + async initContext(context: RegionsRetryContext) { + // do nothing + } + + async shouldRetry(attempt: Attempt): Promise { + if (isErrorResult(attempt.result)) { + return attempt.result.error.message.includes('transfer acceleration is not configured on this bucket') + } + return false + } + + async prepareRetry(attempt: Attempt) { + const ctx = attempt.context + const nextServiceName = ctx.alternativeServiceNames?.shift() + if (!nextServiceName) { + throw new Error('There isn\'t available service for next try') + } + ctx.serviceName = nextServiceName + ctx.alternativeHosts = ctx.region?.services[ctx.serviceName]?.slice() + } + + async isImportant() { + return false + } +} diff --git a/packages/common/src/upload/direct/index.ts b/packages/common/src/upload/direct/index.ts index db7ffb63..fae24527 100644 --- a/packages/common/src/upload/direct/index.ts +++ b/packages/common/src/upload/direct/index.ts @@ -1,6 +1,6 @@ import { UploadFile } from '../../types/file' import { UploadTask, UploadConfig } from '../types' -import { ConfigApis, UploadApis } from '../../api' +import { UploadApis } from '../../api' import { HttpAbortController } from '../../types/http' import { generateRandomString } from '../../helper/string' import { Result, isErrorResult, isSuccessResult } from '../../types/types' @@ -8,11 +8,11 @@ import { Result, isErrorResult, isSuccessResult } from '../../types/types' import { Task, TaskQueue } from '../common/queue' import { UploadContext, updateTotalIntoProgress } from '../common/context' import { initUploadConfig } from '../common/config' -import { HostProgressKey, HostProvideTask } from '../common/host' +import { PrepareRegionsHostsTask, RegionsHostsProgressKey } from '../common/region' import { TokenProgressKey, TokenProvideTask } from '../common/token' export type DirectUploadProgressKey = - | HostProgressKey + | RegionsHostsProgressKey | TokenProgressKey | 'directUpload' @@ -73,14 +73,15 @@ class DirectUploadTask implements Task { return fileSizeResult } - this.abort = new HttpAbortController() - const result = await this.uploadApis.directUpload({ + const abort = new HttpAbortController() + this.abort = abort + const result = await this.context.operationApiRetrier.tryDo(() => this.uploadApis.directUpload({ file: this.file, - abort: this.abort, + abort, customVars: this.vars, token: this.context!.token!, metadata: fileMetaResult.result, - uploadHostUrl: this.context!.host!.getUrl(), + uploadHostUrl: this.context.operationApiContext.host!.getUrl(), fileName: filenameResult.result || generateRandomString(), // 接口要求必传且建议没有有效文件名时传随机字符串 key: fileKeyResult.result || filenameResult.result || undefined, onProgress: progress => { @@ -88,7 +89,7 @@ class DirectUploadTask implements Task { this.context!.progress.details.directUpload.size = fileSizeResult.result notify() } - }) + }), this.context.operationApiContext) if (isErrorResult(result)) { this.context.error = result.error @@ -104,18 +105,15 @@ class DirectUploadTask implements Task { export const createDirectUploadTask = (file: UploadFile, config: UploadConfig): UploadTask => { const normalizedConfig = initUploadConfig(config) - const uploadApis = new UploadApis(normalizedConfig.httpClient) - const configApis = new ConfigApis(normalizedConfig.apiServerUrl, normalizedConfig.httpClient) const context = new DirectUploadContext() + const prepareRegionsHostsTask = new PrepareRegionsHostsTask({ + config: normalizedConfig, + context + }) + const uploadApis = new UploadApis(normalizedConfig.httpClient) const directUploadTask = new DirectUploadTask(context, uploadApis, config.vars, file) const tokenProvideTask = new TokenProvideTask(context, normalizedConfig.tokenProvider) - const hostProvideTask = new HostProvideTask( - context, - configApis, - normalizedConfig.protocol, - normalizedConfig.uploadHosts - ) const taskQueue = new TaskQueue({ logger: { level: normalizedConfig.logLevel, prefix: 'directUploadQueue' }, @@ -124,7 +122,7 @@ export const createDirectUploadTask = (file: UploadFile, config: UploadConfig): taskQueue.enqueue( tokenProvideTask, - hostProvideTask, + prepareRegionsHostsTask, directUploadTask ) diff --git a/packages/common/src/upload/multipartv1/index.ts b/packages/common/src/upload/multipartv1/index.ts index 1c2f9b15..1bfed6c4 100644 --- a/packages/common/src/upload/multipartv1/index.ts +++ b/packages/common/src/upload/multipartv1/index.ts @@ -9,12 +9,12 @@ import { UploadConfig, UploadTask } from '../types' import { Task, TaskQueue } from '../common/queue' import { initUploadConfig } from '../common/config' -import { HostProgressKey, HostProvideTask } from '../common/host' +import { PrepareRegionsHostsTask, RegionsHostsProgressKey } from '../common/region' import { TokenProgressKey, TokenProvideTask } from '../common/token' import { UploadContext, updateTotalIntoProgress } from '../common/context' export type MultipartUploadV1ProgressKey = - | HostProgressKey + | RegionsHostsProgressKey | TokenProgressKey | 'completeMultipartUpload' | `multipartUpload:${number}` @@ -72,16 +72,17 @@ class MkblkTask implements Task { } } - this.abort = new HttpAbortController() + const abort = new HttpAbortController() + this.abort = abort this.updateProgress(false, 0, notify) - const uploadedPartResult = await this.uploadApis.mkblk({ - abort: this.abort, + const uploadedPartResult = await this.context.operationApiRetrier.tryDo(() => this.uploadApis.mkblk({ + abort, token: this.context!.token!, firstChunkBinary: this.blob, uploadHostUrl: this.context!.host!.getUrl(), onProgress: progress => { this.updateProgress(false, progress.percent, notify) } - }) + }), this.context.operationApiContext) if (isCanceledResult(uploadedPartResult)) { return uploadedPartResult @@ -146,9 +147,11 @@ class MkfileTask implements Task { const metadataResult = await this.file.metadata() if (!isSuccessResult(metadataResult)) return metadataResult - this.abort = new HttpAbortController() - const completeResult = await this.uploadApis.mkfile({ - abort: this.abort, + const abort = new HttpAbortController() + this.abort = abort + + const completeResult = await this.context.operationApiRetrier.tryDo(() => this.uploadApis.mkfile({ + abort, userVars: this.vars, token: this.context.token!, fileSize: fileSizeResult.result, @@ -157,7 +160,7 @@ class MkfileTask implements Task { lastCtxOfBlock: this.context.uploadBlocks.map(i => i.ctx), key: fileKeyResult.result || filenameResult.result || undefined, onProgress: progress => { this.updateProgress(progress.percent, notify) } - }) + }), this.context.operationApiContext) if (isSuccessResult(completeResult)) { this.context!.result = completeResult.result @@ -171,21 +174,20 @@ class MkfileTask implements Task { } } -// eslint-disable-next-line max-len -export const createMultipartUploadV1Task = (file: UploadFile, config: UploadConfig): UploadTask => { +export const createMultipartUploadV1Task = ( + file: UploadFile, + config: UploadConfig +): UploadTask => { const normalizedConfig = initUploadConfig(config) + const context = new MultipartUploadV1Context() + const prepareRegionsHostsTask = new PrepareRegionsHostsTask({ + config: normalizedConfig, + context + }) const uploadApis = new UploadApis(normalizedConfig.httpClient) - const configApis = new ConfigApis(normalizedConfig.apiServerUrl, normalizedConfig.httpClient) - const context = new MultipartUploadV1Context() const tokenProvideTask = new TokenProvideTask(context, normalizedConfig.tokenProvider) - const hostProvideTask = new HostProvideTask( - context, - configApis, - normalizedConfig.protocol, - normalizedConfig.uploadHosts - ) const mainQueue = new TaskQueue({ logger: { @@ -224,7 +226,7 @@ export const createMultipartUploadV1Task = (file: UploadFile, config: UploadConf mainQueue.enqueue( tokenProvideTask, - hostProvideTask, + prepareRegionsHostsTask, putQueue, mkfileTask ) diff --git a/packages/common/src/upload/multipartv2/index.ts b/packages/common/src/upload/multipartv2/index.ts index f33d0de4..82715390 100644 --- a/packages/common/src/upload/multipartv2/index.ts +++ b/packages/common/src/upload/multipartv2/index.ts @@ -9,11 +9,11 @@ import { UploadTask, UploadConfig } from '../types' import { UploadContext, updateTotalIntoProgress } from '../common/context' import { Task, TaskQueue } from '../common/queue' import { initUploadConfig } from '../common/config' -import { HostProgressKey, HostProvideTask } from '../common/host' +import { PrepareRegionsHostsTask, RegionsHostsProgressKey } from '../common/region' import { TokenProgressKey, TokenProvideTask } from '../common/token' export type MultipartUploadV2ProgressKey = - | HostProgressKey + | RegionsHostsProgressKey | TokenProgressKey | 'initMultipartUpload' | 'completeMultipartUpload' @@ -57,7 +57,8 @@ class InitPartUploadTask implements Task { } async process(notify: () => void): Promise { - this.abort = new HttpAbortController() + const abort = new HttpAbortController() + this.abort = abort this.updateProgress(0, notify) @@ -68,21 +69,22 @@ class InitPartUploadTask implements Task { if (!isSuccessResult(fileKeyResult)) return fileKeyResult // 首先检查 context 上的 upload id 有没有过期 - if (this.context.uploadPartId) { + const uploadPartId = this.context.uploadPartId + if (uploadPartId) { const nowTime = Date.now() / 1e3 // 上次的 uploadPartId 还没过期 (至少剩余 60s),继续使用 - if ((this.context.uploadPartId.expireAt - 60) > nowTime) { + if ((uploadPartId.expireAt - 60) > nowTime) { // 从服务端获取已上传的分片信息 - const uploadedPartResult = await this.uploadApis.listParts({ - abort: this.abort, + const uploadedPartResult = await this.context.operationApiRetrier.tryDo(() => this.uploadApis.listParts({ + abort, token: this.context!.token!, bucket: this.context.token!.bucket, uploadHostUrl: this.context!.host!.getUrl(), - uploadId: this.context.uploadPartId.uploadId, + uploadId: uploadPartId.uploadId, key: fileKeyResult.result || filenameResult.result || undefined, onProgress: progress => { this.updateProgress(progress.percent, notify) } - }) + }), this.context.operationApiContext) if (isCanceledResult(uploadedPartResult)) { return uploadedPartResult @@ -104,14 +106,14 @@ class InitPartUploadTask implements Task { } } - const initResult = await this.uploadApis.initMultipartUpload({ - abort: this.abort, + const initResult = await this.context.operationApiRetrier.tryDo(() => this.uploadApis.initMultipartUpload({ + abort, token: this.context!.token!, bucket: this.context!.token!.bucket, uploadHostUrl: this.context!.host!.getUrl(), key: fileKeyResult.result || filenameResult.result || undefined, onProgress: progress => { this.updateProgress(progress.percent, notify) } - }) + }), this.context.operationApiContext) if (isSuccessResult(initResult)) { this.context!.uploadPartId = initResult.result @@ -177,10 +179,11 @@ class UploadPartTask implements Task { const fileSizeResult = await this.file.size() if (!isSuccessResult(fileSizeResult)) return fileSizeResult - this.abort = new HttpAbortController() - const uploadPartResult = await this.uploadApis.uploadPart({ + const abort = new HttpAbortController() + this.abort = abort + const uploadPartResult = await this.context.operationApiRetrier.tryDo(() => this.uploadApis.uploadPart({ part: this.blob, - abort: this.abort, + abort, partIndex: this.index, token: this.context!.token!, bucket: this.context!.token!.bucket, @@ -188,7 +191,7 @@ class UploadPartTask implements Task { uploadId: this.context!.uploadPartId!.uploadId!, key: fileKeyResult.result || filenameResult.result || undefined, onProgress: progress => { this.updateProgress(false, fileSizeResult.result, progress.percent, notify) } - }) + }), this.context.operationApiContext) if (isSuccessResult(uploadPartResult)) { if (this.context!.uploadedParts == null) { @@ -256,9 +259,10 @@ class CompletePartUploadTask implements Task { .map(item => ({ partNumber: item.partNumber, etag: item.etag })) .sort((a, b) => a.partNumber - b.partNumber) - this.abort = new HttpAbortController() - const completeResult = await this.uploadApis.completeMultipartUpload({ - abort: this.abort, + const abort = new HttpAbortController() + this.abort = abort + const completeResult = await this.context.operationApiRetrier.tryDo(() => this.uploadApis.completeMultipartUpload({ + abort, parts: sortedParts, customVars: this.vars, token: this.context!.token!, @@ -269,7 +273,7 @@ class CompletePartUploadTask implements Task { key: fileKeyResult.result || filenameResult.result || undefined, fileName: filenameResult.result || generateRandomString(), // 和直传行为保持一致 onProgress: progress => { this.updateProgress(progress.percent, notify) } - }) + }), this.context.operationApiContext) if (isSuccessResult(completeResult)) { this.context!.result = completeResult.result @@ -283,21 +287,20 @@ class CompletePartUploadTask implements Task { } } -// eslint-disable-next-line max-len -export const createMultipartUploadV2Task = (file: UploadFile, config: UploadConfig): UploadTask => { +export const createMultipartUploadV2Task = ( + file: UploadFile, + config: UploadConfig +): UploadTask => { const normalizedConfig = initUploadConfig(config) + const context = new MultipartUploadV2Context() + const prepareRegionsHostsTask = new PrepareRegionsHostsTask({ + config: normalizedConfig, + context + }) const uploadApis = new UploadApis(normalizedConfig.httpClient) - const configApis = new ConfigApis(normalizedConfig.apiServerUrl, normalizedConfig.httpClient) - const context = new MultipartUploadV2Context() const tokenProvideTask = new TokenProvideTask(context, normalizedConfig.tokenProvider) - const hostProvideTask = new HostProvideTask( - context, - configApis, - normalizedConfig.protocol, - normalizedConfig.uploadHosts - ) const initPartUploadTask = new InitPartUploadTask(context, uploadApis, file) const completePartUploadTask = new CompletePartUploadTask(context, uploadApis, config.vars, file) @@ -338,7 +341,7 @@ export const createMultipartUploadV2Task = (file: UploadFile, config: UploadConf mainQueue.enqueue( tokenProvideTask, - hostProvideTask, + prepareRegionsHostsTask, initPartUploadTask, partQueue, completePartUploadTask diff --git a/packages/common/src/upload/types.ts b/packages/common/src/upload/types.ts index 0ff623ad..c86dd7cc 100644 --- a/packages/common/src/upload/types.ts +++ b/packages/common/src/upload/types.ts @@ -1,25 +1,38 @@ -import { UploadFile } from '../types/file' import { Result } from '../types/types' import { TokenProvider } from '../types/token' import { HttpClient, HttpProtocol } from '../types/http' import { LogLevel } from '../helper/logger' +import { MemoryCacheManager, PersistentCacheManager } from '../helper/cache' +import { Retrier } from '../helper/retry' import { UploadContext, Progress as BaseProgress } from './common/context' +import { Region, RegionsProvider } from './common/region' import { DirectUploadContext, DirectUploadProgressKey } from './direct' import { MultipartUploadV1Context, MultipartUploadV1ProgressKey } from './multipartv1' import { MultipartUploadV2Context, MultipartUploadV2ProgressKey } from './multipartv2' +type RegionsProviderGetter = (context: UploadContext) => RegionsProvider +type UploadRetrierGetter = (context: UploadContext) => Retrier + export interface UploadConfig { /** 自定义变量;本次上传任务的自定义变量,关于使用请参考:https://developer.qiniu.com/kodo/1235/vars#xvar */ vars?: Record /** + * @deprecated 使用 bucketServerHosts 替代 + * * 服务的接口地址;默认为七牛公有云,示例:https://api.qiniu.com * 该配置仅当未设置 uploadHosts 时生效 SDK 将会通过指定的 api 服务提供的接口来动态获取上传地址 * 私有云请联系集群运维人员,并确认集群是否支持 v4/query 接口 */ apiServerUrl?: string + /** + * 服务的接口地址;默认为七牛公有云, + * 该配置仅当未设置 uploadHosts 时生效 SDK 将会通过指定的 bucket 服务提供的接口来动态获取上传地址 + * 私有云请联系集群运维人员,并确认集群是否支持 v4/query 接口 + */ + bucketServerHosts?: string[] /** 上传服务地址,手动指定上传服务地址,示例:up.qiniu.com */ uploadHosts?: string[] /** 日志级别;默认为 NONE,即不输出任何日志 */ @@ -30,6 +43,31 @@ export interface UploadConfig { httpClient?: HttpClient /** 上传 token 提供器;SDK 通过该接口获取上传 Token */ tokenProvider: TokenProvider + /** + * 是否开启空间级别上传加速;默认为 false + * 当配置 uploadHosts 时,该配置无效 + * 开启后,将优先使用 regions 配置中获取到的加速上传域名进行上传 + * 若加速域名不可用或上传失败,默认重试器将使用其他域名进行重试 + */ + accelerateUploading?: boolean + /** + * 控制如何获取 bucket 所在区域提供器,该配置仅当未设置 uploadHosts 时生效 + * 默认通过 bucket 服务提供的接口来动态获取上传区域 + */ + regionsProviderGetter?: RegionsProviderGetter + /** + * 控制如何获取上传重试器 + * 默认将会开启域名重试、区域重试 + */ + uploadRetrierGetter?: UploadRetrierGetter + /** 内存缓存管理器;默认值为 undefined,且为开启状态 */ + regionsMemoryCache?: MemoryCacheManager | undefined + /** + * 持久化缓存管理器;默认值为 undefined,且为关闭状态 + * 但在 browser 与 wechat-miniprogram 包中默认使用 LocalStorage + * TODO: 在 harmony 包中默认使用 xxx + */ + regionsPersistentCache?: PersistentCacheManager | undefined } export type Context = DirectUploadContext | MultipartUploadV1Context | MultipartUploadV2Context diff --git a/packages/wechat-miniprogram/rollup.config.js b/packages/wechat-miniprogram/rollup.config.js index 6cd7fe50..a048ae95 100644 --- a/packages/wechat-miniprogram/rollup.config.js +++ b/packages/wechat-miniprogram/rollup.config.js @@ -2,31 +2,32 @@ import typescript from 'rollup-plugin-typescript2' import resolve from '@rollup/plugin-node-resolve' import commonjs from '@rollup/plugin-commonjs' import { babel } from '@rollup/plugin-babel' + import pkg from './package.json' const baseOutputOptions = { - sourcemap: true, - name: pkg.jsName + sourcemap: true, + name: pkg.jsName } const umdOutputOptions = { - ...baseOutputOptions, - file: pkg.main, - format: 'umd', - name: 'qiniu-js' + ...baseOutputOptions, + file: pkg.main, + format: 'umd', + name: 'qiniu-js' } export default [ - { - input: 'src/index.ts', - plugins: [ - resolve({ browser: true }), - commonjs(), - typescript(), - babel({ babelHelpers: 'bundled' }) - ], - output: [ - umdOutputOptions - ] - } + { + input: 'src/index.ts', + plugins: [ + resolve({ browser: true }), + commonjs(), + typescript(), + babel({ babelHelpers: 'bundled' }) + ], + output: [ + umdOutputOptions + ] + } ]