diff --git a/README.md b/README.md index f98fe02..5f72400 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,9 @@ Simulate network latency, delay, packet loss with clumsy on Windows 7/8/10: ![](clumsy-demo.gif) - +## Bandwidth with tail dropping queue +Add a simple tail dropping queue for bandwidth limiting function. The queue size is configurable. +https://github.com/kuhnliu/clumsy/releases/download/0.3rc5_bw_queue/win_x64.zip ## License MIT diff --git a/src/bandwidth.c b/src/bandwidth.c index 5b6a472..0501758 100644 --- a/src/bandwidth.c +++ b/src/bandwidth.c @@ -9,7 +9,10 @@ #define NAME "bandwidth" #define BANDWIDTH_MIN "0" #define BANDWIDTH_MAX "99999" -#define BANDWIDTH_DEFAULT 10 +#define BANDWIDTH_DEFAULT 100 +#define QUEUESIZE_MIN "0" +#define QUEUESIZE_MAX "99999" +#define QUEUESIZE_DEFAULT 100 //--------------------------------------------------------------------- // rate stats @@ -26,6 +29,16 @@ typedef struct { uint32_t *array_sample; } CRateStats; +typedef struct { + PacketNode queueHeadNode; + PacketNode queueTailNode; + PacketNode *queueHead; + PacketNode *queueTail; + int queueSizeInBytes; + CRateStats *rateStats; +} CRateLimiter; + +static CRateLimiter inboundRateLimiter = {0}, outboundRateLimiter = {0}; CRateStats* crate_stats_new(int window_size, float scale); @@ -43,17 +56,24 @@ int32_t crate_stats_calculate(CRateStats *rate, uint32_t now_ts); //--------------------------------------------------------------------- // configuration //--------------------------------------------------------------------- -static Ihandle *inboundCheckbox, *outboundCheckbox, *bandwidthInput; +static Ihandle *inboundCheckbox, *outboundCheckbox, *bandwidthInput, *queueSizeInput; static volatile short bandwidthEnabled = 0, bandwidthInbound = 1, bandwidthOutbound = 1; +static volatile short maxQueueSizeInKBytes = 0; static volatile LONG bandwidthLimit = BANDWIDTH_DEFAULT; -static CRateStats *rateStats = NULL; +static INLINE_FUNCTION short isQueueEmpty(CRateLimiter *rateLimiter) { + short ret = rateLimiter->queueHead->next == rateLimiter->queueTail; + if (ret) assert(rateLimiter->queueSizeInBytes == 0); + return ret; +} static Ihandle* bandwidthSetupUI() { Ihandle *bandwidthControlsBox = IupHbox( + IupLabel("Queuesize(KB):"), + queueSizeInput = IupText(NULL), inboundCheckbox = IupToggle("Inbound", NULL), outboundCheckbox = IupToggle("Outbound", NULL), IupLabel("Limit(KB/s):"), @@ -71,6 +91,12 @@ static Ihandle* bandwidthSetupUI() { IupSetAttribute(inboundCheckbox, SYNCED_VALUE, (char*)&bandwidthInbound); IupSetCallback(outboundCheckbox, "ACTION", (Icallback)uiSyncToggle); IupSetAttribute(outboundCheckbox, SYNCED_VALUE, (char*)&bandwidthOutbound); + IupSetAttribute(queueSizeInput, "VISIBLECOLUMNS", "3"); + IupSetAttribute(queueSizeInput, "VALUE", STR(QUEUESIZE_DEFAULT)); + IupSetCallback(queueSizeInput, "VALUECHANGED_CB", uiSyncInt32); + IupSetAttribute(queueSizeInput, SYNCED_VALUE, (char*)&maxQueueSizeInKBytes); + IupSetAttribute(queueSizeInput, INTEGER_MAX, QUEUESIZE_MAX); + IupSetAttribute(queueSizeInput, INTEGER_MIN, QUEUESIZE_MIN); // enable by default to avoid confusing IupSetAttribute(inboundCheckbox, "VALUE", "ON"); @@ -85,17 +111,55 @@ static Ihandle* bandwidthSetupUI() { return bandwidthControlsBox; } +static void initRateLimiter(CRateLimiter *rateLimiter) { + rateLimiter->queueHead = &rateLimiter->queueHeadNode; + rateLimiter->queueTail = &rateLimiter->queueTailNode; + + if (rateLimiter->queueHead->next == NULL && rateLimiter->queueTail->next == NULL) { + rateLimiter->queueHead->next = rateLimiter->queueTail; + rateLimiter->queueTail->prev = rateLimiter->queueHead; + rateLimiter->queueSizeInBytes = 0; + } else { + assert(isQueueEmpty(rateLimiter)); + } + + if (rateLimiter->rateStats) crate_stats_delete(rateLimiter->rateStats); + rateLimiter->rateStats = crate_stats_new(1000, 1000); +} + +static int uninitRateLimiter(CRateLimiter *rateLimiter, PacketNode *head, PacketNode *tail) { + PacketNode *oldLast = tail->prev; + UNREFERENCED_PARAMETER(head); + // flush all buffered packets + int packetCnt = 0; + while(!isQueueEmpty(rateLimiter)) { + rateLimiter->queueSizeInBytes -= rateLimiter->queueTail->prev->packetLen; + insertAfter(popNode(rateLimiter->queueTail->prev), oldLast); + ++packetCnt; + } + + if (rateLimiter->rateStats) { + crate_stats_delete(rateLimiter->rateStats); + rateLimiter->rateStats = NULL; + } + + return packetCnt; +} + static void bandwidthStartUp() { - if (rateStats) crate_stats_delete(rateStats); - rateStats = crate_stats_new(1000, 1000); + initRateLimiter(&inboundRateLimiter); + initRateLimiter(&outboundRateLimiter); + startTimePeriod(); LOG("bandwidth enabled"); } static void bandwidthCloseDown(PacketNode *head, PacketNode *tail) { - UNREFERENCED_PARAMETER(head); - UNREFERENCED_PARAMETER(tail); - if (rateStats) crate_stats_delete(rateStats); - rateStats = NULL; + int packetCnt = 0; + packetCnt = uninitRateLimiter(&inboundRateLimiter, head, tail); + LOG("Closing down bandwidth, flushing inbound %d packets", packetCnt); + packetCnt = uninitRateLimiter(&outboundRateLimiter, head, tail); + LOG("Closing down bandwidth, flushing outbound %d packets", packetCnt); + endTimePeriod(); LOG("bandwidth disabled"); } @@ -103,41 +167,66 @@ static void bandwidthCloseDown(PacketNode *head, PacketNode *tail) { //--------------------------------------------------------------------- // process //--------------------------------------------------------------------- -static short bandwidthProcess(PacketNode *head, PacketNode* tail) { +static int rateLimiterProcess(CRateLimiter *rateLimiter, PacketNode *head, PacketNode* tail) { + PacketNode *pac; + DWORD now_ts = timeGetTime(); + int limit = bandwidthLimit * 1024; + + while (!isQueueEmpty(rateLimiter)) { + pac = rateLimiter->queueTail->prev; + // chance in range of [0, 10000] + int rate = crate_stats_calculate(rateLimiter->rateStats, now_ts); + int size = pac->packetLen; + if (rate + size > limit) { + break; + } else { + crate_stats_update(rateLimiter->rateStats, size, now_ts); + } + rateLimiter->queueSizeInBytes -= pac->packetLen; + insertAfter(popNode(pac), head); + } + int dropped = 0; - DWORD now_ts = timeGetTime(); - int limit = bandwidthLimit * 1024; + while (rateLimiter->queueSizeInBytes > maxQueueSizeInKBytes * 1024 && !isQueueEmpty(rateLimiter)) { + pac = rateLimiter->queueHead->next; + LOG("dropped with bandwidth %dKB/s, direction %s", + (int)bandwidthLimit, pac->addr.Outbound ? "OUTBOUND" : "INBOUND"); + rateLimiter->queueSizeInBytes -= pac->packetLen; + freeNode(popNode(pac)); + ++dropped; + } - // allow 0 limit which should drop all - if (limit < 0 || rateStats == NULL) { + assert(rateLimiter->queueSizeInBytes >= 0); + + return dropped; +} + +static short bandwidthProcess(PacketNode *head, PacketNode* tail) { + if (inboundRateLimiter.rateStats == NULL || outboundRateLimiter.rateStats == NULL) { return 0; } - while (head->next != tail) { - PacketNode *pac = head->next; - int discard = 0; - // chance in range of [0, 10000] + PacketNode *pac = tail->prev; + while (pac != head) { if (checkDirection(pac->addr.Outbound, bandwidthInbound, bandwidthOutbound)) { - int rate = crate_stats_calculate(rateStats, now_ts); - int size = pac->packetLen; - if (rate + size > limit) { - LOG("dropped with bandwidth %dKB/s, direction %s", - (int)bandwidthLimit, pac->addr.Outbound ? "OUTBOUND" : "INBOUND"); - discard = 1; - } - else { - crate_stats_update(rateStats, size, now_ts); - } - } - if (discard) { - freeNode(popNode(pac)); - ++dropped; + if (pac->addr.Outbound) { + outboundRateLimiter.queueSizeInBytes += pac->packetLen; + insertAfter(popNode(pac), outboundRateLimiter.queueHead); + } else { + inboundRateLimiter.queueSizeInBytes += pac->packetLen; + insertAfter(popNode(pac), inboundRateLimiter.queueHead); + } + pac = tail->prev; } else { - head = head->next; + pac = pac->prev; } } - return dropped > 0; + int dropped; + dropped = rateLimiterProcess(&inboundRateLimiter, head, tail); + dropped += rateLimiterProcess(&outboundRateLimiter, head, tail); + + return dropped > 0 || !isQueueEmpty(&inboundRateLimiter) || !isQueueEmpty(&outboundRateLimiter); } diff --git a/src/divert.c b/src/divert.c index 0e1b5df..7b8ab04 100644 --- a/src/divert.c +++ b/src/divert.c @@ -8,12 +8,13 @@ #define MAX_PACKETSIZE 0xFFFF #define READ_TIME_PER_STEP 3 // FIXME does this need to be larger then the time to process the list? -#define CLOCK_WAITMS 40 +#define CLOCK_WAITMS 5 #define QUEUE_LEN 2 << 10 #define QUEUE_TIME 2 << 9 static HANDLE divertHandle; static volatile short stopLooping; +static volatile DWORD lastScheduleTime = 0; static HANDLE loopThread, clockThread, mutex; static DWORD divertReadLoop(LPVOID arg); @@ -237,7 +238,7 @@ static void divertConsumeStep() { // periodically try to consume packets to keep the network responsive and not blocked by recv static DWORD divertClockLoop(LPVOID arg) { - DWORD startTick, stepTick, waitResult; + DWORD startTick, waitResult; int ix; UNREFERENCED_PARAMETER(arg); @@ -245,36 +246,36 @@ static DWORD divertClockLoop(LPVOID arg) { for(;;) { // use acquire as wait for yielding thread startTick = GetTickCount(); - waitResult = WaitForSingleObject(mutex, CLOCK_WAITMS); - switch(waitResult) { - case WAIT_OBJECT_0: - /***************** enter critical region ************************/ - divertConsumeStep(); - /***************** leave critical region ************************/ - if (!ReleaseMutex(mutex)) { + if (lastScheduleTime + CLOCK_WAITMS <= startTick) { + waitResult = WaitForSingleObject(mutex, CLOCK_WAITMS); + switch(waitResult) { + case WAIT_OBJECT_0: + /***************** enter critical region ************************/ + divertConsumeStep(); + /***************** leave critical region ************************/ + if (!ReleaseMutex(mutex)) { + InterlockedIncrement16(&stopLooping); + LOG("Fatal: Failed to release mutex (%lu)", GetLastError()); + ABORT(); + } + lastScheduleTime = GetTickCount(); + break; + case WAIT_TIMEOUT: + // read loop is processing, so we can skip this run + LOG("!!! Skipping one run"); + Sleep(CLOCK_WAITMS); + break; + case WAIT_ABANDONED: + LOG("Acquired abandoned mutex"); InterlockedIncrement16(&stopLooping); - LOG("Fatal: Failed to release mutex (%lu)", GetLastError()); - ABORT(); - } - // if didn't spent enough time, we sleep on it - stepTick = GetTickCount() - startTick; - if (stepTick < CLOCK_WAITMS) { - Sleep(CLOCK_WAITMS - stepTick); - } - break; - case WAIT_TIMEOUT: - // read loop is processing, so we can skip this run - LOG("!!! Skipping one run"); - Sleep(CLOCK_WAITMS); - break; - case WAIT_ABANDONED: - LOG("Acquired abandoned mutex"); - InterlockedIncrement16(&stopLooping); - break; - case WAIT_FAILED: - LOG("Acquire failed (%lu)", GetLastError()); - InterlockedIncrement16(&stopLooping); - break; + break; + case WAIT_FAILED: + LOG("Acquire failed (%lu)", GetLastError()); + InterlockedIncrement16(&stopLooping); + break; + } + } else { + Sleep(CLOCK_WAITMS - (startTick - lastScheduleTime)); } // need to get the lock here @@ -364,6 +365,7 @@ static DWORD divertReadLoop(LPVOID arg) { pnode = createNode(packetBuf, readLen, &addrBuf); appendNode(pnode); divertConsumeStep(); + lastScheduleTime = GetTickCount(); /***************** leave critical region ************************/ if (!ReleaseMutex(mutex)) { LOG("Fatal: Failed to release mutex (%lu)", GetLastError());