javascript

Cluster + EXPRESS + socket.io без REDIS

  • среда, 12 апреля 2017 г. в 03:15:30
https://habrahabr.ru/post/326172/
  • Node.JS
  • JavaScript


Мне очень понравилась библиотека soket.io. С помощью нее можно просто реализовать realtime приложения. Она сама выбирает протокол в зависимости от браузера, если надо то создает WebSoket.

Это конечно все хорошо, но не стоит забывать о том, что Node.js работает в одном потоке. Для этого у нас есть отличный инструмент Cluset.

Передо мной встал вопрос, а можно ли сделать приложения с кластеризацией, но не использовать для этого REDIS? Также для удобства используем EXPRESS.

Создадим простой пример (пока без кластера): Пример из официальной документации.

var app = require('express')();
var server = require('http').Server(app);
var io = require('socket.io')(server);

server.listen(3038);

app.get('/', function (req, res) {
  res.sendfile(__dirname + '/index.html');
});

io.on('connection', function (socket) {
  socket.emit('news', { hello: 'world' });
  socket.on('my other event', function (data) {
    console.log(data);
  });
});

Пока ничего сложного, все понятно, вешаем сервер на порт 3038 с express и soket.io. Клиентский код будет неизменным:

<html>
<head>
	<title>Test socket.io</title>
	<script src="socket.io.js"></script>
	<script>
	var connect_error_count = 0;
	
	
	var socket = io.connect('http://localhost:3038/',
		{
			'reconnectionDelay': 10 // defaults to 500
		}
	);

	

	socket.on('connect_error', function(){
	   	console.log('Connection Failed');
	    //Если более 5 попыток переподключения, то отключаем подключение
	    connect_error_count++;
	    if (connect_error_count>=5){
	    	socket.disconnect();
	    	console.log("stop reconection");
	    }
	});
	
	
	
	socket.on('reconnect', function(){
	    console.log('reconnect');
		connect_error_count=0;
	});
	
	
	socket.on('news', function (data) {
	  console.log(data);
	});
	
	
	</script>
</head>
<body>

</body>
</html>

Немного пояснений по клиентскому коду. Создаем объект soket с настройками (порт 3038, таймаут подключения 10мс). В случае ошибок подключения более 5 раз, отключаем работу soket, если менее, то обнуляем счетчик. А также подключаемся к комнате news.

Следующим этапом будет создание кластера. Т.к. сокет нельзя создать на одном порту, то каждый новый worker будет создавать soket.io на новом порту:

var cluster = require('cluster');//Включаем cluster
var cpuCount = require('os').cpus().length;//Количество ядер процессора
var io = [];
//В мастере создаем worker'ов равное количеству ядер процессоров
if (cluster.isMaster) {
    for (var i = 0; i < cpuCount; i += 1) {
    	var worker = cluster.fork();
    }
} 
//В воркере
if (cluster.isWorker) {
                var worker_id = cluster.worker.id;
		var express  = require('express');
var app = express();
		var server = require('http').Server(app);
		io[worker_id] = require('socket.io')(server);
		server.listen(3030+worker_id);


		io[worker_id].on('connection', function (socket) {
			console.log( socket.id );
			console.log( "WORKER ID :"+worker_id );
			socket.emit('news', { hello: 'world' });
			socket.on('my other event', function (data) {
			console.log(data);
			});
		});

var app_express = express();
		app_express.listen(8081);
		app_express.use(express.static('public'));//отдаем статичные данные (см. код клиента)
		app_express.get('/', function (request, response) {
			response.send('Hello from Worker '+worker_id);   
			console.log( '------' );
			
		});
		
		
		app_express.get('/get_port', function (request, response) {
			response.send(3030+worker_id);   
			console.log( 'get_port' );
		});
}

		app_express.get('/api', function (request, response) {
			response.setHeader('Content-Type', 'application/json');
			var id = request.param('id');
			var msg = request.param('msg');
                        var port = request.param('port');
			var JSON_DATA = {
					"worker_id":worker_id
					,"id":id
					,"msg":msg
                                        ,"port":port
			};
		});

Схематично покажем как все это выглядит:

image

Когда мы открываем в браузере localhost:8081 cluster сервирует нас на worker по своему правилу. У нас есть вызов:

app_express.get('/api', function (request, response){...}

В этом вызове при открытии localhost:8081/api?msg=test&port=3032&id=100500 мы хотим передать клиенту подключенному по порту 3032 id=100500 сообщение с msg=teest.

Самый простой способ, создать клиента и открыть soket.io по определенному порту

var http = require('http');
var client = http.createClient(3032 , "localhost");
request = client.request();
request.on('response', function( res ) {
    res.on('data', function( data ) {
        console.log( data );
    } );
} );
request.end();

Но к примеру нам нужно отправить broadcast сообщение, и нам неизвестен порт на котором сервируется клиента. Проблема в следующем, мы достоверно не знаем что при открытии localhost:8081/api?msg=test&port=3032&id=100500 мы будем сервироваться на worker 2.

image

На рисунке показано что при отправки broadcast сообщения с worker1 мы не имеем возможности отправить на прямую клиентам soket.io 2, soket.io 3, soket.io 4, soket.io…

Для решения этой задачи мы можем воспользоваться методами в cluster. В cluster'e мы можем передавать сообщения из master → worker и из worker → master. Для наглядности изобразим это схематично.

image

Пример работы с отправкой master → worker и worker → master представлен ниже:

// Кластерное приложение
var cluster = require('cluster');
var io = [];
var cpuCount = require('os').cpus().length;
var workers = [];

if (cluster.isMaster) {
    // Count the machine's CPUs
    // Create a worker for each CPU
    for (var i = 0; i < cpuCount; i += 1) {
    	var worker = cluster.fork();
    	
    	//Слушаем сообщение от workera
        worker.on('message', function(data) {
        	//отправляем всем worker'ам сообщение
        	for (var j in workers) {workers[j].send(data);}
        });
        //Добавляем объект worker в массив
        workers.push(worker);
    }
    
    
} 


if (cluster.isWorker) {
		//------------------------------------------------------------------------------------//
		var worker_id = cluster.worker.id;
		var express  = require('express');
		var app = express();
		var server = require('http').Server(app);
		io[worker_id] = require('socket.io')(server);
		server.listen(3030+worker_id);
		
		io[worker_id].on('connection', function (socket) {
			console.log( socket.id );
			console.log( "WORKER ID :"+worker_id );
			socket.emit('news', { hello: 'world' });
			socket.on('my other event', function (data) {
			console.log(data);
			});
		});
		//------------------------------------------------------------------------------------//
		var app_express = express();
		app_express.listen(8081);
		app_express.use(express.static('public'));//отдаем статичные данные
		app_express.get('/', function (request, response) {
			response.send('Hello from Worker '+worker_id);   
			console.log( '------' );
			
		});
		
		
		app_express.get('/get_port', function (request, response) {
			response.send(3030+worker_id);   
			console.log( 'get_port' );
		});
		
		
		app_express.get('/api', function (request, response) {
			//process.send("----------------------");
			response.setHeader('Content-Type', 'application/json');
			var id = request.param('id');
			var msg = request.param('msg');
			var JSON_DATA = {
					"worker_id":worker_id
					,"id":id
					,"msg":msg
			};
			
			io[port-3030].to(msg.id).emit('news', msg.msg);

			
			response.send(	JSON.stringify(JSON_DATA) );  
			
			//Отправляем всем процессам данные
			process.send(JSON_DATA);
			
		});
		

		//Обработка сообщений от worker//
		process.on('message', function(msg){
			console.log(worker_id);
		    console.log(msg.id);
		    console.log(msg.msg);
		    io[worker_id].to(msg.id).emit('news', msg.msg);
		    
		});
		
}

Надеюсь статья будет полезна для новичков, чтобы познакомиться с cluster и общением между master → worker и worker → master. Кратко рассмотрен soket.io. И самое главное не используется redis.