Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements an ArrowRootAllocationProvider SPI #1040

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<EMISSARY_VERSION>${project.version}</EMISSARY_VERSION>
<argLine />
<checkstyleFormatter>${project.basedir}/contrib/checkstyle.xml</checkstyleFormatter>
<dep.arrow.version>16.1.0</dep.arrow.version>
<dep.commons-codec.version>1.16.0</dep.commons-codec.version>
<dep.commons-collections.version>4.4</dep.commons-collections.version>
<dep.commons-compress.version>1.27.1</dep.commons-compress.version>
Expand Down Expand Up @@ -197,6 +198,23 @@
<artifactId>spymemcached</artifactId>
<version>${dep.spymemcached.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
<version>${dep.arrow.version}</version>
<exclusions>
<!-- conflicts with guava's dependency, which is an older version of the checker framework -->
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${dep.arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
Expand Down Expand Up @@ -401,6 +419,16 @@
<groupId>net.spy</groupId>
<artifactId>spymemcached</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<!-- consumers of emissary can use either arrow-memory-netty or arrow-memory-unsafe to provide an allocator -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
Expand Down Expand Up @@ -1018,6 +1046,10 @@
</resources>
</exception>
</exceptions>
<ignoredResourcePatterns>
<!-- arrow puts this in every jar -->
<ignoredResourcePattern>arrow-git.properties</ignoredResourcePattern>
</ignoredResourcePatterns>
<printEqualFiles>false</printEqualFiles>
<failBuildInCaseOfDifferentContentConflict>true</failBuildInCaseOfDifferentContentConflict>
<failBuildInCaseOfEqualContentConflict>true</failBuildInCaseOfEqualContentConflict>
Expand Down
116 changes: 116 additions & 0 deletions src/main/java/emissary/spi/ArrowRootAllocatorProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package emissary.spi;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;

/**
* Provides a central class for obtaining references to the Arrow root memory allocator. Activate this by including it
* in the list of classes in
*
* <pre>
* META - INF / services / emissary.spi.InitializationProvider
* </pre>
*
* Classes wishing to get a reference to the Arrow root allocator should call the {@link #getArrowRootAllocator()}. They
* are free to create child allocators as needed, but they are responsible for managing any buffers created using either
* the root or a chile allocator and calling
*
* <pre>
* close()
* </pre>
*
* on them when they are no longer needed. The {@link #shutdown()} method will automatically close any child allocators
* created, but will throw an {@link java.lang.IllegalStateException} if there are allocated buffers that have not been
* closed (potentially leaking memory). Provides debug and trace level logging for detailed behavior.
*/
public class ArrowRootAllocatorProvider implements InitializationProvider {
private static final Logger logger = LoggerFactory.getLogger(ArrowRootAllocatorProvider.class);

private static final Object allocatorLock = new Object();
private static BufferAllocator arrowRootAllocator = null;

@Override
public void initialize() {
logger.trace("Waiting for allocator lock in initialize()");
synchronized (allocatorLock) {
logger.debug("Creating new Arrow root allocator");

// creates a RootAllocator with the default memory settings, we may consider implementing a limit here
// that is set via a system property here instead.
arrowRootAllocator = new RootAllocator();

logger.trace("Releasing allocator lock in initialize()");
}
}

/** Shuts down the root allocator and any child allocators */
@Override
public void shutdown() {
logger.trace("Waiting for allocator lock in shutdown()");
synchronized (allocatorLock) {
logger.trace("Closing Arrow allocators");
Collection<BufferAllocator> children = arrowRootAllocator.getChildAllocators();
if (children.isEmpty()) {
logger.trace("Root allocator has no children to close");
} else {
if (logger.isTraceEnabled()) {
logger.trace("Attempting to clode {} child allocators", children.size());
}
for (BufferAllocator child : children) {
if (logger.isDebugEnabled()) {
logger.debug("Shutting down child allocator: {}", child.getName());
}
try {
child.close();
if (logger.isTraceEnabled()) {
logger.trace("Successfully closed child allocator {}", child.getName());
}
} catch (IllegalStateException e) {
// it's ok to catch this, another IllegalStateException will be thrown when closing the root allocator.
logger.warn("IllegalStateException when closing child allocator {}, message: {}", child.getName(), e.getMessage());
}
}
}

logger.trace("Closing root allocator");
arrowRootAllocator.close();
logger.debug("Successfully closed root allocator");
arrowRootAllocator = null;
logger.trace("Releasing allocator lock in shutdown()");
}
InitializationProvider.super.shutdown();
}

/**
* Obtain a reference to the arrow root allocator. Any buffers or child allocators allocated using this instance must be
*
* <pre>
* close()
* </pre>
*
* 'd once they are no longer used.
*
* @return the Arrow root allocator
*/
public static BufferAllocator getArrowRootAllocator() {
logger.trace("Waiting for allocator lock in getArrowRootAllocator()");
synchronized (allocatorLock) {
try {
if (arrowRootAllocator == null) {
throw new IllegalStateException("Arrow Root Allocator has not been initialized by the " +
"ArrowRootAllocatorProvider or is already shutdown, is emissary.spi.ArrowRootAllocatorProver " +
"listed in META-INF/services/emissary.spi.InitializationProvider?");
} else {
logger.trace("Returning root allocator");
return arrowRootAllocator;
}
} finally {
logger.trace("Releasing allocator lock in getArrowRootAllocator()");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
emissary.spi.JavaCharSetInitializationProvider
emissary.spi.ClassLocationVerificationProvider
emissary.spi.ArrowRootAllocatorProvider
108 changes: 108 additions & 0 deletions src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package emissary.spi;

import emissary.test.core.junit5.UnitTest;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* Tests various ArrowRootAllocatorProvider scenarios and demonstrates expected behavior when conditions related to
* failing to close various Arrow resources occurs.
*/
public class ArrowRootAllocatorProviderTest extends UnitTest {
/** shutdown is clean if no memory has been allocated and no child allocators have been created */
@Test
public void testArrowRootAllocatorProvider() {
ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocator = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocator);
provider.shutdown();
}

/** creating a buffer and not closing it will cause a leak */
@Test
public void testArrowRootAllocatorShutdownLeak() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
ArrowBuf buffer = allocatorOne.buffer(1024);
assertThrows(IllegalStateException.class, provider::shutdown,
"expected IllegalStateException attempting to shutdown allocator with allocated buffer open");
}

/**
* creating a child allocator and not closing it before the root allocator provider is shutdown is OK, as long as that
* child allocator doesn't have any open buffers. The root allocator provider attempts to shut down all children.
*/
@Test
public void testArrowRootAllocatorShutdownChildClean() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048);
assertNotNull(allocatorChild);
}

/**
* creating a child allocator and not closing its buffers before the root allocator provider is shutdown should fail
* when the root allocator provider attempts to shut down all children.
*/
@Test
public void testArrowRootAllocatorShutdownChildLeak() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048);
allocatorChild.buffer(1024);
assertNotNull(allocatorChild);
assertThrows(IllegalStateException.class, provider::shutdown,
"expected IllegalStateException attempting to shutdown allocator with child allocator open");
}

/** both allocated buffers and child allocators must be closed before the root allocator can be shutdown cleanly */
@Test
public void testArrowRootAllocatorShutdownAfterProperClose() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048);
ArrowBuf buffer = allocatorChild.buffer(1024);
buffer.close();
allocatorChild.close();
provider.shutdown();
}

/** the root allocator can't be obtained after shutdown */
@Test()
public void testArrowRootAllocatorProviderAfterShutdown() {
ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
provider.shutdown();
assertThrows(IllegalStateException.class, ArrowRootAllocatorProvider::getArrowRootAllocator,
"expected IllegalStateException attempting to get an allocator after shutdown");
}

/** the root allocator won't allocate after shutdown */
@Test
public void testArrowRootAllocatorProviderAllocateAfterShutdown() {
ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocator = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocator);
provider.shutdown();
assertThrows(IllegalStateException.class, () -> allocator.buffer(1024),
"expected IllegalStateException attempting to allocate after provider is shutdown");
}
}

Loading