Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a queue for bandwidth limit #110

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ Simulate network latency, delay, packet loss with clumsy on Windows 7/8/10:

![](clumsy-demo.gif)


## Bandwidth with tail dropping queue
Add a simple tail dropping queue for bandwidth limiting function. The queue size is configurable.
https://github.com/kuhnliu/clumsy/releases/download/0.3rc5_bw_queue/win_x64.zip
## License

MIT
157 changes: 123 additions & 34 deletions src/bandwidth.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
#define NAME "bandwidth"
#define BANDWIDTH_MIN "0"
#define BANDWIDTH_MAX "99999"
#define BANDWIDTH_DEFAULT 10
#define BANDWIDTH_DEFAULT 100
#define QUEUESIZE_MIN "0"
#define QUEUESIZE_MAX "99999"
#define QUEUESIZE_DEFAULT 100

//---------------------------------------------------------------------
// rate stats
Expand All @@ -26,6 +29,16 @@ typedef struct {
uint32_t *array_sample;
} CRateStats;

typedef struct {
PacketNode queueHeadNode;
PacketNode queueTailNode;
PacketNode *queueHead;
PacketNode *queueTail;
int queueSizeInBytes;
CRateStats *rateStats;
} CRateLimiter;

static CRateLimiter inboundRateLimiter = {0}, outboundRateLimiter = {0};

CRateStats* crate_stats_new(int window_size, float scale);

Expand All @@ -43,17 +56,24 @@ int32_t crate_stats_calculate(CRateStats *rate, uint32_t now_ts);
//---------------------------------------------------------------------
// configuration
//---------------------------------------------------------------------
static Ihandle *inboundCheckbox, *outboundCheckbox, *bandwidthInput;
static Ihandle *inboundCheckbox, *outboundCheckbox, *bandwidthInput, *queueSizeInput;

static volatile short bandwidthEnabled = 0,
bandwidthInbound = 1, bandwidthOutbound = 1;
static volatile short maxQueueSizeInKBytes = 0;

static volatile LONG bandwidthLimit = BANDWIDTH_DEFAULT;
static CRateStats *rateStats = NULL;

static INLINE_FUNCTION short isQueueEmpty(CRateLimiter *rateLimiter) {
short ret = rateLimiter->queueHead->next == rateLimiter->queueTail;
if (ret) assert(rateLimiter->queueSizeInBytes == 0);
return ret;
}

static Ihandle* bandwidthSetupUI() {
Ihandle *bandwidthControlsBox = IupHbox(
IupLabel("Queuesize(KB):"),
queueSizeInput = IupText(NULL),
inboundCheckbox = IupToggle("Inbound", NULL),
outboundCheckbox = IupToggle("Outbound", NULL),
IupLabel("Limit(KB/s):"),
Expand All @@ -71,6 +91,12 @@ static Ihandle* bandwidthSetupUI() {
IupSetAttribute(inboundCheckbox, SYNCED_VALUE, (char*)&bandwidthInbound);
IupSetCallback(outboundCheckbox, "ACTION", (Icallback)uiSyncToggle);
IupSetAttribute(outboundCheckbox, SYNCED_VALUE, (char*)&bandwidthOutbound);
IupSetAttribute(queueSizeInput, "VISIBLECOLUMNS", "3");
IupSetAttribute(queueSizeInput, "VALUE", STR(QUEUESIZE_DEFAULT));
IupSetCallback(queueSizeInput, "VALUECHANGED_CB", uiSyncInt32);
IupSetAttribute(queueSizeInput, SYNCED_VALUE, (char*)&maxQueueSizeInKBytes);
IupSetAttribute(queueSizeInput, INTEGER_MAX, QUEUESIZE_MAX);
IupSetAttribute(queueSizeInput, INTEGER_MIN, QUEUESIZE_MIN);

// enable by default to avoid confusing
IupSetAttribute(inboundCheckbox, "VALUE", "ON");
Expand All @@ -85,59 +111,122 @@ static Ihandle* bandwidthSetupUI() {
return bandwidthControlsBox;
}

static void initRateLimiter(CRateLimiter *rateLimiter) {
rateLimiter->queueHead = &rateLimiter->queueHeadNode;
rateLimiter->queueTail = &rateLimiter->queueTailNode;

if (rateLimiter->queueHead->next == NULL && rateLimiter->queueTail->next == NULL) {
rateLimiter->queueHead->next = rateLimiter->queueTail;
rateLimiter->queueTail->prev = rateLimiter->queueHead;
rateLimiter->queueSizeInBytes = 0;
} else {
assert(isQueueEmpty(rateLimiter));
}

if (rateLimiter->rateStats) crate_stats_delete(rateLimiter->rateStats);
rateLimiter->rateStats = crate_stats_new(1000, 1000);
}

static int uninitRateLimiter(CRateLimiter *rateLimiter, PacketNode *head, PacketNode *tail) {
PacketNode *oldLast = tail->prev;
UNREFERENCED_PARAMETER(head);
// flush all buffered packets
int packetCnt = 0;
while(!isQueueEmpty(rateLimiter)) {
rateLimiter->queueSizeInBytes -= rateLimiter->queueTail->prev->packetLen;
insertAfter(popNode(rateLimiter->queueTail->prev), oldLast);
++packetCnt;
}

if (rateLimiter->rateStats) {
crate_stats_delete(rateLimiter->rateStats);
rateLimiter->rateStats = NULL;
}

return packetCnt;
}

static void bandwidthStartUp() {
if (rateStats) crate_stats_delete(rateStats);
rateStats = crate_stats_new(1000, 1000);
initRateLimiter(&inboundRateLimiter);
initRateLimiter(&outboundRateLimiter);
startTimePeriod();
LOG("bandwidth enabled");
}

static void bandwidthCloseDown(PacketNode *head, PacketNode *tail) {
UNREFERENCED_PARAMETER(head);
UNREFERENCED_PARAMETER(tail);
if (rateStats) crate_stats_delete(rateStats);
rateStats = NULL;
int packetCnt = 0;
packetCnt = uninitRateLimiter(&inboundRateLimiter, head, tail);
LOG("Closing down bandwidth, flushing inbound %d packets", packetCnt);
packetCnt = uninitRateLimiter(&outboundRateLimiter, head, tail);
LOG("Closing down bandwidth, flushing outbound %d packets", packetCnt);
endTimePeriod();
LOG("bandwidth disabled");
}


//---------------------------------------------------------------------
// process
//---------------------------------------------------------------------
static short bandwidthProcess(PacketNode *head, PacketNode* tail) {
static int rateLimiterProcess(CRateLimiter *rateLimiter, PacketNode *head, PacketNode* tail) {
PacketNode *pac;
DWORD now_ts = timeGetTime();
int limit = bandwidthLimit * 1024;

while (!isQueueEmpty(rateLimiter)) {
pac = rateLimiter->queueTail->prev;
// chance in range of [0, 10000]
int rate = crate_stats_calculate(rateLimiter->rateStats, now_ts);
int size = pac->packetLen;
if (rate + size > limit) {
break;
} else {
crate_stats_update(rateLimiter->rateStats, size, now_ts);
}
rateLimiter->queueSizeInBytes -= pac->packetLen;
insertAfter(popNode(pac), head);
}

int dropped = 0;
DWORD now_ts = timeGetTime();
int limit = bandwidthLimit * 1024;
while (rateLimiter->queueSizeInBytes > maxQueueSizeInKBytes * 1024 && !isQueueEmpty(rateLimiter)) {
pac = rateLimiter->queueHead->next;
LOG("dropped with bandwidth %dKB/s, direction %s",
(int)bandwidthLimit, pac->addr.Outbound ? "OUTBOUND" : "INBOUND");
rateLimiter->queueSizeInBytes -= pac->packetLen;
freeNode(popNode(pac));
++dropped;
}

// allow 0 limit which should drop all
if (limit < 0 || rateStats == NULL) {
assert(rateLimiter->queueSizeInBytes >= 0);

return dropped;
}

static short bandwidthProcess(PacketNode *head, PacketNode* tail) {
if (inboundRateLimiter.rateStats == NULL || outboundRateLimiter.rateStats == NULL) {
return 0;
}

while (head->next != tail) {
PacketNode *pac = head->next;
int discard = 0;
// chance in range of [0, 10000]
PacketNode *pac = tail->prev;
while (pac != head) {
if (checkDirection(pac->addr.Outbound, bandwidthInbound, bandwidthOutbound)) {
int rate = crate_stats_calculate(rateStats, now_ts);
int size = pac->packetLen;
if (rate + size > limit) {
LOG("dropped with bandwidth %dKB/s, direction %s",
(int)bandwidthLimit, pac->addr.Outbound ? "OUTBOUND" : "INBOUND");
discard = 1;
}
else {
crate_stats_update(rateStats, size, now_ts);
}
}
if (discard) {
freeNode(popNode(pac));
++dropped;
if (pac->addr.Outbound) {
outboundRateLimiter.queueSizeInBytes += pac->packetLen;
insertAfter(popNode(pac), outboundRateLimiter.queueHead);
} else {
inboundRateLimiter.queueSizeInBytes += pac->packetLen;
insertAfter(popNode(pac), inboundRateLimiter.queueHead);
}
pac = tail->prev;
} else {
head = head->next;
pac = pac->prev;
}
}

return dropped > 0;
int dropped;
dropped = rateLimiterProcess(&inboundRateLimiter, head, tail);
dropped += rateLimiterProcess(&outboundRateLimiter, head, tail);

return dropped > 0 || !isQueueEmpty(&inboundRateLimiter) || !isQueueEmpty(&outboundRateLimiter);
}


Expand Down
64 changes: 33 additions & 31 deletions src/divert.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
#define MAX_PACKETSIZE 0xFFFF
#define READ_TIME_PER_STEP 3
// FIXME does this need to be larger then the time to process the list?
#define CLOCK_WAITMS 40
#define CLOCK_WAITMS 5
#define QUEUE_LEN 2 << 10
#define QUEUE_TIME 2 << 9

static HANDLE divertHandle;
static volatile short stopLooping;
static volatile DWORD lastScheduleTime = 0;
static HANDLE loopThread, clockThread, mutex;

static DWORD divertReadLoop(LPVOID arg);
Expand Down Expand Up @@ -237,44 +238,44 @@ static void divertConsumeStep() {

// periodically try to consume packets to keep the network responsive and not blocked by recv
static DWORD divertClockLoop(LPVOID arg) {
DWORD startTick, stepTick, waitResult;
DWORD startTick, waitResult;
int ix;

UNREFERENCED_PARAMETER(arg);

for(;;) {
// use acquire as wait for yielding thread
startTick = GetTickCount();
waitResult = WaitForSingleObject(mutex, CLOCK_WAITMS);
switch(waitResult) {
case WAIT_OBJECT_0:
/***************** enter critical region ************************/
divertConsumeStep();
/***************** leave critical region ************************/
if (!ReleaseMutex(mutex)) {
if (lastScheduleTime + CLOCK_WAITMS <= startTick) {
waitResult = WaitForSingleObject(mutex, CLOCK_WAITMS);
switch(waitResult) {
case WAIT_OBJECT_0:
/***************** enter critical region ************************/
divertConsumeStep();
/***************** leave critical region ************************/
if (!ReleaseMutex(mutex)) {
InterlockedIncrement16(&stopLooping);
LOG("Fatal: Failed to release mutex (%lu)", GetLastError());
ABORT();
}
lastScheduleTime = GetTickCount();
break;
case WAIT_TIMEOUT:
// read loop is processing, so we can skip this run
LOG("!!! Skipping one run");
Sleep(CLOCK_WAITMS);
break;
case WAIT_ABANDONED:
LOG("Acquired abandoned mutex");
InterlockedIncrement16(&stopLooping);
LOG("Fatal: Failed to release mutex (%lu)", GetLastError());
ABORT();
}
// if didn't spent enough time, we sleep on it
stepTick = GetTickCount() - startTick;
if (stepTick < CLOCK_WAITMS) {
Sleep(CLOCK_WAITMS - stepTick);
}
break;
case WAIT_TIMEOUT:
// read loop is processing, so we can skip this run
LOG("!!! Skipping one run");
Sleep(CLOCK_WAITMS);
break;
case WAIT_ABANDONED:
LOG("Acquired abandoned mutex");
InterlockedIncrement16(&stopLooping);
break;
case WAIT_FAILED:
LOG("Acquire failed (%lu)", GetLastError());
InterlockedIncrement16(&stopLooping);
break;
break;
case WAIT_FAILED:
LOG("Acquire failed (%lu)", GetLastError());
InterlockedIncrement16(&stopLooping);
break;
}
} else {
Sleep(CLOCK_WAITMS - (startTick - lastScheduleTime));
}

// need to get the lock here
Expand Down Expand Up @@ -364,6 +365,7 @@ static DWORD divertReadLoop(LPVOID arg) {
pnode = createNode(packetBuf, readLen, &addrBuf);
appendNode(pnode);
divertConsumeStep();
lastScheduleTime = GetTickCount();
/***************** leave critical region ************************/
if (!ReleaseMutex(mutex)) {
LOG("Fatal: Failed to release mutex (%lu)", GetLastError());
Expand Down