Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC not working: What I am doing wrong? Is there an example of its implementation? #186

Open
acarlstein opened this issue Feb 18, 2019 · 14 comments

Comments

@acarlstein
Copy link

acarlstein commented Feb 18, 2019

Greetings,

Will you help me?

I apologize to bother but I am having troubles implementing RPC correctly.

Perhaps someone can point out what I am doing wrong in the code since I couldn't find in the documentation a full example on how to implemented.

// Publish RPC from this client to server
var extension = {
 id: UUID.random(),
}

var body = {...};

rabbit.publish('rpc.extension.request', {
    routingKey: 'extend.rpc',
    correlationId: extension.id,
    replyTo: "rpc.extension.response",
    contentType: "application/json",
    body: body,
    expiresAfter: 60000 // TTL in ms, in this example 60 seconds - can be removed after consumer working, just here to prevent junk accumulating on servers.
  }).then(
    () => { // a list of the messages of that succeeded
      log.info(`Success publishing extension for rpc ${extension.id}`);
    },
    (failed) => { // a list of failed messages and the errors `{ err, message }`
      log.info(`Error publishing extension for rpc ${extension.id} ->  ${failed.err}: ${failed.message}`);
    });

-----------------------
// Consuming RPC result sent by server to this client.

module.exports.setup = function () {
	...

	const handleRPC = rabbit.handle({queue: 'rpc.extension.response'}, handleExtendRPCResponse);
	handleRPC.catch(onHandleError);
};

function handleExtendRPCResponse(msg) {
	logger.info(`RPC Received message : ${msg}`)

	try {
		messageHandler.processMessage(msg, function (err) {
			if (err) {
				logger.error(`ERROR in RPC Process message: "${err.error}"`);
				msg.reject();
			} else {
				logger.info("Process RPC message passed");
				msg.ack();
			}
		})
	} catch(e){
		logger.error(`ERROR while Processing RPC message: "${e.message}"`);
		msg.reject();
	}
}

function onHandleError (err, msg) {
    // not able to hit this with a test...not sure how to.
    logger.error('Error:', JSON.stringify(err), 'Message:', JSON.stringify(msg));
    // Temporary, but if we can't handle it we don't want it requeued.
    msg.reject();
}

Thanks for your time and the creation of this package.

@acarlstein acarlstein changed the title RPC: Is there an example of its implementation? RPC not working: What I am doing wrong? Is there an example of its implementation? Feb 18, 2019
@manuel-reil
Copy link

Maybe I got your question wrong, but if you trying to send back a result from the rpc call, you could use rabbit.request instead of rabbit.publish.

We do it like this:

rabbit
  .request('exchange', {
    type: 'req',
    body: task,
    routingKey
  })
  /*.progress(function(reply) {
    // if multiple replies are provided, all but the last will be sent via the progress callback
  })*/

  .then((final) => {
    // the last message in a series OR the only reply will be sent to this callback

    final.ack();
    if (
      Array.isArray(task.params) &&
      typeof task.params[task.cbPos] === 'function'
    ) {
      task.params[task.cbPos].apply(this, final.body.args);
    } else {
      //...
    }
  });

@acarlstein
Copy link
Author

@manuel-reil, thanks for answering to my question.

Actually, I have a microservice that is trying to do an RPC request, via RabbitMQ, and then consume the result that comes back, via RabbitMQ.

Based on your feedback, I guess my RPC request was wrong because I was using publish() instead of request(). Am I correct?

Also, based on your feedback, I don't need to implement a consumer since request() is going to wait for the return of the result of the RPC request and put it into the chain .then((final) -> ...)
Am I correct also here?

@zlintz
Copy link

zlintz commented Feb 23, 2019

@acarlstein you are correct. You want request() not publish() for rpc. Under the covers during connection a unique rpc queue is automatically made for you to act as the listener queue to handle the return messages and abstracted to the promise level for ease of use. The rpc request uses the replyTo header that is added during the request() call in order to send it back the the right service that made the rpc request originally.

@acarlstein
Copy link
Author

I figured out the issue. In the topology, when creating the exchange, I needed to add 'subscribe:true' property since the queue was already created but used as publish instead of request.

Thanks anyways for the feedback. I appreciate it.

@zlintz
Copy link

zlintz commented Feb 26, 2019

Ah yes, the old subscribe:true I have been there myself

@jtsampson
Copy link

jtsampson commented Mar 15, 2019

@zlintz,

I am having an issue with this as well. following your exact example above...

rabbit
  .request('myExchange', {
    type: 'req',
    body: task,
    routingKey: 'key'
  })
  .then((final) => {
    final.ack();
    log.info('received message');
  });

I am ale to see that my consumer (a java process ) is responding and that there is a message in the temporary queue, but 'received message' is never logged. When I turn on DEBUG=* I see messages like:

  rabbot.exchange debug +15s Publishing message ( type: 'req' topic: 'key', sequence: '0', correlation: '', replyTo: '{"type":"req","contentType":"application/json","
contentEncoding":"utf8","correlationId":"","replyTo":"xxx.response.queue","messageId":"087f6300-4751-11e9-98b1-f3c4b7b64947","timestamp":1552674897204,"a
ppId":"JSampson5530.npm.18772","headers":{},"mandatory":false,"persistent":true}' ) to direct exchange 'assignment.requests' on connection 'default'
  rabbot.acknack debug +365ms New pending tag 1 on queue xxx.response.queue - default
  rabbot.unhandled warn +1ms Message of xxx.response.queue on queue 'xxx.response.queue', connection 'default' was not processed by an
y registered handlers
  rabbot.acknack debug +3ms Marking tag 1 as nack'd on queue xxx.response.queue - default
  rabbot.acknack debug +83ms nack ALL (1) tags on xxx.response.queue up to 1 - default.
  rabbot.queue debug +1ms Nacking tag 1 on 'xxx.response.queue' - 'default'

The use of subscribe:true on the connection definition does not seem to have an effect Any Ideas?

@zlintz
Copy link

zlintz commented Mar 21, 2019

@jtsampson can you provide a sample of the message coming back from the java consumer which is replying on the replyTo queue including all the headers. There are specific expectations that rabbot has around the RPC message coming back on the replyTo queue in order to connect the return message back the correct request() promise response. Most of these expectations are handled for you if you have rabbot on both sides.

@jtsampson
Copy link

@zlintz ,

Thanks for getting back to me!

[aside] For sake of this discussion, and if it matters, the REQUESTER is a node app originally on Rabbit 1.1.0, but now upgraded to Rabbot 2.1.0 , the RESPONDER is a Java app using ampq-client-3.3.5 We have this working asynchronously and want to add a synchronous functionality using the same (or slightly modified) RESPONDER code.

I was pursuing this line of debugging earlier this week to try and determine exactly what the form of the message should be comming back from my RESPONDER but I hadn't gotten around to finishing it up.

Here is what I did so far.

Using the example code and topology in request.spec.js, and dupmping the response object from line 99 to the console I see the following

response: { fields: 
   { consumerTag: 'MACHINE.NAME.PROCESS.NAME',
     deliveryTag: 2,
     redelivered: false,
     exchange: '',
     routingKey: 'MACHINE.NAME.PROCESS.NAME' },
  properties: 
   { contentType: 'text/plain',
     contentEncoding: 'utf8',
     headers: { sequence_end: true },
     deliveryMode: undefined,
     priority: undefined,
     correlationId: '41dd6c20-4c22-11e9-b0db-c5719578f79c',
     replyTo: 'MACHINE.NAME.PROCESS.NAME',
     expiration: undefined,
     messageId: undefined,
     timestamp: 1553204562918,
     type: 'polite.reply',
     userId: undefined,
     appId: undefined,
     clusterId: undefined },
  content: <Buffer 3a 44>,
  ack: { [Function: bound ack] [length]: 0, [name]: 'bound ack' },
  reject: { [Function: bound reject] [length]: 0, [name]: 'bound reject' },
  nack: { [Function: bound nack] [length]: 0, [name]: 'bound nack' },
  reply: 
   { [Function]
     [length]: 2,
     [name]: '',
     [arguments]: null,
     [caller]: null,
     [prototype]: { [constructor]: [Circular] } },
  type: 'polite.reply',
  queue: 'MACHINE.NAME.PROCESS.NAME',
  body: ':D' }

I am guessing that my Java consumer has to respond with similar headers and properties but I am not exactly sure which ones...I tried to set up the minum number of these in my RESPONDER code:

      channel = rabbitMqManager.createChannel();

      Map<String,Object> headers = new HashMap<>();
      headers.put("sequence_end", true); //  possibly expected by node.js rabbot library

      AMQP.BasicProperties props =
          new AMQP.BasicProperties.Builder()
                  .appId("CONSUME_APP") // this is the java consume app name not the rabbot publisher app name
                  .contentEncoding("UTF-8")
                  .contentType("application/json")
                  .correlationId(correlationId) // correlation id sent by publisher from rabbit properties
                  .type((String)properties.getType()) //  possibly expected by node.js rabbot library
                  .headers(headers).build();

      // reply to in this case is 'MACHINE.NAME.PROCESS.NAME'
      channel.basicPublish("", replyTo, props, (body.toJSONString().getBytes()));
      // thisgoes to the default exchange

Above I am assumung (possibly wrongly) that I do not need to have the RESPONDER reply with any of the properties that are undefined from the dump and that the appId does not matter. Then my message will look something like this, I haven't trapped the actuall message yet, but am faily confident it looks like...

{
	"headers": {
		"sequence_end": true
	},
	"properties": {
		"contentEncoding": "UTF-8",
		"contentType": "application/json",
		"corelationId": "passed from PUBLISHER",
		"type": "type passed from PUBLISHER"
	},
	"body": "body"
}

Hopefully I am on the right track.

Do you know exactly which properties and headers I need?

@zlintz
Copy link

zlintz commented Mar 23, 2019

@jtsampson

I did some testing and digging. In order for the response to make it back to the request() the consumer must take the properties.message_id from the message and then give that back as the properties.correlation_id on the return message on the reply to queue. The headers should also have "sequence_end": true

For example the message sent to be replied to on the queue...
image

And the response would be like...
image

I can explain more of the details if needed, let me know if this helps.

@jtsampson
Copy link

@zlintz,

Thanks for the run down, I will try this in my code, should take about a day or so to try out.. If that works I will definately be back for details, and I can certainly add a pull request for added documentation.

@zlintz
Copy link

zlintz commented Mar 28, 2019

@jtsampson any update?
@acarlstein does this address your scenario as well?

@jtsampson
Copy link

jtsampson commented Mar 29, 2019

@zlintz,

Sorry for the late reply. Yes, this does solve my issue perfectly. Additionally, PM'ed @acarlstein and this was actually the solution to his problem as well. You can close this issue.

@jtsampson
Copy link

Solution was also posted here: #76

@zlintz
Copy link

zlintz commented Mar 29, 2019

That makes sense why this problem seemed so familiar. I helped @fifthfloorsoftware with this back then

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants