const fs = require('fs'); const path = require('path'); const archiver = require('archiver'); const axios = require('axios'); const COS = require('cos-nodejs-sdk-v5'); const dayjs = require('dayjs'); const cosConfig = require('../config/cos.js'); // 并发控制:同时最多执行的打包任务数 const MAX_CONCURRENT = 2; let runningCount = 0; module.exports = class extends think.Service { /** * 启动任务处理循环 */ async startProcessing() { if (runningCount >= MAX_CONCURRENT) return; const taskModel = this.model('export_task'); const task = await taskModel.getPendingTask(); if (think.isEmpty(task)) return; runningCount++; try { await this.processTask(task); } catch (e) { think.logger.error(`[ExportTask] 任务 ${task.task_no} 异常:`, e); await taskModel.where({ id: task.id }).update({ status: 3, error_log: e.message || '未知错误', finished_at: think.datetime(new Date()) }); } finally { runningCount--; // 继续处理下一个 setTimeout(() => this.startProcessing(), 500); } } /** * 处理单个任务 */ async processTask(task) { const taskModel = this.model('export_task'); const patientModel = this.model('patient'); // 标记为打包中 await taskModel.where({ id: task.id }).update({ status: 1, started_at: think.datetime(new Date()) }); // 解析参数 let fileTypes = []; let filterParams = {}; try { fileTypes = JSON.parse(task.file_types || '[]'); filterParams = JSON.parse(task.filter_params || '{}'); } catch (e) { throw new Error('任务参数解析失败'); } // 查询患者列表 let patients = []; const exportScope = filterParams.export_scope === 'selected' ? 'selected' : 'filter'; if (exportScope === 'selected') { const patientIds = Array.isArray(filterParams.patient_ids) ? Array.from(new Set(filterParams.patient_ids.map(id => parseInt(id, 10)).filter(Boolean))) : []; if (patientIds.length) { patients = await patientModel .where({ id: ['in', patientIds], is_deleted: 0 }) .order('id DESC') .select(); } } else { patients = await patientModel.getAll(filterParams); } if (!patients.length) { await taskModel.where({ id: task.id }).update({ status: 2, total_files: 0, processed_files: 0, finished_at: think.datetime(new Date()), error_log: '没有符合条件的患者数据' }); return; } // 收集所有需要下载的文件 const downloadList = this._buildDownloadList(patients, fileTypes); const totalFiles = downloadList.reduce((sum, p) => sum + p.files.length, 0); await taskModel.where({ id: task.id }).update({ total_files: totalFiles }); if (totalFiles === 0) { await taskModel.where({ id: task.id }).update({ status: 2, processed_files: 0, finished_at: think.datetime(new Date()), error_log: '所选类型下没有可导出的附件' }); return; } // 创建临时 ZIP 文件 const tmpDir = path.join(think.ROOT_PATH, 'runtime/export'); if (!fs.existsSync(tmpDir)) { fs.mkdirSync(tmpDir, { recursive: true }); } const zipFileName = `${task.task_no}.zip`; const zipFilePath = path.join(tmpDir, zipFileName); // 打包 let processedFiles = 0; const errors = []; await new Promise((resolve, reject) => { const output = fs.createWriteStream(zipFilePath); const archive = archiver('zip', { zlib: { level: 5 } }); output.on('close', resolve); archive.on('error', reject); archive.pipe(output); // 用 Promise 链串行处理每个患者 const processAll = async () => { for (const patient of downloadList) { for (const file of patient.files) { try { const response = await axios.get(file.url, { responseType: 'arraybuffer', timeout: 30000 }); archive.append(Buffer.from(response.data), { name: `${patient.folder}/${file.name}` }); processedFiles++; // 每处理10个文件更新一次进度 if (processedFiles % 10 === 0) { await taskModel.where({ id: task.id }).update({ processed_files: processedFiles }); } } catch (e) { errors.push(`${patient.folder}/${file.name}: ${e.message}`); think.logger.warn(`[ExportTask] 下载失败: ${file.url} - ${e.message}`); } } } }; processAll().then(() => { archive.finalize(); }).catch(reject); }); // 更新已处理数 await taskModel.where({ id: task.id }).update({ processed_files: processedFiles }); // 上传到 COS const cosKey = `uploads/cytx/zip/${dayjs().format('YYYY/MM')}/${zipFileName}`; const fileSize = fs.statSync(zipFilePath).size; const cos = new COS({ SecretId: cosConfig.secretId, SecretKey: cosConfig.secretKey }); await new Promise((resolve, reject) => { cos.putObject({ Bucket: cosConfig.bucket, Region: cosConfig.region, Key: cosKey, Body: fs.createReadStream(zipFilePath), ContentType: 'application/zip' }, (err, data) => { if (err) reject(err); else resolve(data); }); }); // 生成下载 URL const fileUrl = `${cosConfig.cdnUrl}/${cosKey}`; // 更新任务状态 await taskModel.where({ id: task.id }).update({ status: 2, file_url: fileUrl, file_size: fileSize, processed_files: processedFiles, finished_at: think.datetime(new Date()), error_log: errors.length ? errors.join('\n') : '' }); // 删除本地临时文件 try { if (fs.existsSync(zipFilePath)) fs.unlinkSync(zipFilePath); } catch (e) { think.logger.warn(`[ExportTask] 删除临时文件失败: ${e.message}`); } think.logger.info(`[ExportTask] 任务 ${task.task_no} 完成,共 ${processedFiles}/${totalFiles} 个文件,${errors.length} 个失败`); } /** * 构建下载文件列表 */ _buildDownloadList(patients, fileTypes) { const list = []; for (const p of patients) { const folder = `${p.name}_${p.patient_no}`; const files = []; // 实名认证照片 if (fileTypes.includes('id_photos')) { if (p.id_card_front) { files.push({ name: `身份证人像面${this._getExt(p.id_card_front)}`, url: p.id_card_front }); } if (p.id_card_back) { files.push({ name: `身份证国徽面${this._getExt(p.id_card_back)}`, url: p.id_card_back }); } if (p.photo) { files.push({ name: `免冠照片${this._getExt(p.photo)}`, url: p.photo }); } } // 上传资料(检查报告/诊断证明) if (fileTypes.includes('documents')) { let docs = []; try { docs = JSON.parse(p.documents || '[]'); } catch (e) { /* ignore */ } docs.forEach((url, idx) => { if (url) { files.push({ name: `检查报告_${idx + 1}${this._getExt(url)}`, url }); } }); } // 签字材料 if (fileTypes.includes('signs')) { if (p.sign_income) { files.push({ name: `个人可支配收入声明${this._getExt(p.sign_income)}`, url: p.sign_income }); } if (p.sign_privacy) { files.push({ name: `个人信息处理同意书${this._getExt(p.sign_privacy)}`, url: p.sign_privacy }); } if (p.sign_promise) { files.push({ name: `声明与承诺${this._getExt(p.sign_promise)}`, url: p.sign_promise }); } if (p.sign_privacy_jhr) { files.push({ name: `监护人个人信息处理同意书${this._getExt(p.sign_privacy_jhr)}`, url: p.sign_privacy_jhr }); } } // 送检信息附件(送检单照片) if (fileTypes.includes('sample_photos')) { let samplePhotos = []; try { samplePhotos = Array.isArray(p.sample_photos) ? p.sample_photos : JSON.parse(p.sample_photos || '[]'); } catch (e) { /* ignore */ } samplePhotos.forEach((url, idx) => { if (url) { files.push({ name: `送检单照片_${idx + 1}${this._getExt(url)}`, url }); } }); } if (files.length) { list.push({ folder, files }); } } return list; } /** * 从 URL 提取文件扩展名 */ _getExt(url) { if (!url) return '.jpg'; const pathname = url.split('?')[0]; const ext = path.extname(pathname); return ext || '.jpg'; } };