Skip to content

Commit

Permalink
Delete subscriptions when topic is deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
davidMcneil committed Jun 23, 2018
1 parent 83f068f commit b831337
Show file tree
Hide file tree
Showing 22 changed files with 269 additions and 152 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ rust:
- stable
- beta
- nightly
before_script:
- rustup component add rustfmt-preview
cache: cargo
matrix:
allow_failures:
- rust: nightly
script:
- cargo fmt --all -- --check
- cargo clean
- cargo build
- cargo test
Expand Down
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
"getpid",
"gitignore",
"hoverable",
"lish",
"marginless",
"millis",
"minireset",
"mixins",
"musl",
"nexts",
"paddingless",
"psutil",
"pubsub",
"rocket",
"rustfmt",
"rustup",
"serde",
"serializer",
"structopt",
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ open = "1.2.1"
parking_lot = "0.6.2"
psutil = "1.1.1"
rand = "0.5.2"
reqwest = "0.8.6"
serde = "1.0.66"
serde_derive = "1.0.66"
serde_json = "1.0.21"
Expand Down
135 changes: 87 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,30 @@ Courier provides an in-memory, non-distributed pub/sub service with an http, jso

**Linux** - Simply grab the [latest release](https://github.com/davidMcneil/courier/releases/latest). It is 100% statically linked and _should_ run on any x86, unix-like system.

**macOS** - Currently Courier is not distributed for macOS.

**Windows** - Currently Courier is not distributed for Windows.

## HTTP JSON API
## HTTP JSON API <a name="http_json_api"></a>

**Table of Contents**

* [Topic End Points](#topic_end_points)
* [Create](#topic_create)
* [Update](#topic_update)
* [Delete](#topic_delete)
* [Get](#topic_get)
* [List](#topic_list)
* [Subscriptions](#topic_subscriptions)
* [Publish](#topic_publish)
* [Subscription End Points](#subscription_end_points)
* [Create](#subscription_create)
* [Update](#subscription_update)
* [Delete](#subscription_delete)
* [Get](#subscription_get)
* [List](#subscription_list)
* [Pull](#subscription_pull)
* [Ack](#subscription_ack)
- [Topic End Points](#topic_end_points)
- [Create](#topic_create)
- [Update](#topic_update)
- [Delete](#topic_delete)
- [Get](#topic_get)
- [List](#topic_list)
- [Subscriptions](#topic_subscriptions)
- [Publish](#topic_publish)
- [Subscription End Points](#subscription_end_points)
- [Create](#subscription_create)
- [Update](#subscription_update)
- [Delete](#subscription_delete)
- [Get](#subscription_get)
- [List](#subscription_list)
- [Pull](#subscription_pull)
- [Ack](#subscription_ack)

All messages require the following HTTP headers to be set:

Expand All @@ -50,7 +52,10 @@ All messages require the following HTTP headers to be set:
```json
{
"name": "string",
"message_ttl": "u32"
"message_ttl": "i64",
"ttl": "i64",
"created": "string",
"updated": "string"
}
```

Expand All @@ -68,7 +73,10 @@ All messages require the following HTTP headers to be set:
{
"name": "string",
"topic": "string",
"ack_deadline": "u32"
"ack_deadline": "i64",
"ttl": "i64",
"created": "string",
"updated": "string"
}
```

Expand Down Expand Up @@ -124,14 +132,16 @@ Create a new topic.

```json
{
"message_ttl": "u32"
"message_ttl": "u32",
"ttl": "u32"
}
```

| Parameter | Description | Units | Format | Required |
| ----------- | ---------------------------------------------------------------------- | ------- | ------ | -------- |
| topic | The unique name of the topic, a random name will be generated if empty | n/a | path | false |
| message_ttl | The time to live (ttl) applied to all messages | seconds | body | false |
| message_ttl | The time to live (ttl) applied to all messages, use 0 for no ttl | seconds | body | false |
| ttl | The time to live (ttl) of the topic, use 0 for no ttl | seconds | body | false |

##### Response

Expand All @@ -142,20 +152,22 @@ Create a new topic.

#### Update - (PATCH) /api/v0/topics/&lt;topic&gt; <a name="topic_update"></a>

Update a topic.
Update a topic. Updates the topic's `updated` field regardless of if a value is actually updated.

##### Request

```json
{
"message_ttl": "u32"
"message_ttl": "u32",
"ttl": "u32"
}
```

| Parameter | Description | Units | Format | Required |
| ----------- | ---------------------------------------------- | ------- | ------ | -------- |
| topic | The name of the topic | n/a | path | true |
| message_ttl | The time to live (ttl) applied to all messages | seconds | body | false |
| Parameter | Description | Units | Format | Required |
| ----------- | ---------------------------------------------------------------- | ------- | ------ | -------- |
| topic | The name of the topic | n/a | path | true |
| message_ttl | The time to live (ttl) applied to all messages, use 0 for no ttl | seconds | body | false |
| ttl | The time to live (ttl) of the topic, use 0 for no ttl | seconds | body | false |

##### Response

Expand All @@ -166,7 +178,7 @@ Update a topic.

#### Delete - (DELETE) /api/v0/topics/&lt;topic&gt; <a name="topic_delete"></a>

Delete a topic.
Delete a topic. This will also delete all the subscriptions subscribed to this topic.

##### Request

Expand Down Expand Up @@ -233,7 +245,7 @@ List all of the subscription names which are subscribed to this topic.

#### Publish - (POST) /api/v0/topics/&lt;topic&gt;/publish <a name="topic_publish"></a>

Add messages to a topic.
Add messages to a topic. Updates the topics `updated` fields

```json
{
Expand Down Expand Up @@ -267,6 +279,7 @@ Create a new subscription.
{
"topic": "string",
"ack_deadline": "u32",
"ttl": "u32",
"historical": "bool"
}
```
Expand All @@ -276,6 +289,7 @@ Create a new subscription.
| subscription | The unique name of the subscription, a random name will be generated if empty | | path | false |
| topic | The name of the topic to subscribe | | body | true |
| ack_deadline | The amount of time given to ack a message before it is resent | seconds | body | false |
| ttl | The time to live (ttl) of the subscription, use 0 for no ttl | seconds | body | false |
| historical | Should this subscription start pulling from the first message that is part of the subscribed topic, otherwise it will only pull messages added after the subscription is created | | body | false |

##### Response
Expand All @@ -287,20 +301,22 @@ Create a new subscription.

#### Update - (PATCH) /api/v0/subscriptions/&lt;subscription&gt; <a name="subscription_update"></a>

Update a subscription.
Update a subscription. Update the subscriptions `updated` field regardless of if a value is actually updated.

##### Request

```json
{
"ack_deadline": "u32"
"ack_deadline": "u32",
"ttl": "u32"
}
```

| Parameter | Description | Units | Format | Required |
| ------------ | ------------------------------------------------------------- | ------- | ------ | -------- |
| subscription | The name of the subscription | n/a | path | true |
| ack_deadline | The amount of time given to ack a message before it is resent | seconds | body | false |
| ttl | The time to live (ttl) of the subscription, use 0 for no ttl | seconds | body | false |

##### Response

Expand Down Expand Up @@ -361,7 +377,7 @@ List all of the subscriptions.

#### Pull - (POST) /api/v0/subscriptions/&lt;subscription&gt;/pull <a name="subscription_pull"></a>

Pull messages from a subscription.
Pull messages from a subscription. Updates the subscriptions `updated` field.

```json
{
Expand All @@ -385,7 +401,7 @@ Pull messages from a subscription.

#### Ack - (POST) /api/v0/subscriptions/&lt;subscription&gt;/ack <a name="subscription_ack"></a>

Acknowledged that messages have been processed.
Acknowledged that messages have been processed. Updates the subscriptions `updated` field.

```json
{
Expand Down Expand Up @@ -416,8 +432,17 @@ For the time being, Courier uses a nightly version of the rust compiler due to i
> rustup install nightly-2018-03-29
> rustup default nightly-2018-03-29

Courier also uses [tarpaulin](https://github.com/xd009642/tarpaulin) and [cross](https://github.com/japaric/cross) for doing code coverage and cross compilation respectively. They can be installed with:
Courier also uses:

- [clippy](https://github.com/rust-lang-nursery/rust-clippy) - for linting
- [rustfmt](https://github.com/xd009642/tarpaulin) - for code formatting
- [tarpaulin](https://github.com/xd009642/tarpaulin) - for code coverage
- [cross](https://github.com/japaric/cross) - for cross compilation

These can be installed with:

> cargo +nightly install clippy
> rustup component add rustfmt-preview
> cargo install cargo-tarpaulin
> cargo install cross

Expand All @@ -429,27 +454,41 @@ Run the test suite

> cargo test

Format the code

> cargo fmt

Lint with clippy

> cargo clippy --lib --bins

Check test coverage

> cargo tarpaulin --ignore-tests --line --no-count

Perform a cross release build with the [msul](https://www.musl-libc.org/) target
Perform a cross release build with the [musl](https://www.musl-libc.org/) target

> cross build --release --target=x86_64-unknown-linux-musl

## Road Map

* [x] Add metrics and an api to retrieve them
* [ ] Add management ui page
* [ ] Benchmark performance
* [ ] Use a concurrent hashmap instead of having the state of the app behind a single mutex
* [ ] Add other protocols
* TCP Binary
* gRPC
* ...
* [ ] Write drivers for a variety of languages
* [ ] Add durability (write to disk)
* [ ] Make distributed and fault tolerant
## Web Development

Courier uses [yarn](https://yarnpkg.com/) for web development, but the commands should work equally well with npm.

Install dependencies

> yarn install

Start the development server

> yarn start

Create a production build

> yarn build

Clean

> yarn clean

## Example Commands

Expand Down
3 changes: 3 additions & 0 deletions examples/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub fn main() {
println!("example!");
}
4 changes: 2 additions & 2 deletions src/http_protocol/general_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::sync::Arc;
use http_protocol::HttpState;

static HTML: &'static str = include_str!("../../web/dist/index.html");
static CSS: &'static str = include_str!("../../web/dist/src.036c3682.css");
static JS: &'static str = include_str!("../../web/dist/src.b423b4bf.js");
static CSS: &'static str = include_str!("../../web/dist/src.71107acb.css");
static JS: &'static str = include_str!("../../web/dist/src.d29a4435.js");

pub fn html(_: HttpRequest) -> HttpResponse {
HttpResponse::Ok()
Expand Down
4 changes: 2 additions & 2 deletions src/http_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub fn create(config: Config) -> impl Fn() -> Vec<Box<server::HttpHandler>> {
App::new()
.prefix("/web")
.route("/ui", Method::GET, general_handlers::html)
.route("/src.036c3682.css", Method::GET, general_handlers::css)
.route("/src.b423b4bf.js", Method::GET, general_handlers::js)
.route("/src.71107acb.css", Method::GET, general_handlers::css)
.route("/src.d29a4435.js", Method::GET, general_handlers::js)
.middleware(Logger::new(LOGGER_FORMAT))
.middleware(cors::Cors::build().finish())
.boxed(),
Expand Down
47 changes: 47 additions & 0 deletions src/http_protocol/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,53 @@ fn http_protocol_ttls() {
assert_eq!(StatusCode::NOT_FOUND, status);
}

#[test]
fn http_protocol_delete_topic_subscriptions() {
let (_, mut server) = get_server();

// Create topics
let topic_config = TopicCreateConfig {
message_ttl: Some(2),
ttl: Some(0),
};
get_status(&mut server, "topics/topic0", Method::PUT, topic_config);

// Create subscriptions
let subscription_config = SubscriptionCreateConfig {
topic: String::from("topic0"),
ack_deadline: Some(1),
ttl: None,
historical: Some(true),
};
get_status(
&mut server,
"subscriptions/sub0",
Method::PUT,
subscription_config,
);
let subscription_config = SubscriptionCreateConfig {
topic: String::from("topic0"),
ack_deadline: Some(1),
ttl: Some(1),
historical: Some(true),
};
get_status(
&mut server,
"subscriptions/sub1",
Method::PUT,
subscription_config,
);

// Delete the topic
get_status(&mut server, "topics/topic0", Method::DELETE, ());

// Try and get the subscriptions
let status = get_status(&mut server, "subscriptions/sub0", Method::GET, ());
assert_eq!(StatusCode::NOT_FOUND, status);
let status = get_status(&mut server, "subscriptions/sub1", Method::GET, ());
assert_eq!(StatusCode::NOT_FOUND, status);
}

#[test]
fn http_protocol_basic() {
let (_, mut server) = get_server();
Expand Down
Loading

0 comments on commit b831337

Please sign in to comment.