diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java index 492017c6..8bfbae44 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java @@ -39,6 +39,7 @@ import com.expediagroup.beekeeper.core.model.LifecycleEventType; import com.expediagroup.beekeeper.scheduler.apiary.filter.EventTypeListenerEventFilter; +import com.expediagroup.beekeeper.scheduler.apiary.filter.IcebergTableListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.filter.LocationOnlyUpdateListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.filter.TableParameterListenerEventFilter; @@ -96,7 +97,8 @@ public MessageEventHandler unreferencedHousekeepingPathMessageEventHandler( new EventTypeListenerEventFilter(eventClasses), new LocationOnlyUpdateListenerEventFilter(), new TableParameterListenerEventFilter(), - new WhitelistedListenerEventFilter() + new WhitelistedListenerEventFilter(), + new IcebergTableListenerEventFilter() ); return new MessageEventHandler(generator, filters); @@ -120,7 +122,8 @@ public MessageEventHandler expiredHousekeepingMetadataMessageEventHandler( List filters = List.of( new EventTypeListenerEventFilter(eventClasses), - new TableParameterListenerEventFilter() + new TableParameterListenerEventFilter(), + new IcebergTableListenerEventFilter() ); return new MessageEventHandler(generator, filters); diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java new file mode 100644 index 00000000..8edeb381 --- /dev/null +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.scheduler.apiary.filter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.stereotype.Component; + +import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent; +import com.expediagroup.beekeeper.core.model.LifecycleEventType; + +import java.util.Locale; +import java.util.Map; + +@Component +public class IcebergTableListenerEventFilter implements ListenerEventFilter { + + private static final Logger log = LogManager.getLogger(IcebergTableListenerEventFilter.class); + + private static final String METADATA_LOCATION_KEY = "metadata_location"; + private static final String TABLE_TYPE_KEY = "table_type"; + private static final String TABLE_TYPE_ICEBERG_VALUE = "iceberg"; + + @Override + public boolean isFiltered(ListenerEvent event, LifecycleEventType type) { + Map tableParameters = event.getTableParameters(); + + if (tableParameters != null) { + String metadataLocation = tableParameters.getOrDefault(METADATA_LOCATION_KEY,null); + String tableType = tableParameters.getOrDefault(TABLE_TYPE_KEY,null); + + boolean hasMetadataLocation = metadataLocation != null && !metadataLocation.trim().isEmpty(); + boolean isIcebergType = tableType != null && tableType.toLowerCase().contains(TABLE_TYPE_ICEBERG_VALUE); + + if (hasMetadataLocation || isIcebergType) { + log.info("Iceberg table '{}.{}' is not currently supported in Beekeeper.", + event.getDbName(), event.getTableName()); + return true; + } + } + return false; + } +} + + diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java index 44f0e2f2..d07912b2 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java @@ -58,4 +58,8 @@ private boolean shouldFilterMessage(ListenerEvent listenerEvent) { private List generateHousekeepingEntities(ListenerEvent listenerEvent) { return generator.generate(listenerEvent, CLIENT_ID); } + + public List getFilters() { + return filters; + } } diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java index 2aa5f17b..cd1f776a 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.EnumMap; +import java.util.List; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; @@ -32,6 +33,8 @@ import com.expedia.apiary.extensions.receiver.sqs.messaging.SqsMessageReader; import com.expediagroup.beekeeper.core.model.LifecycleEventType; +import com.expediagroup.beekeeper.scheduler.apiary.filter.IcebergTableListenerEventFilter; +import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.generator.ExpiredHousekeepingMetadataGenerator; import com.expediagroup.beekeeper.scheduler.apiary.generator.HousekeepingEntityGenerator; import com.expediagroup.beekeeper.scheduler.apiary.generator.UnreferencedHousekeepingPathGenerator; @@ -117,4 +120,18 @@ public void validatePathEventReader() { mock(MessageEventHandler.class)); assertThat(reader).isInstanceOf(BeekeeperEventReader.class); } + + @Test + public void validateUnreferencedHousekeepingPathMessageEventHandlerIncludesIcebergFilter() { + MessageEventHandler handler = commonBeans.unreferencedHousekeepingPathMessageEventHandler(unreferencedHousekeepingPathGenerator); + List filters = handler.getFilters(); + assertThat(filters).hasAtLeastOneElementOfType(IcebergTableListenerEventFilter.class); + } + + @Test + public void validateExpiredHousekeepingMetadataMessageEventHandlerIncludesIcebergFilter() { + MessageEventHandler handler = commonBeans.expiredHousekeepingMetadataMessageEventHandler(expiredHousekeepingMetadataGenerator); + List filters = handler.getFilters(); + assertThat(filters).hasAtLeastOneElementOfType(IcebergTableListenerEventFilter.class); + } } diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java new file mode 100644 index 00000000..98be98f5 --- /dev/null +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.scheduler.apiary.filter; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent; +import com.expediagroup.beekeeper.core.model.LifecycleEventType; + +public class IcebergTableListenerEventFilterTest { + + private final IcebergTableListenerEventFilter filter = new IcebergTableListenerEventFilter(); + + @Test + public void shouldFilterWhenTableTypeIsIceberg() { + ListenerEvent event = createListenerEventWithTableType("ICEBERG"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isTrue(); + } + + @Test + public void shouldNotFilterWhenTableTypeIsNotIceberg() { + ListenerEvent event = createListenerEventWithTableType("HIVE"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isFalse(); + } + + @Test + public void shouldFilterWhenTableTypeIsIcebergIgnoreCase() { + ListenerEvent event = createListenerEventWithTableType("iceberg"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isTrue(); + } + + @Test + public void shouldFilterWhenMetadataLocationIsPresent() { + ListenerEvent event = createListenerEventWithMetadataLocation("s3://example/path/to/metadata"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isTrue(); + } + + @Test + public void shouldNotFilterWhenMetadataLocationIsEmpty() { + ListenerEvent event = createListenerEventWithMetadataLocation(""); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isFalse(); + } + + @Test + public void shouldNotFilterWhenMetadataLocationIsNull() { + ListenerEvent event = createListenerEventWithMetadataLocation(null); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isFalse(); + } + + @Test + public void shouldHandleNullTableParameters() { + ListenerEvent event = createListenerEventWithTableParameters(null); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isFalse(); + } + + private ListenerEvent createListenerEventWithTableType(String tableType) { + Map tableParameters = new HashMap<>(); + tableParameters.put("table_type", tableType); + return createListenerEventWithTableParameters(tableParameters); + } + + private ListenerEvent createListenerEventWithMetadataLocation(String metadataLocation) { + Map tableParameters = new HashMap<>(); + tableParameters.put("metadata_location", metadataLocation); + return createListenerEventWithTableParameters(tableParameters); + } + + private ListenerEvent createListenerEventWithTableParameters(Map tableParameters) { + return new ListenerEvent() { + @Override + public String getDbName() { + return "test_db"; + } + + @Override + public String getTableName() { + return "test_table"; + } + + @Override + public Map getTableParameters() { + return tableParameters; + } + }; + } +}