这段时间突然对nodejs有点兴趣,虽然之前完全没有接触过,不过本着实践出真知的精神,四处查阅资料,做出了一个小轮子——videomerger,接收数个视频文件的url,在服务器端下载后合并(以及可能的转码),然后把合并后文件的链接返回给用户下载。配合分析各种视频网站真实下载地址的脚本,可以实现用户输入视频播放地址后,得到合并后的单文件视频下载(类似clipconverter.cc)。
做的东西时间长了容易忘掉,于是写个文章帮助记忆,也分享一下思路。
代码已开源:
https://github.com/youran/videomerger
代码写得很随意,基本功能实现了就好,等我有兴致了再整整。轮子再破也是轮子嘛,哈哈。
如果有什么问题、建议或者发现bug,可以在Github上发issue讨论。
首先是服务器环境的搭建:
安装nodejs。这里选择从官网直接下载二进制包安装:
# wget http://nodejs.org/dist/latest-v0.12.x/node-v0.12.12-linux-x64.tar.gz
# tar --strip-components 1 -xzvf node-v* -C /usr/local
安装MongoDB:
# rpm --import https://www.mongodb.org/static/pgp/server-3.2.asc
# vim /etc/yum.repos.d/mongodb-org-3.2.repo
[mongodb-org-3.2]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/3.2/x86_64/
gpgcheck=1
enabled=1
# yum install mongodb-org
修改package.json,加入这次需要的module:
# cat package.json
{
"name": "videomerger",
"main": "server.js",
"dependencies": {
"mongoose": "~4.4.7",
"express": "~4.13.1",
"body-parser": "~1.15.0",
"wget-improved": "~1.3.0",
"fluent-ffmpeg": "~2.0.1"
}
}
# npm install
磨刀不误砍柴工,首先要确定整个程序的架构,否则各种返工(实际上开发过程中为了验证各种方法的可行性,确实也推翻重来了好几次orz)。
本项目使用下图所示的执行流程,由主进程(server.js)负责接受HTTP请求,根据POST请求中的url数据,fork合适数目的下载进程(wget.js)进行下载;当全部下载完成后,主进程fork一个负责转码及合并的子进程(ffmpeg.js),生成最终的单个视频文件。
运行流程定下来以后,定义数据库结构。这里给每个task(包含了下载及合并)定义了下述属性:
var mongoose = require('mongoose');
var Schema = mongoose.Schema;
var videoTask = new Schema({
taskId: Number,
downloadComplete: Boolean,
downloadError: Boolean,
mergeComplete: Boolean,
mergeError: Boolean,
filePath: String
});
module.exports = mongoose.model('videoTask', videoTask);
假设上述代码存放在models/videotask.js里。下面开搞主进程的代码。
把数据库连上,并且定义一个model实例,数据库查询就靠它了:
var mongoose = require('mongoose');
mongoose.connect('mongodb://127.0.0.1:27017/videoTasks');
var videoTask = require('./models/videotask');
顺便写一段代码,让程序被kill或者Ctrl-C终止时,mongoose不要打出一堆错误信息,改为一段friendly message:
// If the main process ends, close the Mongoose connection var db_server = process.env.DB_ENV || 'primary'; var gracefulExit = function() { mongoose.connection.close(function () { console.log('Mongoose default connection with DB :' + db_server + ' is disconnected through app termination'); process.exit(0); }); }; process.on('SIGINT', gracefulExit).on('SIGTERM', gracefulExit);
下面是服务器的HTTP请求处理部分。
先定义一个全局变量,后面用得上,便于任务状态管理:
// array member format: {id: id, wget: [{complete: false, error: false, proc: wgetProc1}, .... {...}], ffmpeg: [{complete: false, error: false, proc: ffpmegProc}]}
var taskArray = [];
以及使用HTTP request解析器:
app.use(bodyParser.urlencoded({ extended: true }));
app.use(bodyParser.json());
再定义一个Express的Router,然后给它写一个处理GET请求的方法,用于取得MongoDB数据库里的所有条目,便于后面的调试。也就是说直接以HTTP GET方式访问http://xxx.xxx.xxx.xxx/tasks时,下面.get()里的逻辑得到执行:
var router = express.Router();
router.route('/tasks')
.get(function(req, res){
videoTask.find(function(err, tasks){
if (err) {
res.json(result: 'Error fetching tasks');
return;
};
res.json(tasks);
});
})
可以用postman这个chrome app来发送HTTP请求做测试,例如:
接着稍微复杂点,给router加一个处理POST请求的方法,根据客户端发来的url数据,分配下载进程。也就是说当以HTTP POST方式(body中带url信息)访问http://xxx.xxx.xxx.xxx/tasks时,下面.post()里的逻辑得到执行:
.post(function(req, res){
// initialize database entry
var newTask = new videoTask();
newTask.downloadComplete = false;
newTask.downloadError = false;
newTask.mergeComplete= false;
newTask.mergeError = false;
newTask.filePath = '';
// create an id for each task
var id = Date.now();
newTask.taskId = id;
// save task properties to database
var saved = true;
newTask.save(function(err){
if (err) {
res.json({result: 'error'});
saved = false;
} else {
console.log('taskId = '+ id);
}
});
if (saved === false) {
return;
}
// register task to taskArray
var taskObj = {id: id, wget: [], ffmpeg: []};
taskArray.push(taskObj);
// populate urls first
console.log('req.body = ' + req.body);
var urls = [];
for (var key in req.body) {
// skip loop if the property is from prototype
if (!req.body.hasOwnProperty(key)) continue;
urls.push(req.body[key]);
}
// fork wget child-processes to do the download
urls.forEach(function(url, index) {
var wgetProc = child_process.fork('./wget.js', [url, id, index]);
var wgetObj = {complete: false, error: false, proc: wgetProc};
taskObj.wget.push(wgetObj);
// communicate with child process
wgetProc.on('message', function(msg){
if (msg.progress !== undefined) {
//TODO: retrieve progress from child
}
if (msg.complete === true) {
wgetObj.complete = true;
}
if (msg.error === true) {
wgetObj.error = true;
newTask.downloadError = true;
newTask.save();
}
});
});
res.json({result: 'accepted', taskId: id});
});
这段代码首先创建了一个videoTask的Document,填入初始化参数。其中,id作为每个task的识别子,我用javascript的Date.now()生成,基本保证了唯一性。
将新建的document写入数据库后,为当前task创建一个taskObj对象,push入之前定义的taskArray中。
接着,从POST数据中取得url信息,为每一个url分配一个wget子进程。这里使用了nodejs的fork方法来创建子进程,好处是由此建立了一条进程间信息通道,方便后面使用.on()方法获取子进程返回的信息。
新建一个taskObj写入当前wget子进程的状态信息,连同fork的返回值一起push到taskObj.wget数组中。万事OK后,就可以向客户端返回”成功接收请求”的json信息了。
接下来需要定义另一条route。前的GET方法返回了数据库中所有task的信息,而很多时候只需要取得某个特定id的task信息。因此有了下面这段:
// status enquiry by taskId
router.route('/tasks/:taskId')
.get(function(req, res) {
// fetch download url in database
videoTask.findOne({taskId: req.params.taskId}, function(err, doc) {
if (err) {
res.json({result: 'error'});
return;
}
if (!doc) {
res.json({result : 'not found'});
return;
}
if (doc.downloadError === true) {
res.json({result: 'download error'});
return;
}
if (doc.mergeError === true) {
res.json({result: 'merge error'});
return;
}
if (doc.downloadError === false && doc.mergeError === false && doc.mergeComplete === false){
var count = 0, total = 0;
// look up download progress in task array
taskArray.forEach(function(task, index){
if (task.id === parseInt(req.params.taskId, 10)) {
task.wget.forEach(function(proc, index){
if (proc.error === true ) { // not likely, just in case
res.json({result: 'download error'});
}
if (proc.complete === true) {
count++;
}
});
total = task.wget.length;
}
});
res.json({result: 'downloading', count: count, total: total});
return;
}
if (doc.downloadComplete === true && doc.mergeComplete === false) {
res.json({result: 'merging'});
return;
}
if (doc.mergeComplete === true) {
var fileName = doc.filePath.split('/').pop();
var url = 'http://your.download.server.com/' + fileName;
res.json({result: 'complete', url: url});
}
});
});
这段的作用,是当以HTTP GET方式访问http://xxx.xxx.xxx.xxx/tasks/111111时(111111是taskID),服务器返回这个task的状态信息。
代码里先根据url中taskID信息,在数据库查找对应的task。根据数据库返回的document内容,解析task状态返回给客户端。结果示例:
有了取单个task的方法,还可以定义一个删除单个task的方法:
.delete(function(req, res) {
// find task by id
var id = parstInt(req.params.task_id, 10);
videoTask.remove({taskId: id}, function(err, doc){
if (err) {
res.send('Error finding task by id.');
return;
}
taskArray.forEach(function(task, index){
if (task.id === id) {
//TODO: kill child-process
}
});
});
});
现在HTTP request处理部分已经告一段落,但是有个问题:当所有wget进程完成下载后,怎样通知主进程进行下一步的转码合并操作?
最初定义的那个taskArray就是为此准备的。主进程中定义了一个interval函数,每隔几秒种就结合数据库检查一下当前taskArray中管理的task,如果发现某个task的wget部分已成功完成所有下载,就fork一个ffmpeg子进程,继续后面的转码合并操作。
// set a timer to watch for status of tasks
var interval = setInterval(function() {
taskArray.forEach(function(task, index){
videoTask.findOne({taskId: task.id}, function(err, doc){
if (err) {
console.log('videoTask.findOne failed!');
} else {
// skip invalid(error or already in processing) tasks
if (doc.downloadError === true || doc.mergeError === true
|| doc.downloadComplete === true || doc.mergeComplete === true) {
return;
}
// fetch status of all wget processes in a task
var allWgetCompleted = true;
task.wget.forEach(function(wget, index){
if (wget.complete === false) {
allWgetCompleted = false;
}
});
// if all parts downloaded, set the downloadComplete flag,
// and fork a videomerge child-process
if (allWgetCompleted === true) {
doc.downloadComplete = true;
doc.save();
if (task.wget.length === undefined || task.wget.length < 2) {
return;
}
var ffpmegProc = child_process.fork('./ffmpeg.js', [doc.taskId, task.wget.length]);
var ffmpegObj = {complete: false, error: false, proc: ffpmegProc};
task.ffmpeg.push(ffmpegObj);
console.log('forked ffmpeg process, id=' + doc.taskId + ', fileNum=' + task.wget.length);
// communicate with child process
ffpmegProc.on('message', function(msg){
if (msg.complete === true) {
ffmpegObj.complete = true;
doc.mergeComplete = true; // when true, the whole processing complete
doc.filePath = msg.filePath;
doc.save();
}
if (msg.error === true) {
ffmpegObj.error = true;
doc.mergeError = true;
doc.save();
}
});
}
}
});
});
}, 5000); // check database every 5 seconds
其实还有一种更elegant的方法。之前fork那些wget子进程的时候不是监听了它们的message吗?可以在taskObj里再加些参数,用来记录当前已完成的下载数和总下载数,如果发现全部下载完成了,就fork一个ffmpeg子进程。这样就可以消除interval方式造成的数秒延迟,代码也更加易读。
到这里,主进程的内容已经基本完成了,指定个端口号,以及当前app所在的url目录,然后listen一下就大功告成:
var port = 1000;
app.use('/', router);
app.listen(port);
请移步下一篇文章:wget子进程的实现。
References:
[1] https://scotch.io/tutorials/build-a-restful-api-using-node-and-express-4
[2] http://www.tutorialspoint.com/nodejs/nodejs_restful_api.htm
[3] https://nodejs.org/api/child_process.html
[4] http://javascript.ruanyifeng.com/nodejs/child-process.html
[5] https://docs.nodejitsu.com/articles/command-line/how-to-parse-command-line-arguments
[6] http://mongoosejs.com/docs/schematypes.html
[7] http://mongoosejs.com/docs/2.7.x/docs/finding-documents.html
[8] https://gist.github.com/pasupulaphani/9463004
本文是悠然居(wordpress.youran.me)原创文章,如转载必须保留此告示。
本文为悠然居(https://wordpress.youran.me/)的原创文章,转载请注明出处!