Skip to content

JWebMP/RabbitMQ-Comms

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

6 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

JWebMP RabbitMQ-Comms

Maven Central License

Java 25+ Modular Angular TypeScript

Vert.x RabbitMQ STOMP WebSocket

Real-time bidirectional communication bridge connecting Angular browser clients to RabbitMQ message broker via WebSocket/STOMP. Enables reactive server-to-browser push notifications, live updates, and pub/sub messaging patterns for JWebMP applications.

Built on RabbitMQ Β· Vert.x RabbitMQ Client Β· STOMP.js Β· SockJS Β· JPMS module com.jwebmp.rabbit Β· Java 25+

πŸ“¦ Installation

<dependency>
  <groupId>com.jwebmp</groupId>
  <artifactId>jwebmp-rabbitmq</artifactId>
  <version>2.0.0-SNAPSHOT</version>
</dependency>
Gradle (Kotlin DSL)
implementation("com.jwebmp:jwebmp-rabbitmq:2.0.0-SNAPSHOT")

RabbitMQ Web STOMP Plugin

Enable the RabbitMQ Web STOMP plugin on your RabbitMQ server:

rabbitmq-plugins enable rabbitmq_web_stomp

Default WebSocket endpoint: ws://localhost:15674/ws

✨ Features

  • Real-Time Browser Communication β€” WebSocket-based bidirectional messaging between Angular clients and RabbitMQ broker
  • STOMP Protocol β€” Industry-standard STOMP over WebSocket with automatic reconnection and heartbeat support
  • Group-Based Pub/Sub β€” Dynamic subscription management with RabbitMQ fanout exchanges for broadcast messaging
  • Angular Directive Integration β€” Declarative HTML attribute [data-rabbit-groups] for automatic group subscription
  • Automatic Exchange Management β€” Server-side exchange declaration and lifecycle management via Vert.x RabbitMQ client
  • Session-Aware Messaging β€” Automatic subscription to session-specific groups using ContextIdService
  • Connection Resilience β€” Automatic reconnection with configurable delays and connection state observables
  • Server-Side WebSocket Hooks β€” Integrates with GuicedEE WebSocket lifecycle events (onAddToGroup, onRemoveFromGroup, onPublish)
  • TypeScript Client Generation β€” Fully typed Angular provider and directive generated from Java annotations
  • SockJS Fallback β€” Graceful degradation for browsers/proxies without native WebSocket support
  • JPMS Modular β€” Fully modular with explicit dependencies via Java Platform Module System

πŸš€ Quick Start

Server-Side Configuration

1. Add Dependency

Include jwebmp-rabbitmq in your JWebMP application:

<dependency>
  <groupId>com.jwebmp</groupId>
  <artifactId>jwebmp-rabbitmq</artifactId>
</dependency>

2. Configure RabbitMQ Connection

The module uses GuicedEE's RabbitMQ module. Configure connection in your module-info.java or via environment variables:

module com.myapp {
    requires com.jwebmp.rabbit;
    requires com.guicedee.rabbit;

    opens com.myapp to com.google.guice;
}

Environment variables for RabbitMQ connection:

Variable Purpose Default
RABBITMQ_HOST RabbitMQ hostname localhost
RABBITMQ_PORT RabbitMQ port 5672
RABBITMQ_WEB_STOMP_PORT WebSocket STOMP port 15674
RABBITMQ_USERNAME Username guest
RABBITMQ_PASSWORD Password guest
RABBITMQ_VIRTUAL_HOST Virtual host /

3. Publish Messages from Server

@Inject
private RabbitPublishToGroup publisher;

public void sendUpdate() {
    String message = "{\"type\":\"update\",\"data\":\"Hello from server!\"}";
    publisher.publish("Everyone", message);
}

public void sendToUser(String userId, String message) {
    publisher.publish("user-" + userId, message);
}

Client-Side Integration (Angular)

1. Component with RabbitMQ Directive

Use the [data-rabbit-groups] directive to automatically subscribe to RabbitMQ exchanges:

@NgComponent
@NgDataVariable(variableName = "messages", value = "[]")
public class DashboardComponent implements INgComponent<DashboardComponent> {

    @Override
    public String render() {
        return """
            <div [data-rabbit-groups]="'user-dashboard'">
                <h1>Live Dashboard</h1>
                <div *ngFor="let message of messages">
                    {{ message }}
                </div>
            </div>
            """;
    }
}

When the component mounts, it automatically:

  1. Subscribes to the user-dashboard exchange
  2. Receives all messages published to that exchange
  3. Unsubscribes when the component is destroyed

2. Programmatic Subscription

Inject RabbitMQProvider for manual control:

// TypeScript (generated from Java annotations)
import { RabbitMQProvider } from './rabbit-mq-provider';

@Component({
  selector: 'app-notifications',
  template: `<div>{{ notification }}</div>`
})
export class NotificationsComponent implements OnInit, OnDestroy {
  notification: string = '';

  constructor(private rabbitMqProvider: RabbitMQProvider) {}

  ngOnInit() {
    // Subscribe to a group
    this.rabbitMqProvider.addGroup('notifications');

    // Send a message
    this.rabbitMqProvider.sendMessage('/exchange/admin',
      JSON.stringify({ action: 'ping' }));
  }

  ngOnDestroy() {
    // Unsubscribe from group
    this.rabbitMqProvider.removeGroup('notifications');
  }
}

3. Connection State Monitoring

import { RabbitMQProvider } from './rabbit-mq-provider';

export class AppComponent implements OnInit {
  isConnected: boolean = false;

  constructor(private rabbitMqProvider: RabbitMQProvider) {}

  ngOnInit() {
    // Wait for connection before performing actions
    this.rabbitMqProvider.waitForConnection().then(() => {
      this.isConnected = true;
      console.log('RabbitMQ connected!');
    });

    // Or subscribe to connection state
    RabbitMQProvider.connected.subscribe(connected => {
      this.isConnected = connected;
    });
  }
}

πŸ“ Architecture

Message Flow

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                          RabbitMQ Broker                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  Exchange: "Everyone" (fanout)                                  β”‚    β”‚
β”‚  β”‚  Exchange: "user-dashboard" (fanout)                            β”‚    β”‚
β”‚  β”‚  Exchange: "notifications" (fanout)                             β”‚    β”‚
β”‚  β”‚  Exchange: "__vertx.session-abc123" (fanout, auto-delete)      β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚         β–²                                            β”‚                   β”‚
β”‚         β”‚ AMQP 0.9.1                                 β”‚ STOMP/WebSocket  β”‚
β”‚         β”‚ (port 5672)                                β”‚ (port 15674)     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
          β”‚                                            β”‚
          β”‚                                            β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Server-Side (Java)   β”‚              β”‚   Browser (Angular/TypeScript) β”‚
β”‚                        β”‚              β”‚                               β”‚
β”‚  RabbitPublishToGroup  β”‚              β”‚     RabbitMQProvider          β”‚
β”‚         β”‚              β”‚              β”‚            β”‚                  β”‚
β”‚         β”œβ”€ onAddToGroup β”‚              β”‚            β”œβ”€ addGroup()      β”‚
β”‚         β”œβ”€ publish()    β”‚              β”‚            β”œβ”€ removeGroup()   β”‚
β”‚         └─ onRemoveFrom β”‚              β”‚            β”œβ”€ sendMessage()   β”‚
β”‚            Group        β”‚              β”‚            └─ subscribe()     β”‚
β”‚                        β”‚              β”‚                               β”‚
β”‚  RabbitMQDirective     β”‚              β”‚     [data-rabbit-groups]      β”‚
β”‚    (Angular annotation)β”‚              β”‚       (HTML directive)        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Component Overview

Component Location Purpose
RabbitMQProvider Angular Service STOMP.js client wrapper, connection management, subscription handling
RabbitMQDirective Angular Directive HTML attribute [data-rabbit-groups] for declarative subscriptions
RabbitPublishToGroup Server (Java) Server-side message publisher via Vert.x RabbitMQ client
GuicedEE WebSocket Hooks Server (Java) Integration with GuicedEE WebSocket lifecycle events

Exchange Lifecycle

  1. Client Subscription β€” Angular component subscribes to a group (e.g., user-dashboard)
  2. Exchange Declaration β€” Server-side RabbitPublishToGroup.onAddToGroup() declares a fanout exchange if not exists
  3. Message Publishing β€” Server publishes to exchange: client.basicPublish("user-dashboard", "", buffer)
  4. Message Distribution β€” RabbitMQ fanout exchange broadcasts to all subscribed browser clients
  5. Client Processing β€” Angular RabbitMQProvider.subscribe() receives message and processes via EventBusService
  6. Cleanup β€” Component destruction triggers removeGroup(), unsubscribing from exchange

Session-Aware Groups

The module automatically subscribes clients to session-specific groups:

  • Everyone β€” Global broadcast group (all connected clients)
  • Session Group β€” Unique group per browser session (e.g., __vertx.session-abc123)
  • Custom Groups β€” Application-defined groups (e.g., user-{userId}, room-{roomId})

Session groups are automatically created and deleted based on ContextIdService observable.

πŸ”§ Configuration

Server-Side (Java)

Exchange Declaration Behavior

Exchanges are declared with the following properties:

client.exchangeDeclare(
    groupName,                              // Exchange name
    "fanout",                               // Exchange type
    !groupName.startsWith("__vertx"),      // Durable (persistent)
    groupName.startsWith("__vertx")        // Auto-delete (temporary)
);
  • Persistent exchanges β€” Groups without __vertx prefix are durable and survive broker restarts
  • Temporary exchanges β€” Groups with __vertx prefix are auto-deleted when last consumer disconnects

Publishing Messages

@Inject
private RabbitPublishToGroup publisher;

// Publish to a group
publisher.publish("groupName", jsonMessage);

// Publish with automatic exchange creation
publisher.onAddToGroup("newGroup").thenAccept(success -> {
    if (success) {
        publisher.publish("newGroup", "Hello!");
    }
});

Client-Side (Angular)

Connection Configuration

The RabbitMQProvider exposes configurable fields:

@NgField("url : string = 'ws://127.0.0.1:15674/ws';")
@NgField("user : string = 'guest';")
@NgField("pass : string = 'guest';")
@NgField("virtualHost : string = 'UWEAssist';")

Override in your application:

import { RabbitMQProvider } from './rabbit-mq-provider';

export class AppModule {
  constructor(private rabbitMqProvider: RabbitMQProvider) {
    this.rabbitMqProvider.url = 'ws://production.example.com:15674/ws';
    this.rabbitMqProvider.user = 'app-user';
    this.rabbitMqProvider.pass = 'secure-password';
    this.rabbitMqProvider.virtualHost = 'production';
  }
}

Reconnection Settings

STOMP client reconnection is configured in the constructor:

RabbitMQProvider.client = new Client({
  brokerURL: this.url,
  reconnectDelay: 5000,        // 5 seconds between reconnection attempts
  // heartbeatIncoming: 4000,  // Optional: incoming heartbeat interval
  // heartbeatOutgoing: 4000   // Optional: outgoing heartbeat interval
});

πŸ”Œ API Reference

Java API

RabbitPublishToGroup

@Singleton
public class RabbitPublishToGroup {

    // Declare exchange for group (idempotent)
    CompletableFuture<Boolean> onAddToGroup(String groupName);

    // Publish message to group
    boolean publish(String groupName, String message);

    // Remove group exchange (only for __vertx.* groups)
    CompletableFuture<Boolean> onRemoveFromGroup(String groupName);
}

RabbitMQDirective

@NgDirective(value = "[data-rabbit-groups]", standalone = true)
public class RabbitMQDirective {

    // Add component to RabbitMQ group
    boolean addGroup(IComponentHierarchyBase<?, ?> component, String groupName);
}

Usage in Java component:

@NgComponent
public class MyComponent implements INgComponent<MyComponent> {
    @Override
    public String render() {
        return "<div [data-rabbit-groups]=\"'my-group'\">Content</div>";
    }
}

TypeScript API

RabbitMQProvider Service

@Injectable({ providedIn: 'root' })
export class RabbitMQProvider {

    // Connection state observable
    static connected: BehaviorSubject<boolean>;

    // Subscribe to RabbitMQ exchange
    subscribe(destination: string): void;

    // Add subscription to group
    addGroup(group: string): void;

    // Remove subscription from group
    removeGroup(group: string): void;

    // Connect to broker
    connect(): void;

    // Disconnect from broker
    disconnect(): void;

    // Publish message to destination
    sendMessage(destination: string, body: string): void;

    // Wait for connection to be established
    waitForConnection(): Promise<boolean>;
}

Directive Usage

<!-- Automatic subscription on mount, unsubscribe on destroy -->
<div [data-rabbit-groups]="'notifications'">
  Live notifications appear here
</div>

<!-- Dynamic group binding -->
<div [data-rabbit-groups]="currentUserGroup">
  User-specific content
</div>

<!-- Multiple components can subscribe to the same group -->
<app-dashboard [data-rabbit-groups]="'dashboard-updates'"></app-dashboard>
<app-chart [data-rabbit-groups]="'dashboard-updates'"></app-chart>

πŸ§ͺ Testing

Running Tests

# Run all tests
mvn clean test

# Run specific test
mvn test -Dtest=RabbitMQWebTest

Test Application

The module includes a test application demonstrating integration:

@NgApp(value = "RabbitMQWebTest", bootComponent = RabbitMQPage.class)
class RabbitMQWebTest extends NGApplication<RabbitMQWebTest> {
    public RabbitMQWebTest() {
        getOptions().setTitle("RabbitMQ Web Test");
    }
}

Integration Testing

For integration tests, use Testcontainers with RabbitMQ:

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class RabbitMQIntegrationTest {

    private static final RabbitMQContainer rabbitMQ =
        new RabbitMQContainer("rabbitmq:3-management")
            .withPluginsEnabled("rabbitmq_web_stomp");

    @BeforeAll
    static void setup() {
        rabbitMQ.start();

        // Configure connection
        System.setProperty("rabbitmq.host", rabbitMQ.getHost());
        System.setProperty("rabbitmq.port",
            String.valueOf(rabbitMQ.getAmqpPort()));
        System.setProperty("rabbitmq.webstomp.port",
            String.valueOf(rabbitMQ.getHttpPort()));
    }

    @Test
    void testMessagePublishing() {
        // Test implementation
    }
}

πŸ—ΊοΈ Module Graph

com.jwebmp.rabbit
 β”œβ”€β”€ com.jwebmp.vertx              (Vert.x integration, reactive runtime)
 β”œβ”€β”€ com.jwebmp.core.angular       (Angular annotation framework)
 β”œβ”€β”€ com.jwebmp.core               (JWebMP core)
 β”œβ”€β”€ com.guicedee.rabbit           (GuicedEE RabbitMQ integration)
 β”œβ”€β”€ com.guicedee.guicedinjection  (Guice DI)
 β”œβ”€β”€ io.vertx.rabbitmq             (Vert.x RabbitMQ client)
 └── TypeScript Dependencies:
     β”œβ”€β”€ @stomp/stompjs            (STOMP over WebSocket client)
     β”œβ”€β”€ sockjs-client             (SockJS fallback transport)
     └── @types/sockjs-client      (TypeScript type definitions)

🧰 Troubleshooting & Best Practices

Connection Issues

Problem: Client cannot connect to RabbitMQ WebSocket endpoint

Solutions:

  • Verify rabbitmq_web_stomp plugin is enabled: rabbitmq-plugins enable rabbitmq_web_stomp
  • Check firewall allows port 15674 (default Web STOMP port)
  • Verify broker URL matches your RabbitMQ server: ws://your-host:15674/ws
  • Check RabbitMQ logs for authentication errors

Exchange Not Created

Problem: Messages not received by clients

Solutions:

  • Check server logs for exchange declaration errors
  • Verify RabbitMQ AMQP connection is established on server side
  • Use RabbitMQ Management UI to inspect exchanges: http://localhost:15672
  • Ensure client subscribes after connection is established (use waitForConnection())

Message Loss

Problem: Some messages are lost during client reconnection

Solutions:

  • Use durable exchanges for critical messages (avoid __vertx prefix)
  • Implement message queues instead of fanout exchanges for guaranteed delivery
  • Consider persistent queues and durable subscriptions for mission-critical data
  • Use RabbitMQ acknowledgments for reliable delivery

Memory Leaks

Problem: Subscriptions not cleaned up

Solutions:

  • Always call removeGroup() in ngOnDestroy() lifecycle hook
  • Use [data-rabbit-groups] directive for automatic cleanup
  • Unsubscribe from observables in component teardown
  • Monitor declaredExchanges set on server side

Best Practices

  • Use semantic group names β€” user-{id}, room-{id}, notifications, etc.
  • Prefer directives β€” Use [data-rabbit-groups] for automatic lifecycle management
  • Wait for connection β€” Always await waitForConnection() before publishing
  • Handle errors β€” Subscribe to onStompError for error handling
  • Clean up resources β€” Properly unsubscribe and disconnect on component/app destruction
  • Use environment variables β€” Externalize RabbitMQ connection configuration
  • Monitor connections β€” Use RabbitMQ Management UI to monitor WebSocket connections
  • Secure credentials β€” Never hardcode credentials, use environment variables or secret management

🧭 Documentation

Related Documentation

External Resources

🀝 Contributing

Issues and pull requests are welcome.

Guidelines

  • Include tests for new features
  • Update documentation for behavior changes
  • Follow existing code patterns and naming conventions
  • Test with RabbitMQ 3.x and latest Web STOMP plugin

πŸ“„ License

Apache 2.0


JWebMP RabbitMQ-Comms β€” Real-time browser-to-RabbitMQ connectivity for reactive web applications.

Built with ❀️ using Java 25+, Vert.x 5, Angular 20, RabbitMQ, and STOMP.

About

Connects browsers to RabbitMQ for JWebMP site updates

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages