A simple RESTful web service in nodejs (1)

这段时间突然对nodejs有点兴趣,虽然之前完全没有接触过,不过本着实践出真知的精神,四处查阅资料,做出了一个小轮子——videomerger,接收数个视频文件的url,在服务器端下载后合并(以及可能的转码),然后把合并后文件的链接返回给用户下载。配合分析各种视频网站真实下载地址的脚本,可以实现用户输入视频播放地址后,得到合并后的单文件视频下载(类似clipconverter.cc)。

做的东西时间长了容易忘掉,于是写个文章帮助记忆,也分享一下思路。
代码已开源:
https://github.com/youran/videomerger
代码写得很随意,基本功能实现了就好,等我有兴致了再整整。轮子再破也是轮子嘛,哈哈。
如果有什么问题、建议或者发现bug,可以在Github上发issue讨论。

首先是服务器环境的搭建:
安装nodejs。这里选择从官网直接下载二进制包安装:

# wget http://nodejs.org/dist/latest-v0.12.x/node-v0.12.12-linux-x64.tar.gz
# tar --strip-components 1 -xzvf node-v* -C /usr/local

安装MongoDB:

# rpm --import https://www.mongodb.org/static/pgp/server-3.2.asc
# vim /etc/yum.repos.d/mongodb-org-3.2.repo
[mongodb-org-3.2]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/3.2/x86_64/
gpgcheck=1
enabled=1
# yum install mongodb-org

修改package.json,加入这次需要的module:

# cat package.json
{
  "name": "videomerger",
  "main": "server.js",
  "dependencies": {
    "mongoose": "~4.4.7",
    "express": "~4.13.1",
    "body-parser": "~1.15.0",
    "wget-improved": "~1.3.0",
    "fluent-ffmpeg": "~2.0.1"
  }
}

# npm install

磨刀不误砍柴工,首先要确定整个程序的架构,否则各种返工(实际上开发过程中为了验证各种方法的可行性,确实也推翻重来了好几次orz)。
本项目使用下图所示的执行流程,由主进程(server.js)负责接受HTTP请求,根据POST请求中的url数据,fork合适数目的下载进程(wget.js)进行下载;当全部下载完成后,主进程fork一个负责转码及合并的子进程(ffmpeg.js),生成最终的单个视频文件。
download

运行流程定下来以后,定义数据库结构。这里给每个task(包含了下载及合并)定义了下述属性:

var mongoose = require('mongoose');
var Schema = mongoose.Schema;

var videoTask = new Schema({
    taskId: Number,
    downloadComplete: Boolean,
    downloadError: Boolean,
    mergeComplete: Boolean,
    mergeError: Boolean,
    filePath: String
});

module.exports = mongoose.model('videoTask', videoTask);

假设上述代码存放在models/videotask.js里。下面开搞主进程的代码。

把数据库连上,并且定义一个model实例,数据库查询就靠它了:

var mongoose = require('mongoose');
mongoose.connect('mongodb://127.0.0.1:27017/videoTasks');
var videoTask = require('./models/videotask');

顺便写一段代码,让程序被kill或者Ctrl-C终止时,mongoose不要打出一堆错误信息,改为一段friendly message:

// If the main process ends, close the Mongoose connection
var db_server  = process.env.DB_ENV || 'primary';
var gracefulExit = function() {
    mongoose.connection.close(function () {
        console.log('Mongoose default connection with DB :' + db_server + ' is disconnected through app termination');
        process.exit(0);
    });
};
process.on('SIGINT', gracefulExit).on('SIGTERM', gracefulExit);

下面是服务器的HTTP请求处理部分。
先定义一个全局变量,后面用得上,便于任务状态管理:

// array member format: {id: id, wget: [{complete: false, error: false, proc: wgetProc1}, .... {...}], ffmpeg: [{complete: false, error: false, proc: ffpmegProc}]}
var taskArray = [];

以及使用HTTP request解析器:

app.use(bodyParser.urlencoded({ extended: true }));
app.use(bodyParser.json());

再定义一个Express的Router,然后给它写一个处理GET请求的方法,用于取得MongoDB数据库里的所有条目,便于后面的调试。也就是说直接以HTTP GET方式访问http://xxx.xxx.xxx.xxx/tasks时,下面.get()里的逻辑得到执行:

var router = express.Router();

router.route('/tasks')
.get(function(req, res){
    videoTask.find(function(err, tasks){
        if (err) {
            res.json(result: 'Error fetching tasks');
            return;
    };
        res.json(tasks);
    });
})

可以用postman这个chrome app来发送HTTP请求做测试,例如:
restful-get-all

接着稍微复杂点,给router加一个处理POST请求的方法,根据客户端发来的url数据,分配下载进程。也就是说当以HTTP POST方式(body中带url信息)访问http://xxx.xxx.xxx.xxx/tasks时,下面.post()里的逻辑得到执行:

.post(function(req, res){
    // initialize database entry
    var newTask = new videoTask();
    newTask.downloadComplete = false;
    newTask.downloadError = false;
    newTask.mergeComplete= false;
    newTask.mergeError = false;
    newTask.filePath = '';

    // create an id for each task
    var id = Date.now();
    newTask.taskId = id;

    // save task properties to database
    var saved = true;
    newTask.save(function(err){
        if (err) {
            res.json({result: 'error'});
            saved = false;
        } else {
            console.log('taskId = '+ id);
        }
    });
    if (saved === false) {
        return;
    }

    // register task to taskArray
    var taskObj = {id: id, wget: [], ffmpeg: []};
    taskArray.push(taskObj);

    // populate urls first
    console.log('req.body = ' + req.body);
    var urls = [];
    for (var key in req.body) {
        // skip loop if the property is from prototype
        if (!req.body.hasOwnProperty(key)) continue;

        urls.push(req.body[key]);
    }

    // fork wget child-processes to do the download
    urls.forEach(function(url, index) {
        var wgetProc = child_process.fork('./wget.js', [url, id, index]);
        var wgetObj = {complete: false, error: false, proc: wgetProc};
        taskObj.wget.push(wgetObj);

        // communicate with child process
        wgetProc.on('message', function(msg){
            if (msg.progress !== undefined) {
                //TODO: retrieve progress from child
            }

            if (msg.complete === true) {
                wgetObj.complete = true;
            }

            if (msg.error === true) {
                wgetObj.error = true;
                newTask.downloadError = true;
                newTask.save();
            }
        });
    });

    res.json({result: 'accepted', taskId: id});
});

这段代码首先创建了一个videoTask的Document,填入初始化参数。其中,id作为每个task的识别子,我用javascript的Date.now()生成,基本保证了唯一性。
将新建的document写入数据库后,为当前task创建一个taskObj对象,push入之前定义的taskArray中。
接着,从POST数据中取得url信息,为每一个url分配一个wget子进程。这里使用了nodejs的fork方法来创建子进程,好处是由此建立了一条进程间信息通道,方便后面使用.on()方法获取子进程返回的信息。
新建一个taskObj写入当前wget子进程的状态信息,连同fork的返回值一起push到taskObj.wget数组中。万事OK后,就可以向客户端返回”成功接收请求”的json信息了。

接下来需要定义另一条route。前的GET方法返回了数据库中所有task的信息,而很多时候只需要取得某个特定id的task信息。因此有了下面这段:

// status enquiry by taskId
router.route('/tasks/:taskId')
.get(function(req, res) {
    // fetch download url in database
    videoTask.findOne({taskId: req.params.taskId}, function(err, doc) {
        if (err) {
            res.json({result: 'error'});
            return;
        }
        
        if (!doc) {
            res.json({result : 'not found'});
            return;
        }

        if (doc.downloadError === true) {
            res.json({result: 'download error'});
            return;
        }

        if (doc.mergeError === true) {
            res.json({result: 'merge error'});
            return;
        }

        if (doc.downloadError === false && doc.mergeError === false && doc.mergeComplete === false){
            var count = 0, total = 0;
            // look up download progress in task array
            taskArray.forEach(function(task, index){
                if (task.id === parseInt(req.params.taskId, 10)) {
                    task.wget.forEach(function(proc, index){
                        if (proc.error === true ) {    // not likely, just in case
                            res.json({result: 'download error'});
                        }

                        if (proc.complete === true) {
                            count++;
                        }
                    });
                    total = task.wget.length;
                }
            });

            res.json({result: 'downloading', count: count, total: total});
            return;
        }

        if (doc.downloadComplete === true && doc.mergeComplete === false) {
            res.json({result: 'merging'});
            return;
        }

        if (doc.mergeComplete === true) {
            var fileName = doc.filePath.split('/').pop();
            var url = 'http://your.download.server.com/' + fileName;
            res.json({result: 'complete', url: url});
        }
    });
});

这段的作用,是当以HTTP GET方式访问http://xxx.xxx.xxx.xxx/tasks/111111时(111111是taskID),服务器返回这个task的状态信息。
代码里先根据url中taskID信息,在数据库查找对应的task。根据数据库返回的document内容,解析task状态返回给客户端。结果示例:
restful-get-one

有了取单个task的方法,还可以定义一个删除单个task的方法:

.delete(function(req, res) {
    // find task by id
    var id = parstInt(req.params.task_id, 10);
    videoTask.remove({taskId: id}, function(err, doc){
        if (err) {
            res.send('Error finding task by id.');
            return;
        }

        taskArray.forEach(function(task, index){
            if (task.id === id) {
                //TODO: kill child-process
            }
        });
    });
});

现在HTTP request处理部分已经告一段落,但是有个问题:当所有wget进程完成下载后,怎样通知主进程进行下一步的转码合并操作?
最初定义的那个taskArray就是为此准备的。主进程中定义了一个interval函数,每隔几秒种就结合数据库检查一下当前taskArray中管理的task,如果发现某个task的wget部分已成功完成所有下载,就fork一个ffmpeg子进程,继续后面的转码合并操作。

// set a timer to watch for status of tasks
var interval = setInterval(function() {
    taskArray.forEach(function(task, index){
        videoTask.findOne({taskId: task.id}, function(err, doc){
            if (err) {
                console.log('videoTask.findOne failed!');
            } else {
                // skip invalid(error or already in processing) tasks
                if (doc.downloadError === true || doc.mergeError === true
                    || doc.downloadComplete === true || doc.mergeComplete === true) {
                    return;
                }
                
                // fetch status of all wget processes in a task
                var allWgetCompleted = true;
                task.wget.forEach(function(wget, index){
                    if (wget.complete === false) {
                        allWgetCompleted = false;
                    }
                });
                
                // if all parts downloaded, set the downloadComplete flag,
                // and fork a videomerge child-process
                if (allWgetCompleted === true) {
                    doc.downloadComplete = true;
                    doc.save();

                    if (task.wget.length === undefined || task.wget.length < 2) {
                        return;
                    }
                    var ffpmegProc = child_process.fork('./ffmpeg.js', [doc.taskId, task.wget.length]);
                    var ffmpegObj = {complete: false, error: false, proc: ffpmegProc};
                    task.ffmpeg.push(ffmpegObj);
                    console.log('forked ffmpeg process, id=' + doc.taskId + ', fileNum=' + task.wget.length);

                    // communicate with child process
                    ffpmegProc.on('message', function(msg){
                        if (msg.complete === true) {
                            ffmpegObj.complete = true;
                            doc.mergeComplete = true;    // when true, the whole processing complete
                            doc.filePath = msg.filePath;
                            doc.save();
                        }

                        if (msg.error === true) {
                            ffmpegObj.error = true;
                            doc.mergeError = true;
                            doc.save();
                        }
                    });
                }
            }
        });
    });
}, 5000);   // check database every 5 seconds

其实还有一种更elegant的方法。之前fork那些wget子进程的时候不是监听了它们的message吗?可以在taskObj里再加些参数,用来记录当前已完成的下载数和总下载数,如果发现全部下载完成了,就fork一个ffmpeg子进程。这样就可以消除interval方式造成的数秒延迟,代码也更加易读。

到这里,主进程的内容已经基本完成了,指定个端口号,以及当前app所在的url目录,然后listen一下就大功告成:

var port = 1000;
app.use('/', router);
app.listen(port);

请移步下一篇文章:wget子进程的实现。

References:
[1] https://scotch.io/tutorials/build-a-restful-api-using-node-and-express-4
[2] http://www.tutorialspoint.com/nodejs/nodejs_restful_api.htm
[3] https://nodejs.org/api/child_process.html
[4] http://javascript.ruanyifeng.com/nodejs/child-process.html
[5] https://docs.nodejitsu.com/articles/command-line/how-to-parse-command-line-arguments
[6] http://mongoosejs.com/docs/schematypes.html
[7] http://mongoosejs.com/docs/2.7.x/docs/finding-documents.html
[8] https://gist.github.com/pasupulaphani/9463004

本文是悠然居(wordpress.youran.me)原创文章,如转载必须保留此告示。

本文为悠然居(https://wordpress.youran.me/)的原创文章,转载请注明出处!

Leave a Reply

Your email address will not be published. Required fields are marked *