From b8a0cf40168131385d4d54bcdbdcbab5d1cc8f45 Mon Sep 17 00:00:00 2001 From: hzliuxinkun Date: Sat, 7 May 2022 22:17:41 +0800 Subject: [PATCH 1/7] add a queue for bandwidth limit --- src/bandwidth.c | 100 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 75 insertions(+), 25 deletions(-) diff --git a/src/bandwidth.c b/src/bandwidth.c index 5b6a472..96a5b8e 100644 --- a/src/bandwidth.c +++ b/src/bandwidth.c @@ -10,6 +10,9 @@ #define BANDWIDTH_MIN "0" #define BANDWIDTH_MAX "99999" #define BANDWIDTH_DEFAULT 10 +#define QUEUESIZE_MIN "0" +#define QUEUESIZE_MAX "99999" +#define QUEUESIZE_DEFAULT 10 //--------------------------------------------------------------------- // rate stats @@ -26,6 +29,9 @@ typedef struct { uint32_t *array_sample; } CRateStats; +static PacketNode queueHeadNode = {0}, queueTailNode = {0}; +static PacketNode *queueHead = &queueHeadNode, *queueTail = &queueTailNode; +static int queueSizeInBytes = 0; CRateStats* crate_stats_new(int window_size, float scale); @@ -43,17 +49,25 @@ int32_t crate_stats_calculate(CRateStats *rate, uint32_t now_ts); //--------------------------------------------------------------------- // configuration //--------------------------------------------------------------------- -static Ihandle *inboundCheckbox, *outboundCheckbox, *bandwidthInput; +static Ihandle *inboundCheckbox, *outboundCheckbox, *bandwidthInput, *queueSizeInput; static volatile short bandwidthEnabled = 0, bandwidthInbound = 1, bandwidthOutbound = 1; +static volatile short maxQueueSizeInKBytes = 0; static volatile LONG bandwidthLimit = BANDWIDTH_DEFAULT; static CRateStats *rateStats = NULL; +static INLINE_FUNCTION short isQueueEmpty() { + short ret = queueHead->next == queueTail; + if (ret) assert(queueSizeInBytes == 0); + return ret; +} static Ihandle* bandwidthSetupUI() { Ihandle *bandwidthControlsBox = IupHbox( + IupLabel("Queuesize(KB):"), + queueSizeInput = IupText(NULL), inboundCheckbox = IupToggle("Inbound", NULL), outboundCheckbox = IupToggle("Outbound", NULL), IupLabel("Limit(KB/s):"), @@ -71,6 +85,12 @@ static Ihandle* bandwidthSetupUI() { IupSetAttribute(inboundCheckbox, SYNCED_VALUE, (char*)&bandwidthInbound); IupSetCallback(outboundCheckbox, "ACTION", (Icallback)uiSyncToggle); IupSetAttribute(outboundCheckbox, SYNCED_VALUE, (char*)&bandwidthOutbound); + IupSetAttribute(queueSizeInput, "VISIBLECOLUMNS", "3"); + IupSetAttribute(queueSizeInput, "VALUE", STR(QUEUESIZE_DEFAULT)); + IupSetCallback(queueSizeInput, "VALUECHANGED_CB", uiSyncInt32); + IupSetAttribute(queueSizeInput, SYNCED_VALUE, (char*)&maxQueueSizeInKBytes); + IupSetAttribute(queueSizeInput, INTEGER_MAX, QUEUESIZE_MAX); + IupSetAttribute(queueSizeInput, INTEGER_MIN, QUEUESIZE_MIN); // enable by default to avoid confusing IupSetAttribute(inboundCheckbox, "VALUE", "ON"); @@ -86,14 +106,33 @@ static Ihandle* bandwidthSetupUI() { } static void bandwidthStartUp() { + if (queueHead->next == NULL && queueTail->next == NULL) { + queueHead->next = queueTail; + queueTail->prev = queueHead; + queueSizeInBytes = 0; + } else { + assert(isQueueEmpty()); + } + if (rateStats) crate_stats_delete(rateStats); rateStats = crate_stats_new(1000, 1000); + startTimePeriod(); LOG("bandwidth enabled"); } static void bandwidthCloseDown(PacketNode *head, PacketNode *tail) { + PacketNode *oldLast = tail->prev; UNREFERENCED_PARAMETER(head); - UNREFERENCED_PARAMETER(tail); + // flush all buffered packets + int packetCnt = 0; + while(!isQueueEmpty()) { + queueSizeInBytes -= queueTail->prev->packetLen; + insertAfter(popNode(queueTail->prev), oldLast); + ++packetCnt; + } + LOG("Closing down bandwidth, flushing %d packets", packetCnt); + endTimePeriod(); + if (rateStats) crate_stats_delete(rateStats); rateStats = NULL; LOG("bandwidth disabled"); @@ -104,40 +143,51 @@ static void bandwidthCloseDown(PacketNode *head, PacketNode *tail) { // process //--------------------------------------------------------------------- static short bandwidthProcess(PacketNode *head, PacketNode* tail) { - int dropped = 0; - DWORD now_ts = timeGetTime(); - int limit = bandwidthLimit * 1024; + int limit = bandwidthLimit * 1024; // allow 0 limit which should drop all if (limit < 0 || rateStats == NULL) { return 0; } - while (head->next != tail) { - PacketNode *pac = head->next; - int discard = 0; - // chance in range of [0, 10000] + PacketNode *pac = tail->prev; + while (pac != head) { if (checkDirection(pac->addr.Outbound, bandwidthInbound, bandwidthOutbound)) { - int rate = crate_stats_calculate(rateStats, now_ts); - int size = pac->packetLen; - if (rate + size > limit) { - LOG("dropped with bandwidth %dKB/s, direction %s", - (int)bandwidthLimit, pac->addr.Outbound ? "OUTBOUND" : "INBOUND"); - discard = 1; - } - else { - crate_stats_update(rateStats, size, now_ts); - } - } - if (discard) { - freeNode(popNode(pac)); - ++dropped; + queueSizeInBytes += pac->packetLen; + insertAfter(popNode(pac), queueHead); + pac = tail->prev; } else { - head = head->next; + pac = pac->prev; } } - return dropped > 0; + DWORD now_ts = timeGetTime(); + + while (!isQueueEmpty()) { + pac = queueTail->prev; + // chance in range of [0, 10000] + int rate = crate_stats_calculate(rateStats, now_ts); + int size = pac->packetLen; + if (rate + size > limit) { + break; + } else { + crate_stats_update(rateStats, size, now_ts); + } + queueSizeInBytes -= queueTail->prev->packetLen; + insertAfter(popNode(queueTail->prev), head); + } + + int dropped = 0; + while (queueSizeInBytes > maxQueueSizeInKBytes * 1024 && !isQueueEmpty()) { + pac = queueHead->next; + LOG("dropped with bandwidth %dKB/s, direction %s", + (int)bandwidthLimit, pac->addr.Outbound ? "OUTBOUND" : "INBOUND"); + queueSizeInBytes -= pac->packetLen; + freeNode(popNode(pac)); + ++dropped; + } + + return dropped > 0 || !isQueueEmpty(); } From a87c0d01bb5ba78fc4186061640ff2b84d4eebba Mon Sep 17 00:00:00 2001 From: kuhnliu Date: Mon, 9 May 2022 20:55:13 +0800 Subject: [PATCH 2/7] Update README.md --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index f98fe02..c3a0637 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,8 @@ Simulate network latency, delay, packet loss with clumsy on Windows 7/8/10: ![](clumsy-demo.gif) +## Bandwidth with tail dropping queue +Add a simple tail dropping queue for bandwidth limiting function. The queue size is configurable. ## License From 15ff940c09d07abd37c2d7d99aaf96c6948e8e85 Mon Sep 17 00:00:00 2001 From: kuhnliu Date: Tue, 10 May 2022 11:32:35 +0800 Subject: [PATCH 3/7] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c3a0637..ffbe820 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Simulate network latency, delay, packet loss with clumsy on Windows 7/8/10: ## Bandwidth with tail dropping queue Add a simple tail dropping queue for bandwidth limiting function. The queue size is configurable. - +https://github.com/kuhnliu/clumsy/releases/download/0.3rc4_bw_queue/win_x64.zip ## License MIT From 81a26f4cf3b2b48614712472de88dad7511e18e3 Mon Sep 17 00:00:00 2001 From: hzliuxinkun Date: Wed, 28 Sep 2022 10:19:23 +0800 Subject: [PATCH 4/7] 1. update init bandwidth limit 2. refine some code --- src/bandwidth.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/bandwidth.c b/src/bandwidth.c index 96a5b8e..3b59ac2 100644 --- a/src/bandwidth.c +++ b/src/bandwidth.c @@ -9,10 +9,10 @@ #define NAME "bandwidth" #define BANDWIDTH_MIN "0" #define BANDWIDTH_MAX "99999" -#define BANDWIDTH_DEFAULT 10 +#define BANDWIDTH_DEFAULT 100 #define QUEUESIZE_MIN "0" #define QUEUESIZE_MAX "99999" -#define QUEUESIZE_DEFAULT 10 +#define QUEUESIZE_DEFAULT 100 //--------------------------------------------------------------------- // rate stats @@ -145,8 +145,7 @@ static void bandwidthCloseDown(PacketNode *head, PacketNode *tail) { static short bandwidthProcess(PacketNode *head, PacketNode* tail) { int limit = bandwidthLimit * 1024; - // allow 0 limit which should drop all - if (limit < 0 || rateStats == NULL) { + if (rateStats == NULL) { return 0; } @@ -173,8 +172,8 @@ static short bandwidthProcess(PacketNode *head, PacketNode* tail) { } else { crate_stats_update(rateStats, size, now_ts); } - queueSizeInBytes -= queueTail->prev->packetLen; - insertAfter(popNode(queueTail->prev), head); + queueSizeInBytes -= pac->packetLen; + insertAfter(popNode(pac), head); } int dropped = 0; @@ -187,6 +186,8 @@ static short bandwidthProcess(PacketNode *head, PacketNode* tail) { ++dropped; } + assert(queueSizeInBytes >= 0); + return dropped > 0 || !isQueueEmpty(); } From 7de864dddf8c1b632a8db2b7bf950c491afaf2ee Mon Sep 17 00:00:00 2001 From: hzliuxinkun Date: Thu, 9 Nov 2023 20:02:48 +0800 Subject: [PATCH 5/7] schedule clockloop every 5ms --- src/divert.c | 64 +++++++++++++++++++++++++++------------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/src/divert.c b/src/divert.c index 0e1b5df..7b8ab04 100644 --- a/src/divert.c +++ b/src/divert.c @@ -8,12 +8,13 @@ #define MAX_PACKETSIZE 0xFFFF #define READ_TIME_PER_STEP 3 // FIXME does this need to be larger then the time to process the list? -#define CLOCK_WAITMS 40 +#define CLOCK_WAITMS 5 #define QUEUE_LEN 2 << 10 #define QUEUE_TIME 2 << 9 static HANDLE divertHandle; static volatile short stopLooping; +static volatile DWORD lastScheduleTime = 0; static HANDLE loopThread, clockThread, mutex; static DWORD divertReadLoop(LPVOID arg); @@ -237,7 +238,7 @@ static void divertConsumeStep() { // periodically try to consume packets to keep the network responsive and not blocked by recv static DWORD divertClockLoop(LPVOID arg) { - DWORD startTick, stepTick, waitResult; + DWORD startTick, waitResult; int ix; UNREFERENCED_PARAMETER(arg); @@ -245,36 +246,36 @@ static DWORD divertClockLoop(LPVOID arg) { for(;;) { // use acquire as wait for yielding thread startTick = GetTickCount(); - waitResult = WaitForSingleObject(mutex, CLOCK_WAITMS); - switch(waitResult) { - case WAIT_OBJECT_0: - /***************** enter critical region ************************/ - divertConsumeStep(); - /***************** leave critical region ************************/ - if (!ReleaseMutex(mutex)) { + if (lastScheduleTime + CLOCK_WAITMS <= startTick) { + waitResult = WaitForSingleObject(mutex, CLOCK_WAITMS); + switch(waitResult) { + case WAIT_OBJECT_0: + /***************** enter critical region ************************/ + divertConsumeStep(); + /***************** leave critical region ************************/ + if (!ReleaseMutex(mutex)) { + InterlockedIncrement16(&stopLooping); + LOG("Fatal: Failed to release mutex (%lu)", GetLastError()); + ABORT(); + } + lastScheduleTime = GetTickCount(); + break; + case WAIT_TIMEOUT: + // read loop is processing, so we can skip this run + LOG("!!! Skipping one run"); + Sleep(CLOCK_WAITMS); + break; + case WAIT_ABANDONED: + LOG("Acquired abandoned mutex"); InterlockedIncrement16(&stopLooping); - LOG("Fatal: Failed to release mutex (%lu)", GetLastError()); - ABORT(); - } - // if didn't spent enough time, we sleep on it - stepTick = GetTickCount() - startTick; - if (stepTick < CLOCK_WAITMS) { - Sleep(CLOCK_WAITMS - stepTick); - } - break; - case WAIT_TIMEOUT: - // read loop is processing, so we can skip this run - LOG("!!! Skipping one run"); - Sleep(CLOCK_WAITMS); - break; - case WAIT_ABANDONED: - LOG("Acquired abandoned mutex"); - InterlockedIncrement16(&stopLooping); - break; - case WAIT_FAILED: - LOG("Acquire failed (%lu)", GetLastError()); - InterlockedIncrement16(&stopLooping); - break; + break; + case WAIT_FAILED: + LOG("Acquire failed (%lu)", GetLastError()); + InterlockedIncrement16(&stopLooping); + break; + } + } else { + Sleep(CLOCK_WAITMS - (startTick - lastScheduleTime)); } // need to get the lock here @@ -364,6 +365,7 @@ static DWORD divertReadLoop(LPVOID arg) { pnode = createNode(packetBuf, readLen, &addrBuf); appendNode(pnode); divertConsumeStep(); + lastScheduleTime = GetTickCount(); /***************** leave critical region ************************/ if (!ReleaseMutex(mutex)) { LOG("Fatal: Failed to release mutex (%lu)", GetLastError()); From 6d5a8e7a6a67bde78716cc11ac1260313268050a Mon Sep 17 00:00:00 2001 From: hzliuxinkun Date: Thu, 9 Nov 2023 22:43:13 +0800 Subject: [PATCH 6/7] use independent rate limiter for inband and outband packets --- src/bandwidth.c | 144 ++++++++++++++++++++++++++++++------------------ 1 file changed, 91 insertions(+), 53 deletions(-) diff --git a/src/bandwidth.c b/src/bandwidth.c index 3b59ac2..0501758 100644 --- a/src/bandwidth.c +++ b/src/bandwidth.c @@ -29,9 +29,16 @@ typedef struct { uint32_t *array_sample; } CRateStats; -static PacketNode queueHeadNode = {0}, queueTailNode = {0}; -static PacketNode *queueHead = &queueHeadNode, *queueTail = &queueTailNode; -static int queueSizeInBytes = 0; +typedef struct { + PacketNode queueHeadNode; + PacketNode queueTailNode; + PacketNode *queueHead; + PacketNode *queueTail; + int queueSizeInBytes; + CRateStats *rateStats; +} CRateLimiter; + +static CRateLimiter inboundRateLimiter = {0}, outboundRateLimiter = {0}; CRateStats* crate_stats_new(int window_size, float scale); @@ -56,11 +63,10 @@ static volatile short bandwidthEnabled = 0, static volatile short maxQueueSizeInKBytes = 0; static volatile LONG bandwidthLimit = BANDWIDTH_DEFAULT; -static CRateStats *rateStats = NULL; -static INLINE_FUNCTION short isQueueEmpty() { - short ret = queueHead->next == queueTail; - if (ret) assert(queueSizeInBytes == 0); +static INLINE_FUNCTION short isQueueEmpty(CRateLimiter *rateLimiter) { + short ret = rateLimiter->queueHead->next == rateLimiter->queueTail; + if (ret) assert(rateLimiter->queueSizeInBytes == 0); return ret; } @@ -105,36 +111,55 @@ static Ihandle* bandwidthSetupUI() { return bandwidthControlsBox; } -static void bandwidthStartUp() { - if (queueHead->next == NULL && queueTail->next == NULL) { - queueHead->next = queueTail; - queueTail->prev = queueHead; - queueSizeInBytes = 0; +static void initRateLimiter(CRateLimiter *rateLimiter) { + rateLimiter->queueHead = &rateLimiter->queueHeadNode; + rateLimiter->queueTail = &rateLimiter->queueTailNode; + + if (rateLimiter->queueHead->next == NULL && rateLimiter->queueTail->next == NULL) { + rateLimiter->queueHead->next = rateLimiter->queueTail; + rateLimiter->queueTail->prev = rateLimiter->queueHead; + rateLimiter->queueSizeInBytes = 0; } else { - assert(isQueueEmpty()); + assert(isQueueEmpty(rateLimiter)); } - if (rateStats) crate_stats_delete(rateStats); - rateStats = crate_stats_new(1000, 1000); - startTimePeriod(); - LOG("bandwidth enabled"); + if (rateLimiter->rateStats) crate_stats_delete(rateLimiter->rateStats); + rateLimiter->rateStats = crate_stats_new(1000, 1000); } -static void bandwidthCloseDown(PacketNode *head, PacketNode *tail) { +static int uninitRateLimiter(CRateLimiter *rateLimiter, PacketNode *head, PacketNode *tail) { PacketNode *oldLast = tail->prev; UNREFERENCED_PARAMETER(head); // flush all buffered packets int packetCnt = 0; - while(!isQueueEmpty()) { - queueSizeInBytes -= queueTail->prev->packetLen; - insertAfter(popNode(queueTail->prev), oldLast); + while(!isQueueEmpty(rateLimiter)) { + rateLimiter->queueSizeInBytes -= rateLimiter->queueTail->prev->packetLen; + insertAfter(popNode(rateLimiter->queueTail->prev), oldLast); ++packetCnt; } - LOG("Closing down bandwidth, flushing %d packets", packetCnt); - endTimePeriod(); - if (rateStats) crate_stats_delete(rateStats); - rateStats = NULL; + if (rateLimiter->rateStats) { + crate_stats_delete(rateLimiter->rateStats); + rateLimiter->rateStats = NULL; + } + + return packetCnt; +} + +static void bandwidthStartUp() { + initRateLimiter(&inboundRateLimiter); + initRateLimiter(&outboundRateLimiter); + startTimePeriod(); + LOG("bandwidth enabled"); +} + +static void bandwidthCloseDown(PacketNode *head, PacketNode *tail) { + int packetCnt = 0; + packetCnt = uninitRateLimiter(&inboundRateLimiter, head, tail); + LOG("Closing down bandwidth, flushing inbound %d packets", packetCnt); + packetCnt = uninitRateLimiter(&outboundRateLimiter, head, tail); + LOG("Closing down bandwidth, flushing outbound %d packets", packetCnt); + endTimePeriod(); LOG("bandwidth disabled"); } @@ -142,53 +167,66 @@ static void bandwidthCloseDown(PacketNode *head, PacketNode *tail) { //--------------------------------------------------------------------- // process //--------------------------------------------------------------------- -static short bandwidthProcess(PacketNode *head, PacketNode* tail) { - int limit = bandwidthLimit * 1024; - - if (rateStats == NULL) { - return 0; - } - - PacketNode *pac = tail->prev; - while (pac != head) { - if (checkDirection(pac->addr.Outbound, bandwidthInbound, bandwidthOutbound)) { - queueSizeInBytes += pac->packetLen; - insertAfter(popNode(pac), queueHead); - pac = tail->prev; - } else { - pac = pac->prev; - } - } - +static int rateLimiterProcess(CRateLimiter *rateLimiter, PacketNode *head, PacketNode* tail) { + PacketNode *pac; DWORD now_ts = timeGetTime(); + int limit = bandwidthLimit * 1024; - while (!isQueueEmpty()) { - pac = queueTail->prev; + while (!isQueueEmpty(rateLimiter)) { + pac = rateLimiter->queueTail->prev; // chance in range of [0, 10000] - int rate = crate_stats_calculate(rateStats, now_ts); + int rate = crate_stats_calculate(rateLimiter->rateStats, now_ts); int size = pac->packetLen; if (rate + size > limit) { break; } else { - crate_stats_update(rateStats, size, now_ts); + crate_stats_update(rateLimiter->rateStats, size, now_ts); } - queueSizeInBytes -= pac->packetLen; + rateLimiter->queueSizeInBytes -= pac->packetLen; insertAfter(popNode(pac), head); } int dropped = 0; - while (queueSizeInBytes > maxQueueSizeInKBytes * 1024 && !isQueueEmpty()) { - pac = queueHead->next; + while (rateLimiter->queueSizeInBytes > maxQueueSizeInKBytes * 1024 && !isQueueEmpty(rateLimiter)) { + pac = rateLimiter->queueHead->next; LOG("dropped with bandwidth %dKB/s, direction %s", (int)bandwidthLimit, pac->addr.Outbound ? "OUTBOUND" : "INBOUND"); - queueSizeInBytes -= pac->packetLen; + rateLimiter->queueSizeInBytes -= pac->packetLen; freeNode(popNode(pac)); ++dropped; } - assert(queueSizeInBytes >= 0); + assert(rateLimiter->queueSizeInBytes >= 0); + + return dropped; +} + +static short bandwidthProcess(PacketNode *head, PacketNode* tail) { + if (inboundRateLimiter.rateStats == NULL || outboundRateLimiter.rateStats == NULL) { + return 0; + } + + PacketNode *pac = tail->prev; + while (pac != head) { + if (checkDirection(pac->addr.Outbound, bandwidthInbound, bandwidthOutbound)) { + if (pac->addr.Outbound) { + outboundRateLimiter.queueSizeInBytes += pac->packetLen; + insertAfter(popNode(pac), outboundRateLimiter.queueHead); + } else { + inboundRateLimiter.queueSizeInBytes += pac->packetLen; + insertAfter(popNode(pac), inboundRateLimiter.queueHead); + } + pac = tail->prev; + } else { + pac = pac->prev; + } + } + + int dropped; + dropped = rateLimiterProcess(&inboundRateLimiter, head, tail); + dropped += rateLimiterProcess(&outboundRateLimiter, head, tail); - return dropped > 0 || !isQueueEmpty(); + return dropped > 0 || !isQueueEmpty(&inboundRateLimiter) || !isQueueEmpty(&outboundRateLimiter); } From f734708d960886e9244a0509fe56bd12146e546a Mon Sep 17 00:00:00 2001 From: hzliuxinkun Date: Thu, 9 Nov 2023 23:24:21 +0800 Subject: [PATCH 7/7] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ffbe820..5f72400 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Simulate network latency, delay, packet loss with clumsy on Windows 7/8/10: ## Bandwidth with tail dropping queue Add a simple tail dropping queue for bandwidth limiting function. The queue size is configurable. -https://github.com/kuhnliu/clumsy/releases/download/0.3rc4_bw_queue/win_x64.zip +https://github.com/kuhnliu/clumsy/releases/download/0.3rc5_bw_queue/win_x64.zip ## License MIT