Skip to content
This repository has been archived by the owner on Oct 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #144 from amarzavery/neweph2
Browse files Browse the repository at this point in the history
adding user-agent to the AMQP connection
  • Loading branch information
amarzavery authored Sep 13, 2018
2 parents e070e3c + 79c6a01 commit 4daff68
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 83 deletions.
13 changes: 5 additions & 8 deletions processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ async function main() {
const eph = EventProcessorHost.createFromConnectionString(
EventProcessorHost.createHostName("my-host"),
storageCS,
leasecontainerName,
ehCS,
{
eventHubPath: path,
leasecontainerName: leasecontainerName
eventHubPath: path
},
onEphError: (error) => {
console.log("This handler will notify you of any internal errors that happen " +
Expand Down Expand Up @@ -185,13 +185,10 @@ async function startEph(ephName /**string**/) {
const eph = EventProcessorHost.createFromConnectionString(
ephName,
storageCS,
leasecontainerName,
ehCS,
{
eventHubPath: path,
// If the lease container name is not provided, then the EPH will use it's name to create
// a new container. It is important to provide the same container name across different EPH
// instances for the paritions to be load balanced.
leasecontainerName: leasecontainerName,
// This method will provide errors that occur during lease and partition management. The
// errors that occur while receiving messages will be provided in the onError handler
// provided in the eph.start() method.
Expand Down Expand Up @@ -255,10 +252,10 @@ async function main() {
const eph = await EventProcessorHost.createFromIotHubConnectionString(
EventProcessorHost.createHostName("my-host"),
storageCS,
leasecontainerName,
iothubCS,
{
eventHubPath: path,
leasecontainerName: leasecontainerName
eventHubPath: path
}
);
let count = 0;
Expand Down
11 changes: 9 additions & 2 deletions processor/changelog.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
## 2018-09-12 0.2.0
- Added support to automatically balance the load od receiving messages across multiple partitions.
- Added support to automatically balance the load of receiving messages across multiple partitions.
- Added static method to create an EPH from an `IotHubConnectionString`
- Added user-agent to the underlying amqp-connection. This would help in tracking usage of EPH.
- Changed the overall design of EPH.
- Instead of attaching handlers on `eph:message` and `eph:error`, now the handlers need to be passed
as arguments to the `start()` method on EPH.
- Removed an optional property `autoCheckpoint` and added optional properties
- Apart from that an additional handler/method can be passed as an optional property `onEphError`
to EPH. This handler will receive notifications from EPH regarding any errors that occur during
partition management.
- Removed optional property `leasecontainerName` and replaced it with a required parameter `storageContainerName` wherever applicable in all the static methods on `EventProcessorHost`.
- Removed optional property `autoCheckpoint` and added optional properties
- `checkpointManager`
- `onEphError`
- `leaseRenewInterval`
- `leaseDuration`
- Please take a look at the [examples](https://github.com/Azure/azure-event-hubs-node/tree/master/processor/examples) for more details.

## 2018-07-16 0.1.4
- Added an option `autoCheckpoint: false` to not checkpoint the received messages by default.
Expand Down
13 changes: 6 additions & 7 deletions processor/examples/iothubEph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import * as dotenv from "dotenv";
dotenv.config();

const storageCS = process.env.STORAGE_CONNECTION_STRING;
const ehCS = process.env.EVENTHUB_CONNECTION_STRING;
const leasecontainerName = "iothub-test-container";
const iotCS = process.env.IOTHUB_CONNECTION_STRING;
// creates a unique storageContainer name for every run
// if you wish to keep the name same between different runs then use the following then that is fine as well.
const storageContainerName = EventProcessorHost.createHostName("iothub-container");
const ephName = "my-iothub-eph";

/**
Expand Down Expand Up @@ -49,12 +51,9 @@ async function startEph(ephName: string): Promise<EventProcessorHost> {
const eph = await EventProcessorHost.createFromIotHubConnectionString(
ephName,
storageCS!,
ehCS!,
storageContainerName,
iotCS!,
{
// If the lease container name is not provided, then the EPH will use it's name to create
// a new container. It is important to provide the same container name across different EPH
// instances for the paritions to be load balanced.
leasecontainerName: leasecontainerName,
onEphError: (error) => {
console.log(">>>>>>> [%s] Error: %O", ephName, error);
}
Expand Down
12 changes: 6 additions & 6 deletions processor/examples/multiEph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ const path = process.env[entityPath] || "";
const storageCS = process.env[storageConnectionString];
const ehCS = process.env[ehconnectionString];

// set the names of eph and the lease container.
const leasecontainerName = "test-container";
// set the names of eph and the storage container.
// creates a unique storageContainer name for every run
// if you wish to keep the name same between different runs then use the following then that is fine as well.
const storageContainerName = EventProcessorHost.createHostName("test-container");
console.log(">>>> The storage container name is: %s.", storageContainerName);
const ephName1 = "eph-1";
const ephName2 = "eph-2";

Expand Down Expand Up @@ -61,13 +64,10 @@ async function startEph(ephName: string): Promise<EventProcessorHost> {
const eph = EventProcessorHost.createFromConnectionString(
ephName,
storageCS!,
storageContainerName,
ehCS!,
{
eventHubPath: path,
// If the lease container name is not provided, then the EPH will use it's name to create
// a new container. It is important to provide the same container name across different EPH
// instances for the paritions to be load balanced.
leasecontainerName: leasecontainerName,
// This method will provide errors that occur during lease and partition management. The
// errors that occur while receiving messages will be provided in the onError handler
// provided in the eph.start() method.
Expand Down
9 changes: 4 additions & 5 deletions processor/examples/singleEph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ dotenv.config();
const path = process.env.EVENTHUB_NAME;
const storageCS = process.env.STORAGE_CONNECTION_STRING;
const ehCS = process.env.EVENTHUB_CONNECTION_STRING;
const leasecontainerName = "test-container";
// creates a unique storageContainer name for every run
// if you wish to keep the name same between different runs then use the following then that is fine as well.
const storageContainerName = EventProcessorHost.createHostName("test-container");
const ephName = "my-eph";

/**
Expand Down Expand Up @@ -51,13 +53,10 @@ async function startEph(ephName: string): Promise<EventProcessorHost> {
const eph = EventProcessorHost.createFromConnectionString(
EventProcessorHost.createHostName(ephName),
storageCS!,
storageContainerName,
ehCS!,
{
eventHubPath: path,
// If the lease container name is not provided, then the EPH will use it's name to create
// a new container. It is important to provide the same container name across different EPH
// instances for the paritions to be load balanced.
leasecontainerName: leasecontainerName,
onEphError: (error) => {
console.log(">>>>>>> [%s] Error: %O", ephName, error);
}
Expand Down
8 changes: 4 additions & 4 deletions processor/lib/azureBlobLease.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ export class AzureBlobLease extends CompleteLease implements AzureBlobLeaseInfo
* @returns {LeaseInfo} LeaseInfo.
*/
getInfo(): LeaseInfo {
const info = super.getInfo();
(info as LeaseInfo).sequenceNumber = this.sequenceNumber;
(info as LeaseInfo).token = this.token;
(info as LeaseInfo).offset = this.offset;
const info = super.getInfo() as LeaseInfo;
info.sequenceNumber = this.sequenceNumber;
info.token = this.token;
info.offset = this.offset;
log.azurebloblease("[%s] [%s] Lease info is: %o", this.owner, this.partitionId, info);
return (info as LeaseInfo);
}
Expand Down
18 changes: 9 additions & 9 deletions processor/lib/azureStorageCheckpointLeaseManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
if (!result) {
const blobPath = `${this._context.composedBlobPrefix}${partitionId}`;
result = new AzureBlob(this._context.hostName, this._context.storageConnectionString!,
this._context.leasecontainerName, blobPath, this._context.blobService);
this._context.storageContainerName!, blobPath, this._context.blobService);
this._context.blobReferenceByPartition[partitionId] = result;
}
return result;
Expand All @@ -77,32 +77,32 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
}

async leaseStoreExists(): Promise<boolean> {
return await this._context.blobService!.doesContainerExist(this._context.leasecontainerName);
return await this._context.blobService!.doesContainerExist(this._context.storageContainerName!);
}

async createLeaseStoreIfNotExists(): Promise<void> {
await this._context.blobService!.ensureContainerExists(this._context.leasecontainerName);
await this._context.blobService!.ensureContainerExists(this._context.storageContainerName!);
return;
}

async deleteLeaseStore(): Promise<void> {
const blobService = this._context.blobService;
const leasecontainerName = this._context.leasecontainerName;
const storageContainerName = this._context.storageContainerName!;
try {
if (blobService) {
const listResult = await blobService.listBlobsSegmented(leasecontainerName);
const listResult = await blobService.listBlobsSegmented(storageContainerName);
const deleteBlobs: Promise<void>[] = [];
for (const blob of listResult.entries) {
deleteBlobs.push(blobService.deleteBlobIfExists(leasecontainerName, blob.name));
deleteBlobs.push(blobService.deleteBlobIfExists(storageContainerName, blob.name));
}
await Promise.all(deleteBlobs);
await blobService.deleteContainerIfExists(leasecontainerName);
await blobService.deleteContainerIfExists(storageContainerName);
} else {
throw new Error("'blobService' is not defined in the 'hostContext', hence cannot " +
"list all the blobs.");
}
} catch (err) {
const msg = `An error occurred while deleting the lease store '${leasecontainerName}': %O` +
const msg = `An error occurred while deleting the lease store '${storageContainerName}': %O` +
`${err ? err.stack : JSON.stringify(err)}`;
log.error(this._context.withHost(msg));
const info: EPHDiagnosticInfo = {
Expand Down Expand Up @@ -421,7 +421,7 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
const blobService = this._context.blobService;
const withHost = this._context.withHost;
if (blobService) {
const listResult = await blobService.listBlobsSegmented(this._context.leasecontainerName);
const listResult = await blobService.listBlobsSegmented(this._context.storageContainerName!);
log.checkpointLeaseMgr(withHost("Number of blobs: %d"), listResult.entries.length);
return listResult.entries;
} else {
Expand Down
6 changes: 3 additions & 3 deletions processor/lib/completeLease.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ export class CompleteLease extends BaseLease {
* @returns {CompleteLeaseInfo} CompleteLeaseInfo.
*/
getInfo(): CompleteLeaseInfo {
const info = super.getInfo();
(info as CompleteLeaseInfo).epoch = this.epoch;
const info = super.getInfo() as CompleteLeaseInfo;
info.epoch = this.epoch;
log.completeLease("[%s] [%s] Lease info is: %o", this.owner, this.partitionId, info);
return (info as CompleteLeaseInfo);
return info;
}
}
20 changes: 20 additions & 0 deletions processor/lib/eventProcessorHost.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ export class EventProcessorHost {
* @param {string} storageConnectionString Connection string to Azure Storage account used for
* leases and checkpointing. Example DefaultEndpointsProtocol=https;AccountName=<account-name>;
* AccountKey=<account-key>;EndpointSuffix=core.windows.net
* @param {string} storageContainerName Azure Storage container name for use by built-in lease
* and checkpoint manager.
* @param {string} eventHubConnectionString Connection string for the Event Hub to receive from.
* Example: 'Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;
* SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key'
Expand All @@ -167,18 +169,21 @@ export class EventProcessorHost {
static createFromConnectionString(
hostName: string,
storageConnectionString: string,
storageContainerName: string,
eventHubConnectionString: string,
options?: FromConnectionStringOptions): EventProcessorHost {
if (!options) options = {};

validateType("hostName", hostName, true, "string");
validateType("storageConnectionString", storageConnectionString, true, "string");
validateType("storageContainerName", storageContainerName, true, "string");
validateType("eventHubConnectionString", eventHubConnectionString, true, "string");
validateType("options", options, false, "object");

const ephOptions: EventProcessorHostOptions = {
...options,
storageConnectionString: storageConnectionString,
storageContainerName: storageContainerName,
eventHubConnectionString: eventHubConnectionString
};
return new EventProcessorHost(hostName, ephOptions);
Expand Down Expand Up @@ -233,6 +238,8 @@ export class EventProcessorHost {
* @param {string} storageConnectionString Connection string to Azure Storage account used for
* leases and checkpointing. Example DefaultEndpointsProtocol=https;AccountName=<account-name>;
* AccountKey=<account-key>;EndpointSuffix=core.windows.net
* @param {string} storageContainerName Azure Storage container name for use by built-in lease
* and checkpoint manager.
* @param {string} namespace Fully qualified domain name for Event Hubs.
* Example: "{your-sb-namespace}.servicebus.windows.net"
* @param {string} eventHubPath The name of the EventHub.
Expand All @@ -245,6 +252,7 @@ export class EventProcessorHost {
static createFromTokenProvider(
hostName: string,
storageConnectionString: string,
storageContainerName: string,
namespace: string,
eventHubPath: string,
tokenProvider: TokenProvider,
Expand All @@ -253,6 +261,7 @@ export class EventProcessorHost {

validateType("hostName", hostName, true, "string");
validateType("storageConnectionString", storageConnectionString, true, "string");
validateType("storageContainerName", storageContainerName, true, "string");
validateType("namespace", namespace, true, "string");
validateType("eventHubPath", eventHubPath, true, "string");
validateType("tokenProvider", tokenProvider, true, "object");
Expand All @@ -265,6 +274,7 @@ export class EventProcessorHost {
...options,
tokenProvider: tokenProvider,
storageConnectionString: storageConnectionString,
storageContainerName: storageContainerName,
eventHubPath: eventHubPath,
eventHubConnectionString: connectionString
};
Expand Down Expand Up @@ -330,6 +340,8 @@ export class EventProcessorHost {
* @param {string} storageConnectionString Connection string to Azure Storage account used for
* leases and checkpointing. Example DefaultEndpointsProtocol=https;AccountName=<account-name>;
* AccountKey=<account-key>;EndpointSuffix=core.windows.net
* @param {string} storageContainerName Azure Storage container name for use by built-in lease
* and checkpoint manager.
* @param {string} namespace Fully qualified domain name for Event Hubs.
* Example: "{your-sb-namespace}.servicebus.windows.net"
* @param {string} eventHubPath The name of the EventHub.
Expand All @@ -344,6 +356,7 @@ export class EventProcessorHost {
static createFromAadTokenCredentials(
hostName: string,
storageConnectionString: string,
storageContainerName: string,
namespace: string,
eventHubPath: string,
credentials: ApplicationTokenCredentials | UserTokenCredentials | DeviceTokenCredentials | MSITokenCredentials,
Expand All @@ -352,6 +365,7 @@ export class EventProcessorHost {

validateType("hostName", hostName, true, "string");
validateType("storageConnectionString", storageConnectionString, true, "string");
validateType("storageContainerName", storageContainerName, true, "string");
validateType("namespace", namespace, true, "string");
validateType("eventHubPath", eventHubPath, true, "string");
validateType("credentials", credentials, true, "object");
Expand All @@ -365,6 +379,7 @@ export class EventProcessorHost {
...options,
tokenProvider: new AadTokenProvider(credentials),
storageConnectionString: storageConnectionString,
storageContainerName: storageContainerName,
eventHubPath: eventHubPath,
eventHubConnectionString: connectionString
};
Expand Down Expand Up @@ -432,6 +447,8 @@ export class EventProcessorHost {
* @param {string} storageConnectionString Connection string to Azure Storage account used for
* leases and checkpointing. Example DefaultEndpointsProtocol=https;AccountName=<account-name>;
* AccountKey=<account-key>;EndpointSuffix=core.windows.net
* @param {string} storageContainerName Azure Storage container name for use by built-in lease
* and checkpoint manager.
* @param {string} iotHubConnectionString Connection string for the IotHub.
* Example: 'Endpoint=iot-host-name;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key'
* @param {FromIotHubConnectionStringOptions} [options] Optional parameters for creating an
Expand All @@ -442,12 +459,14 @@ export class EventProcessorHost {
static async createFromIotHubConnectionString(
hostName: string,
storageConnectionString: string,
storageContainerName: string,
iotHubConnectionString: string,
options?: FromIotHubConnectionStringOptions): Promise<EventProcessorHost> {
if (!options) options = {};

validateType("hostName", hostName, true, "string");
validateType("storageConnectionString", storageConnectionString, true, "string");
validateType("storageContainerName", storageContainerName, true, "string");
validateType("iotHubConnectionString", iotHubConnectionString, true, "string");
validateType("options", options, false, "object");

Expand All @@ -457,6 +476,7 @@ export class EventProcessorHost {
const ephOptions: EventProcessorHostOptions = {
...options,
storageConnectionString: storageConnectionString,
storageContainerName: storageContainerName,
eventHubConnectionString: eventHubConnectionString,
eventHubPath: client.eventhubName
};
Expand Down
Loading

0 comments on commit 4daff68

Please sign in to comment.