Real-time bidirectional communication bridge connecting Angular browser clients to RabbitMQ message broker via WebSocket/STOMP. Enables reactive server-to-browser push notifications, live updates, and pub/sub messaging patterns for JWebMP applications.
Built on RabbitMQ Β· Vert.x RabbitMQ Client Β· STOMP.js Β· SockJS Β· JPMS module com.jwebmp.rabbit Β· Java 25+
<dependency>
<groupId>com.jwebmp</groupId>
<artifactId>jwebmp-rabbitmq</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>Gradle (Kotlin DSL)
implementation("com.jwebmp:jwebmp-rabbitmq:2.0.0-SNAPSHOT")Enable the RabbitMQ Web STOMP plugin on your RabbitMQ server:
rabbitmq-plugins enable rabbitmq_web_stompDefault WebSocket endpoint: ws://localhost:15674/ws
- Real-Time Browser Communication β WebSocket-based bidirectional messaging between Angular clients and RabbitMQ broker
- STOMP Protocol β Industry-standard STOMP over WebSocket with automatic reconnection and heartbeat support
- Group-Based Pub/Sub β Dynamic subscription management with RabbitMQ fanout exchanges for broadcast messaging
- Angular Directive Integration β Declarative HTML attribute
[data-rabbit-groups]for automatic group subscription - Automatic Exchange Management β Server-side exchange declaration and lifecycle management via Vert.x RabbitMQ client
- Session-Aware Messaging β Automatic subscription to session-specific groups using
ContextIdService - Connection Resilience β Automatic reconnection with configurable delays and connection state observables
- Server-Side WebSocket Hooks β Integrates with GuicedEE WebSocket lifecycle events (
onAddToGroup,onRemoveFromGroup,onPublish) - TypeScript Client Generation β Fully typed Angular provider and directive generated from Java annotations
- SockJS Fallback β Graceful degradation for browsers/proxies without native WebSocket support
- JPMS Modular β Fully modular with explicit dependencies via Java Platform Module System
Include jwebmp-rabbitmq in your JWebMP application:
<dependency>
<groupId>com.jwebmp</groupId>
<artifactId>jwebmp-rabbitmq</artifactId>
</dependency>The module uses GuicedEE's RabbitMQ module. Configure connection in your module-info.java or via environment variables:
module com.myapp {
requires com.jwebmp.rabbit;
requires com.guicedee.rabbit;
opens com.myapp to com.google.guice;
}Environment variables for RabbitMQ connection:
| Variable | Purpose | Default |
|---|---|---|
RABBITMQ_HOST |
RabbitMQ hostname | localhost |
RABBITMQ_PORT |
RabbitMQ port | 5672 |
RABBITMQ_WEB_STOMP_PORT |
WebSocket STOMP port | 15674 |
RABBITMQ_USERNAME |
Username | guest |
RABBITMQ_PASSWORD |
Password | guest |
RABBITMQ_VIRTUAL_HOST |
Virtual host | / |
@Inject
private RabbitPublishToGroup publisher;
public void sendUpdate() {
String message = "{\"type\":\"update\",\"data\":\"Hello from server!\"}";
publisher.publish("Everyone", message);
}
public void sendToUser(String userId, String message) {
publisher.publish("user-" + userId, message);
}Use the [data-rabbit-groups] directive to automatically subscribe to RabbitMQ exchanges:
@NgComponent
@NgDataVariable(variableName = "messages", value = "[]")
public class DashboardComponent implements INgComponent<DashboardComponent> {
@Override
public String render() {
return """
<div [data-rabbit-groups]="'user-dashboard'">
<h1>Live Dashboard</h1>
<div *ngFor="let message of messages">
{{ message }}
</div>
</div>
""";
}
}When the component mounts, it automatically:
- Subscribes to the
user-dashboardexchange - Receives all messages published to that exchange
- Unsubscribes when the component is destroyed
Inject RabbitMQProvider for manual control:
// TypeScript (generated from Java annotations)
import { RabbitMQProvider } from './rabbit-mq-provider';
@Component({
selector: 'app-notifications',
template: `<div>{{ notification }}</div>`
})
export class NotificationsComponent implements OnInit, OnDestroy {
notification: string = '';
constructor(private rabbitMqProvider: RabbitMQProvider) {}
ngOnInit() {
// Subscribe to a group
this.rabbitMqProvider.addGroup('notifications');
// Send a message
this.rabbitMqProvider.sendMessage('/exchange/admin',
JSON.stringify({ action: 'ping' }));
}
ngOnDestroy() {
// Unsubscribe from group
this.rabbitMqProvider.removeGroup('notifications');
}
}import { RabbitMQProvider } from './rabbit-mq-provider';
export class AppComponent implements OnInit {
isConnected: boolean = false;
constructor(private rabbitMqProvider: RabbitMQProvider) {}
ngOnInit() {
// Wait for connection before performing actions
this.rabbitMqProvider.waitForConnection().then(() => {
this.isConnected = true;
console.log('RabbitMQ connected!');
});
// Or subscribe to connection state
RabbitMQProvider.connected.subscribe(connected => {
this.isConnected = connected;
});
}
}βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RabbitMQ Broker β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Exchange: "Everyone" (fanout) β β
β β Exchange: "user-dashboard" (fanout) β β
β β Exchange: "notifications" (fanout) β β
β β Exchange: "__vertx.session-abc123" (fanout, auto-delete) β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β² β β
β β AMQP 0.9.1 β STOMP/WebSocket β
β β (port 5672) β (port 15674) β
βββββββββββΌβββββββββββββββββββββββββββββββββββββββββββββΌβββββββββββββββββββ
β β
β βΌ
βββββββββββ΄βββββββββββββββ ββββββββββββββββββββββββββββββββ
β Server-Side (Java) β β Browser (Angular/TypeScript) β
β β β β
β RabbitPublishToGroup β β RabbitMQProvider β
β β β β β β
β ββ onAddToGroup β β ββ addGroup() β
β ββ publish() β β ββ removeGroup() β
β ββ onRemoveFrom β β ββ sendMessage() β
β Group β β ββ subscribe() β
β β β β
β RabbitMQDirective β β [data-rabbit-groups] β
β (Angular annotation)β β (HTML directive) β
ββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββββ
| Component | Location | Purpose |
|---|---|---|
| RabbitMQProvider | Angular Service | STOMP.js client wrapper, connection management, subscription handling |
| RabbitMQDirective | Angular Directive | HTML attribute [data-rabbit-groups] for declarative subscriptions |
| RabbitPublishToGroup | Server (Java) | Server-side message publisher via Vert.x RabbitMQ client |
| GuicedEE WebSocket Hooks | Server (Java) | Integration with GuicedEE WebSocket lifecycle events |
- Client Subscription β Angular component subscribes to a group (e.g.,
user-dashboard) - Exchange Declaration β Server-side
RabbitPublishToGroup.onAddToGroup()declares a fanout exchange if not exists - Message Publishing β Server publishes to exchange:
client.basicPublish("user-dashboard", "", buffer) - Message Distribution β RabbitMQ fanout exchange broadcasts to all subscribed browser clients
- Client Processing β Angular
RabbitMQProvider.subscribe()receives message and processes viaEventBusService - Cleanup β Component destruction triggers
removeGroup(), unsubscribing from exchange
The module automatically subscribes clients to session-specific groups:
- Everyone β Global broadcast group (all connected clients)
- Session Group β Unique group per browser session (e.g.,
__vertx.session-abc123) - Custom Groups β Application-defined groups (e.g.,
user-{userId},room-{roomId})
Session groups are automatically created and deleted based on ContextIdService observable.
Exchanges are declared with the following properties:
client.exchangeDeclare(
groupName, // Exchange name
"fanout", // Exchange type
!groupName.startsWith("__vertx"), // Durable (persistent)
groupName.startsWith("__vertx") // Auto-delete (temporary)
);- Persistent exchanges β Groups without
__vertxprefix are durable and survive broker restarts - Temporary exchanges β Groups with
__vertxprefix are auto-deleted when last consumer disconnects
@Inject
private RabbitPublishToGroup publisher;
// Publish to a group
publisher.publish("groupName", jsonMessage);
// Publish with automatic exchange creation
publisher.onAddToGroup("newGroup").thenAccept(success -> {
if (success) {
publisher.publish("newGroup", "Hello!");
}
});The RabbitMQProvider exposes configurable fields:
@NgField("url : string = 'ws://127.0.0.1:15674/ws';")
@NgField("user : string = 'guest';")
@NgField("pass : string = 'guest';")
@NgField("virtualHost : string = 'UWEAssist';")Override in your application:
import { RabbitMQProvider } from './rabbit-mq-provider';
export class AppModule {
constructor(private rabbitMqProvider: RabbitMQProvider) {
this.rabbitMqProvider.url = 'ws://production.example.com:15674/ws';
this.rabbitMqProvider.user = 'app-user';
this.rabbitMqProvider.pass = 'secure-password';
this.rabbitMqProvider.virtualHost = 'production';
}
}STOMP client reconnection is configured in the constructor:
RabbitMQProvider.client = new Client({
brokerURL: this.url,
reconnectDelay: 5000, // 5 seconds between reconnection attempts
// heartbeatIncoming: 4000, // Optional: incoming heartbeat interval
// heartbeatOutgoing: 4000 // Optional: outgoing heartbeat interval
});@Singleton
public class RabbitPublishToGroup {
// Declare exchange for group (idempotent)
CompletableFuture<Boolean> onAddToGroup(String groupName);
// Publish message to group
boolean publish(String groupName, String message);
// Remove group exchange (only for __vertx.* groups)
CompletableFuture<Boolean> onRemoveFromGroup(String groupName);
}@NgDirective(value = "[data-rabbit-groups]", standalone = true)
public class RabbitMQDirective {
// Add component to RabbitMQ group
boolean addGroup(IComponentHierarchyBase<?, ?> component, String groupName);
}Usage in Java component:
@NgComponent
public class MyComponent implements INgComponent<MyComponent> {
@Override
public String render() {
return "<div [data-rabbit-groups]=\"'my-group'\">Content</div>";
}
}@Injectable({ providedIn: 'root' })
export class RabbitMQProvider {
// Connection state observable
static connected: BehaviorSubject<boolean>;
// Subscribe to RabbitMQ exchange
subscribe(destination: string): void;
// Add subscription to group
addGroup(group: string): void;
// Remove subscription from group
removeGroup(group: string): void;
// Connect to broker
connect(): void;
// Disconnect from broker
disconnect(): void;
// Publish message to destination
sendMessage(destination: string, body: string): void;
// Wait for connection to be established
waitForConnection(): Promise<boolean>;
}<!-- Automatic subscription on mount, unsubscribe on destroy -->
<div [data-rabbit-groups]="'notifications'">
Live notifications appear here
</div>
<!-- Dynamic group binding -->
<div [data-rabbit-groups]="currentUserGroup">
User-specific content
</div>
<!-- Multiple components can subscribe to the same group -->
<app-dashboard [data-rabbit-groups]="'dashboard-updates'"></app-dashboard>
<app-chart [data-rabbit-groups]="'dashboard-updates'"></app-chart># Run all tests
mvn clean test
# Run specific test
mvn test -Dtest=RabbitMQWebTestThe module includes a test application demonstrating integration:
@NgApp(value = "RabbitMQWebTest", bootComponent = RabbitMQPage.class)
class RabbitMQWebTest extends NGApplication<RabbitMQWebTest> {
public RabbitMQWebTest() {
getOptions().setTitle("RabbitMQ Web Test");
}
}For integration tests, use Testcontainers with RabbitMQ:
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class RabbitMQIntegrationTest {
private static final RabbitMQContainer rabbitMQ =
new RabbitMQContainer("rabbitmq:3-management")
.withPluginsEnabled("rabbitmq_web_stomp");
@BeforeAll
static void setup() {
rabbitMQ.start();
// Configure connection
System.setProperty("rabbitmq.host", rabbitMQ.getHost());
System.setProperty("rabbitmq.port",
String.valueOf(rabbitMQ.getAmqpPort()));
System.setProperty("rabbitmq.webstomp.port",
String.valueOf(rabbitMQ.getHttpPort()));
}
@Test
void testMessagePublishing() {
// Test implementation
}
}com.jwebmp.rabbit
βββ com.jwebmp.vertx (Vert.x integration, reactive runtime)
βββ com.jwebmp.core.angular (Angular annotation framework)
βββ com.jwebmp.core (JWebMP core)
βββ com.guicedee.rabbit (GuicedEE RabbitMQ integration)
βββ com.guicedee.guicedinjection (Guice DI)
βββ io.vertx.rabbitmq (Vert.x RabbitMQ client)
βββ TypeScript Dependencies:
βββ @stomp/stompjs (STOMP over WebSocket client)
βββ sockjs-client (SockJS fallback transport)
βββ @types/sockjs-client (TypeScript type definitions)
Problem: Client cannot connect to RabbitMQ WebSocket endpoint
Solutions:
- Verify
rabbitmq_web_stompplugin is enabled:rabbitmq-plugins enable rabbitmq_web_stomp - Check firewall allows port 15674 (default Web STOMP port)
- Verify broker URL matches your RabbitMQ server:
ws://your-host:15674/ws - Check RabbitMQ logs for authentication errors
Problem: Messages not received by clients
Solutions:
- Check server logs for exchange declaration errors
- Verify RabbitMQ AMQP connection is established on server side
- Use RabbitMQ Management UI to inspect exchanges:
http://localhost:15672 - Ensure client subscribes after connection is established (use
waitForConnection())
Problem: Some messages are lost during client reconnection
Solutions:
- Use durable exchanges for critical messages (avoid
__vertxprefix) - Implement message queues instead of fanout exchanges for guaranteed delivery
- Consider persistent queues and durable subscriptions for mission-critical data
- Use RabbitMQ acknowledgments for reliable delivery
Problem: Subscriptions not cleaned up
Solutions:
- Always call
removeGroup()inngOnDestroy()lifecycle hook - Use
[data-rabbit-groups]directive for automatic cleanup - Unsubscribe from observables in component teardown
- Monitor
declaredExchangesset on server side
- Use semantic group names β
user-{id},room-{id},notifications, etc. - Prefer directives β Use
[data-rabbit-groups]for automatic lifecycle management - Wait for connection β Always await
waitForConnection()before publishing - Handle errors β Subscribe to
onStompErrorfor error handling - Clean up resources β Properly unsubscribe and disconnect on component/app destruction
- Use environment variables β Externalize RabbitMQ connection configuration
- Monitor connections β Use RabbitMQ Management UI to monitor WebSocket connections
- Secure credentials β Never hardcode credentials, use environment variables or secret management
- JWebMP Core:
JWebMP/README.md - GuicedEE RabbitMQ:
GuicedEE/rabbitmq/README.md - Vert.x Integration:
JWebMP/vertx/README.md - Angular Integration:
JWebMP/plugins/angular/README.md
Issues and pull requests are welcome.
- Include tests for new features
- Update documentation for behavior changes
- Follow existing code patterns and naming conventions
- Test with RabbitMQ 3.x and latest Web STOMP plugin
JWebMP RabbitMQ-Comms β Real-time browser-to-RabbitMQ connectivity for reactive web applications.
Built with β€οΈ using Java 25+, Vert.x 5, Angular 20, RabbitMQ, and STOMP.