Skip to main content
The RabbitMQ Sink command (Command Type: 1200) enables you to publish messages directly to RabbitMQ exchanges. This command is the primary method for sending processed data to message queues and enabling robot communication.

Overview

The RabbitMQ Sink command provides functionality for:
  • Publishing messages directly to RabbitMQ exchanges
  • Configuring exchange and routing keys
  • Sending data as part of the message itself
  • Managing message routing and delivery
  • Enabling direct communication with robots

Command Type

Command Type ID: 1200

Parameters

ParameterTypeDescriptionRequired
exchangestringRabbitMQ exchange nameYes
routingKeystringRouting key for message routingYes
dataobjectData payload to publishYes
messagePropertiesobjectAdditional message propertiesNo

Key Features

Direct Publishing

The RabbitMQ Sink command enables direct publishing without requiring orchestrator flow:
  • No Orchestrator Flow: Direct publishing bypasses the bringits-stream-orchestrator
  • No Redis Storage: All data included in the message itself
  • Immediate Publishing: Messages sent as soon as the step completes

Message Configuration

Configure message routing and delivery:
{
  "command": "sink",
  "params": {
    "exchange": "your-exchange-name",
    "routingKey": "your-routing-key"
  }
}

Usage Examples

Basic Message Publishing

{
  "command": "sink",
  "params": {
    "exchange": "events",
    "routingKey": "data.processed",
    "data": {
      "id": "123",
      "name": "Example",
      "status": "processed"
    }
  }
}

Publishing with Project Variables

{
  "command": "sink",
  "params": {
    "exchange": "events",
    "routingKey": "data.processed",
    "data": {
      "projectId": "@{PROJECT_ID}",
      "runId": "@{RUN_ID}",
      "timestamp": "@{START_TIMESTAMP}",
      "payload": "@{PREVIOUS_OUTPUT}"
    }
  }
}

Publishing Extracted Data

{
  "command": "sink",
  "params": {
    "exchange": "events",
    "routingKey": "data.extracted",
    "data": {
      "items": "@{PREVIOUS_OUTPUT}.extracted_items",
      "metadata": {
        "source": "api",
        "timestamp": "@{START_TIMESTAMP}"
      }
    }
  }
}

Publishing with Message Properties

{
  "command": "sink",
  "params": {
    "exchange": "events",
    "routingKey": "data.processed",
    "data": {
      "id": "123",
      "name": "Example"
    },
    "messageProperties": {
      "priority": 5,
      "expiration": "3600000",
      "headers": {
        "x-custom-header": "value"
      }
    }
  }
}

Variable Support

The RabbitMQ Sink command supports variable interpolation in:
  • Exchange: Use variables in exchange names
  • Routing Key: Use variables in routing keys
  • Data Payload: Use variables throughout the data payload

Common Variable Patterns

  • Project Variables: @{PROJECT_ID}, @{RUN_ID}, @{START_TIMESTAMP}
  • Previous Output: @{PREVIOUS_OUTPUT} - Data from previous commands
  • Extracted Data: Reference data from Json Path or Object Mapper commands
  • Secrets: %{SECRET_NAME} - Encrypted credentials if needed

Message Structure

Messages published to RabbitMQ include:

Message Format

{
  "exchange": "events",
  "routingKey": "data.processed",
  "data": {
    "projectId": "project-123",
    "runId": "run-456",
    "timestamp": "2024-01-01T00:00:00Z",
    "payload": {}
  }
}

Best Practices

  • Place at End of Workflow: Typically use sink commands as the final command in your last step
  • Test Configuration: Test sink configuration with simple messages before full workflow
  • Use Descriptive Routing Keys: Use clear routing keys for message organization
  • Include Metadata: Include project and run identifiers in message payloads
  • Verify Consumption: Confirm messages are being consumed successfully
  • Handle Errors: Implement error handling for publishing failures
  • Use Consistent Formats: Maintain consistent message formats across workflows

Common Use Cases

  • Message Queue Publishing: Publish messages directly to RabbitMQ exchanges
  • Data Pipeline Integration: Send data to downstream processing systems
  • Robot Communication: Primary method for robot communication
  • Event Streaming: Stream events to message brokers
  • Data Archival: Send processed data to storage systems

Troubleshooting

Messages Not Being Consumed

If messages aren’t being consumed:
  1. Verify Exchange Exists: Ensure the exchange is configured in RabbitMQ
  2. Check Routing Key: Verify routing key matches consumer configuration
  3. Test Connection: Ensure RabbitMQ connection is working
  4. Review Message Format: Verify message format matches consumer expectations
  5. Check Exchange Type: Verify exchange type matches routing requirements

Common Issues

  • Exchange Not Found: Verify exchange name is correct and exists
  • Routing Key Mismatch: Ensure routing key matches consumer bindings
  • Connection Issues: Check RabbitMQ connection and credentials
  • Message Format Errors: Verify data payload format is correct
  • Permission Errors: Ensure proper permissions for exchange and routing

Testing Sink Configuration

Test your sink configuration:
  1. Create a simple test message
  2. Publish to test exchange
  3. Verify message appears in RabbitMQ
  4. Confirm consumer receives message
  5. Validate message format matches expectations