Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into filteringOnHW
Browse files Browse the repository at this point in the history
  • Loading branch information
burak-58 committed Dec 20, 2024
2 parents d9db647 + 44d6eed commit 51d5afc
Show file tree
Hide file tree
Showing 8 changed files with 568 additions and 59 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ jobs:
GPG_TTY: ${{ secrets.GPG_TTY }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: https://sonarcloud.io
CI_DEPLOY_USERNAME: ${{ secrets.CI_DEPLOY_USERNAME }}
CI_DEPLOY_PASSWORD: ${{ secrets.CI_DEPLOY_PASSWORD }}
GPG_KEY_NAME: ${{ secrets.GPG_KEY_NAME }}
GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}

steps:
- name: Checkout code
Expand Down
200 changes: 178 additions & 22 deletions src/main/java/io/antmedia/AntMediaApplicationAdapter.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.antmedia;

import static io.antmedia.rest.RestServiceBase.FETCH_REQUEST_REDIRECTED_TO_ORIGIN;
import static io.antmedia.muxer.IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING;
import static org.bytedeco.ffmpeg.global.avcodec.avcodec_get_name;

Expand All @@ -26,10 +27,11 @@
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.*;

import io.antmedia.filter.JWTFilter;
import io.antmedia.filter.TokenFilterManager;
import io.antmedia.statistic.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -90,9 +92,6 @@
import io.antmedia.settings.ServerSettings;
import io.antmedia.shutdown.AMSShutdownManager;
import io.antmedia.shutdown.IShutdownListener;
import io.antmedia.statistic.DashViewerStats;
import io.antmedia.statistic.HlsViewerStats;
import io.antmedia.statistic.ViewerStats;
import io.antmedia.statistic.type.RTMPToWebRTCStats;
import io.antmedia.statistic.type.WebRTCAudioReceiveStats;
import io.antmedia.statistic.type.WebRTCAudioSendStats;
Expand All @@ -110,6 +109,8 @@
import io.vertx.core.json.JsonObject;
import io.vertx.ext.dropwizard.MetricsService;
import jakarta.validation.constraints.NotNull;
import org.springframework.web.context.WebApplicationContext;

public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter implements IAntMediaStreamHandler, IShutdownListener {

public static final String BEAN_NAME = "web.handler";
Expand Down Expand Up @@ -176,6 +177,8 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter
public static final String STREAM_SOURCE = "streamSource";
public static final String PLAY_LIST = "playlist";
protected static final int END_POINT_LIMIT = 20;
public static final int CLUSTER_POST_RETRY_ATTEMPT_COUNT = 3;
public static final int CLUSTER_POST_TIMEOUT_MS = 1000;

//Allow any sub directory under /
private static final String VOD_IMPORT_ALLOWED_DIRECTORY = "/";
Expand Down Expand Up @@ -223,6 +226,8 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter

private Random random = new Random();

private IStatsCollector statsCollector;

@Override
public boolean appStart(IScope app) {
setScope(app);
Expand Down Expand Up @@ -253,17 +258,17 @@ public boolean appStart(IScope app) {
//which means it's in cluster mode
clusterNotifier = (IClusterNotifier) app.getContext().getBean(IClusterNotifier.BEAN_NAME);
logger.info("Registering settings listener to the cluster notifier for app: {}", app.getName());

clusterNotifier.registerSettingUpdateListener(getAppSettings().getAppName(), new IAppSettingsUpdateListener() {

@Override
public boolean settingsUpdated(AppSettings settings) {
return updateSettings(settings, false, true);
}

@Override
public AppSettings getCurrentSettings() {

return getAppSettings();
}
});
Expand Down Expand Up @@ -977,7 +982,7 @@ public Broadcast updateBroadcastStatus(String streamId, long absoluteStartTimeMs
public ServerSettings getServerSettings()
{
if (serverSettings == null) {
serverSettings = (ServerSettings)scope.getContext().getApplicationContext().getBean(ServerSettings.BEAN_NAME);
serverSettings = (ServerSettings)getScope().getContext().getApplicationContext().getBean(ServerSettings.BEAN_NAME);
}
return serverSettings;
}
Expand Down Expand Up @@ -1247,6 +1252,67 @@ private void putToMap(String keyName, Object keyValue, Map<String, Object> map)
}
}

public boolean sendClusterPost(String url, String clusterCommunicationToken)
{

boolean result = false;
try (CloseableHttpClient httpClient = getHttpClient())
{
HttpPost httpPost = new HttpPost(url);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(CLUSTER_POST_TIMEOUT_MS)
.setConnectionRequestTimeout(CLUSTER_POST_TIMEOUT_MS)
.setSocketTimeout(CLUSTER_POST_TIMEOUT_MS)
.build();
httpPost.setConfig(requestConfig);

httpPost.setHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, clusterCommunicationToken);

try (CloseableHttpResponse httpResponse = httpClient.execute(httpPost))
{
int statusCode = httpResponse.getStatusLine().getStatusCode();
logger.info("Cluster POST Response Status: {}", statusCode);
if (statusCode == HttpStatus.SC_OK) {
result = true;
}
}
}
catch (IOException e)
{
logger.error(ExceptionUtils.getStackTrace(e));
}
return result;
}

public void trySendClusterPostWithDelay(String url, String clusterCommunicationToken, int retryAttempts, CompletableFuture<Boolean> future) {
vertx.setTimer(appSettings.getWebhookRetryDelay(), timerId -> {

vertx.executeBlocking(() -> {

boolean result = sendClusterPost(url, clusterCommunicationToken);

if (!result && retryAttempts >= 1) {
trySendClusterPostWithDelay(url, clusterCommunicationToken, retryAttempts - 1, future);
}
else {

future.complete(result);
if (result) {
logger.info("Cluster POST is successful another node for url:{}", url);
}
else {
logger.info("Cluster POST is not successful to another node for url:{} and no more retry attempts left",
url);
}
}
return null;

});


});
}

/**
*
* @param url
Expand All @@ -1265,6 +1331,7 @@ public void sendPOST(String url, Map<String, Object> variables, int retryAttempt
.build();
httpPost.setConfig(requestConfig);


if (ContentType.APPLICATION_FORM_URLENCODED.getMimeType().equals(contentType))
{
List<NameValuePair> urlParameters = new ArrayList<>();
Expand Down Expand Up @@ -1362,26 +1429,99 @@ public static final boolean isStreaming(Broadcast broadcast) {
|| IAntMediaStreamHandler.BROADCAST_STATUS_PREPARING.equals(broadcast.getStatus()));
}

public Result startStreaming(Broadcast broadcast)
{
public Result startStreaming(Broadcast broadcast) {
Result result = new Result(false);

if(broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA) ||
// Check resource availability first
if (!getStatsCollector().enoughResource()) {
result.setMessage("Not enough resource on server to start streaming.");
return result;
}

// Handle streaming for IP camera, stream source, and VOD
if (broadcast.getType().equals(AntMediaApplicationAdapter.IP_CAMERA) ||
broadcast.getType().equals(AntMediaApplicationAdapter.STREAM_SOURCE) ||
broadcast.getType().equals(AntMediaApplicationAdapter.VOD)
) {
result = getStreamFetcherManager().startStreaming(broadcast);
broadcast.getType().equals(AntMediaApplicationAdapter.VOD)) {

if (isClusterMode()) {
String broadcastOriginAddress = broadcast.getOriginAdress();

// Handle null or empty origin address
if (StringUtils.isBlank(broadcastOriginAddress)) {
result = getStreamFetcherManager().startStreaming(broadcast);
result.setMessage("Broadcasts origin address is not set. " +
getServerSettings().getHostAddress() + " will fetch the stream.");
return result;
}

// Handle matching origin address
if (broadcastOriginAddress.equals(getServerSettings().getHostAddress())) {
result = getStreamFetcherManager().startStreaming(broadcast);
return result;
}

// Forward request to origin server
forwardStartStreaming(broadcast);
result.setSuccess(true);
result.setErrorId(FETCH_REQUEST_REDIRECTED_TO_ORIGIN);
result.setMessage("Request forwarded to origin server for fetching. " +
"Check broadcast status for final confirmation.");
return result;
}
else {
result = getStreamFetcherManager().startStreaming(broadcast);
}
}
// Handle playlist type
else if (broadcast.getType().equals(AntMediaApplicationAdapter.PLAY_LIST)) {
result = getStreamFetcherManager().startPlaylist(broadcast);

}
// Handle unsupported broadcast types
else {
logger.info("Broadcast type is not supported for startStreaming:{} streamId:{}", broadcast.getType(), broadcast.getStreamId());
logger.info("Broadcast type is not supported for startStreaming:{} streamId:{}",
broadcast.getType(), broadcast.getStreamId());
result.setMessage("Broadcast type is not supported. It can be StreamSource, IP Camera, VOD, Playlist");
}

return result;
}

public void forwardStartStreaming(Broadcast broadcast) {
String jwtToken = JWTFilter.generateJwtToken(
getAppSettings().getClusterCommunicationKey(),
System.currentTimeMillis() + 5000
);

String restRouteOfNode = "http://" + broadcast.getOriginAdress() + ":" +
getServerSettings().getDefaultHttpPort() +
File.separator + getAppSettings().getAppName() +
File.separator + "rest" +
File.separator + "v2" +
File.separator + "broadcasts" +
File.separator + broadcast.getStreamId() +
File.separator + "start";


CompletableFuture<Boolean> future = new CompletableFuture<>();

trySendClusterPostWithDelay(restRouteOfNode, jwtToken, CLUSTER_POST_RETRY_ATTEMPT_COUNT, future);


future.thenAccept(success -> {
if (success) {
logger.info("Cluster POST redirection to {} succeeded", restRouteOfNode);
} else {
logger.info("Cluster POST redirection to {} failed. Local node {} will fetch the stream. ", restRouteOfNode, getServerSettings().getHostAddress());
getStreamFetcherManager().startStreaming(broadcast);
}
})
.exceptionally(ex -> {
logger.error("Cluster POST encountered an exception: {}", ExceptionUtils.getStackTrace(ex));
getStreamFetcherManager().startStreaming(broadcast);
return null;
});
}

public Result stopStreaming(Broadcast broadcast)
{
Result result = new Result(false);
Expand Down Expand Up @@ -1668,8 +1808,8 @@ public void stopApplication(boolean deleteDB) {
}

public void closeDB(boolean deleteDB) {
boolean isClusterMode = getScope().getContext().hasBean(IClusterNotifier.BEAN_NAME);
if (deleteDB && isClusterMode)
boolean clusterMode = isClusterMode();
if (deleteDB && clusterMode)
{
//let the other nodes have enough time to synch
getVertx().setTimer(ClusterNode.NODE_UPDATE_PERIOD + 1000, l->
Expand Down Expand Up @@ -1956,7 +2096,7 @@ public synchronized boolean updateSettings(AppSettings newSettings, boolean noti
appSettings.setToBeDeleted(newSettings.isToBeDeleted());

appSettings.setAppStatus(newSettings.getAppStatus());


boolean saveSettings = clusterNotifier.getClusterStore().saveSettings(appSettings);
logger.info("Saving settings to cluster db -> {} for app: {} and updateTime:{}", saveSettings, getScope().getName(), appSettings.getUpdateTime());
Expand Down Expand Up @@ -2314,6 +2454,10 @@ public void stopPublish(String streamId) {
});
}

public boolean isClusterMode() {
return getScope().getContext().hasBean(IClusterNotifier.BEAN_NAME);
}

public void joinedTheRoom(String roomId, String streamId) {
//No need to implement here.
}
Expand Down Expand Up @@ -2342,4 +2486,16 @@ public Map<String, Long> getPlayListSchedulerTimer() {
return playListSchedulerTimer;
}

public IStatsCollector getStatsCollector() {
if(statsCollector == null)
{
statsCollector = (IStatsCollector)getScope().getContext().getApplicationContext().getBean(StatsCollector.BEAN_NAME);
}
return statsCollector;
}

public void setStatsCollector(IStatsCollector statsCollector) {
this.statsCollector = statsCollector;
}

}
2 changes: 1 addition & 1 deletion src/main/java/io/antmedia/datastore/db/MongoStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import com.github.benmanes.caffeine.cache.Caffeine;
import dev.morphia.query.filters.Filter;
import dev.morphia.query.filters.LogicalFilter;
import org.apache.commons.io.FilenameUtils;
Expand All @@ -26,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/io/antmedia/filter/RestProxyFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ else if (broadcast != null && AntMediaApplicationAdapter.isStreaming(broadcast)
&& !isRequestDestinedForThisNode(request.getRemoteAddr(), broadcast.getOriginAdress())
&& isHostRunning(broadcast.getOriginAdress(), getServerSettings().getDefaultHttpPort()))
{


forwardRequestToNode(request, response, broadcast.getOriginAdress());
}
else
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/antmedia/filter/TokenFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
*/


if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method))
if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method))
{
if (streamId == null) {
logger.warn("No streamId found in the request: {}", httpRequest.getRequestURI());
Expand All @@ -98,7 +98,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha

if (jwtInternalCommunicationToken != null)
{
//if jwtInternalCommunicationToken is not null,
//if jwtInternalCommunicationToken is not null,
//it means that this is the origin instance and receiving request from the edge node directly

boolean checkJwtToken = false;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/antmedia/rest/RestServiceBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public interface ProcessBuilderFactory {
public static final int HIGH_CPU_ERROR = -3;
public static final int FETCHER_NOT_STARTED_ERROR = -4;
public static final int INVALID_STREAM_NAME_ERROR = -5;
public static final int FETCH_REQUEST_REDIRECTED_TO_ORIGIN = -6;

public static final String HTTP = "http://";
public static final String RTSP = "rtsp://";
Expand Down
Loading

0 comments on commit 51d5afc

Please sign in to comment.