From abb52589a28cf6c830e8007f2436ab552d81b6d6 Mon Sep 17 00:00:00 2001 From: lastpeony Date: Wed, 23 Oct 2024 18:13:56 +0300 Subject: [PATCH 01/12] forward auto start request from edge to origin --- .../antmedia/AntMediaApplicationAdapter.java | 77 ++++++++++++++++- .../java/io/antmedia/filter/IPFilter.java | 1 + .../io/antmedia/filter/RestProxyFilter.java | 9 ++ .../antmedia/filter/TokenFilterManager.java | 5 +- .../antmedia/rest/BroadcastRestService.java | 1 + .../AntMediaApplicationAdaptorUnitTest.java | 83 +++++++++++++++++-- 6 files changed, 166 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index 0795a098b..0a4df6334 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -23,11 +23,14 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import io.antmedia.filter.JWTFilter; +import io.antmedia.filter.TokenFilterManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.http.HttpEntity; +import org.apache.http.HttpRequest; import org.apache.http.HttpStatus; import org.apache.http.NameValuePair; import org.apache.http.client.config.RequestConfig; @@ -904,7 +907,7 @@ public Broadcast updateBroadcastStatus(String streamId, long absoluteStartTimeMs public ServerSettings getServerSettings() { if (serverSettings == null) { - serverSettings = (ServerSettings)scope.getContext().getApplicationContext().getBean(ServerSettings.BEAN_NAME); + serverSettings = (ServerSettings)getScope().getContext().getApplicationContext().getBean(ServerSettings.BEAN_NAME); } return serverSettings; } @@ -1191,6 +1194,14 @@ public void sendPOST(String url, Map variables, int retryAttempt .build(); httpPost.setConfig(requestConfig); + if (variables.containsKey(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION)) { + Object clusterAuthJwt = variables.get(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION); + if (clusterAuthJwt != null) { + httpPost.setHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, clusterAuthJwt.toString()); + variables.remove(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION); + } + } + if (ContentType.APPLICATION_FORM_URLENCODED.getMimeType().equals(contentType)) { List urlParameters = new ArrayList<>(); @@ -1290,13 +1301,37 @@ public static final boolean isStreaming(Broadcast broadcast) { public Result startStreaming(Broadcast broadcast) { + logger.info("yunus START STREAMING REQUEST CAME!"); Result result = new Result(false); if(broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA) || broadcast.getType().equals(AntMediaApplicationAdapter.STREAM_SOURCE) || broadcast.getType().equals(AntMediaApplicationAdapter.VOD) ) { - result = getStreamFetcherManager().startStreaming(broadcast); + + if(isClusterMode()){ + logger.info("yunus IS IS CLUSTER MODE"); + + String broadcastOriginAddress = broadcast.getOriginAdress(); + logger.info("yunus broadcast origin address: {}", broadcastOriginAddress); + logger.info("yunus server settings host address: {}", getServerSettings().getHostAddress()); + + + if(broadcastOriginAddress.equals(getServerSettings().getHostAddress())){ + logger.info("yunus I AM THE ORIGIN I AM STARTING ON MYSELF"); + + result = getStreamFetcherManager().startStreaming(broadcast); + return result; + } + + forwardStartStreaming(broadcast); + result.setSuccess(true); + return result; + + }else{ + result = getStreamFetcherManager().startStreaming(broadcast); + } + } else if (broadcast.getType().equals(AntMediaApplicationAdapter.PLAY_LIST)) { result = getStreamFetcherManager().startPlaylist(broadcast); @@ -1308,6 +1343,36 @@ else if (broadcast.getType().equals(AntMediaApplicationAdapter.PLAY_LIST)) { return result; } + public void forwardStartStreaming(Broadcast broadcast){ + logger.info("yunus FORWARD START STREAMING CALLED!"); + String jwtToken = JWTFilter.generateJwtToken(getAppSettings().getClusterCommunicationKey(), System.currentTimeMillis() + 5000); + logger.info("yunus GENERATED JWT TOKEN: {}", jwtToken); + + String restRouteOfNode = "http://" + broadcast.getOriginAdress() + ":" + getServerSettings().getDefaultHttpPort() + File.separator + getAppSettings().getAppName() + File.separator+ "rest" + + File.separator +"v2" + File.separator +"broadcasts" + + File.separator + broadcast.getStreamId() + File.separator + "start"; + + logger.info("yunus REST ROUTE: {}", jwtToken); + + + Map variables = new HashMap<>(); + + variables.put(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, jwtToken); + + + vertx.executeBlocking(() -> { + try { + sendPOST(restRouteOfNode, variables, getAppSettings().getWebhookRetryCount(), getAppSettings().getWebhookContentType()); + } catch (Exception exception) { + logger.error(ExceptionUtils.getStackTrace(exception)); + } + + return null; + + }, false); + + } + public Result stopStreaming(Broadcast broadcast) { Result result = new Result(false); @@ -1594,8 +1659,8 @@ public void stopApplication(boolean deleteDB) { } public void closeDB(boolean deleteDB) { - boolean isClusterMode = getScope().getContext().hasBean(IClusterNotifier.BEAN_NAME); - if (deleteDB && isClusterMode) + boolean clusterMode = isClusterMode(); + if (deleteDB && clusterMode) { //let the other nodes have enough time to synch getVertx().setTimer(ClusterNode.NODE_UPDATE_PERIOD + 1000, l-> @@ -2232,6 +2297,10 @@ public void stopPublish(String streamId) { }); } + public boolean isClusterMode() { + return getScope().getContext().hasBean(IClusterNotifier.BEAN_NAME); + } + public void joinedTheRoom(String roomId, String streamId) { //No need to implement here. } diff --git a/src/main/java/io/antmedia/filter/IPFilter.java b/src/main/java/io/antmedia/filter/IPFilter.java index ca724768f..95ef9415c 100644 --- a/src/main/java/io/antmedia/filter/IPFilter.java +++ b/src/main/java/io/antmedia/filter/IPFilter.java @@ -31,6 +31,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha HttpServletRequest httpRequest = (HttpServletRequest) request; if (isAllowed(request.getRemoteAddr()) || RestProxyFilter.isNodeCommunicationTokenValid(httpRequest.getHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION), getAppSettings().getClusterCommunicationKey(), httpRequest.getRequestURI())) { + logger.info("yunus IT IS VALID, REREDIRECTING!!!"); chain.doFilter(request, response); return; } diff --git a/src/main/java/io/antmedia/filter/RestProxyFilter.java b/src/main/java/io/antmedia/filter/RestProxyFilter.java index 9f2b78ee4..66d54e210 100644 --- a/src/main/java/io/antmedia/filter/RestProxyFilter.java +++ b/src/main/java/io/antmedia/filter/RestProxyFilter.java @@ -97,6 +97,12 @@ && isHostRunning(subscriber.getRegisteredNodeIp(), getServerSettings().getDefaul * forward the request to the origin address. This also handles the scenario if the origin server is dead or broadcast stuck * because AntMediaApplicationAdapter.isStreaming checks the last update time */ + + // stop durumunda yönlendirme yok. + // broadcast rtsp ise offline olsa bile yönlendir. + + //1-) origin ölmüş olabilir. (kubernetes pod is dead) + //2-) node doluysa ne yapacağız. else if (broadcast != null && AntMediaApplicationAdapter.isStreaming(broadcast) && !isRequestDestinedForThisNode(request.getRemoteAddr(), broadcast.getOriginAdress()) && isHostRunning(broadcast.getOriginAdress(), getServerSettings().getDefaultHttpPort())) @@ -251,6 +257,9 @@ public boolean isRequestDestinedForThisNode(String requestAddress, String nodeA */ public static boolean isNodeCommunicationTokenValid(String jwtInternalCommunicationToken, String jwtSecretKey, String requestURI) { + logger.info("yunus CHECKING IF NODE COMMUNICATION TOKEN IS VALID OR NOT..."); + logger.info("yunus token:{} ", jwtInternalCommunicationToken); + boolean result = false; if (jwtInternalCommunicationToken != null) { diff --git a/src/main/java/io/antmedia/filter/TokenFilterManager.java b/src/main/java/io/antmedia/filter/TokenFilterManager.java index cb2f500f8..6a8742110 100644 --- a/src/main/java/io/antmedia/filter/TokenFilterManager.java +++ b/src/main/java/io/antmedia/filter/TokenFilterManager.java @@ -79,8 +79,9 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha */ - if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method)) + if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method)) { + logger.info("yunus INSIDE TOKEN FILTER MANAGER!!"); if (streamId == null) { logger.warn("No streamId found in the request: {}", httpRequest.getRequestURI()); } @@ -98,12 +99,14 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha if (jwtInternalCommunicationToken != null) { + logger.info("yunus INTERNAL COMM TOKEN IS NOT NULL. WILL CHECK IT!"); //if jwtInternalCommunicationToken is not null, //it means that this is the origin instance and receiving request from the edge node directly boolean checkJwtToken = false; if (streamId != null) { checkJwtToken = tokenServiceTmp.isJwtTokenValid(jwtInternalCommunicationToken, appSettings.getClusterCommunicationKey(), streamId, Token.PLAY_TOKEN); + logger.info("yunus CHECKED INTERNAL COMM TOKEN AND ITS {}", checkJwtToken); } if (!checkJwtToken) { diff --git a/src/main/java/io/antmedia/rest/BroadcastRestService.java b/src/main/java/io/antmedia/rest/BroadcastRestService.java index c0a9e3d32..0d11da15f 100644 --- a/src/main/java/io/antmedia/rest/BroadcastRestService.java +++ b/src/main/java/io/antmedia/rest/BroadcastRestService.java @@ -1322,6 +1322,7 @@ public Result getCameraErrorV2(@Parameter(description = "StreamId of the IP Came @Produces(MediaType.APPLICATION_JSON) public Result startStreamSourceV2(@Parameter(description = "the id of the stream. The broadcast type should be IP Camera or Stream Source otherwise it does not work", required = true) @PathParam("id") String id) { + logger.info("yunus I RECEIVED START STREAMING!!!"); return super.startStreamSource(id); } diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index e221d82a2..d50eb781b 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -6,10 +6,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -37,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import io.antmedia.filter.TokenFilterManager; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.http.HttpEntity; @@ -1816,10 +1814,85 @@ public void testStartStreaming() { startStreaming = spyAdapter.startStreaming(broadcast).isSuccess(); assertFalse(startStreaming); + } + @Test + public void testStartStreamingForwardToOrigin(){ + IScope scope = mock(IScope.class); + when(scope.getName()).thenReturn("junit"); - } + DataStore dataStore = new InMemoryDataStore("dbname"); + DataStoreFactory dsf = Mockito.mock(DataStoreFactory.class); + Mockito.when(dsf.getDataStore()).thenReturn(dataStore); + + AntMediaApplicationAdapter spyAdapter = Mockito.spy(adapter); + IContext context = mock(IContext.class); + when(context.getBean(spyAdapter.VERTX_BEAN_NAME)).thenReturn(vertx); + + when(context.getBean(IDataStoreFactory.BEAN_NAME)).thenReturn(dsf); + + + ApplicationContext appContext = Mockito.mock(ApplicationContext.class); + when(context.getApplicationContext()).thenReturn(appContext); + when(context.getResource(Mockito.anyString())).thenReturn(Mockito.mock(org.springframework.core.io.Resource.class)); + + AntMediaApplicationAdapter appAdaptor = Mockito.mock(AntMediaApplicationAdapter.class); + ServerSettings serverSettings = Mockito.mock(ServerSettings.class); + when(spyAdapter.getScope()).thenReturn(scope); + when(scope.getContext()).thenReturn(context); + + when(spyAdapter.getServerSettings()).thenReturn(serverSettings); + when(serverSettings.getHostAddress()).thenReturn("1.1.1.2"); + + AppSettings appSettings = new AppSettings(); + appSettings.setAppName("WebRTCAppEE"); + + spyAdapter.setAppSettings(appSettings); + spyAdapter.setDataStore(dataStore); + + when(appContext.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(appAdaptor); + + when(appContext.containsBean(AppSettings.BEAN_NAME)).thenReturn(true); + when(appContext.containsBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(true); + when(appContext.getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(vertx); + when(appContext.getBean(AppSettings.BEAN_NAME)).thenReturn(new AppSettings()); + + when(scope.getContext()).thenReturn(context); + spyAdapter.setDataStoreFactory(dsf); + + Mockito.doReturn(dataStore).when(spyAdapter).getDataStore(); + spyAdapter.setScope(scope); + + ILicenceService licenseService = Mockito.mock(ILicenceService.class); + Mockito.when(context.getBean(ILicenceService.BeanName.LICENCE_SERVICE.toString())).thenReturn(licenseService); + when(licenseService.isLicenceSuspended()).thenReturn(false); + + Broadcast broadcast = new Broadcast(); + broadcast.setType(AntMediaApplicationAdapter.STREAM_SOURCE); + broadcast.setOriginAdress("1.1.1.2"); + broadcast.setStreamUrl("https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerMeltdowns.mp4"); + dataStore.save(broadcast); + + when(spyAdapter.isClusterMode()).thenReturn(true); + + boolean startStreaming = spyAdapter.startStreaming(broadcast).isSuccess(); + assertTrue(startStreaming); + + when(serverSettings.getHostAddress()).thenReturn("1.1.1.3"); + when(serverSettings.getDefaultHttpPort()).thenReturn(Integer.valueOf("5080")); + + String restRouteOfNode = "http://" + broadcast.getOriginAdress() + ":" + "5080" + File.separator + appSettings.getAppName() + File.separator+ "rest" + + File.separator +"v2" + File.separator +"broadcasts" + + File.separator + broadcast.getStreamId() + File.separator + "start"; + + startStreaming = spyAdapter.startStreaming(broadcast).isSuccess(); + assertTrue(startStreaming); + + verify(spyAdapter,times(1)).forwardStartStreaming(broadcast); + verify(spyAdapter,times(1)).sendPOST(eq(restRouteOfNode), argThat(map -> map.containsKey(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION)), anyInt(), anyString()); + + } @Test public void testStreamFetcherNotStartAutomatically() { From 2822a96e2572d234dcb0395bd3ea47c5c4fb80e1 Mon Sep 17 00:00:00 2001 From: lastpeony Date: Thu, 24 Oct 2024 13:44:00 +0300 Subject: [PATCH 02/12] delete debug logs --- .../io/antmedia/AntMediaApplicationAdapter.java | 17 +---------------- src/main/java/io/antmedia/filter/IPFilter.java | 1 - .../io/antmedia/filter/RestProxyFilter.java | 3 --- .../io/antmedia/filter/TokenFilterManager.java | 5 +---- .../io/antmedia/rest/BroadcastRestService.java | 1 - 5 files changed, 2 insertions(+), 25 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index 0a4df6334..2e5fa70d6 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -1301,7 +1301,6 @@ public static final boolean isStreaming(Broadcast broadcast) { public Result startStreaming(Broadcast broadcast) { - logger.info("yunus START STREAMING REQUEST CAME!"); Result result = new Result(false); if(broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA) || @@ -1310,16 +1309,9 @@ public Result startStreaming(Broadcast broadcast) ) { if(isClusterMode()){ - logger.info("yunus IS IS CLUSTER MODE"); - String broadcastOriginAddress = broadcast.getOriginAdress(); - logger.info("yunus broadcast origin address: {}", broadcastOriginAddress); - logger.info("yunus server settings host address: {}", getServerSettings().getHostAddress()); - if(broadcastOriginAddress.equals(getServerSettings().getHostAddress())){ - logger.info("yunus I AM THE ORIGIN I AM STARTING ON MYSELF"); - result = getStreamFetcherManager().startStreaming(broadcast); return result; } @@ -1344,22 +1336,15 @@ else if (broadcast.getType().equals(AntMediaApplicationAdapter.PLAY_LIST)) { } public void forwardStartStreaming(Broadcast broadcast){ - logger.info("yunus FORWARD START STREAMING CALLED!"); String jwtToken = JWTFilter.generateJwtToken(getAppSettings().getClusterCommunicationKey(), System.currentTimeMillis() + 5000); - logger.info("yunus GENERATED JWT TOKEN: {}", jwtToken); - String restRouteOfNode = "http://" + broadcast.getOriginAdress() + ":" + getServerSettings().getDefaultHttpPort() + File.separator + getAppSettings().getAppName() + File.separator+ "rest" + File.separator +"v2" + File.separator +"broadcasts" + File.separator + broadcast.getStreamId() + File.separator + "start"; - logger.info("yunus REST ROUTE: {}", jwtToken); - - Map variables = new HashMap<>(); variables.put(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, jwtToken); - - + vertx.executeBlocking(() -> { try { sendPOST(restRouteOfNode, variables, getAppSettings().getWebhookRetryCount(), getAppSettings().getWebhookContentType()); diff --git a/src/main/java/io/antmedia/filter/IPFilter.java b/src/main/java/io/antmedia/filter/IPFilter.java index 95ef9415c..ca724768f 100644 --- a/src/main/java/io/antmedia/filter/IPFilter.java +++ b/src/main/java/io/antmedia/filter/IPFilter.java @@ -31,7 +31,6 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha HttpServletRequest httpRequest = (HttpServletRequest) request; if (isAllowed(request.getRemoteAddr()) || RestProxyFilter.isNodeCommunicationTokenValid(httpRequest.getHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION), getAppSettings().getClusterCommunicationKey(), httpRequest.getRequestURI())) { - logger.info("yunus IT IS VALID, REREDIRECTING!!!"); chain.doFilter(request, response); return; } diff --git a/src/main/java/io/antmedia/filter/RestProxyFilter.java b/src/main/java/io/antmedia/filter/RestProxyFilter.java index 66d54e210..eaed61588 100644 --- a/src/main/java/io/antmedia/filter/RestProxyFilter.java +++ b/src/main/java/io/antmedia/filter/RestProxyFilter.java @@ -257,9 +257,6 @@ public boolean isRequestDestinedForThisNode(String requestAddress, String nodeA */ public static boolean isNodeCommunicationTokenValid(String jwtInternalCommunicationToken, String jwtSecretKey, String requestURI) { - logger.info("yunus CHECKING IF NODE COMMUNICATION TOKEN IS VALID OR NOT..."); - logger.info("yunus token:{} ", jwtInternalCommunicationToken); - boolean result = false; if (jwtInternalCommunicationToken != null) { diff --git a/src/main/java/io/antmedia/filter/TokenFilterManager.java b/src/main/java/io/antmedia/filter/TokenFilterManager.java index 6a8742110..2f63cb546 100644 --- a/src/main/java/io/antmedia/filter/TokenFilterManager.java +++ b/src/main/java/io/antmedia/filter/TokenFilterManager.java @@ -81,7 +81,6 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method)) { - logger.info("yunus INSIDE TOKEN FILTER MANAGER!!"); if (streamId == null) { logger.warn("No streamId found in the request: {}", httpRequest.getRequestURI()); } @@ -99,14 +98,12 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha if (jwtInternalCommunicationToken != null) { - logger.info("yunus INTERNAL COMM TOKEN IS NOT NULL. WILL CHECK IT!"); - //if jwtInternalCommunicationToken is not null, + //if jwtInternalCommunicationToken is not null, //it means that this is the origin instance and receiving request from the edge node directly boolean checkJwtToken = false; if (streamId != null) { checkJwtToken = tokenServiceTmp.isJwtTokenValid(jwtInternalCommunicationToken, appSettings.getClusterCommunicationKey(), streamId, Token.PLAY_TOKEN); - logger.info("yunus CHECKED INTERNAL COMM TOKEN AND ITS {}", checkJwtToken); } if (!checkJwtToken) { diff --git a/src/main/java/io/antmedia/rest/BroadcastRestService.java b/src/main/java/io/antmedia/rest/BroadcastRestService.java index 0d11da15f..c0a9e3d32 100644 --- a/src/main/java/io/antmedia/rest/BroadcastRestService.java +++ b/src/main/java/io/antmedia/rest/BroadcastRestService.java @@ -1322,7 +1322,6 @@ public Result getCameraErrorV2(@Parameter(description = "StreamId of the IP Came @Produces(MediaType.APPLICATION_JSON) public Result startStreamSourceV2(@Parameter(description = "the id of the stream. The broadcast type should be IP Camera or Stream Source otherwise it does not work", required = true) @PathParam("id") String id) { - logger.info("yunus I RECEIVED START STREAMING!!!"); return super.startStreamSource(id); } From 308e914012903384da2ad2903acc7580d5479567 Mon Sep 17 00:00:00 2001 From: lastpeony Date: Thu, 24 Oct 2024 14:13:32 +0300 Subject: [PATCH 03/12] remove unnecessary comment --- src/main/java/io/antmedia/filter/RestProxyFilter.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/main/java/io/antmedia/filter/RestProxyFilter.java b/src/main/java/io/antmedia/filter/RestProxyFilter.java index eaed61588..24e3b4a95 100644 --- a/src/main/java/io/antmedia/filter/RestProxyFilter.java +++ b/src/main/java/io/antmedia/filter/RestProxyFilter.java @@ -97,18 +97,10 @@ && isHostRunning(subscriber.getRegisteredNodeIp(), getServerSettings().getDefaul * forward the request to the origin address. This also handles the scenario if the origin server is dead or broadcast stuck * because AntMediaApplicationAdapter.isStreaming checks the last update time */ - - // stop durumunda yönlendirme yok. - // broadcast rtsp ise offline olsa bile yönlendir. - - //1-) origin ölmüş olabilir. (kubernetes pod is dead) - //2-) node doluysa ne yapacağız. else if (broadcast != null && AntMediaApplicationAdapter.isStreaming(broadcast) && !isRequestDestinedForThisNode(request.getRemoteAddr(), broadcast.getOriginAdress()) && isHostRunning(broadcast.getOriginAdress(), getServerSettings().getDefaultHttpPort())) { - - forwardRequestToNode(request, response, broadcast.getOriginAdress()); } else From 94fcc91bdd88dc93473713371dc0ea771aa7dd71 Mon Sep 17 00:00:00 2001 From: lastpeony Date: Fri, 25 Oct 2024 16:57:25 +0300 Subject: [PATCH 04/12] retry logic --- .../antmedia/AntMediaApplicationAdapter.java | 107 ++++++++++++++---- 1 file changed, 85 insertions(+), 22 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index 2e5fa70d6..fb54038b8 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -19,9 +19,7 @@ import java.util.Queue; import java.util.Random; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.*; import io.antmedia.filter.JWTFilter; import io.antmedia.filter.TokenFilterManager; @@ -172,6 +170,8 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter public static final String STREAM_SOURCE = "streamSource"; public static final String PLAY_LIST = "playlist"; protected static final int END_POINT_LIMIT = 20; + public static final int CLUSTER_POST_RETRY_ATTEMPT = 3; + public static final int CLUSTER_POST_TIMEOUT_MS = 1000; //Allow any sub directory under / private static final String VOD_IMPORT_ALLOWED_DIRECTORY = "/"; @@ -1176,6 +1176,64 @@ private void putToMap(String keyName, Object keyValue, Map map) } } + + public boolean sendClusterPost(String url, String clusterCommunicationToken, int retryAttempts) { + logger.info("Sending cluster POST request to {}", url); + try (CloseableHttpClient httpClient = getHttpClient()) { + HttpPost httpPost = new HttpPost(url); + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(CLUSTER_POST_TIMEOUT_MS) + .setConnectionRequestTimeout(CLUSTER_POST_TIMEOUT_MS) + .setSocketTimeout(CLUSTER_POST_TIMEOUT_MS) + .build(); + httpPost.setConfig(requestConfig); + + httpPost.setHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, clusterCommunicationToken); + + try (CloseableHttpResponse httpResponse = httpClient.execute(httpPost)) { + int statusCode = httpResponse.getStatusLine().getStatusCode(); + logger.info("Cluster POST Response Status: {}", statusCode); + if (statusCode != HttpStatus.SC_OK) { + if (retryAttempts >= 1) { + logger.info("Retry attempt for Cluster POST in {} milliseconds due to non-200 response: {}", + appSettings.getWebhookRetryDelay(), statusCode); + return retrySendClusterPostWithDelay(url, clusterCommunicationToken, retryAttempts - 1); + } else { + logger.info("Stopping sending Cluster POST because no more retry attempts left. Giving up."); + return false; + } + } + return true; + } + } catch (IOException e) { + if (retryAttempts >= 1) { + logger.info("Retry attempt for Cluster POST in {} milliseconds due to IO exception: {}", + appSettings.getWebhookRetryDelay(), ExceptionUtils.getStackTrace(e)); + return retrySendClusterPostWithDelay(url, clusterCommunicationToken, retryAttempts - 1); + } else { + logger.info("Stopping sending Cluster POST because no more retry attempts left. Giving up."); + return false; + } + } + } + + public boolean retrySendClusterPostWithDelay(String url, String clusterCommunicationToken, int retryAttempts) { + CompletableFuture future = new CompletableFuture<>(); + + vertx.setTimer(appSettings.getWebhookRetryDelay(), timerId -> { + boolean result = sendClusterPost(url, clusterCommunicationToken, retryAttempts); + future.complete(result); + }); + + try { + // Timeout for each retry = retry delay + request timeout + buffer + return future.get(appSettings.getWebhookRetryDelay() + CLUSTER_POST_TIMEOUT_MS + 500, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + logger.error("Error waiting for retry POST request completion", e); + return false; + } + } + /** * * @param url @@ -1194,13 +1252,6 @@ public void sendPOST(String url, Map variables, int retryAttempt .build(); httpPost.setConfig(requestConfig); - if (variables.containsKey(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION)) { - Object clusterAuthJwt = variables.get(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION); - if (clusterAuthJwt != null) { - httpPost.setHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, clusterAuthJwt.toString()); - variables.remove(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION); - } - } if (ContentType.APPLICATION_FORM_URLENCODED.getMimeType().equals(contentType)) { @@ -1316,9 +1367,13 @@ public Result startStreaming(Broadcast broadcast) return result; } - forwardStartStreaming(broadcast); - result.setSuccess(true); - return result; + if(forwardStartStreaming(broadcast)){ + result.setSuccess(true); + return result; + } + + result = getStreamFetcherManager().startStreaming(broadcast); + }else{ result = getStreamFetcherManager().startStreaming(broadcast); @@ -1335,27 +1390,35 @@ else if (broadcast.getType().equals(AntMediaApplicationAdapter.PLAY_LIST)) { return result; } - public void forwardStartStreaming(Broadcast broadcast){ + public boolean forwardStartStreaming(Broadcast broadcast) { String jwtToken = JWTFilter.generateJwtToken(getAppSettings().getClusterCommunicationKey(), System.currentTimeMillis() + 5000); - String restRouteOfNode = "http://" + broadcast.getOriginAdress() + ":" + getServerSettings().getDefaultHttpPort() + File.separator + getAppSettings().getAppName() + File.separator+ "rest" + - File.separator +"v2" + File.separator +"broadcasts" + + String restRouteOfNode = "http://" + broadcast.getOriginAdress() + ":" + getServerSettings().getDefaultHttpPort() + + File.separator + getAppSettings().getAppName() + File.separator + "rest" + + File.separator + "v2" + File.separator + "broadcasts" + File.separator + broadcast.getStreamId() + File.separator + "start"; - Map variables = new HashMap<>(); + CompletableFuture future = new CompletableFuture<>(); - variables.put(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, jwtToken); - vertx.executeBlocking(() -> { try { - sendPOST(restRouteOfNode, variables, getAppSettings().getWebhookRetryCount(), getAppSettings().getWebhookContentType()); + boolean result = sendClusterPost(restRouteOfNode, jwtToken, CLUSTER_POST_RETRY_ATTEMPT); + future.complete(result); } catch (Exception exception) { logger.error(ExceptionUtils.getStackTrace(exception)); + future.complete(false); } - return null; - }, false); + try { + // Total timeout = retry attempts * (retry delay + request timeout) + buffer + long totalTimeout = CLUSTER_POST_RETRY_ATTEMPT * (appSettings.getWebhookRetryDelay() + 1000) + 1000; + return future.get(totalTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + logger.error("Error waiting for Cluster POST request completion", e); + return false; + } + } public Result stopStreaming(Broadcast broadcast) From b9dd4c78f40df13cc60fdc135ba7a6a62c9e3c63 Mon Sep 17 00:00:00 2001 From: lastpeony Date: Mon, 28 Oct 2024 13:09:06 +0300 Subject: [PATCH 05/12] extend and change unit test --- .../AntMediaApplicationAdaptorUnitTest.java | 323 ++++++++++++++++-- 1 file changed, 288 insertions(+), 35 deletions(-) diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index d50eb781b..58f7642fc 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -7,15 +7,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.io.ByteArrayInputStream; import java.io.File; @@ -31,6 +23,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -38,7 +31,9 @@ import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; import org.apache.http.StatusLine; +import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -1817,7 +1812,7 @@ public void testStartStreaming() { } @Test - public void testStartStreamingForwardToOrigin(){ + public void testStartStreamingForwardToOrigin() { IScope scope = mock(IScope.class); when(scope.getName()).thenReturn("junit"); @@ -1827,11 +1822,9 @@ public void testStartStreamingForwardToOrigin(){ AntMediaApplicationAdapter spyAdapter = Mockito.spy(adapter); IContext context = mock(IContext.class); - when(context.getBean(spyAdapter.VERTX_BEAN_NAME)).thenReturn(vertx); - + when(context.getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(vertx); when(context.getBean(IDataStoreFactory.BEAN_NAME)).thenReturn(dsf); - ApplicationContext appContext = Mockito.mock(ApplicationContext.class); when(context.getApplicationContext()).thenReturn(appContext); when(context.getResource(Mockito.anyString())).thenReturn(Mockito.mock(org.springframework.core.io.Resource.class)); @@ -1840,26 +1833,22 @@ public void testStartStreamingForwardToOrigin(){ ServerSettings serverSettings = Mockito.mock(ServerSettings.class); when(spyAdapter.getScope()).thenReturn(scope); when(scope.getContext()).thenReturn(context); - when(spyAdapter.getServerSettings()).thenReturn(serverSettings); - when(serverSettings.getHostAddress()).thenReturn("1.1.1.2"); AppSettings appSettings = new AppSettings(); appSettings.setAppName("WebRTCAppEE"); - + appSettings.setWebhookRetryDelay(1000); spyAdapter.setAppSettings(appSettings); spyAdapter.setDataStore(dataStore); when(appContext.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(appAdaptor); - when(appContext.containsBean(AppSettings.BEAN_NAME)).thenReturn(true); when(appContext.containsBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(true); when(appContext.getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(vertx); - when(appContext.getBean(AppSettings.BEAN_NAME)).thenReturn(new AppSettings()); - + when(appContext.getBean(AppSettings.BEAN_NAME)).thenReturn(appSettings); when(scope.getContext()).thenReturn(context); - spyAdapter.setDataStoreFactory(dsf); + spyAdapter.setDataStoreFactory(dsf); Mockito.doReturn(dataStore).when(spyAdapter).getDataStore(); spyAdapter.setScope(scope); @@ -1867,32 +1856,296 @@ public void testStartStreamingForwardToOrigin(){ Mockito.when(context.getBean(ILicenceService.BeanName.LICENCE_SERVICE.toString())).thenReturn(licenseService); when(licenseService.isLicenceSuspended()).thenReturn(false); + StreamFetcherManager streamFetcherManager = mock(StreamFetcherManager.class); + when(spyAdapter.getStreamFetcherManager()).thenReturn(streamFetcherManager); - Broadcast broadcast = new Broadcast(); - broadcast.setType(AntMediaApplicationAdapter.STREAM_SOURCE); - broadcast.setOriginAdress("1.1.1.2"); - broadcast.setStreamUrl("https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerMeltdowns.mp4"); - dataStore.save(broadcast); + // Test Case 1: Local server streaming (same origin address) + Broadcast broadcast1 = new Broadcast(); + broadcast1.setType(AntMediaApplicationAdapter.STREAM_SOURCE); + broadcast1.setOriginAdress("1.1.1.2"); + broadcast1.setStreamUrl("https://example.com/stream1.mp4"); + dataStore.save(broadcast1); when(spyAdapter.isClusterMode()).thenReturn(true); + when(serverSettings.getHostAddress()).thenReturn("1.1.1.2"); + when(streamFetcherManager.startStreaming(broadcast1)).thenReturn(new Result(true)); - boolean startStreaming = spyAdapter.startStreaming(broadcast).isSuccess(); - assertTrue(startStreaming); + Result result1 = spyAdapter.startStreaming(broadcast1); + assertTrue(result1.isSuccess()); + verify(streamFetcherManager, times(1)).startStreaming(broadcast1); + verify(spyAdapter, never()).forwardStartStreaming(broadcast1); + + // Test Case 2: successful origin forward + Broadcast broadcast2 = new Broadcast(); + broadcast2.setType(AntMediaApplicationAdapter.STREAM_SOURCE); + broadcast2.setOriginAdress("1.1.1.2"); + broadcast2.setStreamUrl("https://example.com/stream2.mp4"); + dataStore.save(broadcast2); when(serverSettings.getHostAddress()).thenReturn("1.1.1.3"); - when(serverSettings.getDefaultHttpPort()).thenReturn(Integer.valueOf("5080")); + when(serverSettings.getDefaultHttpPort()).thenReturn(5080); - String restRouteOfNode = "http://" + broadcast.getOriginAdress() + ":" + "5080" + File.separator + appSettings.getAppName() + File.separator+ "rest" + - File.separator +"v2" + File.separator +"broadcasts" + - File.separator + broadcast.getStreamId() + File.separator + "start"; + String expectedRestRoute = "http://" + broadcast2.getOriginAdress() + ":5080" + + File.separator + appSettings.getAppName() + File.separator + "rest" + + File.separator + "v2" + File.separator + "broadcasts" + + File.separator + broadcast2.getStreamId() + File.separator + "start"; - startStreaming = spyAdapter.startStreaming(broadcast).isSuccess(); - assertTrue(startStreaming); + Mockito.doReturn(true).when(spyAdapter).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + + Result result2 = spyAdapter.startStreaming(broadcast2); + assertTrue(result2.isSuccess()); + verify(spyAdapter, times(1)).forwardStartStreaming(broadcast2); + verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + + // Test Case 3: failed forward to origin, fallback to local edge streaming + Broadcast broadcast3 = new Broadcast(); + broadcast3.setType(AntMediaApplicationAdapter.STREAM_SOURCE); + broadcast3.setOriginAdress("1.1.1.2"); + broadcast3.setStreamUrl("https://example.com/stream3.mp4"); + dataStore.save(broadcast3); + + Mockito.doReturn(false).when(spyAdapter).sendClusterPost(anyString(), anyString(), anyInt()); + when(streamFetcherManager.startStreaming(broadcast3)).thenReturn(new Result(true)); + + Result result3 = spyAdapter.startStreaming(broadcast3); + assertTrue(result3.isSuccess()); + verify(streamFetcherManager, times(1)).startStreaming(broadcast3); + + // Test Case 4: Non-cluster mode streaming + when(spyAdapter.isClusterMode()).thenReturn(false); + Broadcast broadcast4 = new Broadcast(); + broadcast4.setType(AntMediaApplicationAdapter.STREAM_SOURCE); + broadcast4.setStreamUrl("https://example.com/stream4.mp4"); + dataStore.save(broadcast4); + + when(streamFetcherManager.startStreaming(broadcast4)).thenReturn(new Result(true)); + + Result result4 = spyAdapter.startStreaming(broadcast4); + assertTrue(result4.isSuccess()); + verify(spyAdapter, never()).forwardStartStreaming(broadcast4); + verify(streamFetcherManager, times(1)).startStreaming(broadcast4); + } + + @Test + public void testForwardStartStreaming() throws Exception { + IScope scope = mock(IScope.class); + when(scope.getName()).thenReturn("junit"); + + DataStore dataStore = new InMemoryDataStore("dbname"); + DataStoreFactory dsf = Mockito.mock(DataStoreFactory.class); + Mockito.when(dsf.getDataStore()).thenReturn(dataStore); + + AntMediaApplicationAdapter spyAdapter = Mockito.spy(adapter); + IContext context = mock(IContext.class); + when(context.getBean(spyAdapter.VERTX_BEAN_NAME)).thenReturn(vertx); + when(context.getBean(IDataStoreFactory.BEAN_NAME)).thenReturn(dsf); + + ApplicationContext appContext = Mockito.mock(ApplicationContext.class); + when(context.getApplicationContext()).thenReturn(appContext); + when(context.getResource(Mockito.anyString())).thenReturn(Mockito.mock(org.springframework.core.io.Resource.class)); + + ServerSettings serverSettings = Mockito.mock(ServerSettings.class); + when(spyAdapter.getScope()).thenReturn(scope); + when(scope.getContext()).thenReturn(context); + when(spyAdapter.getServerSettings()).thenReturn(serverSettings); + + AppSettings appSettings = new AppSettings(); + appSettings.setAppName("WebRTCAppEE"); + appSettings.setWebhookRetryDelay(1000); + appSettings.setClusterCommunicationKey("test-key"); + spyAdapter.setAppSettings(appSettings); + + when(serverSettings.getDefaultHttpPort()).thenReturn(5080); + + // Test Case 1: Successful forward + Broadcast broadcast1 = new Broadcast(); + broadcast1.setOriginAdress("127.0.0.1"); + broadcast1.setStreamId("test-stream-1"); + + String expectedRestRoute = "http://127.0.0.1:5080" + + File.separator + appSettings.getAppName() + File.separator + "rest" + + File.separator + "v2" + File.separator + "broadcasts" + + File.separator + broadcast1.getStreamId() + File.separator + "start"; + + Mockito.doReturn(true).when(spyAdapter).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + + // Test successful forward + boolean result1 = spyAdapter.forwardStartStreaming(broadcast1); + assertTrue(result1); + verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + + // Test Case 2: Failed forward + Broadcast broadcast2 = new Broadcast(); + broadcast2.setOriginAdress("127.0.0.2"); + broadcast2.setStreamId("test-stream-2"); + + String expectedRestRoute2 = "http://127.0.0.2:5080" + + File.separator + appSettings.getAppName() + File.separator + "rest" + + File.separator + "v2" + File.separator + "broadcasts" + + File.separator + broadcast2.getStreamId() + File.separator + "start"; + + // Mock sendClusterPost to return false + Mockito.doReturn(false).when(spyAdapter).sendClusterPost(eq(expectedRestRoute2), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + + boolean result2 = spyAdapter.forwardStartStreaming(broadcast2); + assertFalse(result2); + verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute2), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + + // Test Case 3: Exception during execution + Broadcast broadcast3 = new Broadcast(); + broadcast3.setOriginAdress("127.0.0.3"); + broadcast3.setStreamId("test-stream-3"); + + String expectedRestRoute3 = "http://127.0.0.3:5080" + + File.separator + appSettings.getAppName() + File.separator + "rest" + + File.separator + "v2" + File.separator + "broadcasts" + + File.separator + broadcast3.getStreamId() + File.separator + "start"; + + // Mock sendClusterPost to throw exception + Mockito.doThrow(new RuntimeException("Test exception")).when(spyAdapter) + .sendClusterPost(eq(expectedRestRoute3), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + + boolean result3 = spyAdapter.forwardStartStreaming(broadcast3); + assertFalse(result3); + verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute3), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + + // Test Case 4: Timeout scenario + Broadcast broadcast4 = new Broadcast(); + broadcast4.setOriginAdress("127.0.0.4"); + broadcast4.setStreamId("test-stream-4"); + + String expectedRestRoute4 = "http://127.0.0.4:5080" + + File.separator + appSettings.getAppName() + File.separator + "rest" + + File.separator + "v2" + File.separator + "broadcasts" + + File.separator + broadcast4.getStreamId() + File.separator + "start"; + + // Mock sendClusterPost to simulate timeout by sleeping + Mockito.doAnswer(invocation -> { + Thread.sleep(5000); // Sleep longer than the timeout + return true; + }).when(spyAdapter).sendClusterPost(eq(expectedRestRoute4), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + + // Reduce timeout for test purposes + appSettings.setWebhookRetryDelay(100); + + boolean result4 = spyAdapter.forwardStartStreaming(broadcast4); + assertFalse(result4); + verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute4), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + + // Verify JWT token format (at least check it's not empty and properly formatted) + ArgumentCaptor tokenCaptor = ArgumentCaptor.forClass(String.class); + verify(spyAdapter, atLeastOnce()).sendClusterPost(anyString(), tokenCaptor.capture(), anyInt()); + String capturedToken = tokenCaptor.getValue(); + assertNotNull(capturedToken); + assertFalse(capturedToken.isEmpty()); + } + + @Test + public void testSendClusterPost() throws Exception { + IScope scope = mock(IScope.class); + when(scope.getName()).thenReturn("junit"); - verify(spyAdapter,times(1)).forwardStartStreaming(broadcast); - verify(spyAdapter,times(1)).sendPOST(eq(restRouteOfNode), argThat(map -> map.containsKey(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION)), anyInt(), anyString()); + AntMediaApplicationAdapter spyAdapter = Mockito.spy(adapter); + IContext context = mock(IContext.class); + when(context.getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(vertx); + ApplicationContext appContext = Mockito.mock(ApplicationContext.class); + when(context.getApplicationContext()).thenReturn(appContext); + when(appContext.containsBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(true); + when(appContext.getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(vertx); + + AppSettings appSettings = new AppSettings(); + appSettings.setWebhookRetryDelay(100); // Small delay for testing + spyAdapter.setAppSettings(appSettings); + + CloseableHttpClient httpClient = mock(CloseableHttpClient.class); + Mockito.doReturn(httpClient).when(spyAdapter).getHttpClient(); + + String testUrl = "http://localhost:5080/test"; + String testToken = "test-token"; + + // Test Case 1: Successful request with 200 response + CloseableHttpResponse successResponse = mock(CloseableHttpResponse.class); + StatusLine successStatusLine = mock(StatusLine.class); + when(successStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(successResponse.getStatusLine()).thenReturn(successStatusLine); + when(httpClient.execute(any(HttpPost.class))).thenReturn(successResponse); + + boolean result1 = spyAdapter.sendClusterPost(testUrl, testToken, 3); + assertTrue(result1); + + // Verify request configuration + ArgumentCaptor httpPostCaptor = ArgumentCaptor.forClass(HttpPost.class); + verify(httpClient).execute(httpPostCaptor.capture()); + HttpPost capturedPost = httpPostCaptor.getValue(); + assertEquals(testUrl, capturedPost.getURI().toString()); + assertEquals(testToken, capturedPost.getFirstHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION).getValue()); + RequestConfig capturedConfig = capturedPost.getConfig(); + assertEquals(AntMediaApplicationAdapter.CLUSTER_POST_TIMEOUT_MS, capturedConfig.getConnectTimeout()); + assertEquals(AntMediaApplicationAdapter.CLUSTER_POST_TIMEOUT_MS, capturedConfig.getConnectionRequestTimeout()); + assertEquals(AntMediaApplicationAdapter.CLUSTER_POST_TIMEOUT_MS, capturedConfig.getSocketTimeout()); + + // Test Case 2: Failed request with retry + CloseableHttpResponse failResponse = mock(CloseableHttpResponse.class); + StatusLine failStatusLine = mock(StatusLine.class); + when(failStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR); + when(failResponse.getStatusLine()).thenReturn(failStatusLine); + + // First call returns 500, second call returns 200 + when(httpClient.execute(any(HttpPost.class))) + .thenReturn(failResponse) + .thenReturn(successResponse); + + boolean result2 = spyAdapter.sendClusterPost(testUrl, testToken, 2); + assertTrue(result2); + verify(spyAdapter, times(1)).retrySendClusterPostWithDelay(eq(testUrl), eq(testToken), eq(1)); + + // Test Case 3: Failed request with no more retries + when(httpClient.execute(any(HttpPost.class))).thenReturn(failResponse); + + boolean result3 = spyAdapter.sendClusterPost(testUrl, testToken, 0); + assertFalse(result3); + verify(spyAdapter, times(1)).retrySendClusterPostWithDelay(eq(testUrl), eq(testToken), anyInt()); + + // Test Case 4: IOException with retry + when(httpClient.execute(any(HttpPost.class))) + .thenThrow(new IOException("Test exception")) + .thenReturn(successResponse); + + boolean result4 = spyAdapter.sendClusterPost(testUrl, testToken, 2); + assertTrue(result4); + verify(spyAdapter, times(2)).retrySendClusterPostWithDelay(eq(testUrl), eq(testToken), eq(1)); + + // Test Case 5: IOException with no more retries + when(httpClient.execute(any(HttpPost.class))).thenThrow(new IOException("Test exception")); + + boolean result5 = spyAdapter.sendClusterPost(testUrl, testToken, 0); + assertFalse(result5); + + // Test Case 6: Test retrySendClusterPostWithDelay timeout + CompletableFuture timeoutFuture = new CompletableFuture<>(); + Mockito.doAnswer(invocation -> { + Thread.sleep(2000); // Sleep longer than timeout + timeoutFuture.complete(true); + return true; + }).when(spyAdapter).sendClusterPost(eq(testUrl), eq(testToken), eq(1)); + + boolean result6 = spyAdapter.retrySendClusterPostWithDelay(testUrl, testToken, 1); + assertFalse(result6); + + // Test Case 7: Test retrySendClusterPostWithDelay success + CompletableFuture successFuture = new CompletableFuture<>(); + Mockito.doAnswer(invocation -> { + successFuture.complete(true); + return true; + }).when(spyAdapter).sendClusterPost(eq(testUrl), eq(testToken), eq(1)); + + boolean result7 = spyAdapter.retrySendClusterPostWithDelay(testUrl, testToken, 1); + assertTrue(result7); + + verify(httpClient, atLeastOnce()).close(); } + @Test public void testStreamFetcherNotStartAutomatically() { From 3b293853aaddbb2ff02f27fef20021e04de13c0d Mon Sep 17 00:00:00 2001 From: lastpeony Date: Mon, 28 Oct 2024 13:47:28 +0300 Subject: [PATCH 06/12] thread interrupted --- .../java/io/antmedia/AntMediaApplicationAdapter.java | 6 ++++-- .../test/AntMediaApplicationAdaptorUnitTest.java | 12 ------------ 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index fb54038b8..0a2dc119b 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -1229,7 +1229,8 @@ public boolean retrySendClusterPostWithDelay(String url, String clusterCommunica // Timeout for each retry = retry delay + request timeout + buffer return future.get(appSettings.getWebhookRetryDelay() + CLUSTER_POST_TIMEOUT_MS + 500, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { - logger.error("Error waiting for retry POST request completion", e); + logger.error(ExceptionUtils.getStackTrace(e)); + Thread.currentThread().interrupt(); return false; } } @@ -1415,7 +1416,8 @@ public boolean forwardStartStreaming(Broadcast broadcast) { long totalTimeout = CLUSTER_POST_RETRY_ATTEMPT * (appSettings.getWebhookRetryDelay() + 1000) + 1000; return future.get(totalTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { - logger.error("Error waiting for Cluster POST request completion", e); + logger.error(ExceptionUtils.getStackTrace(e)); + Thread.currentThread().interrupt(); return false; } diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index 58f7642fc..787a2de8b 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -2132,18 +2132,6 @@ public void testSendClusterPost() throws Exception { boolean result6 = spyAdapter.retrySendClusterPostWithDelay(testUrl, testToken, 1); assertFalse(result6); - - // Test Case 7: Test retrySendClusterPostWithDelay success - CompletableFuture successFuture = new CompletableFuture<>(); - Mockito.doAnswer(invocation -> { - successFuture.complete(true); - return true; - }).when(spyAdapter).sendClusterPost(eq(testUrl), eq(testToken), eq(1)); - - boolean result7 = spyAdapter.retrySendClusterPostWithDelay(testUrl, testToken, 1); - assertTrue(result7); - - verify(httpClient, atLeastOnce()).close(); } @Test From ecb8e8f5f5c9f11c9be23b6b0cdaec3ae4e0a8da Mon Sep 17 00:00:00 2001 From: lastpeony Date: Tue, 19 Nov 2024 00:29:02 +0300 Subject: [PATCH 07/12] refactor non blocking --- .../antmedia/AntMediaApplicationAdapter.java | 170 +++++++++--------- .../AntMediaApplicationAdaptorUnitTest.java | 20 +-- 2 files changed, 100 insertions(+), 90 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index 0a2dc119b..5f2f9c445 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -28,7 +28,6 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.http.HttpEntity; -import org.apache.http.HttpRequest; import org.apache.http.HttpStatus; import org.apache.http.NameValuePair; import org.apache.http.client.config.RequestConfig; @@ -170,7 +169,7 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter public static final String STREAM_SOURCE = "streamSource"; public static final String PLAY_LIST = "playlist"; protected static final int END_POINT_LIMIT = 20; - public static final int CLUSTER_POST_RETRY_ATTEMPT = 3; + public static final int CLUSTER_POST_RETRY_ATTEMPT_COUNT = 3; public static final int CLUSTER_POST_TIMEOUT_MS = 1000; //Allow any sub directory under / @@ -1177,62 +1176,66 @@ private void putToMap(String keyName, Object keyValue, Map map) } - public boolean sendClusterPost(String url, String clusterCommunicationToken, int retryAttempts) { - logger.info("Sending cluster POST request to {}", url); - try (CloseableHttpClient httpClient = getHttpClient()) { - HttpPost httpPost = new HttpPost(url); - RequestConfig requestConfig = RequestConfig.custom() - .setConnectTimeout(CLUSTER_POST_TIMEOUT_MS) - .setConnectionRequestTimeout(CLUSTER_POST_TIMEOUT_MS) - .setSocketTimeout(CLUSTER_POST_TIMEOUT_MS) - .build(); - httpPost.setConfig(requestConfig); - - httpPost.setHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, clusterCommunicationToken); + public CompletableFuture sendClusterPost(String url, String clusterCommunicationToken, int retryAttempts) { + CompletableFuture future = new CompletableFuture<>(); - try (CloseableHttpResponse httpResponse = httpClient.execute(httpPost)) { - int statusCode = httpResponse.getStatusLine().getStatusCode(); - logger.info("Cluster POST Response Status: {}", statusCode); - if (statusCode != HttpStatus.SC_OK) { - if (retryAttempts >= 1) { - logger.info("Retry attempt for Cluster POST in {} milliseconds due to non-200 response: {}", - appSettings.getWebhookRetryDelay(), statusCode); - return retrySendClusterPostWithDelay(url, clusterCommunicationToken, retryAttempts - 1); + vertx.executeBlocking(promise -> { + try (CloseableHttpClient httpClient = getHttpClient()) { + HttpPost httpPost = new HttpPost(url); + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(CLUSTER_POST_TIMEOUT_MS) + .setConnectionRequestTimeout(CLUSTER_POST_TIMEOUT_MS) + .setSocketTimeout(CLUSTER_POST_TIMEOUT_MS) + .build(); + httpPost.setConfig(requestConfig); + + httpPost.setHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, clusterCommunicationToken); + + try (CloseableHttpResponse httpResponse = httpClient.execute(httpPost)) { + int statusCode = httpResponse.getStatusLine().getStatusCode(); + logger.info("Cluster POST Response Status: {}", statusCode); + if (statusCode == HttpStatus.SC_OK) { + promise.complete(true); } else { - logger.info("Stopping sending Cluster POST because no more retry attempts left. Giving up."); - return false; + if (retryAttempts >= 1) { + logger.info("Retry attempt for Cluster POST in {} milliseconds due to non-200 response: {}", + appSettings.getWebhookRetryDelay(), statusCode); + retrySendClusterPostWithDelay(url, clusterCommunicationToken, retryAttempts - 1) + .thenAccept(promise::complete); // Chain retry result + } else { + logger.info("Stopping sending Cluster POST because no more retry attempts left. Giving up."); + promise.complete(false); + } } } - return true; + } catch (IOException e) { + if (retryAttempts >= 1) { + logger.info("Retry attempt for Cluster POST in {} milliseconds due to IO exception: {}", + appSettings.getWebhookRetryDelay(), ExceptionUtils.getStackTrace(e)); + retrySendClusterPostWithDelay(url, clusterCommunicationToken, retryAttempts - 1) + .thenAccept(promise::complete); // Chain retry result + } else { + logger.info("Stopping sending Cluster POST because no more retry attempts left. Giving up."); + promise.complete(false); + } } - } catch (IOException e) { - if (retryAttempts >= 1) { - logger.info("Retry attempt for Cluster POST in {} milliseconds due to IO exception: {}", - appSettings.getWebhookRetryDelay(), ExceptionUtils.getStackTrace(e)); - return retrySendClusterPostWithDelay(url, clusterCommunicationToken, retryAttempts - 1); + }, result -> { + if (result.succeeded()) { + future.complete((Boolean) result.result()); } else { - logger.info("Stopping sending Cluster POST because no more retry attempts left. Giving up."); - return false; + future.completeExceptionally(result.cause()); } - } + }); + + return future; } - public boolean retrySendClusterPostWithDelay(String url, String clusterCommunicationToken, int retryAttempts) { + public CompletableFuture retrySendClusterPostWithDelay(String url, String clusterCommunicationToken, int retryAttempts) { CompletableFuture future = new CompletableFuture<>(); - vertx.setTimer(appSettings.getWebhookRetryDelay(), timerId -> { - boolean result = sendClusterPost(url, clusterCommunicationToken, retryAttempts); - future.complete(result); + sendClusterPost(url, clusterCommunicationToken, retryAttempts).thenAccept(future::complete); }); - - try { - // Timeout for each retry = retry delay + request timeout + buffer - return future.get(appSettings.getWebhookRetryDelay() + CLUSTER_POST_TIMEOUT_MS + 500, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - logger.error(ExceptionUtils.getStackTrace(e)); - Thread.currentThread().interrupt(); - return false; - } + return future; } /** @@ -1363,18 +1366,26 @@ public Result startStreaming(Broadcast broadcast) if(isClusterMode()){ String broadcastOriginAddress = broadcast.getOriginAdress(); - if(broadcastOriginAddress.equals(getServerSettings().getHostAddress())){ + if (broadcastOriginAddress == null || broadcastOriginAddress.isEmpty()) { result = getStreamFetcherManager().startStreaming(broadcast); + result.setMessage("Broadcasts origin address is not set. "+ getServerSettings().getHostAddress()+ " will fetch the stream."); return result; } - if(forwardStartStreaming(broadcast)){ - result.setSuccess(true); + if(broadcastOriginAddress.equals(getServerSettings().getHostAddress())){ + result = getStreamFetcherManager().startStreaming(broadcast); return result; } - result = getStreamFetcherManager().startStreaming(broadcast); + // Forward start streaming request to origin server + forwardStartStreaming(broadcast); + // Immediately return a success result with additional context + result.setSuccess(true); + result.setMessage("Request forwarded to origin server for fetching. " + + "Check broadcast status for final confirmation."); + + return result; }else{ result = getStreamFetcherManager().startStreaming(broadcast); @@ -1391,36 +1402,35 @@ else if (broadcast.getType().equals(AntMediaApplicationAdapter.PLAY_LIST)) { return result; } - public boolean forwardStartStreaming(Broadcast broadcast) { - String jwtToken = JWTFilter.generateJwtToken(getAppSettings().getClusterCommunicationKey(), System.currentTimeMillis() + 5000); - String restRouteOfNode = "http://" + broadcast.getOriginAdress() + ":" + getServerSettings().getDefaultHttpPort() + - File.separator + getAppSettings().getAppName() + File.separator + "rest" + - File.separator + "v2" + File.separator + "broadcasts" + - File.separator + broadcast.getStreamId() + File.separator + "start"; - - CompletableFuture future = new CompletableFuture<>(); - - vertx.executeBlocking(() -> { - try { - boolean result = sendClusterPost(restRouteOfNode, jwtToken, CLUSTER_POST_RETRY_ATTEMPT); - future.complete(result); - } catch (Exception exception) { - logger.error(ExceptionUtils.getStackTrace(exception)); - future.complete(false); - } - return null; - }, false); - - try { - // Total timeout = retry attempts * (retry delay + request timeout) + buffer - long totalTimeout = CLUSTER_POST_RETRY_ATTEMPT * (appSettings.getWebhookRetryDelay() + 1000) + 1000; - return future.get(totalTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - logger.error(ExceptionUtils.getStackTrace(e)); - Thread.currentThread().interrupt(); - return false; - } - + public void forwardStartStreaming(Broadcast broadcast) { + String jwtToken = JWTFilter.generateJwtToken( + getAppSettings().getClusterCommunicationKey(), + System.currentTimeMillis() + 5000 + ); + + String restRouteOfNode = "http://" + broadcast.getOriginAdress() + ":" + + getServerSettings().getDefaultHttpPort() + + File.separator + getAppSettings().getAppName() + + File.separator + "rest" + + File.separator + "v2" + + File.separator + "broadcasts" + + File.separator + broadcast.getStreamId() + + File.separator + "start"; + + sendClusterPost(restRouteOfNode, jwtToken, CLUSTER_POST_RETRY_ATTEMPT_COUNT) + .thenAccept(success -> { + if (success) { + logger.info("Cluster POST redirection to {} succeeded", restRouteOfNode); + } else { + logger.info("Cluster POST redirection to {} failed. Local node {} will fetch the stream. ", restRouteOfNode, getServerSettings().getHostAddress()); + getStreamFetcherManager().startStreaming(broadcast); + } + }) + .exceptionally(ex -> { + logger.error("Cluster POST encountered an exception: {}", ExceptionUtils.getStackTrace(ex)); + getStreamFetcherManager().startStreaming(broadcast); + return null; + }); } public Result stopStreaming(Broadcast broadcast) diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index 787a2de8b..3e216e514 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -1890,12 +1890,12 @@ public void testStartStreamingForwardToOrigin() { File.separator + "v2" + File.separator + "broadcasts" + File.separator + broadcast2.getStreamId() + File.separator + "start"; - Mockito.doReturn(true).when(spyAdapter).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + Mockito.doReturn(true).when(spyAdapter).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); Result result2 = spyAdapter.startStreaming(broadcast2); assertTrue(result2.isSuccess()); verify(spyAdapter, times(1)).forwardStartStreaming(broadcast2); - verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); // Test Case 3: failed forward to origin, fallback to local edge streaming Broadcast broadcast3 = new Broadcast(); @@ -1967,12 +1967,12 @@ public void testForwardStartStreaming() throws Exception { File.separator + "v2" + File.separator + "broadcasts" + File.separator + broadcast1.getStreamId() + File.separator + "start"; - Mockito.doReturn(true).when(spyAdapter).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + Mockito.doReturn(true).when(spyAdapter).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); // Test successful forward boolean result1 = spyAdapter.forwardStartStreaming(broadcast1); assertTrue(result1); - verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); // Test Case 2: Failed forward Broadcast broadcast2 = new Broadcast(); @@ -1985,11 +1985,11 @@ public void testForwardStartStreaming() throws Exception { File.separator + broadcast2.getStreamId() + File.separator + "start"; // Mock sendClusterPost to return false - Mockito.doReturn(false).when(spyAdapter).sendClusterPost(eq(expectedRestRoute2), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + Mockito.doReturn(false).when(spyAdapter).sendClusterPost(eq(expectedRestRoute2), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); boolean result2 = spyAdapter.forwardStartStreaming(broadcast2); assertFalse(result2); - verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute2), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute2), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); // Test Case 3: Exception during execution Broadcast broadcast3 = new Broadcast(); @@ -2003,11 +2003,11 @@ public void testForwardStartStreaming() throws Exception { // Mock sendClusterPost to throw exception Mockito.doThrow(new RuntimeException("Test exception")).when(spyAdapter) - .sendClusterPost(eq(expectedRestRoute3), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + .sendClusterPost(eq(expectedRestRoute3), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); boolean result3 = spyAdapter.forwardStartStreaming(broadcast3); assertFalse(result3); - verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute3), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute3), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); // Test Case 4: Timeout scenario Broadcast broadcast4 = new Broadcast(); @@ -2023,14 +2023,14 @@ public void testForwardStartStreaming() throws Exception { Mockito.doAnswer(invocation -> { Thread.sleep(5000); // Sleep longer than the timeout return true; - }).when(spyAdapter).sendClusterPost(eq(expectedRestRoute4), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + }).when(spyAdapter).sendClusterPost(eq(expectedRestRoute4), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); // Reduce timeout for test purposes appSettings.setWebhookRetryDelay(100); boolean result4 = spyAdapter.forwardStartStreaming(broadcast4); assertFalse(result4); - verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute4), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT)); + verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute4), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); // Verify JWT token format (at least check it's not empty and properly formatted) ArgumentCaptor tokenCaptor = ArgumentCaptor.forClass(String.class); From d173f1da4886b72928338e169f2e279a2f1866cd Mon Sep 17 00:00:00 2001 From: lastpeony Date: Tue, 19 Nov 2024 01:44:27 +0300 Subject: [PATCH 08/12] refactor unit tests --- .../antmedia/AntMediaApplicationAdapter.java | 64 ++-- .../io/antmedia/rest/RestServiceBase.java | 1 + .../AntMediaApplicationAdaptorUnitTest.java | 321 ++++++++++-------- 3 files changed, 226 insertions(+), 160 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index 8f4e0bcc0..1f68167a5 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -1,5 +1,6 @@ package io.antmedia; +import static io.antmedia.rest.RestServiceBase.FETCH_REQUEST_REDIRECTED_TO_ORIGIN; import static org.bytedeco.ffmpeg.global.avcodec.avcodec_get_name; import java.io.File; @@ -23,6 +24,7 @@ import io.antmedia.filter.JWTFilter; import io.antmedia.filter.TokenFilterManager; +import io.antmedia.statistic.*; import org.apache.commons.io.FileUtils; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.RandomStringUtils; @@ -82,9 +84,6 @@ import io.antmedia.settings.ServerSettings; import io.antmedia.shutdown.AMSShutdownManager; import io.antmedia.shutdown.IShutdownListener; -import io.antmedia.statistic.DashViewerStats; -import io.antmedia.statistic.HlsViewerStats; -import io.antmedia.statistic.ViewerStats; import io.antmedia.statistic.type.RTMPToWebRTCStats; import io.antmedia.statistic.type.WebRTCAudioReceiveStats; import io.antmedia.statistic.type.WebRTCAudioSendStats; @@ -103,6 +102,8 @@ import io.vertx.core.json.JsonObject; import io.vertx.ext.dropwizard.MetricsService; import jakarta.validation.constraints.NotNull; +import org.springframework.web.context.WebApplicationContext; + public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter implements IAntMediaStreamHandler, IShutdownListener { public static final String BEAN_NAME = "web.handler"; @@ -218,6 +219,8 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter private Random random = new Random(); + private IStatsCollector statsCollector; + @Override public boolean appStart(IScope app) { setScope(app); @@ -1176,7 +1179,6 @@ private void putToMap(String keyName, Object keyValue, Map map) } } - public CompletableFuture sendClusterPost(String url, String clusterCommunicationToken, int retryAttempts) { CompletableFuture future = new CompletableFuture<>(); @@ -1355,51 +1357,59 @@ public static final boolean isStreaming(Broadcast broadcast) { || IAntMediaStreamHandler.BROADCAST_STATUS_PREPARING.equals(broadcast.getStatus())); } - public Result startStreaming(Broadcast broadcast) - { + public Result startStreaming(Broadcast broadcast) { Result result = new Result(false); - if(broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA) || + // Check resource availability first + if (!getStatsCollector().enoughResource()) { + result.setMessage("Not enough resource on server to start streaming."); + return result; + } + + // Handle streaming for IP camera, stream source, and VOD + if (broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA) || broadcast.getType().equals(AntMediaApplicationAdapter.STREAM_SOURCE) || - broadcast.getType().equals(AntMediaApplicationAdapter.VOD) - ) { + broadcast.getType().equals(AntMediaApplicationAdapter.VOD)) { - if(isClusterMode()){ + if (isClusterMode()) { String broadcastOriginAddress = broadcast.getOriginAdress(); + // Handle null or empty origin address if (broadcastOriginAddress == null || broadcastOriginAddress.isEmpty()) { result = getStreamFetcherManager().startStreaming(broadcast); - result.setMessage("Broadcasts origin address is not set. "+ getServerSettings().getHostAddress()+ " will fetch the stream."); + result.setMessage("Broadcasts origin address is not set. " + + getServerSettings().getHostAddress() + " will fetch the stream."); return result; } - if(broadcastOriginAddress.equals(getServerSettings().getHostAddress())){ + // Handle matching origin address + if (broadcastOriginAddress.equals(getServerSettings().getHostAddress())) { result = getStreamFetcherManager().startStreaming(broadcast); return result; } - // Forward start streaming request to origin server + // Forward request to origin server forwardStartStreaming(broadcast); - - // Immediately return a success result with additional context result.setSuccess(true); + result.setErrorId(FETCH_REQUEST_REDIRECTED_TO_ORIGIN); result.setMessage("Request forwarded to origin server for fetching. " + "Check broadcast status for final confirmation."); - return result; - - }else{ + } else { result = getStreamFetcherManager().startStreaming(broadcast); } - } + // Handle playlist type else if (broadcast.getType().equals(AntMediaApplicationAdapter.PLAY_LIST)) { result = getStreamFetcherManager().startPlaylist(broadcast); - } + // Handle unsupported broadcast types else { - logger.info("Broadcast type is not supported for startStreaming:{} streamId:{}", broadcast.getType(), broadcast.getStreamId()); + logger.info("Broadcast type is not supported for startStreaming:{} streamId:{}", + broadcast.getType(), broadcast.getStreamId()); + result.setMessage("Broadcast type is not supported. It can be StreamSource, IP Camera, VOD, Playlist"); } + return result; } @@ -2390,4 +2400,16 @@ public Map getPlayListSchedulerTimer() { return playListSchedulerTimer; } + public IStatsCollector getStatsCollector() { + if(statsCollector == null) + { + statsCollector = (IStatsCollector)getScope().getContext().getApplicationContext().getBean(StatsCollector.BEAN_NAME); + } + return statsCollector; + } + + public void setStatsCollector(IStatsCollector statsCollector) { + this.statsCollector = statsCollector; + } + } diff --git a/src/main/java/io/antmedia/rest/RestServiceBase.java b/src/main/java/io/antmedia/rest/RestServiceBase.java index d1e36a9b8..657f457b5 100755 --- a/src/main/java/io/antmedia/rest/RestServiceBase.java +++ b/src/main/java/io/antmedia/rest/RestServiceBase.java @@ -144,6 +144,7 @@ public interface ProcessBuilderFactory { public static final int HIGH_CPU_ERROR = -3; public static final int FETCHER_NOT_STARTED_ERROR = -4; public static final int INVALID_STREAM_NAME_ERROR = -5; + public static final int FETCH_REQUEST_REDIRECTED_TO_ORIGIN = -6; public static final String HTTP = "http://"; public static final String RTSP = "rtsp://"; diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index 3e216e514..e4a73c891 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -1,5 +1,6 @@ package io.antmedia.test; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -28,6 +29,8 @@ import java.util.concurrent.TimeUnit; import io.antmedia.filter.TokenFilterManager; +import io.antmedia.statistic.IStatsCollector; +import io.antmedia.statistic.StatsCollector; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.http.HttpEntity; @@ -636,7 +639,7 @@ public void testMuxingFinishedWithPreview(){ adapter.muxingFinished("streamId", anyFile, 0, 100, 480, "src/test/resources/preview.png", null); - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()-> f.exists()); + await().atMost(5, TimeUnit.SECONDS).until(()-> f.exists()); try { Files.delete(f.toPath()); @@ -671,7 +674,7 @@ public void testMuxingFinished() { adapter.muxingFinished("streamId", anyFile, 0, 100, 480, null, null); - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()-> f.exists()); + await().atMost(5, TimeUnit.SECONDS).until(()-> f.exists()); try { Files.delete(f.toPath()); @@ -689,7 +692,7 @@ public void testMuxingFinished() { adapter.muxingFinished("streamId", anyFile, 0, 100, 480, "", null); - Awaitility.await().pollDelay(3, TimeUnit.SECONDS).atMost(4, TimeUnit.SECONDS).until(()-> !f.exists()); + await().pollDelay(3, TimeUnit.SECONDS).atMost(4, TimeUnit.SECONDS).until(()-> !f.exists()); } } @@ -702,7 +705,7 @@ public void testRunMuxerScript() { adapter.setVertx(vertx); adapter.runScript("src/test/resources/echo.sh"); - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()-> f.exists()); + await().atMost(5, TimeUnit.SECONDS).until(()-> f.exists()); try { Files.delete(f.toPath()); @@ -988,7 +991,7 @@ public void testNotifyHookErrors(){ spyAdaptor.publishTimeoutError(broadcast.getStreamId(), ""); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(()-> { + await().atMost(10, TimeUnit.SECONDS).until(()-> { boolean called = false; try { @@ -1014,7 +1017,7 @@ public void testNotifyHookErrors(){ spyAdaptor.incrementEncoderNotOpenedError(broadcast.getStreamId()); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(()-> { + await().atMost(10, TimeUnit.SECONDS).until(()-> { boolean called = false; try { @@ -1039,7 +1042,7 @@ public void testNotifyHookErrors(){ String rtmpUrl = "rtmp://localhost/test/stream123"; spyAdaptor.endpointFailedUpdate(broadcast.getStreamId(), rtmpUrl); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(()-> { + await().atMost(10, TimeUnit.SECONDS).until(()-> { boolean called = false; try { @@ -1135,7 +1138,7 @@ public void testNotifyHookFromMuxingFinished() { //call muxingFinished function spyAdaptor.muxingFinished(streamId, anyFile, 0, 100, 480, null, null); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(()-> { + await().atMost(10, TimeUnit.SECONDS).until(()-> { boolean called = false; try { @@ -1169,7 +1172,7 @@ public void testNotifyHookFromMuxingFinished() { //call muxingFinished function spyAdaptor.muxingFinished(streamId, anyFile, 0, 100, 480, null, null); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(()-> { + await().atMost(10, TimeUnit.SECONDS).until(()-> { boolean called = false; try { @@ -1198,7 +1201,7 @@ public void testNotifyHookFromMuxingFinished() { //call muxingFinished function spyAdaptor.muxingFinished(streamId, anyFile, 0, 100, 480, null, null); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(()-> { + await().atMost(10, TimeUnit.SECONDS).until(()-> { boolean called = false; try { @@ -1680,7 +1683,7 @@ public void testVertexThreadWait() { }, r->{}); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> threadStarted); + await().atMost(10, TimeUnit.SECONDS).until(() -> threadStarted); long t0 = System.currentTimeMillis(); antMediaApplicationAdapter.waitUntilThreadsStop(); @@ -1733,7 +1736,7 @@ public void testStreamFetcherStartAutomatically() spyAdapter.appStart(scope); - Awaitility.await().pollInterval(2,TimeUnit.SECONDS).atMost(3, TimeUnit.SECONDS).until(()-> true); + await().pollInterval(2,TimeUnit.SECONDS).atMost(3, TimeUnit.SECONDS).until(()-> true); ArgumentCaptor broadcastListCaptor = ArgumentCaptor.forClass(Broadcast.class); verify(streamFetcherManager, times(1)).startStreaming(broadcastListCaptor.capture()); @@ -1747,6 +1750,8 @@ public void testStreamFetcherStartAutomatically() @Test public void testStartStreaming() { IScope scope = mock(IScope.class); + IStatsCollector statsCollector = mock(IStatsCollector.class); + when(scope.getName()).thenReturn("junit"); DataStore dataStore = new InMemoryDataStore("dbname"); @@ -1755,8 +1760,10 @@ public void testStartStreaming() { AntMediaApplicationAdapter spyAdapter = Mockito.spy(adapter); IContext context = mock(IContext.class); + when(scope.getContext()).thenReturn(context); when(context.getBean(spyAdapter.VERTX_BEAN_NAME)).thenReturn(vertx); + when(context.getBean(IDataStoreFactory.BEAN_NAME)).thenReturn(dsf); @@ -1786,6 +1793,8 @@ public void testStartStreaming() { Mockito.when(context.getBean(ILicenceService.BeanName.LICENCE_SERVICE.toString())).thenReturn(licenseService); when(licenseService.isLicenceSuspended()).thenReturn(false); + when(appContext.getBean(StatsCollector.BEAN_NAME)).thenReturn(statsCollector); + when(statsCollector.enoughResource()).thenReturn(true); Broadcast broadcast = new Broadcast(); @@ -1798,10 +1807,10 @@ public void testStartStreaming() { assertTrue(spyAdapter.getStreamFetcherManager().isStreamRunning(broadcast)); StreamFetcher streamFetcher = spyAdapter.getStreamFetcherManager().getStreamFetcher(broadcast.getStreamId()); - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> streamFetcher.isThreadActive()); + await().atMost(5, TimeUnit.SECONDS).until(() -> streamFetcher.isThreadActive()); spyAdapter.getStreamFetcherManager().stopStreaming(broadcast.getStreamId()); - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> !streamFetcher.isThreadActive()); + await().atMost(5, TimeUnit.SECONDS).until(() -> !streamFetcher.isThreadActive()); @@ -1812,8 +1821,9 @@ public void testStartStreaming() { } @Test - public void testStartStreamingForwardToOrigin() { + public void testStartStreamingForwardToOrigin() throws Exception { IScope scope = mock(IScope.class); + IStatsCollector statsCollector = mock(IStatsCollector.class); when(scope.getName()).thenReturn("junit"); DataStore dataStore = new InMemoryDataStore("dbname"); @@ -1838,6 +1848,7 @@ public void testStartStreamingForwardToOrigin() { AppSettings appSettings = new AppSettings(); appSettings.setAppName("WebRTCAppEE"); appSettings.setWebhookRetryDelay(1000); + appSettings.setClusterCommunicationKey("test-key"); spyAdapter.setAppSettings(appSettings); spyAdapter.setDataStore(dataStore); @@ -1856,74 +1867,102 @@ public void testStartStreamingForwardToOrigin() { Mockito.when(context.getBean(ILicenceService.BeanName.LICENCE_SERVICE.toString())).thenReturn(licenseService); when(licenseService.isLicenceSuspended()).thenReturn(false); + when(appContext.getBean(StatsCollector.BEAN_NAME)).thenReturn(statsCollector); + when(statsCollector.enoughResource()).thenReturn(true); + StreamFetcherManager streamFetcherManager = mock(StreamFetcherManager.class); when(spyAdapter.getStreamFetcherManager()).thenReturn(streamFetcherManager); + when(spyAdapter.isClusterMode()).thenReturn(true); + when(serverSettings.getDefaultHttpPort()).thenReturn(5080); - // Test Case 1: Local server streaming (same origin address) + // Test Case 1: Null origin address in cluster mode Broadcast broadcast1 = new Broadcast(); broadcast1.setType(AntMediaApplicationAdapter.STREAM_SOURCE); - broadcast1.setOriginAdress("1.1.1.2"); - broadcast1.setStreamUrl("https://example.com/stream1.mp4"); + broadcast1.setStreamUrl("rtsp"); + broadcast1.setOriginAdress(null); dataStore.save(broadcast1); - when(spyAdapter.isClusterMode()).thenReturn(true); when(serverSettings.getHostAddress()).thenReturn("1.1.1.2"); when(streamFetcherManager.startStreaming(broadcast1)).thenReturn(new Result(true)); Result result1 = spyAdapter.startStreaming(broadcast1); assertTrue(result1.isSuccess()); + assertEquals("Broadcasts origin address is not set. 1.1.1.2 will fetch the stream.", result1.getMessage()); verify(streamFetcherManager, times(1)).startStreaming(broadcast1); - verify(spyAdapter, never()).forwardStartStreaming(broadcast1); - // Test Case 2: successful origin forward + // Test Case 2: Local server streaming (same origin address) Broadcast broadcast2 = new Broadcast(); broadcast2.setType(AntMediaApplicationAdapter.STREAM_SOURCE); broadcast2.setOriginAdress("1.1.1.2"); - broadcast2.setStreamUrl("https://example.com/stream2.mp4"); + broadcast2.setStreamUrl("rtsp"); dataStore.save(broadcast2); - when(serverSettings.getHostAddress()).thenReturn("1.1.1.3"); - when(serverSettings.getDefaultHttpPort()).thenReturn(5080); - - String expectedRestRoute = "http://" + broadcast2.getOriginAdress() + ":5080" + - File.separator + appSettings.getAppName() + File.separator + "rest" + - File.separator + "v2" + File.separator + "broadcasts" + - File.separator + broadcast2.getStreamId() + File.separator + "start"; - - Mockito.doReturn(true).when(spyAdapter).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); + when(streamFetcherManager.startStreaming(broadcast2)).thenReturn(new Result(true)); Result result2 = spyAdapter.startStreaming(broadcast2); assertTrue(result2.isSuccess()); - verify(spyAdapter, times(1)).forwardStartStreaming(broadcast2); - verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); + verify(streamFetcherManager, times(1)).startStreaming(broadcast2); - // Test Case 3: failed forward to origin, fallback to local edge streaming + // Test Case 3: Forward to different origin Broadcast broadcast3 = new Broadcast(); broadcast3.setType(AntMediaApplicationAdapter.STREAM_SOURCE); - broadcast3.setOriginAdress("1.1.1.2"); - broadcast3.setStreamUrl("https://example.com/stream3.mp4"); + broadcast3.setOriginAdress("1.1.1.3"); + broadcast3.setStreamUrl("rtsp"); + broadcast3.setStreamId("test-stream-id"); dataStore.save(broadcast3); - Mockito.doReturn(false).when(spyAdapter).sendClusterPost(anyString(), anyString(), anyInt()); - when(streamFetcherManager.startStreaming(broadcast3)).thenReturn(new Result(true)); + when(serverSettings.getHostAddress()).thenReturn("1.1.1.2"); + + CompletableFuture successFuture = CompletableFuture.completedFuture(true); + String expectedRestRoute = "http://1.1.1.3:5080/WebRTCAppEE/rest/v2/broadcasts/test-stream-id/start"; + Mockito.doReturn(successFuture).when(spyAdapter).sendClusterPost( + eq(expectedRestRoute), + anyString(), + eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT) + ); Result result3 = spyAdapter.startStreaming(broadcast3); assertTrue(result3.isSuccess()); - verify(streamFetcherManager, times(1)).startStreaming(broadcast3); + assertEquals("Request forwarded to origin server for fetching. Check broadcast status for final confirmation.", + result3.getMessage()); - // Test Case 4: Non-cluster mode streaming - when(spyAdapter.isClusterMode()).thenReturn(false); + // Test Case 4: Forward fails, fallback to local streaming Broadcast broadcast4 = new Broadcast(); broadcast4.setType(AntMediaApplicationAdapter.STREAM_SOURCE); - broadcast4.setStreamUrl("https://example.com/stream4.mp4"); + broadcast4.setOriginAdress("1.1.1.4"); + broadcast4.setStreamUrl("rtsp"); + broadcast4.setStreamId("test-stream-id-2"); dataStore.save(broadcast4); + CompletableFuture failureFuture = CompletableFuture.completedFuture(false); + String expectedRestRoute2 = "http://1.1.1.4:5080/WebRTCAppEE/rest/v2/broadcasts/test-stream-id-2/start"; + Mockito.doReturn(failureFuture).when(spyAdapter).sendClusterPost( + eq(expectedRestRoute2), + anyString(), + eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT) + ); when(streamFetcherManager.startStreaming(broadcast4)).thenReturn(new Result(true)); Result result4 = spyAdapter.startStreaming(broadcast4); assertTrue(result4.isSuccess()); - verify(spyAdapter, never()).forwardStartStreaming(broadcast4); - verify(streamFetcherManager, times(1)).startStreaming(broadcast4); + + // Verify the async callback executed + await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> { + verify(streamFetcherManager, times(1)).startStreaming(broadcast4); + }); + + // Test Case 5: Non-cluster mode streaming + when(spyAdapter.isClusterMode()).thenReturn(false); + Broadcast broadcast5 = new Broadcast(); + broadcast5.setType(AntMediaApplicationAdapter.STREAM_SOURCE); + broadcast5.setStreamUrl("https://example.com/stream5.mp4"); + dataStore.save(broadcast5); + + when(streamFetcherManager.startStreaming(broadcast5)).thenReturn(new Result(true)); + + Result result5 = spyAdapter.startStreaming(broadcast5); + assertTrue(result5.isSuccess()); + verify(streamFetcherManager, times(1)).startStreaming(broadcast5); } @Test @@ -1951,30 +1990,46 @@ public void testForwardStartStreaming() throws Exception { AppSettings appSettings = new AppSettings(); appSettings.setAppName("WebRTCAppEE"); - appSettings.setWebhookRetryDelay(1000); + appSettings.setWebhookRetryDelay(100); // Small delay for testing appSettings.setClusterCommunicationKey("test-key"); spyAdapter.setAppSettings(appSettings); + spyAdapter.setDataStore(dataStore); when(serverSettings.getDefaultHttpPort()).thenReturn(5080); + StreamFetcherManager streamFetcherManager = mock(StreamFetcherManager.class); + + when(spyAdapter.getStreamFetcherManager()).thenReturn(streamFetcherManager); // Test Case 1: Successful forward Broadcast broadcast1 = new Broadcast(); broadcast1.setOriginAdress("127.0.0.1"); broadcast1.setStreamId("test-stream-1"); - String expectedRestRoute = "http://127.0.0.1:5080" + + String expectedRestRoute1 = "http://127.0.0.1:5080" + File.separator + appSettings.getAppName() + File.separator + "rest" + File.separator + "v2" + File.separator + "broadcasts" + File.separator + broadcast1.getStreamId() + File.separator + "start"; - Mockito.doReturn(true).when(spyAdapter).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); - - // Test successful forward - boolean result1 = spyAdapter.forwardStartStreaming(broadcast1); - assertTrue(result1); - verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); + CompletableFuture successFuture = CompletableFuture.completedFuture(true); + Mockito.doReturn(successFuture).when(spyAdapter).sendClusterPost( + eq(expectedRestRoute1), + anyString(), + eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT) + ); + + spyAdapter.forwardStartStreaming(broadcast1); + + // Verify successful case + await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> { + verify(spyAdapter, times(1)).sendClusterPost( + eq(expectedRestRoute1), + anyString(), + eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT) + ); + verify(streamFetcherManager, never()).startStreaming(broadcast1); + }); - // Test Case 2: Failed forward + // Test Case 2: Failed forward with retry Broadcast broadcast2 = new Broadcast(); broadcast2.setOriginAdress("127.0.0.2"); broadcast2.setStreamId("test-stream-2"); @@ -1984,12 +2039,25 @@ public void testForwardStartStreaming() throws Exception { File.separator + "v2" + File.separator + "broadcasts" + File.separator + broadcast2.getStreamId() + File.separator + "start"; - // Mock sendClusterPost to return false - Mockito.doReturn(false).when(spyAdapter).sendClusterPost(eq(expectedRestRoute2), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); - - boolean result2 = spyAdapter.forwardStartStreaming(broadcast2); - assertFalse(result2); - verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute2), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); + CompletableFuture failureFuture = CompletableFuture.completedFuture(false); + Mockito.doReturn(failureFuture).when(spyAdapter).sendClusterPost( + eq(expectedRestRoute2), + anyString(), + eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT) + ); + when(streamFetcherManager.startStreaming(broadcast2)).thenReturn(new Result(true)); + + spyAdapter.forwardStartStreaming(broadcast2); + + // Verify failure case with local fallback + await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> { + verify(spyAdapter, times(1)).sendClusterPost( + eq(expectedRestRoute2), + anyString(), + eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT) + ); + verify(streamFetcherManager, times(1)).startStreaming(broadcast2); + }); // Test Case 3: Exception during execution Broadcast broadcast3 = new Broadcast(); @@ -2001,43 +2069,35 @@ public void testForwardStartStreaming() throws Exception { File.separator + "v2" + File.separator + "broadcasts" + File.separator + broadcast3.getStreamId() + File.separator + "start"; - // Mock sendClusterPost to throw exception - Mockito.doThrow(new RuntimeException("Test exception")).when(spyAdapter) - .sendClusterPost(eq(expectedRestRoute3), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); - - boolean result3 = spyAdapter.forwardStartStreaming(broadcast3); - assertFalse(result3); - verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute3), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); - - // Test Case 4: Timeout scenario - Broadcast broadcast4 = new Broadcast(); - broadcast4.setOriginAdress("127.0.0.4"); - broadcast4.setStreamId("test-stream-4"); - - String expectedRestRoute4 = "http://127.0.0.4:5080" + - File.separator + appSettings.getAppName() + File.separator + "rest" + - File.separator + "v2" + File.separator + "broadcasts" + - File.separator + broadcast4.getStreamId() + File.separator + "start"; - - // Mock sendClusterPost to simulate timeout by sleeping - Mockito.doAnswer(invocation -> { - Thread.sleep(5000); // Sleep longer than the timeout - return true; - }).when(spyAdapter).sendClusterPost(eq(expectedRestRoute4), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); + CompletableFuture exceptionFuture = new CompletableFuture<>(); + exceptionFuture.completeExceptionally(new RuntimeException("Test exception")); + Mockito.doReturn(exceptionFuture).when(spyAdapter).sendClusterPost( + eq(expectedRestRoute3), + anyString(), + eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT) + ); + when(streamFetcherManager.startStreaming(broadcast3)).thenReturn(new Result(true)); - // Reduce timeout for test purposes - appSettings.setWebhookRetryDelay(100); + spyAdapter.forwardStartStreaming(broadcast3); - boolean result4 = spyAdapter.forwardStartStreaming(broadcast4); - assertFalse(result4); - verify(spyAdapter, times(1)).sendClusterPost(eq(expectedRestRoute4), anyString(), eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT)); + // Verify exception case with local fallback + await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> { + verify(spyAdapter, times(1)).sendClusterPost( + eq(expectedRestRoute3), + anyString(), + eq(AntMediaApplicationAdapter.CLUSTER_POST_RETRY_ATTEMPT_COUNT) + ); + verify(streamFetcherManager, times(1)).startStreaming(broadcast3); + }); - // Verify JWT token format (at least check it's not empty and properly formatted) + // Test Case 4: Verify JWT token format ArgumentCaptor tokenCaptor = ArgumentCaptor.forClass(String.class); - verify(spyAdapter, atLeastOnce()).sendClusterPost(anyString(), tokenCaptor.capture(), anyInt()); - String capturedToken = tokenCaptor.getValue(); - assertNotNull(capturedToken); - assertFalse(capturedToken.isEmpty()); + verify(spyAdapter, atLeast(3)).sendClusterPost(anyString(), tokenCaptor.capture(), anyInt()); + + for (String capturedToken : tokenCaptor.getAllValues()) { + assertNotNull(capturedToken); + assertFalse(capturedToken.isEmpty()); + } } @Test @@ -2064,74 +2124,57 @@ public void testSendClusterPost() throws Exception { String testUrl = "http://localhost:5080/test"; String testToken = "test-token"; - // Test Case 1: Successful request with 200 response + // Mock responses CloseableHttpResponse successResponse = mock(CloseableHttpResponse.class); StatusLine successStatusLine = mock(StatusLine.class); when(successStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); when(successResponse.getStatusLine()).thenReturn(successStatusLine); - when(httpClient.execute(any(HttpPost.class))).thenReturn(successResponse); - - boolean result1 = spyAdapter.sendClusterPost(testUrl, testToken, 3); - assertTrue(result1); - - // Verify request configuration - ArgumentCaptor httpPostCaptor = ArgumentCaptor.forClass(HttpPost.class); - verify(httpClient).execute(httpPostCaptor.capture()); - HttpPost capturedPost = httpPostCaptor.getValue(); - assertEquals(testUrl, capturedPost.getURI().toString()); - assertEquals(testToken, capturedPost.getFirstHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION).getValue()); - RequestConfig capturedConfig = capturedPost.getConfig(); - assertEquals(AntMediaApplicationAdapter.CLUSTER_POST_TIMEOUT_MS, capturedConfig.getConnectTimeout()); - assertEquals(AntMediaApplicationAdapter.CLUSTER_POST_TIMEOUT_MS, capturedConfig.getConnectionRequestTimeout()); - assertEquals(AntMediaApplicationAdapter.CLUSTER_POST_TIMEOUT_MS, capturedConfig.getSocketTimeout()); - // Test Case 2: Failed request with retry CloseableHttpResponse failResponse = mock(CloseableHttpResponse.class); StatusLine failStatusLine = mock(StatusLine.class); when(failStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR); when(failResponse.getStatusLine()).thenReturn(failStatusLine); - // First call returns 500, second call returns 200 + // First request fails, second succeeds when(httpClient.execute(any(HttpPost.class))) .thenReturn(failResponse) .thenReturn(successResponse); - boolean result2 = spyAdapter.sendClusterPost(testUrl, testToken, 2); - assertTrue(result2); - verify(spyAdapter, times(1)).retrySendClusterPostWithDelay(eq(testUrl), eq(testToken), eq(1)); + // Test asynchronous behavior + CompletableFuture result = spyAdapter.sendClusterPost(testUrl, testToken, 1); - // Test Case 3: Failed request with no more retries - when(httpClient.execute(any(HttpPost.class))).thenReturn(failResponse); + // Wait for the CompletableFuture to complete + assertTrue(result.get(2, TimeUnit.SECONDS)); - boolean result3 = spyAdapter.sendClusterPost(testUrl, testToken, 0); - assertFalse(result3); - verify(spyAdapter, times(1)).retrySendClusterPostWithDelay(eq(testUrl), eq(testToken), anyInt()); + // Verify request configuration + ArgumentCaptor httpPostCaptor = ArgumentCaptor.forClass(HttpPost.class); + verify(httpClient, times(2)).execute(httpPostCaptor.capture()); - // Test Case 4: IOException with retry - when(httpClient.execute(any(HttpPost.class))) - .thenThrow(new IOException("Test exception")) - .thenReturn(successResponse); + List capturedPosts = httpPostCaptor.getAllValues(); + assertEquals(testUrl, capturedPosts.get(0).getURI().toString()); + assertEquals(testToken, capturedPosts.get(0) + .getFirstHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION).getValue()); - boolean result4 = spyAdapter.sendClusterPost(testUrl, testToken, 2); - assertTrue(result4); - verify(spyAdapter, times(2)).retrySendClusterPostWithDelay(eq(testUrl), eq(testToken), eq(1)); + RequestConfig capturedConfig = capturedPosts.get(0).getConfig(); + assertEquals(AntMediaApplicationAdapter.CLUSTER_POST_TIMEOUT_MS, capturedConfig.getConnectTimeout()); + assertEquals(AntMediaApplicationAdapter.CLUSTER_POST_TIMEOUT_MS, capturedConfig.getConnectionRequestTimeout()); + assertEquals(AntMediaApplicationAdapter.CLUSTER_POST_TIMEOUT_MS, capturedConfig.getSocketTimeout()); - // Test Case 5: IOException with no more retries - when(httpClient.execute(any(HttpPost.class))).thenThrow(new IOException("Test exception")); + // Test no retries left after failure + when(httpClient.execute(any(HttpPost.class))).thenReturn(failResponse); + CompletableFuture noRetryResult = spyAdapter.sendClusterPost(testUrl, testToken, 0); + assertFalse(noRetryResult.get(2, TimeUnit.SECONDS)); - boolean result5 = spyAdapter.sendClusterPost(testUrl, testToken, 0); - assertFalse(result5); + // Test IOException handling with retries + when(httpClient.execute(any(HttpPost.class))) + .thenThrow(new IOException("Test exception")) + .thenReturn(successResponse); - // Test Case 6: Test retrySendClusterPostWithDelay timeout - CompletableFuture timeoutFuture = new CompletableFuture<>(); - Mockito.doAnswer(invocation -> { - Thread.sleep(2000); // Sleep longer than timeout - timeoutFuture.complete(true); - return true; - }).when(spyAdapter).sendClusterPost(eq(testUrl), eq(testToken), eq(1)); + CompletableFuture ioRetryResult = spyAdapter.sendClusterPost(testUrl, testToken, 1); + assertTrue(ioRetryResult.get(2, TimeUnit.SECONDS)); - boolean result6 = spyAdapter.retrySendClusterPostWithDelay(testUrl, testToken, 1); - assertFalse(result6); + // Verify the retry logic was triggered + verify(spyAdapter, times(2)).retrySendClusterPostWithDelay(eq(testUrl), eq(testToken), eq(0)); } @Test @@ -2177,7 +2220,7 @@ public void testStreamFetcherNotStartAutomatically() spyAdapter.appStart(scope); - Awaitility.await().pollInterval(2,TimeUnit.SECONDS).atMost(3, TimeUnit.SECONDS).until(()-> true); + await().pollInterval(2,TimeUnit.SECONDS).atMost(3, TimeUnit.SECONDS).until(()-> true); ArgumentCaptor broadcastListCaptor = ArgumentCaptor.forClass(Broadcast.class); verify(streamFetcherManager, never()).startStreaming(broadcastListCaptor.capture()); From 43ef4837395d709960c75236b1cb32d97fa78a87 Mon Sep 17 00:00:00 2001 From: lastpeony Date: Tue, 19 Nov 2024 02:27:33 +0300 Subject: [PATCH 09/12] fix unit test --- .../test/rest/PlaylistRestServiceV2UnitTest.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/antmedia/test/rest/PlaylistRestServiceV2UnitTest.java b/src/test/java/io/antmedia/test/rest/PlaylistRestServiceV2UnitTest.java index d93212d59..0419be6e0 100644 --- a/src/test/java/io/antmedia/test/rest/PlaylistRestServiceV2UnitTest.java +++ b/src/test/java/io/antmedia/test/rest/PlaylistRestServiceV2UnitTest.java @@ -560,12 +560,14 @@ public void testStartPlaylist() { restServiceReal.setScope(scope); - ApplicationContext context = mock(ApplicationContext.class); + ApplicationContext applicationContext = mock(ApplicationContext.class); + + when(icontext.getApplicationContext()).thenReturn(applicationContext); InMemoryDataStore dataStore = new InMemoryDataStore("testdb"); restServiceReal.setDataStore(dataStore); - restServiceReal.setAppCtx(context); + restServiceReal.setAppCtx(applicationContext); AntMediaApplicationAdapter app = Mockito.spy(new AntMediaApplicationAdapter()); @@ -574,10 +576,12 @@ public void testStartPlaylist() { //init stream fetcher app.getStreamFetcherManager(); - when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(app); + when(applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(app); IStatsCollector collector = mock(IStatsCollector.class); when(collector.enoughResource()).thenReturn(true); - when(context.getBean(IStatsCollector.BEAN_NAME)).thenReturn(collector); + when(applicationContext.getBean(IStatsCollector.BEAN_NAME)).thenReturn(collector); + + when(app.getStatsCollector()).thenReturn(collector); dataStore.save(playlist); From ea3e78b9346d668470c012c5c883faffcbca640c Mon Sep 17 00:00:00 2001 From: mekya Date: Mon, 16 Dec 2024 17:53:38 +0300 Subject: [PATCH 10/12] Fix failing test case --- src/main/java/io/antmedia/AntMediaApplicationAdapter.java | 3 ++- .../io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index 6f04be9ba..c1d357432 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -1467,7 +1467,8 @@ public Result startStreaming(Broadcast broadcast) { result.setMessage("Request forwarded to origin server for fetching. " + "Check broadcast status for final confirmation."); return result; - } else { + } + else { result = getStreamFetcherManager().startStreaming(broadcast); } } diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index 4968364bf..7b386d302 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -1970,7 +1970,7 @@ public void testStartStreamingForwardToOrigin() throws Exception { assertTrue(result4.isSuccess()); // Verify the async callback executed - await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> { + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { verify(streamFetcherManager, times(1)).startStreaming(broadcast4); }); From c37e91678d96630cad20962a1d7670ad8c3af431 Mon Sep 17 00:00:00 2001 From: mekya Date: Mon, 16 Dec 2024 18:06:44 +0300 Subject: [PATCH 11/12] Remove the subscriber session map --- src/main/java/io/antmedia/filter/TokenSessionFilter.java | 1 - src/main/java/io/antmedia/security/ITokenService.java | 7 ------- src/main/java/io/antmedia/security/MockTokenService.java | 6 ------ 3 files changed, 14 deletions(-) diff --git a/src/main/java/io/antmedia/filter/TokenSessionFilter.java b/src/main/java/io/antmedia/filter/TokenSessionFilter.java index d7d569c27..de712c93f 100644 --- a/src/main/java/io/antmedia/filter/TokenSessionFilter.java +++ b/src/main/java/io/antmedia/filter/TokenSessionFilter.java @@ -29,7 +29,6 @@ public void sessionDestroyed(HttpSessionEvent se) { ITokenService tokenServiceTmp = getTokenService(); if (tokenServiceTmp != null) { tokenServiceTmp.getAuthenticatedMap().remove(se.getSession().getId()); - tokenServiceTmp.getSubscriberAuthenticatedMap().remove(se.getSession().getId()); } } diff --git a/src/main/java/io/antmedia/security/ITokenService.java b/src/main/java/io/antmedia/security/ITokenService.java index 1079044ea..9bae2bf6c 100644 --- a/src/main/java/io/antmedia/security/ITokenService.java +++ b/src/main/java/io/antmedia/security/ITokenService.java @@ -116,12 +116,5 @@ public String toString() { */ Map getAuthenticatedMap(); - - /** - * gets map of authenticated subscriber sessions - * @return list - */ - - Map getSubscriberAuthenticatedMap(); } diff --git a/src/main/java/io/antmedia/security/MockTokenService.java b/src/main/java/io/antmedia/security/MockTokenService.java index 17f5b1ceb..9ca46dbd1 100644 --- a/src/main/java/io/antmedia/security/MockTokenService.java +++ b/src/main/java/io/antmedia/security/MockTokenService.java @@ -12,7 +12,6 @@ public class MockTokenService implements IStreamPublishSecurity, ITokenService{ Map authenticatedMap = new ConcurrentHashMap<>(); - Map subscriberAuthenticatedMap = new ConcurrentHashMap<>(); public boolean checkToken(String tokenId, String streamId, String sessionId, String type) { return true; @@ -38,11 +37,6 @@ public Map getAuthenticatedMap() { return authenticatedMap; } - @Override - public Map getSubscriberAuthenticatedMap() { - return subscriberAuthenticatedMap; - } - @Override public boolean checkHash(String hash, String streamId, String sessionId, String type) { return true; From b2b6d92aa89d6605c694312566afe6d7798e3b2d Mon Sep 17 00:00:00 2001 From: mekya Date: Mon, 16 Dec 2024 19:21:16 +0300 Subject: [PATCH 12/12] Revert "Remove the subscriber session map" This reverts commit c37e91678d96630cad20962a1d7670ad8c3af431. --- src/main/java/io/antmedia/filter/TokenSessionFilter.java | 1 + src/main/java/io/antmedia/security/ITokenService.java | 7 +++++++ src/main/java/io/antmedia/security/MockTokenService.java | 6 ++++++ 3 files changed, 14 insertions(+) diff --git a/src/main/java/io/antmedia/filter/TokenSessionFilter.java b/src/main/java/io/antmedia/filter/TokenSessionFilter.java index de712c93f..d7d569c27 100644 --- a/src/main/java/io/antmedia/filter/TokenSessionFilter.java +++ b/src/main/java/io/antmedia/filter/TokenSessionFilter.java @@ -29,6 +29,7 @@ public void sessionDestroyed(HttpSessionEvent se) { ITokenService tokenServiceTmp = getTokenService(); if (tokenServiceTmp != null) { tokenServiceTmp.getAuthenticatedMap().remove(se.getSession().getId()); + tokenServiceTmp.getSubscriberAuthenticatedMap().remove(se.getSession().getId()); } } diff --git a/src/main/java/io/antmedia/security/ITokenService.java b/src/main/java/io/antmedia/security/ITokenService.java index 9bae2bf6c..1079044ea 100644 --- a/src/main/java/io/antmedia/security/ITokenService.java +++ b/src/main/java/io/antmedia/security/ITokenService.java @@ -116,5 +116,12 @@ public String toString() { */ Map getAuthenticatedMap(); + + /** + * gets map of authenticated subscriber sessions + * @return list + */ + + Map getSubscriberAuthenticatedMap(); } diff --git a/src/main/java/io/antmedia/security/MockTokenService.java b/src/main/java/io/antmedia/security/MockTokenService.java index 9ca46dbd1..17f5b1ceb 100644 --- a/src/main/java/io/antmedia/security/MockTokenService.java +++ b/src/main/java/io/antmedia/security/MockTokenService.java @@ -12,6 +12,7 @@ public class MockTokenService implements IStreamPublishSecurity, ITokenService{ Map authenticatedMap = new ConcurrentHashMap<>(); + Map subscriberAuthenticatedMap = new ConcurrentHashMap<>(); public boolean checkToken(String tokenId, String streamId, String sessionId, String type) { return true; @@ -37,6 +38,11 @@ public Map getAuthenticatedMap() { return authenticatedMap; } + @Override + public Map getSubscriberAuthenticatedMap() { + return subscriberAuthenticatedMap; + } + @Override public boolean checkHash(String hash, String streamId, String sessionId, String type) { return true;