Skip to content

Commit

Permalink
client proxy update
Browse files Browse the repository at this point in the history
  • Loading branch information
pkriens committed Jan 16, 2024
1 parent eca2f8c commit 893004e
Show file tree
Hide file tree
Showing 8 changed files with 373 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.eclipse.ot.rsa.distribution.provider.message;

import org.eclipse.ot.rsa.distribution.provider.message.AbstractRSAMessage.CacheKey;

import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Promise;

public interface RSAMessage {
CacheKey getKey();

interface MsgHandler {

void fire(ByteBuf data);

Promise<?> send(ByteBuf data);
}

boolean received(ByteBuf buffer);

void send(MsgHandler msgHandler);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.eclipse.ot.rsa.distribution.provider.proxy;

import java.lang.reflect.Proxy;

import org.osgi.framework.Bundle;
import org.osgi.framework.ServiceException;
import org.osgi.framework.ServiceFactory;
import org.osgi.framework.ServiceRegistration;

public class ClientServiceFactory2 implements ServiceFactory<Object> {
final RSAImportedContext imported;

public ClientServiceFactory2(RSAImportedContext imported) {
this.imported = imported;
}

@Override
public Object getService(Bundle bundle, ServiceRegistration<Object> registration) {
try {
RSAProxyContext context = imported.create(bundle::loadClass);
return context.proxy();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new ServiceException("failed to create proxy", ServiceException.REMOTE, e);
}
}

@Override
public void ungetService(Bundle bundle, ServiceRegistration<Object> registration, Object service) {
RSAProxyContext context = (RSAProxyContext) Proxy.getInvocationHandler(service);
context.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.eclipse.ot.rsa.distribution.provider.proxy;

public interface LoadClass {
Class<?> loadClass(String name) throws ClassNotFoundException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.eclipse.ot.rsa.distribution.provider.proxy;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.ot.rsa.distribution.provider.wireformat.Protocol;

import io.netty.channel.Channel;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;

public class RSAContext {

final AtomicInteger callId = new AtomicInteger(1000);
final Timer timer;
final EventExecutorGroup executor;
final Future<Boolean> TRUE;
final Future<Boolean> FALSE;
final List<SpecialHandler> specialHandlers = new ArrayList<>();

public interface SpecialHandler {
void newProxy(RSAProxyContext imported);

void newImported(RSAImportedContext imported);

}

public RSAContext(EventExecutorGroup executor, Timer timer) {
this.executor = executor;
this.timer = timer;
this.TRUE = executor.next()
.newSucceededFuture(true);
this.FALSE = executor.next()
.newSucceededFuture(false);
}

public RSAImportedContext newImported(UUID id, List<String> interfaces, Map<Integer, String> methodMappings,
Protocol protocol, Channel channel) {
RSAImportedContext imported = new RSAImportedContext(this, channel, id, interfaces, methodMappings, protocol,
0);
specialHandlers.forEach(h -> h.newImported(imported));
return imported;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.eclipse.ot.rsa.distribution.provider.proxy;

import static org.eclipse.ot.rsa.distribution.util.Utils.invert;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.ot.rsa.distribution.provider.proxy.RSAProxyContext.Action;
import org.eclipse.ot.rsa.distribution.provider.wireformat.Protocol;

import io.netty.channel.Channel;

public class RSAImportedContext {
final RSAContext context;
final UUID uuid;
final List<String> interfaces;
final Protocol protocol;
final AtomicLong timeout = new AtomicLong();
final Map<String, Integer> signatureToIndex;
final Channel channel;

RSAImportedContext(RSAContext context, Channel channel, UUID uuid, List<String> interfaces,
Map<Integer, String> methodMapping,
Protocol protocol, long timeout) {
this.channel = channel;
this.signatureToIndex = invert(methodMapping);
this.interfaces = new ArrayList<>(interfaces);
this.context = context;
this.uuid = uuid;
this.protocol = protocol;
this.timeout.set(timeout);
}

public RSAProxyContext create(LoadClass loader) throws ClassNotFoundException {
RSAProxyContext pc = new RSAProxyContext(this, loader);
context.specialHandlers.forEach(h -> h.newProxy(pc));
return pc;
}

public Action getAction(Method m, Integer index) {
Action distribute = (p,a) -> {
int callId = context.callId.incrementAndGet();
protocol.callWithReturn(uuid, callId, index, a);
return null;
};
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package org.eclipse.ot.rsa.distribution.provider.proxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;

import org.eclipse.ot.rsa.distribution.provider.wireformat.MethodIndexes;
import org.osgi.framework.ServiceException;

/**
* For each bundle that gets this service, we create a proxy. When the service
* is returned, we close this object
*/
@SuppressWarnings({
"rawtypes", "unchecked"
})
public class RSAProxyContext implements AutoCloseable {
final static Map<Method, Action> defaultMethods = new HashMap<>();
final static Method OBJECT_TOSTRING;

static {
try {
OBJECT_TOSTRING = Object.class.getDeclaredMethod("toString");
defaultMethods.put(Object.class.getDeclaredMethod("equals", Object.class), (p, args) -> p == args[0]);
defaultMethods.put(Object.class.getDeclaredMethod("hashCode"), (p, args) -> System.identityHashCode(p));
} catch (Exception e) {
throw new Error("Cannot initialize fields from object");
}
}

final RSAImportedContext imported;
final Object proxy;
final Map<String, Class> substitutions = new HashMap<>();
final Class[] interfaces;
final Class proxyClass;
final ClassLoader[] loaders;
final LoadClass loader;
final Map<Method, Action> actions = new HashMap<>();

interface Action {
Object call(Object proxy, Object[] args);
}

class ProxyLoader extends ClassLoader {
@Override
protected Class<?> findClass(String name) throws ClassNotFoundException {
Class clazz = substitutions.get(name);
if (clazz != null)
return clazz;

for (ClassLoader l : loaders) {
try {
return l.loadClass(name);
} catch (ClassNotFoundException cnfe) {}
}

throw new ClassNotFoundException(name);
}
}

RSAProxyContext(RSAImportedContext imported, LoadClass loader) throws ClassNotFoundException {
this.imported = imported;
this.loader = loader;
this.interfaces = getInterfaces(imported, loader);
if (interfaces.length == 0)
throw new ClassNotFoundException("any of: " + imported.interfaces);

this.loaders = getUniqueLoaders(this.interfaces);
ProxyLoader proxyLoader = new ProxyLoader();
this.proxyClass = Proxy.getProxyClass(proxyLoader, this.interfaces);
InvocationHandler ih = this::invoke;

actions.putAll(defaultMethods);
remote(imported, proxyClass, actions, interfaces);
actions.put(OBJECT_TOSTRING, this::toString);

this.proxy = Proxy.newProxyInstance(proxyLoader, this.interfaces, this::invoke);

}

private String toString(Object proxy, Object[] args) {
StringBuilder sb = new StringBuilder(80);
sb.append("ActualTypeName");

String del = "";
for (Class<?> iface : interfaces) {
sb.append(del)
.append(iface.getName());
}

sb.append('@');
sb.append(Integer.toHexString(System.identityHashCode(proxy)));
return sb.toString();
}

private static void remote(RSAImportedContext imported, Class proxyClass, Map<Method, Action> actions,
Class[] interfaces) {
for (Class interf : interfaces) {
for (Method m : interf.getMethods()) {
String signature = MethodIndexes.toSignature(m);
Integer index = imported.signatureToIndex.get(signature);
if (index == null) {
// TODO debug log
continue;
}
Action action = imported.getAction(m, index);
actions.put(m, action);

try {
Method direct = proxyClass.getDeclaredMethod(m.getName(), m.getParameterTypes());
actions.put(direct, action);
} catch (Exception e) {
// ignore
}
}
}
}

private Object invoke(Object proxy, Method m, Object[] args) throws Exception {
try {
return actions.getOrDefault(m, this::unknown)
.call(proxy, args);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
if (isChecked(m, e)) {
throw e;
}
throw new ServiceException("unchecked exception from RSA", ServiceException.REMOTE, e);
}
}

private static boolean isChecked(Method m, Exception e) {
Class type = e.getClass();
for (Class t : m.getExceptionTypes()) {
if (t.isAssignableFrom(type))
return true;
}
return false;
}

private Object unknown(Object proxy, Object[] args) {
return null;
}

public Object proxy() {
return proxy;
}

@Override
public void close() {

}

private static Class[] getInterfaces(RSAImportedContext imported, LoadClass loader) {
return imported.interfaces.stream()
.map(name -> load(loader, name))
.filter(Objects::nonNull)
.sorted((a, b) -> a.getName()
.compareTo(b.getName()))
.toArray(Class[]::new);
}

private static Class<?> load(LoadClass loader, String name) {
// TODO Auto-generated method stub
return null;
}

private static ClassLoader[] getUniqueLoaders(Class... interfaces) {
return Stream.of(interfaces)
.map(Class::getClassLoader)
.distinct()
.toArray(ClassLoader[]::new);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ public Method[] getMappings() {
/**
* Converts a java.lang.reflect.Method into a canonicalised String
*
* @param m the method
* @return A string identifier for the method
* @param name the name of the method
* @param parameters the parameter types
* @return a signature string
*/
static String toSignature(String name, Class<?>... parameters) {
public static String toSignature(String name, Class... parameters) {
StringBuilder sb = new StringBuilder(name);
sb.append('[');
String del = "";
Expand All @@ -69,4 +70,13 @@ static String toSignature(String name, Class<?>... parameters) {
.toString();
}

/**
* Converts a java.lang.reflect.Method into a canonicalised String
*
* @param m the method
* @return A string identifier for the method
*/
public static String toSignature(Method m) {
return toSignature(m.getName(), m.getParameterTypes());
}
}
Loading

0 comments on commit 893004e

Please sign in to comment.