大文件上传
背景
我们的SaaS平台中包含企业资料、会议视频等大文件的上传,如果不作特殊处理,将遇到以下问题:
- 网络中断、程序异常退出等问题导致文件上传失败,从而不得不全部重新上传
- 同一文件被不同用户反复上传,白白占用网络和服务器存储资源
因此,需要一个针对大文件上传的方案来解决上述问题。
问题和方案
大文件上传的普遍方案是文件分片上传。
如果把文件上传看做是一个不可分割的事务,那么分片的目标就是把一个耗时的大事务划分为一个一个的小事务。
由于公司使用BFF层来承接前端的文件请求,因此需要打通前后端所有跟文件上传的障碍。
分片上传的主要障碍集中在:
- 如何减少页面阻塞
- 前后端如何协调
- 代码如何组织
- 前端代码中的复杂逻辑
- BFF代码中的复杂逻辑
下面分开阐述
如何减少页面阻塞?
分片上传的一个首要目标就是要尽量避免相同的分片重复上传。服务器必须要能够识别来自各个客户端的各个上传请求中,是否存在与过去分片相同的上传请求。
服务器如何识别哪些分片是相同的呢?
首先需要对相同下一个准确的定义: 文件内容一样即为相同。
可是对文件内容进行二进制的对比是一个非常耗时的操作,于是可以选择基于内容的hash来进行对比。
hash是一种算法,可以将任何长度的数据转换为定长的数据,常见的hash算法包括MD5、SHA-1。
本节课使用MD5进行hash计算,使用第三方库 Spark-MD5。
不仅针对分片如此,针对整个文件也是如此。
可见,客户端需要承担两件重要的事情:
- 对文件进行分片,并计算每个分片的hash值
- 根据所有分片的hash值,计算整个文件的hash值
而计算Hash是一件CPU密集型的操作,如果不加处理将会导致长时间阻塞主线程。
为了解决这个问题,我们可以对大文件上传做一个大胆的假设:绝大部分的文件上传都是新文件上传。
有了这个假设,我们就无须等待整体hash的计算结果,直接上传分片即可,同时可以把分片操作使用多线程+异步的方式进行上传处理。
这样做的好处是,页面完全无阻塞,也无须等待整体hash即可启动上传,相比于传统方案:
- 对于新文件上传可以缩短整体上传时间,消除页面的阻塞。
- 对于旧文件上传可能会产生一些无效的请求,但这些请求仅传递的是hash,并不真实上传文件数据,所以对网络和服务器影响很小,加之旧文件上传情况相对较少,所以整体影响可以忽略不计。
前后端如何协调?
文件上传涉及到前后端的交互,需要建立一个标准的通信协议,通过协议要能完成下面几件核心交互:
- 创建文件
- hash校验
- 分片数据上传
- 分片合并
创建文件协议
当客户端发送分片到服务器时,需要告知服务器分片属于哪一次文件上传,因此需要一个唯一标识来标识某一次文件上传。
创建文件协议就是用于获取文件上传的唯一标识。
uploadToken
: 文件上传的唯一标识chunkSize
: 分片大小,单位字节
hash校验协议
客户端有时需要校验单个分片或整个文件的hash,服务器需要告知客户端它们目前的具体情况。
Upload-Hash-Type
: 取值chunk
或file
,分别代表分片hash和文件整体hashUpload-Hash
: 分片或文件的具体hash值hasFile
: 指示服务器是否已经存储了对应的分片或文件rest
: 当校验文件hash时特有的响应字段, 指示该文件还剩余哪些hash没有上传url
: 当校验文件hash时特有的响应字段, 如果该文件已完成上传出现该字段, 表示文件的请求地址
分片数据上传协议
通过此协议,上传具体的文件分片数据
分片合并协议
当所有的分片全部上传后,通过此协议请求服务器完成分片合并。
代码如何组织?
大文件上传SDK的搭建分为三层:
上传协议 约定前后端的通信格式
upload-core 基于协议的API,提供协议字段的创建、读取、前后端通用工具函数等核心功能
upload-client 应用于客户端的SDK
upload-server 应用于BFF的SDK
upload-core中的通用函数
EventEmitter
统一前后端涉及到的基于各种事件的处理,使用发布订阅模式提供统一的EventEmitter类。
- 前端可能出现的各种事件: 上传进度改变事件、上传暂停/恢复事件等等
- 后端可能出现的各种事件: 分片写入完成事件、分片合并完成事件等等
export class EventEmitter<T extends string> {
private events: Map<T, Set<Function>>;
constructor() {
this.events = new Map();
}
on(event: T, listener: Function) {
if (!this.events.has(event)) {
this.events.set(event, new Set());
}
this.events.get(event)!.add(listener);
}
off(event: T, listener: Function) {
if (!this.events.has(event)) {
return;
}
this.events.get(event)!.delete(listener);
}
once(event: T, listener: Function) {
const onceListener = (...args: any[]) => {
listener(...args);
this.off(event, onceListener);
};
this.on(event, onceListener);
}
emit(event: T, ...args: any[]) {
if (!this.events.has(event)) {
return;
}
this.events.get(event)!.forEach((listener) => {
listener(...args);
});
}
}
TaskQueue
为支撑前后端的多任务并发执行,提供TaskQueue类
- 前端可能的并发执行: 并发请求
- 后端可能的并发执行: 并发的分片Hash校验
// 任务构造器
export class Task {
fn: Function; // 任务关联的执行函数
payload?: any; // 任务关联的其他信息
constructor(fn: Function, payload?: any) {
this.fn = fn;
this.payload = payload;
}
// 执行任务
run() {
return this.fn(this.payload);
}
}
// 可并发执行的任务队列
export class TaskQueue extends EventEmitter<'start' | 'pause' | 'drain'> {
// 待执行的任务
private tasks: Set<Task> = new Set();
// 当前正在执行的任务数
private currentCount = 0;
// 任务状态
private status: 'paused' | 'running' = 'paused';
// 最大并发数
private concurrency: number = 4;
constructor(concurrency: number = 4) {
super();
this.concurrency = concurrency;
}
// 添加任务
add(...tasks: Task[]) {
for (const t of tasks) {
this.tasks.add(t);
}
}
// 添加任务并启动执行
addAndStart(...tasks: Task[]) {
this.add(...tasks);
this.start();
}
// 启动任务
start() {
if (this.status === 'running') {
return; // 任务正在进行中,结束
}
if (this.tasks.size === 0) {
// 当前已无任务,触发drain事件
this.emit('drain');
return;
}
// 设置任务状态为running
this.status = 'running';
this.emit('start'); // 触发start事件
this.runNext(); // 开始执行下一个任务
}
// 取出第一个任务
private takeHeadTask() {
const task = this.tasks.values().next().value;
if (task) {
this.tasks.delete(task);
}
return task;
}
// 执行下一个任务
private runNext() {
if (this.status !== 'running') {
return; // 如果整体的任务状态不是running,结束
}
if (this.currentCount >= this.concurrency) {
// 并发数已满,结束
return;
}
// 取出第一个任务
const task = this.takeHeadTask();
if (!task) {
// 没有任务了
this.status = 'paused'; // 暂停执行
this.emit('drain'); // 触发drain事件
return;
}
this.currentCount++; // 当前任务数+1
// 执行任务
Promise.resolve(task.run()).finally(() => {
// 任务执行完成后,当前任务数-1,继续执行下一个任务
this.currentCount--;
this.runNext();
});
}
// 暂停任务
pause() {
this.status = 'paused';
this.emit('pause');
}
}
前端代码中的复杂问题
前端涉及到两个核心问题:
- 如何对文件分片
- 如何控制请求
如何对文件分片?
首先要实现分片对象的处理
// chunk.ts
export interface Chunk {
blob: Blob; // 分片的二进制数据
start: number; // 分片的起始位置
end: number; // 分片的结束位置
hash: string; // 分片的hash值
index: number; // 分片在文件中的索引
}
// 创建一个不带hash的chunk
export function createChunk(
file: File,
index: number,
chunkSize: number
): Chunk {
const start = index * chunkSize;
const end = Math.min((index + 1) * chunkSize, file.size);
const blob = file.slice(start, end);
return {
blob,
start,
end,
hash: '',
index,
};
}
// 计算chunk的hash值
export function calcChunkHash(chunk: Chunk): Promise<string> {
return new Promise((resolve) => {
const spark = new SparkMD5.ArrayBuffer();
const fileReader = new FileReader();
fileReader.onload = (e) => {
spark.append(e.target?.result as ArrayBuffer);
resolve(spark.end());
};
fileReader.readAsArrayBuffer(chunk.blob);
});
}
接下来,要对整个文件进行分片,分片的方式有很多,比如:
- 普通分片
- 基于多线程的分片
- 基于主线程时间切片的分片(React Fiber)
- 其他分片模式
考虑到通用性,必须要向上层提供不同的分片模式,同时还要允许上层自定义分片模式,因此在设计上,使用基于抽象类的模板模式来完成处理。
// ChunkSplitor.ts
// 分片的相关事件
// chunks: 一部分分片产生了
// wholeHash: 整个文件的hash计算完成
// drain: 所有分片处理完成
export type ChunkSplitorEvents = 'chunks' | 'wholeHash' | 'drain';
export abstract class ChunkSplitor extends EventEmitter<ChunkSplitorEvents> {
protected chunkSize: number; // 分片大小(单位字节)
protected file: File; // 待分片的文件
protected hash?: string; // 整个文件的hash
protected chunks: Chunk[]; // 分片列表
private handleChunkCount = 0; // 已计算hash的分片数量
private spark = new SparkMD5(); // 计算hash的工具
private hasSplited = false; // 是否已经分片
constructor(file: File, chunkSize: number = 1024 * 1024 * 5) {
super();
this.file = file;
this.chunkSize = chunkSize;
// 获取分片数组
const chunkCount = Math.ceil(this.file.size / this.chunkSize);
this.chunks = new Array(chunkCount)
.fill(0)
.map((_, index) => createChunk(this.file, index, this.chunkSize));
}
split() {
if (this.hasSplited) {
return;
}
this.hasSplited = true;
const emitter = new EventEmitter<'chunks'>();
const chunksHanlder = (chunks: Chunk[]) => {
this.emit('chunks', chunks);
chunks.forEach((chunk) => {
this.spark.append(chunk.hash);
});
this.handleChunkCount += chunks.length;
if (this.handleChunkCount === this.chunks.length) {
// 计算完成
emitter.off('chunks', chunksHanlder);
this.emit('wholeHash', this.spark.end());
this.spark.destroy();
this.emit('drain');
}
};
emitter.on('chunks', chunksHanlder);
this.calcHash(this.chunks, emitter);
}
// 计算每一个分片的hash
abstract calcHash(chunks: Chunk[], emitter: EventEmitter<'chunks'>): void;
// 分片完成后一些需要销毁的工作
abstract dispose(): void;
}
基于此抽象类,即可实现多种形式的分片模式,每种模式只需要继承ChunkSplitor
,实现计算分片的hash即可。
比如,基于多线程的分片类可以非常简单的实现:
// MutilThreadSplitor.ts
export class MultiThreadSplitor extends ChunkSplitor {
private workers: Worker[] = new Array(navigator.hardwareConcurrency || 4)
.fill(0)
.map(
() =>
new Worker(new URL('./SplitWorker.ts', import.meta.url), {
type: 'module',
})
);
calcHash(chunks: Chunk[], emitter: EventEmitter<'chunks'>): void {
const workerSize = Math.ceil(chunks.length / this.workers.length);
for(let i = 0; i < this.workers.length; i++) {
const worker = this.workers[i];
const start = i * workerSize;
const end = Math.min((i + 1) * workerSize, chunks.length);
const workerChunks = chunks.slice(start, end);
worker.postMessage(workerChunks);
worker.onmessage = (e) => {
emitter.emit('chunks', e.data);
};
}
}
dispose(): void {
this.workers.forEach((worker) => worker.terminate());
}
}
// SplitWorker.ts
onmessage = function (e) {
const chunks = e.data as Chunk[];
for (const chunk of chunks) {
calcChunkHash(chunk).then((hash) => {
chunk.hash = hash;
postMessage([chunk]);
});
}
};
如何控制请求?
对请求的控制涉及到多个方面的问题:
如何充分利用带宽 分片上传中涉及到大量的请求发送,这些请求既不能一起发送造成网络阻塞,也不能依次发送浪费带宽资源,因此需要有请求并发控制的机制。 方案: 利用基础库的TaskQueue实现并发控制
如何与上层请求库解耦
考虑到通用性,上层应用可能会使用各种请求库来发送请求,因此前端SDK不能绑定任何的请求库。 方案: 这里使用策略模式对请求库解耦。
这个类比较复杂,下面贴出核心代码结构
请求策略
// 请求策略
export interface RequestStrategy {
// 文件创建请求,返回token
createFile(file: File): Promise<string>;
// 分片上传请求
uploadChunk(chunk: Chunk): Promise<void>;
// 文件合并请求,返回文件url
mergeFile(token: string): Promise<string>;
// hash校验请求
patchHash<T extends 'file' | 'chunk'>(
token: string,
hash: string,
type: T
): Promise<
T extends 'file'
? { hasFile: boolean }
: { hasFile: boolean; rest: number[]; url: string }
>;
}
请求控制
export class UploadController {
private requestStrategy: RequestStrategy; // 请求策略,没有传递则使用默认策略
private splitStrategy: ChunkSplitor; // 分片策略,没有传递则默认多线程分片
private taskQueue: TaskQueue; // 任务队列
// 其他属性略
// 初始化
async init() {
// 获取文件token
this.token = await this.requestStrategy.createFile(this.file);
// 分片事件监听
this.splitStrategy.on('chunks', this.handleChunks.bind(this));
this.splitStrategy.on('wholeHash', this.handleWholeHash.bind(this));
}
// 分片事件处理
private handleChunks(chunks: Chunk[]) {
// 分片上传任务加入队列
chunks.forEach((chunk) => {
this.taskQueue.addAndStart(new Task(this.uploadChunk.bind(this), chunk));
});
}
async uploadChunk(chunk: Chunk) {
// hash校验
const resp = await this.requestStrategy.patchHash(this.token, chunk.hash, 'chunk');
if (resp.hasFile) {
// 文件已存在
return;
}
// 分片上传
await this.requestStrategy.uploadChunk(chunk, this.uploadEmitter);
}
// 整体hash事件处理
private async handleWholeHash(hash: string) {
// hash校验
const resp = await this.requestStrategy.patchHash(this.token, hash, 'file');
if (resp.hasFile) {
// 文件已存在
this.emit('end', resp.url);
return;
}
// 根据resp.rest重新编排后续任务
// ...
}
}
后端代码中的复杂问题
相对于客户端而言,服务器面临着更大的挑战。
如何隔离不同的文件上传?
在创建文件协议中,服务器使用uuid + jwt 生成一个不可篡改的唯一编码,用于标识不同的文件上传。
如何保证分片不重复?
这里的重复是指:
- 不保存重复分片
- 不上传重复分片
这就要求分片跨文件唯一,并且永不删除
也就是说: 服务器并不保存合并之后的文件,仅记录文件中的分片顺序
合并分片到底做什么?
合并会造成很多问题,最主要的是:
- 极其耗时
- 数据冗余
所以服务器并不发生真正的合并,而是在数据库中记录文件中包含的分片。
因此,合并操作时,服务器仅做简单的处理:
- 校验文件大小
- 校验文件hash
- 标记文件状态
- 生成文件访问地址
- ...
以上操作效率极高
访问文件怎么办?
由于服务器并未发生真正的文件合并,当后续请求该文件时,服务器需要动态处理,具体做法是:
- 服务器收到对文件的请求,并在数据库中找到了对应的文件
- 服务器读取文件的所有分片ID,依次找到对应的分片文件
- 服务器利用TaskQueue的并发控制能力,逐步产生文件读取流,并利用管道直接输出到网络I/O