Skip to content

Commit

Permalink
Merge pull request #73 from AndriiLandiak/feature/edge-resource-support
Browse files Browse the repository at this point in the history
Feature/edge resource support
  • Loading branch information
volodymyr-babak authored Nov 2, 2023
2 parents 135bb1a + 3000d6f commit bf7f0a5
Show file tree
Hide file tree
Showing 19 changed files with 266 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void performInstall() {
// log.info("Updating system data...");
// dataUpdateService.upgradeRuleNodes();
// systemDataLoaderService.updateSystemWidgets();
installScripts.loadSystemLwm2mResources();
// installScripts.loadSystemLwm2mResources();
}

log.info("Upgrade finished successfully!");
Expand Down Expand Up @@ -330,9 +330,9 @@ public void performInstall() {
// systemDataLoaderService.createQueues();
// systemDataLoaderService.createDefaultNotificationConfigs();

// systemDataLoaderService.loadSystemPlugins();
// systemDataLoaderService.loadSystemRules();
installScripts.loadSystemLwm2mResources();
// systemDataLoaderService.loadSystemPlugins();
// systemDataLoaderService.loadSystemRules();
// installScripts.loadSystemLwm2mResources();

if (loadDemo) {
// log.info("Loading demo data...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public class CloudEventSourcingListener {
EntityType.ENTITY_VIEW,
EntityType.ASSET,
EntityType.ASSET_PROFILE,
EntityType.DASHBOARD);
EntityType.DASHBOARD,
EntityType.TB_RESOURCE);

private final List<EntityType> supportableEntityTypes = new ArrayList<>(COMMON_ENTITY_TYPES) {{
add(EntityType.ALARM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.thingsboard.server.service.cloud.rpc.processor.EntityCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.EntityViewCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.RelationCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.ResourceCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.RuleChainCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.TelemetryCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.TenantCloudProcessor;
Expand Down Expand Up @@ -185,6 +186,9 @@ public class CloudManagerService {
@Autowired
private CustomerCloudProcessor customerProcessor;

@Autowired
private ResourceCloudProcessor resourceCloudProcessor;

@Autowired
private CloudEventService cloudEventService;

Expand Down Expand Up @@ -461,6 +465,8 @@ private UplinkMsg convertEntityEventToUplink(TenantId tenantId, CloudEvent cloud
return entityViewProcessor.convertEntityViewEventToUplink(cloudEvent);
case RELATION:
return relationProcessor.convertRelationEventToUplink(cloudEvent);
case TB_RESOURCE:
return resourceCloudProcessor.convertResourceEventToUplink(cloudEvent);
default:
log.warn("Unsupported cloud event type [{}]", cloudEvent);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void pushNotificationToCloud(TransportProtos.CloudNotificationMsgProto cl
case ENTITY_VIEW:
case DASHBOARD:
case RULE_CHAIN:
case TB_RESOURCE:
future = processEntity(tenantId, cloudNotificationMsg);
break;
case ALARM:
Expand Down Expand Up @@ -179,5 +180,3 @@ private ListenableFuture<Void> processRelation(TenantId tenantId, TransportProto
0L);
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.thingsboard.server.gen.edge.v1.OtaPackageUpdateMsg;
import org.thingsboard.server.gen.edge.v1.QueueUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg;
import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.v1.TenantProfileUpdateMsg;
Expand All @@ -66,6 +67,7 @@
import org.thingsboard.server.service.cloud.rpc.processor.OtaPackageCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.QueueCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.RelationCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.ResourceCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.RuleChainCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.TelemetryCloudProcessor;
import org.thingsboard.server.service.cloud.rpc.processor.TenantCloudProcessor;
Expand Down Expand Up @@ -150,6 +152,9 @@ public class DefaultDownlinkMessageService implements DownlinkMessageService {
@Autowired
private TenantProfileCloudProcessor tenantProfileCloudProcessor;

@Autowired
private ResourceCloudProcessor tbResourceCloudProcessor;

@Autowired
private DbCallbackExecutorService dbCallbackExecutorService;

Expand Down Expand Up @@ -299,6 +304,11 @@ public ListenableFuture<List<Void>> processDownlinkMsg(TenantId tenantId,
result.add(tenantCloudProcessor.processTenantMsgFromCloud(tenantUpdateMsg));
}
}
if (downlinkMsg.getResourceUpdateMsgCount() > 0) {
for (ResourceUpdateMsg resourceUpdateMsg : downlinkMsg.getResourceUpdateMsgList()) {
result.add(tbResourceCloudProcessor.processResourceMsgFromCloud(tenantId, resourceUpdateMsg));
}
}
log.trace("Finished processing DownlinkMsg {}", downlinkMsg.getDownlinkMsgId());
} catch (Exception e) {
log.error("Can't process downlink message [{}]", downlinkMsg, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cloud.rpc.processor;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.ResourceType;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.cloud.CloudEvent;
import org.thingsboard.server.common.data.id.TbResourceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.service.edge.rpc.processor.resource.BaseResourceProcessor;

import java.util.UUID;

@Component
@Slf4j
public class ResourceCloudProcessor extends BaseResourceProcessor {

public ListenableFuture<Void> processResourceMsgFromCloud(TenantId tenantId, ResourceUpdateMsg resourceUpdateMsg) {
TbResourceId tbResourceId = new TbResourceId(new UUID(resourceUpdateMsg.getIdMSB(), resourceUpdateMsg.getIdLSB()));
try {
edgeSynchronizationManager.getSync().set(true);
switch (resourceUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
deleteSystemResourceIfAlreadyExists(tbResourceId, ResourceType.valueOf(resourceUpdateMsg.getResourceType()), resourceUpdateMsg.getResourceKey());
super.saveOrUpdateTbResource(tenantId, tbResourceId, resourceUpdateMsg);
break;
case ENTITY_DELETED_RPC_MESSAGE:
TbResource tbResourceToDelete = resourceService.findResourceById(tenantId, tbResourceId);
if (tbResourceToDelete != null) {
resourceService.deleteResource(tenantId, tbResourceId);
}
break;
case UNRECOGNIZED:
return handleUnsupportedMsgType(resourceUpdateMsg.getMsgType());
}
} finally {
edgeSynchronizationManager.getSync().remove();
}
return Futures.immediateFuture(null);
}

private void deleteSystemResourceIfAlreadyExists(TbResourceId tbResourceId, ResourceType resourceType, String resourceKey) {
PageDataIterable<TbResource> entityIdsIterator = new PageDataIterable<>(
link -> resourceService.findAllTenantResources(TenantId.SYS_TENANT_ID, link), 1024);
for (TbResource resource : entityIdsIterator) {
if (resource.getResourceType().equals(resourceType)
&& resource.getResourceKey().equals(resourceKey)
&& !resource.getId().equals(tbResourceId)) {
resourceService.deleteResource(TenantId.SYS_TENANT_ID, resource.getId());
break;
}
}
}

public UplinkMsg convertResourceEventToUplink(CloudEvent cloudEvent) {
TbResourceId tbResourceId = new TbResourceId(cloudEvent.getEntityId());
UplinkMsg msg = null;
switch (cloudEvent.getAction()) {
case ADDED:
case UPDATED:
TbResource tbResource = resourceService.findResourceById(cloudEvent.getTenantId(), tbResourceId);
if (tbResource != null) {
UpdateMsgType msgType = getUpdateMsgType(cloudEvent.getAction());
ResourceUpdateMsg resourceUpdateMsg =
resourceMsgConstructor.constructResourceUpdatedMsg(msgType, tbResource);
msg = UplinkMsg.newBuilder()
.setUplinkMsgId(EdgeUtils.nextPositiveInt())
.addResourceUpdateMsg(resourceUpdateMsg)
.build();
}
break;
case DELETED:
ResourceUpdateMsg resourceUpdateMsg =
resourceMsgConstructor.constructResourceDeleteMsg(tbResourceId);
msg = UplinkMsg.newBuilder()
.setUplinkMsgId(EdgeUtils.nextPositiveInt())
.addResourceUpdateMsg(resourceUpdateMsg)
.build();
break;
}
return msg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public ListenableFuture<Void> processWidgetsBundleMsgFromCloud(TenantId tenantId
case ENTITY_UPDATED_RPC_MESSAGE:
widgetCreationLock.lock();
try {
deleteSystemWidgetBundleIfAlreadyExists(tenantId, widgetsBundleUpdateMsg.getAlias(), widgetsBundleId);
deleteSystemWidgetBundleIfAlreadyExists(widgetsBundleUpdateMsg.getAlias(), widgetsBundleId);
WidgetsBundle widgetsBundle = widgetsBundleService.findWidgetsBundleById(tenantId, widgetsBundleId);
if (widgetsBundle == null) {
widgetsBundle = new WidgetsBundle();
Expand Down Expand Up @@ -95,7 +95,7 @@ public ListenableFuture<Void> processWidgetsBundleMsgFromCloud(TenantId tenantId
return Futures.immediateFuture(null);
}

private void deleteSystemWidgetBundleIfAlreadyExists(TenantId tenantId, String bundleAlias, WidgetsBundleId widgetsBundleId) {
private void deleteSystemWidgetBundleIfAlreadyExists(String bundleAlias, WidgetsBundleId widgetsBundleId) {
try {
WidgetsBundle widgetsBundle = widgetsBundleService.findWidgetsBundleByTenantIdAndAlias(TenantId.SYS_TENANT_ID, bundleAlias);
if (widgetsBundle != null && !widgetsBundleId.equals(widgetsBundle.getId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@
import org.thingsboard.server.service.edge.rpc.constructor.OtaPackageMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.QueueMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.RelationMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.ResourceMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.TenantMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.TenantProfileMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,10 @@ public void upgradeDatabase(String fromVersion) throws Exception {
log.info("Updating schema ...");
Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.6.0", SCHEMA_UPDATE_SQL);
loadSql(schemaUpdateFile, conn);
try {
conn.createStatement().execute("DELETE FROM resource");
} catch (Exception e) {
}
conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3006001;");
log.info("Schema updated to version 3.6.1.");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.protobuf.AbstractMessage;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.thingsboard.server.common.data.ResourceType;
import org.thingsboard.server.common.data.StringUtils;
Expand All @@ -39,6 +40,7 @@ public class ResourceEdgeTest extends AbstractEdgeTest {
private static final String FILE_NAME = "test.jks";

@Test
@Ignore
public void testResources_create_update_delete() throws Exception {
// create resource
TbResource resource = new TbResource();
Expand Down Expand Up @@ -89,6 +91,7 @@ public void testResources_create_update_delete() throws Exception {
}

@Test
@Ignore
public void testSendResourceToCloud() throws Exception {
UUID uuid = Uuids.timeBased();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public static CloudEventType getCloudEventTypeByEntityType(EntityType entityType
return CloudEventType.CUSTOMER;
case EDGE:
return CloudEventType.EDGE;
case TB_RESOURCE:
return CloudEventType.TB_RESOURCE;
default:
log.warn("Unsupported entity type: [{}]", entityType);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ public enum CloudEventType {
ENTITY_GROUP,
WIDGETS_BUNDLE,
WIDGET_TYPE,
EDGE
EDGE,
TB_RESOURCE
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ public static EntityId getByCloudEventTypeAndUuid(CloudEventType cloudEventType,
return new WidgetsBundleId(uuid);
case WIDGET_TYPE:
return new WidgetTypeId(uuid);
case TB_RESOURCE:
return new TbResourceId(uuid);
}
throw new IllegalArgumentException("CloudEventType " + cloudEventType + " is not supported!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private static void verifyWidgetBundles() {
.atMost(30, TimeUnit.SECONDS).
until(() -> {
try {
return edgeRestClient.getWidgetsBundles(new PageLink(100)).getTotalElements() == 20;
return edgeRestClient.getWidgetsBundles(new PageLink(100)).getTotalElements() == 22;
} catch (Throwable e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
Expand Down
Loading

0 comments on commit bf7f0a5

Please sign in to comment.