Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Grzegorz Kołakowski committed Aug 28, 2024
1 parent 1181334 commit c68fca9
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ protected HttpSinkInternal(
);
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(endpointUrl),
"The endpoint URL must be set when initializing HTTP Sink.");
Preconditions.checkArgument(deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE,
"Only at-least-once and none delivery guarantees are supported.");
this.deliveryGuarantee = deliveryGuarantee;
this.endpointUrl = endpointUrl;
this.httpPostRequestCallback =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.getindata.connectors.http.internal.sink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
Expand All @@ -21,13 +21,15 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.SinkHttpClientResponse.ResponseItem;

@Slf4j
@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -59,42 +61,125 @@ public void setUp() {
when(metricGroup.getNumRecordsSendErrorsCounter()).thenReturn(errorCounter);
when(metricGroup.getIOMetricGroup()).thenReturn(operatorIOMetricGroup);
when(context.metricGroup()).thenReturn(metricGroup);
}

private void createHttpSinkWriter(DeliveryGuarantee deliveryGuarantee) {
Collection<BufferedRequestState<HttpSinkRequestEntry>> stateBuffer = new ArrayList<>();

this.httpSinkWriter = new HttpSinkWriter<>(
elementConverter,
context,
10,
10,
100,
10,
10,
10,
DeliveryGuarantee.NONE,
"http://localhost/client",
httpClient,
stateBuffer,
new Properties());
elementConverter,
context,
10,
10,
100,
10,
10,
10,
deliveryGuarantee,
"http://localhost/client",
httpClient,
stateBuffer,
new Properties());
}

@Test
public void testErrorMetric() throws InterruptedException {
public void testErrorMetricWhenAllRequestsFailed() throws InterruptedException {
createHttpSinkWriter(DeliveryGuarantee.NONE);

CompletableFuture<SinkHttpClientResponse> future = new CompletableFuture<>();
future.completeExceptionally(new Exception("Test Exception"));

when(httpClient.putRequests(anyList(), anyString())).thenReturn(future);

HttpSinkRequestEntry request = new HttpSinkRequestEntry("PUT", "hello".getBytes());
HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes());
HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes());
Consumer<List<HttpSinkRequestEntry>> requestResult =
httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries));

List<HttpSinkRequestEntry> requestEntries = Collections.singletonList(request);
List<HttpSinkRequestEntry> requestEntries = Arrays.asList(request1, request2);
this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult);

// would be good to use Countdown Latch instead sleep...
Thread.sleep(2000);
verify(errorCounter).inc(2);
}

@Test
public void testErrorMetricWhenAPartOfRequestsFailed() throws InterruptedException {
createHttpSinkWriter(DeliveryGuarantee.NONE);

CompletableFuture<SinkHttpClientResponse> future = new CompletableFuture<>();
future.complete(new SinkHttpClientResponse(
Arrays.asList(
new ResponseItem(null, false),
new ResponseItem(null, true))
));

when(httpClient.putRequests(anyList(), anyString())).thenReturn(future);

HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes());
HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes());
Consumer<List<HttpSinkRequestEntry>> requestResult =
httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries));

List<HttpSinkRequestEntry> requestEntries = Arrays.asList(request1, request2);
this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult);

// would be good to use Countdown Latch instead sleep...
Thread.sleep(2000);
verify(errorCounter).inc(1);
}

@Test
public void testRetryWhenAllRequestsFailed() throws InterruptedException {
createHttpSinkWriter(DeliveryGuarantee.AT_LEAST_ONCE);

CompletableFuture<SinkHttpClientResponse> future = new CompletableFuture<>();
future.completeExceptionally(new Exception("Test Exception"));

when(httpClient.putRequests(anyList(), anyString())).thenReturn(future);

final List<HttpSinkRequestEntry> entriesToRetry = new ArrayList<>();

HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes());
HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes());
Consumer<List<HttpSinkRequestEntry>> requestResult = entriesToRetry::addAll;

List<HttpSinkRequestEntry> requestEntries = Arrays.asList(request1, request2);
this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult);

// would be good to use Countdown Latch instead sleep...
Thread.sleep(2000);
verify(errorCounter).inc(2);
assertEquals(2, entriesToRetry.size());
}


@Test
public void testRetryWhenAPartOfRequestsFailed() throws InterruptedException {
createHttpSinkWriter(DeliveryGuarantee.AT_LEAST_ONCE);

CompletableFuture<SinkHttpClientResponse> future = new CompletableFuture<>();
future.complete(new SinkHttpClientResponse(
Arrays.asList(
new ResponseItem(null, false),
new ResponseItem(null, true))
));

when(httpClient.putRequests(anyList(), anyString())).thenReturn(future);

final List<HttpSinkRequestEntry> entriesToRetry = new ArrayList<>();

HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes());
HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes());
Consumer<List<HttpSinkRequestEntry>> requestResult = entriesToRetry::addAll;

List<HttpSinkRequestEntry> requestEntries = Arrays.asList(request1, request2);
this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult);

// would be good to use Countdown Latch instead sleep...
Thread.sleep(2000);
verify(errorCounter).inc(requestEntries.size());
verify(errorCounter).inc(1);
assertEquals(1, entriesToRetry.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,24 @@ public void nonexistentOptionsTest() {
assertThrows(ValidationException.class,
() -> tEnv.executeSql("INSERT INTO http VALUES (1)").await());
}

@Test
public void invalidSinkDeliveryGuaranteeOptionTests() {
final String invalidOptionCreateSql =
String.format(
"CREATE TABLE http (\n"
+ " id bigint\n"
+ ") with (\n"
+ " 'connector' = '%s',\n"
+ " 'url' = '%s',\n"
+ " 'format' = 'json',\n"
+ " 'sink.delivery-guarantee' = 'invalid'\n"
+ ")",
HttpDynamicTableSinkFactory.IDENTIFIER,
"http://localhost/"
);
tEnv.executeSql(invalidOptionCreateSql);
assertThrows(ValidationException.class,
() -> tEnv.executeSql("INSERT INTO http VALUES (1)").await());
}
}

0 comments on commit c68fca9

Please sign in to comment.