Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/feature-streamings' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
marihachi committed May 14, 2017
2 parents e0066ab + fa13434 commit fd14b15
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 3 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"mongodb": "^2.2.24",
"redis": "^2.7.0",
"request": "^2.81.0",
"rimraf": "^2.6.1"
"rimraf": "^2.6.1",
"socket.io": "^2.0.1"
},
"devDependencies": {
"eslint": "^3.18.0",
Expand Down
93 changes: 93 additions & 0 deletions src/helpers/server-streaming-manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* SocketIO.Serverのラッパークラス
*/
class ServerStreamingManager {
constructor(ioServer, ioServerSocket, options) {
this.ioServer = ioServer;
this.ioServerSocket = ioServerSocket;
this.dataEventName = options.dataEventName != null ? options.dataEventName : 'data';
this.restEventName = options.restEventName != null ? options.restEventName : 'rest';
this.errorEventName = options.errorEventName != null ? options.errorEventName : 'error';
}

/**
* ストリームに基本的なイベントを発行します
*/
stream(arg1, arg2, arg3) {
if (arguments.length == 3) {
const prefix = arg1;
const type = arg2;
const data = arg3;

this.stream(`${prefix}:${type}`, data);
}
else if (arguments.length == 2) {
const type = arg1;
const data = arg2;

this.ioServer.to(this.ioServerSocket.id).emit(type, data);
}
else
throw new Error('invalid arguments count');
}

/**
* ストリームにデータ送信用イベントを発行します
*/
data(streamType, data) {
this.stream(this.dataEventName, streamType, data);
}

/**
* ストリームにREST APIの成功イベントを発行します
*/
rest(requestInfo, success, data) {
this.stream(this.restEventName, Object.assign({request: requestInfo, success: success}, data));
}

/**
* ストリームにエラーイベントを発行します
*/
error(data) {
this.stream(this.errorEventName, data);
}

/**
* イベントハンドラを登録します
*/
on(arg1, arg2, arg3) {
if (arguments.length == 3) {
const prefix = arg1;
const type = arg2;
const callback = arg3;

this.on(`${prefix}:${type}`, callback);
}
else if (arguments.length == 2) {
const type = arg1;
const callback = arg2;

this.ioServerSocket.on(type, data => {
callback(data);
});
}
else
throw new Error('invalid arguments count');
}

/**
* ストリームを切断します
*/
disconnect() {
this.ioServerSocket.disconnect();
}

/**
* ストリーム切断時のイベントハンドラを登録します
*/
onDisconnect(callback) {
this.ioServerSocket.on('disconnect', callback);
}
}

module.exports = ServerStreamingManager;
15 changes: 14 additions & 1 deletion src/routes/posts/post_status.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

const ApiResult = require('../../helpers/apiResult');

const redis = require('redis');
const publisher = redis.createClient(6379, 'localhost');
publisher.on('error', function(err) {
console.log('redis_err(publisher): ' + String(err));
});

exports.post = async (request) => {
const result = await request.checkRequestAsync({
body: [
Expand Down Expand Up @@ -34,5 +40,12 @@ exports.post = async (request) => {
if (postStatus == null)
return new ApiResult(500, 'faild to create postStatus');

return new ApiResult(200, {postStatus: await postStatus.serializeAsync(true)});
const serializedPostStatus = await postStatus.serializeAsync(true);

const json = JSON.stringify(serializedPostStatus);

publisher.publish(`${request.user.document._id.toString()}:status`, json);
publisher.publish('public:status', json);

return new ApiResult(200, {postStatus: serializedPostStatus});
};
16 changes: 16 additions & 0 deletions src/routes/users/id/follow.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ exports.post = async (request) => {
if (user == null)
return new ApiResult(404, 'user is not found');

const userId = user.document._id.toString();

// 購読
const subscriber = request.subscribers.get(userId);
if (subscriber != null) {
subscriber.subscribe(`${userId}:status`);
}

return new ApiResult(501, 'not implemented');
};

Expand All @@ -34,5 +42,13 @@ exports.del = async (request) => {
if (user == null)
return new ApiResult(404, 'user is not found');

const userId = user.document._id.toString();

// 購読解除
const subscriber = request.subscribers.get(userId);
if (subscriber != null) {
subscriber.unsubscribe(`${userId}:status`);
}

return new ApiResult(501, 'not implemented');
};
17 changes: 16 additions & 1 deletion src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
const i = require('./helpers/readline');
const bodyParser = require('body-parser');
const express = require('express');
const httpClass = require('http');

const loadConfig = require('./helpers/loadConfig');
const sanitize = require('mongo-sanitize');
const DbProvider = require('./helpers/dbProvider');
Expand All @@ -13,6 +15,9 @@ const apiSend = require('./helpers/middlewares/apiSend');
const checkRequest = require('./helpers/middlewares/checkRequest');

const questionResult = (ans) => (ans.toLowerCase()).indexOf('y') === 0;
Error.stackTraceLimit = 20;

process.on('unhandledRejection', console.dir);

module.exports = async () => {
try {
Expand All @@ -30,10 +35,12 @@ module.exports = async () => {

if (config != null) {
const app = express();
const http = httpClass.Server(app);
app.disable('x-powered-by');
const dbProvider = await DbProvider.connectApidbAsync(config);
const db = new Db(config, dbProvider);
const directoryRouter = new DirectoryRouter(app);
const subscribers = new Map();

app.use((req, res, next) => {
// services
Expand All @@ -56,6 +63,12 @@ module.exports = async () => {
app.use(apiSend);
app.use(checkRequest);

app.use((req, res, next) => {
req.subscribers = subscribers;

next();
});

// add routes
for(const route of require('./routeList')()) {
let method = route[0];
Expand All @@ -72,9 +85,11 @@ module.exports = async () => {
res.apiSend(new (require('./helpers/apiResult'))(404, 'not found'));
});

app.listen(config.api.port, () => {
http.listen(config.api.port, () => {
console.log(`listen on port: ${config.api.port}`);
});

require('./streaming-server')(http, subscribers, db, config);
}
}
catch(err) {
Expand Down
Loading

0 comments on commit fd14b15

Please sign in to comment.