it-swarm-pt.tech

NodeJS | Cluster: Como enviar dados do mestre para todos ou filhos / trabalhadores únicos?

Tenho trabalhando (estoque) script de

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd && msg.cmd == 'notifyRequest') {
        numReqs++;
      }
    });
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

No script acima, posso enviar dados do trabalhador para o processo mestre com facilidade. Mas como enviar dados do mestre para os trabalhadores? Com exemplos, se possível.

26
htonus

Como o cluster.fork é implementado em cima de child_process.fork , você pode enviar mensagens de um mestre para o trabalhador usando worker.send({ msg: 'test' }) e de um trabalhador para um mestre por process.send({ msg: 'test' });. Você recebe as mensagens da seguinte maneira: worker.on('message', callback) (de trabalhador para mestre) e process.on('message', callback); (de mestre para trabalhador).

Aqui está meu exemplo completo: você pode testá-lo navegando em http: // localhost: 8000 / Então o trabalhador envia uma mensagem ao mestre e o mestre responde:

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var worker;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    worker = cluster.fork();

    worker.on('message', function(msg) {
      // we only want to intercept messages that have a chat property
      if (msg.chat) {
        console.log('Worker to master: ', msg.chat);
        worker.send({ chat: 'Ok worker, Master got the message! Over and out!' });
      }
    });

  }
} else {
  process.on('message', function(msg) {
    // we only want to intercept messages that have a chat property
    if (msg.chat) {
      console.log('Master to worker: ', msg.chat);
    }
  });
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ chat: 'Hey master, I got a new request!' });
  }).listen(8000);
}
39
alessioalex

Encontrei esse tópico enquanto procurava uma maneira de enviar uma mensagem para todos os processos filhos e, felizmente, consegui descobrir isso graças aos comentários sobre matrizes. Só queria ilustrar uma solução em potencial para enviar uma mensagem a todos os processos filhos que utilizam essa abordagem.

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var workers = [];

if (cluster.isMaster) {
  // Broadcast a message to all workers
  var broadcast = function() {
    for (var i in workers) {
      var worker = workers[i];
      worker.send({ cmd: 'broadcast', numReqs: numReqs });
    }
  }

  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd) {
        switch (msg.cmd) {
          case 'notifyRequest':
            numReqs++;
          break;
          case 'broadcast':
            broadcast();
          break;
        }
    });

    // Add the worker to an array of known workers
    workers.Push(worker);
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // React to messages received from master
  process.on('message', function(msg) {
    switch(msg.cmd) {
      case 'broadcast':
        if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs);
      break;
    }
  });

  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
    process.send({ cmd: 'broadcast' });
  }).listen(8000);
}
8
Kevin Reilly

Eis como implementei uma solução para um problema semelhante. Ao conectar-se a cluster.on('fork'), você pode anexar manipuladores de mensagens aos trabalhadores conforme eles são bifurcados (em vez de armazená-los em uma matriz), que tem a vantagem adicional de lidar com casos em que os trabalhadores morrem ou se desconectam e um novo trabalhador é bifurcada.

Esse trecho enviaria uma mensagem do mestre para todos trabalhadores.

if (cluster.isMaster) {
    for (var i = 0; i < require('os').cpus.length; i++) {
        cluster.fork();
    }

    cluster.on('disconnect', function(worker) {
        cluster.fork();
    }

    // When a new worker process is forked, attach the handler
    // This handles cases where new worker processes are forked
    // on disconnect/exit, as above.
    cluster.on('fork', function(worker) {
        worker.on('message', messageRelay);
    }

    var messageRelay = function(msg) {
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send(msg);
        });
    };
}
else {
    process.on('message', messageHandler);

    var messageHandler = function messageHandler(msg) {
        // Worker received message--do something
    };
}
4
LiquidPony

Você deve poder enviar uma mensagem do mestre para o trabalhador assim:

worker.send({message:'hello'})

porque "cluster.fork é implementado em cima de child_process.fork" (cluster.fork é implementado em cima de child_process.fork)

1
cheng81

Entendo seu objetivo de transmitir para todos os processos de trabalho do nó em um cluster, embora você não possa enviar o componente de soquete como tal, mas há uma solução alternativa para o objetivo a ser atendido. Vou tentar uma explicação com um exemplo:

Etapa 1: quando uma ação do cliente requer uma transmissão:

Child.js (Process that has been forked) :

socket.on("BROADCAST_TO_ALL_WORKERS", function (data) 
{
    process.send({cmd : 'BROADCAST_TO_ALL_WORKERS', message :data.message});
}) 

Etapa 2: no lado da criação do cluster

Server.js (Place where cluster forking happens):

if (cluster.isMaster) {

  for (var i = 0; i < numCPUs; i++) {

    var worker = cluster.fork();

    worker.on('message', function (data) {
     if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
       console.log(server_debug_prefix() + "Server Broadcast To All, Message : " + data.message + " , Reload : " + data.reload + " Player Id : " + data.player_id);
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
        });
      }
    });
  }

  cluster.on('exit', function (worker, code, signal) {
    var newWorker = cluster.fork();
    newWorker.on('message', function (data) {
      console.log(data);
      if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
        console.log(data.cmd,data);
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
        });
      }
    });
  });
} 
else {
  //Node Js App Entry
  require("./Child.js");
}

Etapa 3: Para transmitir no processo filho -

-> Coloque isso antes de io.on ("conexão") em Child.js

process.on("message", function(data){
    if(data.cmd === "BROADCAST_TO_WORKER"){
        io.sockets.emit("SERVER_MESSAGE", { message: data.message, reload: data.reload, player_id : data.player_id });
    }
});

Eu espero que isso ajude. Entre em contato se precisar de mais esclarecimentos.

1
Raman