Skip to content

Commit

Permalink
Merge pull request #6086 from ant-media/firebase-cloud-messaging
Browse files Browse the repository at this point in the history
Support Push Notification Rest REST Service
  • Loading branch information
mekya authored Feb 8, 2024
2 parents 538589e + 1331161 commit 8430b6f
Show file tree
Hide file tree
Showing 25 changed files with 943 additions and 25 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,11 @@
<version>${redisson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.firebase</groupId>
<artifactId>firebase-admin</artifactId>
<version>9.2.0</version>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
35 changes: 34 additions & 1 deletion src/main/java/io/antmedia/AppSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -2052,7 +2052,23 @@ public boolean isWriteStatsToDatastore() {
*/
@Value("${sendAudioLevelToViewers:true}")
private boolean sendAudioLevelToViewers = true;


/**
* Firebase Service Account Key JSON to send push notification
* through Firebase Cloud Messaging
*/
@Value("${firebaseAccountKeyJSON:#{null}}")
private String firebaseAccountKeyJSON = null;

/**
* This is JWT Secret to authenticate the user for push notifications.
*
* JWT token should be generated with the following secret: subscriberId(username, email, etc.) + subscriberAuthenticationKey
*
*/
@Value("${subscriberAuthenticationKey:#{ T(org.apache.commons.lang3.RandomStringUtils).randomAlphanumeric(32)}}")
private String subscriberAuthenticationKey = RandomStringUtils.randomAlphanumeric(32);

public void setWriteStatsToDatastore(boolean writeStatsToDatastore) {
this.writeStatsToDatastore = writeStatsToDatastore;
}
Expand Down Expand Up @@ -3532,4 +3548,21 @@ public String getTimeTokenSecretForPlay() {
public void setTimeTokenSecretForPlay(String timeTokenSecretForPlay) {
this.timeTokenSecretForPlay = timeTokenSecretForPlay;
}

public String getFirebaseAccountKeyJSON() {
return firebaseAccountKeyJSON;
}

public void setFirebaseAccountKeyJSON(String firebaseAccountKeyJSON) {
this.firebaseAccountKeyJSON = firebaseAccountKeyJSON;
}

public String getSubscriberAuthenticationKey() {
return subscriberAuthenticationKey;
}

public void setSubscriberAuthenticationKey(String subscriberAuthenticationKey) {
this.subscriberAuthenticationKey = subscriberAuthenticationKey;
}

}
19 changes: 19 additions & 0 deletions src/main/java/io/antmedia/datastore/db/DataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import io.antmedia.datastore.db.types.ConnectionEvent;
import io.antmedia.datastore.db.types.Endpoint;
import io.antmedia.datastore.db.types.P2PConnection;
import io.antmedia.datastore.db.types.PushNotificationToken;
import io.antmedia.datastore.db.types.StreamInfo;
import io.antmedia.datastore.db.types.Subscriber;
import io.antmedia.datastore.db.types.SubscriberMetadata;
import io.antmedia.datastore.db.types.SubscriberStats;
import io.antmedia.datastore.db.types.TensorFlowObject;
import io.antmedia.datastore.db.types.Token;
Expand Down Expand Up @@ -1404,6 +1406,23 @@ public List<WebRTCViewerInfo> getWebRTCViewerList(Map<String, String> webRTCView
* @param metaData new meta data
*/
public abstract boolean updateStreamMetaData(String streamId, String metaData);

/**
* Put subscriber metadata. It overwrites the metadata, if you need to update something,
* first get the {@link #getSubscriberMetaData(String)} , update it and put it
*
* @param subscriberId
* @param SubscriberMetadata
* @return
*/
public abstract void putSubscriberMetaData(String subscriberId, SubscriberMetadata metadata);

/**
* Get subscriber metadata
* @param subscriberId
* @return
*/
public abstract SubscriberMetadata getSubscriberMetaData(String subscriberId);


//**************************************
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/io/antmedia/datastore/db/InMemoryDataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.antmedia.AntMediaApplicationAdapter;
import io.antmedia.datastore.db.types.Broadcast;
import io.antmedia.datastore.db.types.ConferenceRoom;
import io.antmedia.datastore.db.types.Endpoint;
import io.antmedia.datastore.db.types.P2PConnection;
import io.antmedia.datastore.db.types.StreamInfo;
import io.antmedia.datastore.db.types.Subscriber;
import io.antmedia.datastore.db.types.SubscriberMetadata;
import io.antmedia.datastore.db.types.TensorFlowObject;
import io.antmedia.datastore.db.types.Token;
import io.antmedia.datastore.db.types.VoD;
Expand All @@ -38,6 +41,7 @@ public class InMemoryDataStore extends DataStore {
private Map<String, List<TensorFlowObject>> detectionMap = new LinkedHashMap<>();
private Map<String, Token> tokenMap = new LinkedHashMap<>();
private Map<String, Subscriber> subscriberMap = new LinkedHashMap<>();
private Map<String, SubscriberMetadata> subscriberMetadataMap = new LinkedHashMap<>();
private Map<String, ConferenceRoom> roomMap = new LinkedHashMap<>();
private Map<String, WebRTCViewerInfo> webRTCViewerMap = new LinkedHashMap<>();

Expand Down Expand Up @@ -1085,4 +1089,15 @@ public boolean updateStreamMetaData(String streamId, String metaData) {
}
return result;
}

@Override
public SubscriberMetadata getSubscriberMetaData(String subscriberId) {
return subscriberMetadataMap.get(subscriberId);
}

@Override
public void putSubscriberMetaData(String subscriberId, SubscriberMetadata subscriberMetadata) {
subscriberMetadata.setSubscriberId(subscriberId);
subscriberMetadataMap.put(subscriberId, subscriberMetadata);
}
}
17 changes: 17 additions & 0 deletions src/main/java/io/antmedia/datastore/db/MapBasedDataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.antmedia.datastore.db.types.P2PConnection;
import io.antmedia.datastore.db.types.StreamInfo;
import io.antmedia.datastore.db.types.Subscriber;
import io.antmedia.datastore.db.types.SubscriberMetadata;
import io.antmedia.datastore.db.types.TensorFlowObject;
import io.antmedia.datastore.db.types.Token;
import io.antmedia.datastore.db.types.VoD;
Expand All @@ -44,6 +45,7 @@ public abstract class MapBasedDataStore extends DataStore {
protected Map<String, String> subscriberMap;
protected Map<String, String> conferenceRoomMap;
protected Map<String, String> webRTCViewerMap;
protected Map<String, String> subscriberMetadataMap;

public static final String REPLACE_CHARS_REGEX = "[\n|\r|\t]";

Expand Down Expand Up @@ -1106,5 +1108,20 @@ public Broadcast getBroadcastFromMap(String streamId)
}
return null;
}

@Override
public void putSubscriberMetaData(String subscriberId, SubscriberMetadata metadata) {
metadata.setSubscriberId(subscriberId);
subscriberMetadataMap.put(subscriberId, gson.toJson(metadata));
}

@Override
public SubscriberMetadata getSubscriberMetaData(String subscriberId) {
String jsonString = subscriberMetadataMap.get(subscriberId);
if(jsonString != null) {
return gson.fromJson(jsonString, SubscriberMetadata.class);
}
return null;
}

}
49 changes: 36 additions & 13 deletions src/main/java/io/antmedia/datastore/db/MapDBStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Map.Entry;

import org.apache.commons.lang3.exception.ExceptionUtils;
Expand All @@ -18,7 +19,9 @@
import org.slf4j.LoggerFactory;

import io.antmedia.datastore.db.types.Broadcast;
import io.antmedia.datastore.db.types.PushNotificationToken;
import io.antmedia.datastore.db.types.StreamInfo;
import io.antmedia.datastore.db.types.SubscriberMetadata;
import io.antmedia.muxer.IAntMediaStreamHandler;
import io.vertx.core.Vertx;

Expand All @@ -29,6 +32,8 @@ public class MapDBStore extends MapBasedDataStore {

private Vertx vertx;
private long timerId;

private AtomicBoolean committing = new AtomicBoolean(false);
protected static Logger logger = LoggerFactory.getLogger(MapDBStore.class);
private static final String MAP_NAME = "BROADCAST";
private static final String VOD_MAP_NAME = "VOD";
Expand All @@ -37,6 +42,8 @@ public class MapDBStore extends MapBasedDataStore {
private static final String SUBSCRIBER = "SUBSCRIBER";
private static final String CONFERENCE_ROOM_MAP_NAME = "CONFERENCE_ROOM";
private static final String WEBRTC_VIEWER = "WEBRTC_VIEWER";
private static final String SUBSCRIBER_METADATA = "SUBSCRIBER_METADATA";



public MapDBStore(String dbName, Vertx vertx) {
Expand Down Expand Up @@ -73,20 +80,34 @@ public MapDBStore(String dbName, Vertx vertx) {

webRTCViewerMap = db.treeMap(WEBRTC_VIEWER).keySerializer(Serializer.STRING).valueSerializer(Serializer.STRING)
.counterEnable().createOrOpen();

subscriberMetadataMap = db.treeMap(SUBSCRIBER_METADATA).keySerializer(Serializer.STRING).valueSerializer(Serializer.STRING)
.counterEnable().createOrOpen();

timerId = vertx.setPeriodic(5000, id ->

vertx.executeBlocking(b -> {

synchronized (this)
{
if (available) {
db.commit();
}
}

}, false, null)
);
timerId = vertx.setPeriodic(5000,
id ->
vertx.executeBlocking(() -> {

//if it's committing, just let the thread proceed here and become free immediately for other jobs
if (committing.compareAndSet(false, true))
{
try {
synchronized (this)
{
if (available) {
db.commit();
}
}
}
finally {
committing.compareAndSet(true, false);
}
}

return null;

},
false));

available = true;
}
Expand Down Expand Up @@ -146,5 +167,7 @@ public List<StreamInfo> getStreamInfoList(String streamId) {
public void saveStreamInfo(StreamInfo streamInfo) {
//no need to implement this method, it is used in cluster mode
}



}
37 changes: 36 additions & 1 deletion src/main/java/io/antmedia/datastore/db/MongoStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import io.antmedia.datastore.db.types.ConferenceRoom;
import io.antmedia.datastore.db.types.Endpoint;
import io.antmedia.datastore.db.types.P2PConnection;
import io.antmedia.datastore.db.types.PushNotificationToken;
import io.antmedia.datastore.db.types.StreamInfo;
import io.antmedia.datastore.db.types.Subscriber;
import io.antmedia.datastore.db.types.SubscriberMetadata;
import io.antmedia.datastore.db.types.TensorFlowObject;
import io.antmedia.datastore.db.types.Token;
import io.antmedia.datastore.db.types.VoD;
Expand All @@ -57,7 +59,8 @@ public class MongoStore extends DataStore {
public static final String VOD_ID = "vodId";
private static final String VIEWER_ID = "viewerId";
private static final String TOKEN_ID = "tokenId";
public static final String STREAM_ID = "streamId";
public static final String STREAM_ID = "streamId";
public static final String SUBSCRIBER_ID = "subscriberId";
private Datastore datastore;
private Datastore vodDatastore;
private Datastore tokenDatastore;
Expand Down Expand Up @@ -1529,4 +1532,36 @@ public boolean updateStreamMetaData(String streamId, String metaData) {
public Datastore getSubscriberDatastore() {
return subscriberDatastore;
}

@Override
public SubscriberMetadata getSubscriberMetaData(String subscriberId) {
synchronized(this) {
try {
return datastore.find(SubscriberMetadata.class).filter(Filters.eq(SUBSCRIBER_ID, subscriberId)).first();
} catch (Exception e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
}
return null;
}

@Override
public void putSubscriberMetaData(String subscriberId, SubscriberMetadata metadata) {

try {
//delete the subscriberId if exists to make it compatible with all datastores
Query<SubscriberMetadata> query = datastore.find(SubscriberMetadata.class).filter(Filters.eq(SUBSCRIBER_ID, subscriberId));
long deletedCount = query.delete().getDeletedCount();
if (deletedCount > 0) {
logger.info("There is a SubsriberMetadata exists in database. It's deleted(deletedCount:{}) and it'll put to make it easy and compatible.", deletedCount);
}

metadata.setSubscriberId(subscriberId);
synchronized(this) {
datastore.save(metadata);
}
} catch (Exception e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
}
}
5 changes: 3 additions & 2 deletions src/main/java/io/antmedia/datastore/db/RedisStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import io.antmedia.datastore.db.types.Broadcast;
import io.antmedia.datastore.db.types.P2PConnection;
import io.antmedia.datastore.db.types.PushNotificationToken;
import io.antmedia.datastore.db.types.StreamInfo;
import io.antmedia.datastore.db.types.SubscriberMetadata;
import io.antmedia.muxer.IAntMediaStreamHandler;

public class RedisStore extends MapBasedDataStore {
Expand Down Expand Up @@ -61,6 +63,7 @@ public RedisStore(String redisConnectionUrl, String dbName) {
webRTCViewerMap = redisson.getMap(dbName+"WebRTCViewers");
streamInfoMap = redisson.getMap(dbName+"StreamInfo");
p2pMap = redisson.getMap(dbName+"P2P");
subscriberMetadataMap = redisson.getMap(dbName+"SubscriberMetaData");

available = true;
}
Expand Down Expand Up @@ -181,6 +184,4 @@ public List<Broadcast> getLocalLiveBroadcasts(String hostAddress)
return getActiveBroadcastList(hostAddress);
}



}
Loading

0 comments on commit 8430b6f

Please sign in to comment.