Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish/Subscribe pattern #3140

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 218 additions & 0 deletions publish-subsribe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
---
title: "Publish/Subscribe"
shortTitle: "Pub/Sub Pattern"
description: "A messaging pattern that enables loose coupling between publishers and subscribers through message topics"
category: Behavioral
language: en
tag:
- Messaging
- Event-Driven
- Decoupling
---

## Also known as

- Pub/Sub
- Observer Pattern (similar but not identical)

## Intent

Establish a one-to-many, many-to-one, or many-to-many dependency between objects where multiple publishers send messages to topics, and multiple subscribers can receive those messages without direct knowledge of each other, promoting loose coupling and scalability.

## Core Concepts

### Topics and Messaging Components

A topic is a distribution mechanism for publishing messages between message producers and message consumers in a one-to-many relationship. Key components include:

- **Topics**: Destinations where publishers send messages and from which subscribers receive them. Unlike queues (used in point-to-point messaging), topics can have multiple consumers.

- **Message Producers**: Lightweight objects created to send messages to a topic. They are typically created per topic and can be created for each message since they don't consume significant resources.

- **Message Consumers**: Objects that subscribe to topics and receive messages. Consumers can receive messages either:
- Synchronously: By calling a `receive()` method that blocks until a message arrives
- Asynchronously: By implementing a message listener with an `onMessage()` callback (which is discussed in this implementation)

### Types of Subscriptions

1. **Non-Durable Subscriptions**

- Simplest form of subscription
- Exists only while the consumer is active
- Messages sent while the subscriber is inactive are missed
- Best for real-time data where missing messages is acceptable
- Example: Live sports score updates

2. **Durable Subscriptions**

- Maintains subscription state even when subscriber is offline
- Messages are stored by JMS provider until consumer reconnects
- Requires a unique client identifier for persistence
- Two subtypes:
- Unshared: Only one consumer can use the subscription
- Shared: Multiple consumers can share the subscription
- Example: Critical business notifications

3. **Shared Subscriptions**
- Allows multiple consumers to share message load
- Messages are distributed among active consumers
- Can be combined with durability
- Useful for load balancing and high-throughput scenarios
- Example: Distributed processing of bank transactions

### Messaging Infrastructure

The JMS (Java Message Service) provider handles:

- Message persistence for durable subscriptions
- Message distribution to appropriate subscribers
- Connection and session management
- Transaction support when integrated with JTA
- Load balancing for shared subscriptions

## Detailed Explanation with Real-World Examples

Real-world example

> Consider a news distribution system where different types of subscribers receive news updates:
>
> - Regular subscribers who only receive messages while they're online
> - Durable subscribers who receive missed messages when they reconnect
> - Shared subscribers who distribute the message load among multiple instances
> This is exactly what our demonstration implements in the App.java examples.

In plain words

> A news publishing system where publishers can send news to topics, and different types of subscribers (basic, durable, shared) can receive these updates in various ways, as demonstrated in our three main scenarios: basic publish-subscribe, durable subscriptions, and shared subscriptions.

Wikipedia says

> Publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, may be interested.

## Programmatic Example

### 1. Basic Publish-Subscribe Pattern

The most straightforward implementation where all subscribers receive all messages:

```java
// Create basic subscribers that receive messages only while online
TopicSubscriber basicSub1 = new TopicSubscriber(
"BasicSub1", "NEWS", SubscriberType.NONDURABLE, null
);
TopicSubscriber basicSub2 = new TopicSubscriber(
"BasicSub2", "NEWS", SubscriberType.NONDURABLE, null
);

// Create publisher and send messages
TopicPublisher publisher = new TopicPublisher("NEWS");
publisher.publish(new Message("Basic message 1", "NEWS"));
publisher.publish(new Message("Basic message 2", "NEWS"));

// Both BasicSub1 and BasicSub2 will receive all messages
```

### 2. Durable Subscriptions Pattern

Demonstrates how subscribers can receive messages that were published while they were offline:

```java
// Create durable subscriber with client ID for persistence
TopicSubscriber durableSub = new TopicSubscriber(
"DurableSub", "NEWS", SubscriberType.DURABLE, "durable-client"
);

// First message - subscriber receives while online
publisher.publish(new Message("Durable message while online", "NEWS"));

// Subscriber goes offline (close connection)
durableSub.close();

// Message sent while subscriber is offline
publisher.publish(new Message("Durable message while offline", "NEWS"));

// When subscriber reconnects, it receives the missed message
durableSub = new TopicSubscriber(
"DurableSub", "NEWS", SubscriberType.DURABLE, "durable-client"
);
```

### 3. Shared Subscriptions Pattern

Shows how messages can be distributed among multiple subscribers for load balancing:

```java
// Create shared subscribers that will distribute the message load
TopicSubscriber sharedSub1 = new TopicSubscriber(
"SharedSub1", "NEWS", SubscriberType.SHARED, null
);
TopicSubscriber sharedSub2 = new TopicSubscriber(
"SharedSub2", "NEWS", SubscriberType.SHARED, null
);

// Send multiple messages that will be distributed
publisher.publish(new Message("Shared message 1", "NEWS"));
publisher.publish(new Message("Shared message 2", "NEWS"));
publisher.publish(new Message("Shared message 3", "NEWS"));
publisher.publish(new Message("Shared message 4", "NEWS"));

// Messages are distributed between SharedSub1 and SharedSub2
// Each subscriber receives approximately half of the messages
```

## Implementation Details

- Basic subscribers demonstrate the simplest form of pub/sub where all subscribers receive all messages
- Durable subscribers maintain their subscription state even when offline, ensuring no messages are missed
- Shared subscribers enable load balancing by distributing messages among multiple consumers
- Messages are delivered asynchronously through the `onMessage()` callback
- The JMS provider (ActiveMQ in this implementation) handles message persistence and distribution

## When to Use

- When you need different types of message consumption patterns:
- Basic subscribers for simple real-time updates
- Durable subscribers for critical messages that can't be missed
- Shared subscribers for load balancing
- When subscribers need to receive messages even after being offline (demonstrated in durableSubscriptions() example)
- When message load needs to be distributed among multiple consumers (demonstrated in sharedSubscriptions() example)

## Real-World Applications

Any event-driven system that requires loose coupling between publishers and subscribers can benefit from the pub/sub pattern. Some common examples include:

- IOT systems where multiple devices publish data to a central server
- Enterprise messaging systems (JMS) for inter-application communication
- Microservices architectures where services communicate through message brokers
- News distribution systems (as demonstrated in our NEWS topic example)
- Load-balanced message processing systems (using shared subscriptions)
- Message broadcasting systems (using basic pub/sub)

## Benefits and Trade-offs

Benefits:

- Loose coupling between publishers and subscribers
- Scalability through message distribution
- Flexibility to add/remove components
- Support for offline components through durable subscriptions
- Asynchronous communication

Trade-offs:

- Additional complexity in message delivery guarantees
- Potential performance overhead from message broker
- Message ordering challenges in distributed systems
- Durable subscriptions which allow for message persistence add a coniderable overhead

## Related Patterns

- Observer Pattern
- Mediator Pattern
- Event Sourcing
- Message Queue Pattern

## References

- Java EE Tutorial - JMS Messaging
- Enterprise Integration Patterns
Binary file added publish-subsribe/pom.xml
Binary file not shown.
144 changes: 144 additions & 0 deletions publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package com.iluwatar.publishsubscribe;

import com.iluwatar.publishsubscribe.jms.JmsUtil;
import com.iluwatar.publishsubscribe.model.Message;
import com.iluwatar.publishsubscribe.publisher.TopicPublisher;
import com.iluwatar.publishsubscribe.subscriber.SubscriberType;
import com.iluwatar.publishsubscribe.subscriber.TopicSubscriber;
import java.util.ArrayList;
import java.util.List;

/**
* Main application demonstrating different aspects of JMS publish-subscribe pattern.
*/
Comment on lines +11 to +13
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Describe the pattern and explain how this example implements it

public final class App {
private static TopicPublisher publisher;

private App() {
}

/**
* Main method to run the demo.
*/
public static void main(String[] args) {
try {
publisher = new TopicPublisher("NEWS");
demonstrateBasicPubSub();
demonstrateDurableSubscriptions();
demonstrateSharedSubscriptions();
} catch (Exception e) {
System.err.println("Error in publish-subscribe demo: " + e.getMessage());
e.printStackTrace();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a proper logger

} finally {
cleanup(null);
}
JmsUtil.closeConnection();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be inside the finally block?

}

/**
* Demonstrates basic publish-subscribe with non-durable subscribers.
* All subscribers receive all messages.
*/
private static void demonstrateBasicPubSub() throws Exception {
System.out.println("\n=== Basic Publish-Subscribe Demonstration ===");
List<TopicSubscriber> subscribers = new ArrayList<>();

try {
// Create basic subscribers
subscribers.add(new TopicSubscriber("BasicSub1", "NEWS", SubscriberType.NONDURABLE, null));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use constants for strings like this

subscribers.add(new TopicSubscriber("BasicSub2", "NEWS", SubscriberType.NONDURABLE, null));
Thread.sleep(100); // Wait for subscribers to initialize

// Publish messages - all subscribers should receive all messages
publisher.publish(new Message("Basic message 1", "NEWS"));
publisher.publish(new Message("Basic message 2", "NEWS"));

Thread.sleep(1000); // Wait for message processing
} finally {
cleanup(subscribers);
System.out.println("=== Basic Demonstration Completed ===\n");
}
}

/**
* Demonstrates durable subscriptions that persist messages when subscribers are
* offline.
*/
private static void demonstrateDurableSubscriptions() throws Exception {
System.out.println("\n=== Durable Subscriptions Demonstration ===");
List<TopicSubscriber> subscribers = new ArrayList<>();

try {
// Create durable subscriber
TopicSubscriber durableSub = new TopicSubscriber("DurableSub", "NEWS",
SubscriberType.DURABLE, "durable-client");
subscribers.add(durableSub);
Thread.sleep(100);

// First message - subscriber is online
publisher.publish(new Message("Durable message while online", "NEWS"));
Thread.sleep(500);

// Disconnect subscriber
durableSub.close();
subscribers.clear();

// Send message while subscriber is offline
publisher.publish(new Message("Durable message while offline", "NEWS"));
Thread.sleep(500);

// Reconnect subscriber - should receive offline message
subscribers.add(new TopicSubscriber("DurableSub", "NEWS",
SubscriberType.DURABLE, "durable-client"));
Thread.sleep(1000);

} finally {
cleanup(subscribers);
System.out.println("=== Durable Demonstration Completed ===\n");
}
}

/**
* Demonstrates shared subscriptions where messages are distributed among
* subscribers.
*/
private static void demonstrateSharedSubscriptions() throws Exception {
System.out.println("\n=== Shared Subscriptions Demonstration ===");
List<TopicSubscriber> subscribers = new ArrayList<>();

try {
// Create shared subscribers
subscribers.add(new TopicSubscriber("SharedSub1", "NEWS", SubscriberType.SHARED, null));
subscribers.add(new TopicSubscriber("SharedSub2", "NEWS", SubscriberType.SHARED, null));
Thread.sleep(100);

// Messages should be distributed between subscribers
for (int i = 1; i <= 4; i++) {
publisher.publish(new Message("Shared message " + i, "NEWS"));
Thread.sleep(100);
}

Thread.sleep(1000);
} finally {
cleanup(subscribers);
System.out.println("=== Shared Demonstration Completed ===\n");
}
}

/**
* Cleanup specified subscribers and optionally the publisher.
*/
private static void cleanup(List<TopicSubscriber> subscribers) {
try {
if (subscribers != null) {
for (TopicSubscriber subscriber : subscribers) {
if (subscriber != null) {
subscriber.close();
}
}
}
} catch (Exception e) {
System.err.println("Error during subscriber cleanup: " + e.getMessage());
}
}
}
Loading
Loading