Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward auto start request to origin #6730

Merged
merged 12 commits into from
Dec 17, 2024
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ jobs:
GPG_TTY: ${{ secrets.GPG_TTY }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: https://sonarcloud.io
CI_DEPLOY_USERNAME: ${{ secrets.CI_DEPLOY_USERNAME }}
CI_DEPLOY_PASSWORD: ${{ secrets.CI_DEPLOY_PASSWORD }}
GPG_KEY_NAME: ${{ secrets.GPG_KEY_NAME }}
GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}

steps:
- name: Checkout code
Expand Down
199 changes: 177 additions & 22 deletions src/main/java/io/antmedia/AntMediaApplicationAdapter.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.antmedia;

import static io.antmedia.rest.RestServiceBase.FETCH_REQUEST_REDIRECTED_TO_ORIGIN;
import static io.antmedia.muxer.IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING;
import static org.bytedeco.ffmpeg.global.avcodec.avcodec_get_name;

Expand All @@ -26,10 +27,11 @@
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.*;

import io.antmedia.filter.JWTFilter;
import io.antmedia.filter.TokenFilterManager;
import io.antmedia.statistic.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -90,9 +92,6 @@
import io.antmedia.settings.ServerSettings;
import io.antmedia.shutdown.AMSShutdownManager;
import io.antmedia.shutdown.IShutdownListener;
import io.antmedia.statistic.DashViewerStats;
import io.antmedia.statistic.HlsViewerStats;
import io.antmedia.statistic.ViewerStats;
import io.antmedia.statistic.type.RTMPToWebRTCStats;
import io.antmedia.statistic.type.WebRTCAudioReceiveStats;
import io.antmedia.statistic.type.WebRTCAudioSendStats;
Expand All @@ -110,6 +109,8 @@
import io.vertx.core.json.JsonObject;
import io.vertx.ext.dropwizard.MetricsService;
import jakarta.validation.constraints.NotNull;
import org.springframework.web.context.WebApplicationContext;

public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter implements IAntMediaStreamHandler, IShutdownListener {

public static final String BEAN_NAME = "web.handler";
Expand Down Expand Up @@ -176,6 +177,8 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter
public static final String STREAM_SOURCE = "streamSource";
public static final String PLAY_LIST = "playlist";
protected static final int END_POINT_LIMIT = 20;
public static final int CLUSTER_POST_RETRY_ATTEMPT_COUNT = 3;
public static final int CLUSTER_POST_TIMEOUT_MS = 1000;

//Allow any sub directory under /
private static final String VOD_IMPORT_ALLOWED_DIRECTORY = "/";
Expand Down Expand Up @@ -223,6 +226,8 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter

private Random random = new Random();

private IStatsCollector statsCollector;

@Override
public boolean appStart(IScope app) {
setScope(app);
Expand Down Expand Up @@ -253,17 +258,17 @@ public boolean appStart(IScope app) {
//which means it's in cluster mode
clusterNotifier = (IClusterNotifier) app.getContext().getBean(IClusterNotifier.BEAN_NAME);
logger.info("Registering settings listener to the cluster notifier for app: {}", app.getName());

clusterNotifier.registerSettingUpdateListener(getAppSettings().getAppName(), new IAppSettingsUpdateListener() {

@Override
public boolean settingsUpdated(AppSettings settings) {
return updateSettings(settings, false, true);
}

@Override
public AppSettings getCurrentSettings() {

return getAppSettings();
}
});
Expand Down Expand Up @@ -977,7 +982,7 @@ public Broadcast updateBroadcastStatus(String streamId, long absoluteStartTimeMs
public ServerSettings getServerSettings()
{
if (serverSettings == null) {
serverSettings = (ServerSettings)scope.getContext().getApplicationContext().getBean(ServerSettings.BEAN_NAME);
serverSettings = (ServerSettings)getScope().getContext().getApplicationContext().getBean(ServerSettings.BEAN_NAME);
}
return serverSettings;
}
Expand Down Expand Up @@ -1247,6 +1252,67 @@ private void putToMap(String keyName, Object keyValue, Map<String, Object> map)
}
}

public boolean sendClusterPost(String url, String clusterCommunicationToken)
{

boolean result = false;
try (CloseableHttpClient httpClient = getHttpClient())
{
HttpPost httpPost = new HttpPost(url);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(CLUSTER_POST_TIMEOUT_MS)
.setConnectionRequestTimeout(CLUSTER_POST_TIMEOUT_MS)
.setSocketTimeout(CLUSTER_POST_TIMEOUT_MS)
.build();
httpPost.setConfig(requestConfig);

httpPost.setHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, clusterCommunicationToken);

try (CloseableHttpResponse httpResponse = httpClient.execute(httpPost))
{
int statusCode = httpResponse.getStatusLine().getStatusCode();
logger.info("Cluster POST Response Status: {}", statusCode);
if (statusCode == HttpStatus.SC_OK) {
result = true;
}
}
}
catch (IOException e)
{
logger.error(ExceptionUtils.getStackTrace(e));
}
return result;
}

public void trySendClusterPostWithDelay(String url, String clusterCommunicationToken, int retryAttempts, CompletableFuture<Boolean> future) {
vertx.setTimer(appSettings.getWebhookRetryDelay(), timerId -> {

vertx.executeBlocking(() -> {

boolean result = sendClusterPost(url, clusterCommunicationToken);

if (!result && retryAttempts >= 1) {
trySendClusterPostWithDelay(url, clusterCommunicationToken, retryAttempts - 1, future);
}
else {

future.complete(result);
if (result) {
logger.info("Cluster POST is successful another node for url:{}", url);
}
else {
logger.info("Cluster POST is not successful to another node for url:{} and no more retry attempts left",
url);
}
}
return null;

});


});
}

/**
*
* @param url
Expand All @@ -1265,6 +1331,7 @@ public void sendPOST(String url, Map<String, Object> variables, int retryAttempt
.build();
httpPost.setConfig(requestConfig);


if (ContentType.APPLICATION_FORM_URLENCODED.getMimeType().equals(contentType))
{
List<NameValuePair> urlParameters = new ArrayList<>();
Expand Down Expand Up @@ -1362,26 +1429,98 @@ public static final boolean isStreaming(Broadcast broadcast) {
|| IAntMediaStreamHandler.BROADCAST_STATUS_PREPARING.equals(broadcast.getStatus()));
}

public Result startStreaming(Broadcast broadcast)
{
public Result startStreaming(Broadcast broadcast) {
Result result = new Result(false);

if(broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA) ||
// Check resource availability first
if (!getStatsCollector().enoughResource()) {
result.setMessage("Not enough resource on server to start streaming.");
return result;
}

// Handle streaming for IP camera, stream source, and VOD
if (broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA) ||
broadcast.getType().equals(AntMediaApplicationAdapter.STREAM_SOURCE) ||
broadcast.getType().equals(AntMediaApplicationAdapter.VOD)
) {
result = getStreamFetcherManager().startStreaming(broadcast);
broadcast.getType().equals(AntMediaApplicationAdapter.VOD)) {

if (isClusterMode()) {
String broadcastOriginAddress = broadcast.getOriginAdress();

// Handle null or empty origin address
if (StringUtils.isBlank(broadcastOriginAddress)) {
result = getStreamFetcherManager().startStreaming(broadcast);
result.setMessage("Broadcasts origin address is not set. " +
getServerSettings().getHostAddress() + " will fetch the stream.");
return result;
}

// Handle matching origin address
if (broadcastOriginAddress.equals(getServerSettings().getHostAddress())) {
result = getStreamFetcherManager().startStreaming(broadcast);
return result;
}

// Forward request to origin server
forwardStartStreaming(broadcast);
result.setSuccess(true);
result.setErrorId(FETCH_REQUEST_REDIRECTED_TO_ORIGIN);
result.setMessage("Request forwarded to origin server for fetching. " +
"Check broadcast status for final confirmation.");
return result;
} else {
result = getStreamFetcherManager().startStreaming(broadcast);
}
}
// Handle playlist type
else if (broadcast.getType().equals(AntMediaApplicationAdapter.PLAY_LIST)) {
result = getStreamFetcherManager().startPlaylist(broadcast);

}
// Handle unsupported broadcast types
else {
logger.info("Broadcast type is not supported for startStreaming:{} streamId:{}", broadcast.getType(), broadcast.getStreamId());
logger.info("Broadcast type is not supported for startStreaming:{} streamId:{}",
broadcast.getType(), broadcast.getStreamId());
result.setMessage("Broadcast type is not supported. It can be StreamSource, IP Camera, VOD, Playlist");
}

return result;
}

public void forwardStartStreaming(Broadcast broadcast) {
String jwtToken = JWTFilter.generateJwtToken(
getAppSettings().getClusterCommunicationKey(),
System.currentTimeMillis() + 5000
);

String restRouteOfNode = "http://" + broadcast.getOriginAdress() + ":" +
getServerSettings().getDefaultHttpPort() +
File.separator + getAppSettings().getAppName() +
File.separator + "rest" +
File.separator + "v2" +
File.separator + "broadcasts" +
File.separator + broadcast.getStreamId() +
File.separator + "start";


CompletableFuture<Boolean> future = new CompletableFuture<>();

trySendClusterPostWithDelay(restRouteOfNode, jwtToken, CLUSTER_POST_RETRY_ATTEMPT_COUNT, future);


future.thenAccept(success -> {
if (success) {
logger.info("Cluster POST redirection to {} succeeded", restRouteOfNode);
} else {
logger.info("Cluster POST redirection to {} failed. Local node {} will fetch the stream. ", restRouteOfNode, getServerSettings().getHostAddress());
getStreamFetcherManager().startStreaming(broadcast);
}
})
.exceptionally(ex -> {
logger.error("Cluster POST encountered an exception: {}", ExceptionUtils.getStackTrace(ex));
getStreamFetcherManager().startStreaming(broadcast);
return null;
});
}

public Result stopStreaming(Broadcast broadcast)
{
Result result = new Result(false);
Expand Down Expand Up @@ -1668,8 +1807,8 @@ public void stopApplication(boolean deleteDB) {
}

public void closeDB(boolean deleteDB) {
boolean isClusterMode = getScope().getContext().hasBean(IClusterNotifier.BEAN_NAME);
if (deleteDB && isClusterMode)
boolean clusterMode = isClusterMode();
if (deleteDB && clusterMode)
{
//let the other nodes have enough time to synch
getVertx().setTimer(ClusterNode.NODE_UPDATE_PERIOD + 1000, l->
Expand Down Expand Up @@ -1956,7 +2095,7 @@ public synchronized boolean updateSettings(AppSettings newSettings, boolean noti
appSettings.setToBeDeleted(newSettings.isToBeDeleted());

appSettings.setAppStatus(newSettings.getAppStatus());


boolean saveSettings = clusterNotifier.getClusterStore().saveSettings(appSettings);
logger.info("Saving settings to cluster db -> {} for app: {} and updateTime:{}", saveSettings, getScope().getName(), appSettings.getUpdateTime());
Expand Down Expand Up @@ -2310,6 +2449,10 @@ public void stopPublish(String streamId) {
});
}

public boolean isClusterMode() {
return getScope().getContext().hasBean(IClusterNotifier.BEAN_NAME);
}

public void joinedTheRoom(String roomId, String streamId) {
//No need to implement here.
}
Expand Down Expand Up @@ -2338,4 +2481,16 @@ public Map<String, Long> getPlayListSchedulerTimer() {
return playListSchedulerTimer;
}

public IStatsCollector getStatsCollector() {
if(statsCollector == null)
{
statsCollector = (IStatsCollector)getScope().getContext().getApplicationContext().getBean(StatsCollector.BEAN_NAME);
}
return statsCollector;
}

public void setStatsCollector(IStatsCollector statsCollector) {
this.statsCollector = statsCollector;
}

}
2 changes: 1 addition & 1 deletion src/main/java/io/antmedia/datastore/db/MongoStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import com.github.benmanes.caffeine.cache.Caffeine;
import dev.morphia.query.filters.Filter;
import dev.morphia.query.filters.LogicalFilter;
import org.apache.commons.io.FilenameUtils;
Expand All @@ -26,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/io/antmedia/filter/RestProxyFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ else if (broadcast != null && AntMediaApplicationAdapter.isStreaming(broadcast)
&& !isRequestDestinedForThisNode(request.getRemoteAddr(), broadcast.getOriginAdress())
&& isHostRunning(broadcast.getOriginAdress(), getServerSettings().getDefaultHttpPort()))
{


forwardRequestToNode(request, response, broadcast.getOriginAdress());
}
else
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/antmedia/filter/TokenFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
*/


if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method))
if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method))
{
if (streamId == null) {
logger.warn("No streamId found in the request: {}", httpRequest.getRequestURI());
Expand All @@ -98,7 +98,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha

if (jwtInternalCommunicationToken != null)
{
//if jwtInternalCommunicationToken is not null,
//if jwtInternalCommunicationToken is not null,
//it means that this is the origin instance and receiving request from the edge node directly

boolean checkJwtToken = false;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/antmedia/rest/RestServiceBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public interface ProcessBuilderFactory {
public static final int HIGH_CPU_ERROR = -3;
public static final int FETCHER_NOT_STARTED_ERROR = -4;
public static final int INVALID_STREAM_NAME_ERROR = -5;
public static final int FETCH_REQUEST_REDIRECTED_TO_ORIGIN = -6;

public static final String HTTP = "http://";
public static final String RTSP = "rtsp://";
Expand Down
Loading
Loading