From 893004e92a1000602a9b263564dcd615410b05fd Mon Sep 17 00:00:00 2001 From: Peter Kriens Date: Tue, 16 Jan 2024 17:25:24 +0100 Subject: [PATCH] client proxy update --- .../provider/message/RSAMessage.java | 22 +++ .../provider/proxy/ClientServiceFactory2.java | 35 ++++ .../provider/proxy/LoadClass.java | 5 + .../provider/proxy/RSAContext.java | 48 +++++ .../provider/proxy/RSAImportedContext.java | 53 ++++++ .../provider/proxy/RSAProxyContext.java | 180 ++++++++++++++++++ .../provider/wireformat/MethodIndexes.java | 16 +- .../ot/rsa/distribution/util/Utils.java | 17 ++ 8 files changed, 373 insertions(+), 3 deletions(-) create mode 100644 org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/message/RSAMessage.java create mode 100644 org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/ClientServiceFactory2.java create mode 100644 org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/LoadClass.java create mode 100644 org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAContext.java create mode 100644 org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAImportedContext.java create mode 100644 org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAProxyContext.java diff --git a/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/message/RSAMessage.java b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/message/RSAMessage.java new file mode 100644 index 0000000..cc1d3e3 --- /dev/null +++ b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/message/RSAMessage.java @@ -0,0 +1,22 @@ +package org.eclipse.ot.rsa.distribution.provider.message; + +import org.eclipse.ot.rsa.distribution.provider.message.AbstractRSAMessage.CacheKey; + +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.Promise; + +public interface RSAMessage { + CacheKey getKey(); + + interface MsgHandler { + + void fire(ByteBuf data); + + Promise send(ByteBuf data); + } + + boolean received(ByteBuf buffer); + + void send(MsgHandler msgHandler); + +} diff --git a/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/ClientServiceFactory2.java b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/ClientServiceFactory2.java new file mode 100644 index 0000000..aeef437 --- /dev/null +++ b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/ClientServiceFactory2.java @@ -0,0 +1,35 @@ +package org.eclipse.ot.rsa.distribution.provider.proxy; + +import java.lang.reflect.Proxy; + +import org.osgi.framework.Bundle; +import org.osgi.framework.ServiceException; +import org.osgi.framework.ServiceFactory; +import org.osgi.framework.ServiceRegistration; + +public class ClientServiceFactory2 implements ServiceFactory { + final RSAImportedContext imported; + + public ClientServiceFactory2(RSAImportedContext imported) { + this.imported = imported; + } + + @Override + public Object getService(Bundle bundle, ServiceRegistration registration) { + try { + RSAProxyContext context = imported.create(bundle::loadClass); + return context.proxy(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new ServiceException("failed to create proxy", ServiceException.REMOTE, e); + } + } + + @Override + public void ungetService(Bundle bundle, ServiceRegistration registration, Object service) { + RSAProxyContext context = (RSAProxyContext) Proxy.getInvocationHandler(service); + context.close(); + } + +} diff --git a/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/LoadClass.java b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/LoadClass.java new file mode 100644 index 0000000..42d1d2c --- /dev/null +++ b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/LoadClass.java @@ -0,0 +1,5 @@ +package org.eclipse.ot.rsa.distribution.provider.proxy; + +public interface LoadClass { + Class loadClass(String name) throws ClassNotFoundException; +} diff --git a/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAContext.java b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAContext.java new file mode 100644 index 0000000..4bdf2eb --- /dev/null +++ b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAContext.java @@ -0,0 +1,48 @@ +package org.eclipse.ot.rsa.distribution.provider.proxy; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.ot.rsa.distribution.provider.wireformat.Protocol; + +import io.netty.channel.Channel; +import io.netty.util.Timer; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; + +public class RSAContext { + + final AtomicInteger callId = new AtomicInteger(1000); + final Timer timer; + final EventExecutorGroup executor; + final Future TRUE; + final Future FALSE; + final List specialHandlers = new ArrayList<>(); + + public interface SpecialHandler { + void newProxy(RSAProxyContext imported); + + void newImported(RSAImportedContext imported); + + } + + public RSAContext(EventExecutorGroup executor, Timer timer) { + this.executor = executor; + this.timer = timer; + this.TRUE = executor.next() + .newSucceededFuture(true); + this.FALSE = executor.next() + .newSucceededFuture(false); + } + + public RSAImportedContext newImported(UUID id, List interfaces, Map methodMappings, + Protocol protocol, Channel channel) { + RSAImportedContext imported = new RSAImportedContext(this, channel, id, interfaces, methodMappings, protocol, + 0); + specialHandlers.forEach(h -> h.newImported(imported)); + return imported; + } +} diff --git a/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAImportedContext.java b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAImportedContext.java new file mode 100644 index 0000000..c46ea33 --- /dev/null +++ b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAImportedContext.java @@ -0,0 +1,53 @@ +package org.eclipse.ot.rsa.distribution.provider.proxy; + +import static org.eclipse.ot.rsa.distribution.util.Utils.invert; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.ot.rsa.distribution.provider.proxy.RSAProxyContext.Action; +import org.eclipse.ot.rsa.distribution.provider.wireformat.Protocol; + +import io.netty.channel.Channel; + +public class RSAImportedContext { + final RSAContext context; + final UUID uuid; + final List interfaces; + final Protocol protocol; + final AtomicLong timeout = new AtomicLong(); + final Map signatureToIndex; + final Channel channel; + + RSAImportedContext(RSAContext context, Channel channel, UUID uuid, List interfaces, + Map methodMapping, + Protocol protocol, long timeout) { + this.channel = channel; + this.signatureToIndex = invert(methodMapping); + this.interfaces = new ArrayList<>(interfaces); + this.context = context; + this.uuid = uuid; + this.protocol = protocol; + this.timeout.set(timeout); + } + + public RSAProxyContext create(LoadClass loader) throws ClassNotFoundException { + RSAProxyContext pc = new RSAProxyContext(this, loader); + context.specialHandlers.forEach(h -> h.newProxy(pc)); + return pc; + } + + public Action getAction(Method m, Integer index) { + Action distribute = (p,a) -> { + int callId = context.callId.incrementAndGet(); + protocol.callWithReturn(uuid, callId, index, a); + return null; + }; + return null; + } + +} diff --git a/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAProxyContext.java b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAProxyContext.java new file mode 100644 index 0000000..3a7349c --- /dev/null +++ b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/proxy/RSAProxyContext.java @@ -0,0 +1,180 @@ +package org.eclipse.ot.rsa.distribution.provider.proxy; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; + +import org.eclipse.ot.rsa.distribution.provider.wireformat.MethodIndexes; +import org.osgi.framework.ServiceException; + +/** + * For each bundle that gets this service, we create a proxy. When the service + * is returned, we close this object + */ +@SuppressWarnings({ + "rawtypes", "unchecked" +}) +public class RSAProxyContext implements AutoCloseable { + final static Map defaultMethods = new HashMap<>(); + final static Method OBJECT_TOSTRING; + + static { + try { + OBJECT_TOSTRING = Object.class.getDeclaredMethod("toString"); + defaultMethods.put(Object.class.getDeclaredMethod("equals", Object.class), (p, args) -> p == args[0]); + defaultMethods.put(Object.class.getDeclaredMethod("hashCode"), (p, args) -> System.identityHashCode(p)); + } catch (Exception e) { + throw new Error("Cannot initialize fields from object"); + } + } + + final RSAImportedContext imported; + final Object proxy; + final Map substitutions = new HashMap<>(); + final Class[] interfaces; + final Class proxyClass; + final ClassLoader[] loaders; + final LoadClass loader; + final Map actions = new HashMap<>(); + + interface Action { + Object call(Object proxy, Object[] args); + } + + class ProxyLoader extends ClassLoader { + @Override + protected Class findClass(String name) throws ClassNotFoundException { + Class clazz = substitutions.get(name); + if (clazz != null) + return clazz; + + for (ClassLoader l : loaders) { + try { + return l.loadClass(name); + } catch (ClassNotFoundException cnfe) {} + } + + throw new ClassNotFoundException(name); + } + } + + RSAProxyContext(RSAImportedContext imported, LoadClass loader) throws ClassNotFoundException { + this.imported = imported; + this.loader = loader; + this.interfaces = getInterfaces(imported, loader); + if (interfaces.length == 0) + throw new ClassNotFoundException("any of: " + imported.interfaces); + + this.loaders = getUniqueLoaders(this.interfaces); + ProxyLoader proxyLoader = new ProxyLoader(); + this.proxyClass = Proxy.getProxyClass(proxyLoader, this.interfaces); + InvocationHandler ih = this::invoke; + + actions.putAll(defaultMethods); + remote(imported, proxyClass, actions, interfaces); + actions.put(OBJECT_TOSTRING, this::toString); + + this.proxy = Proxy.newProxyInstance(proxyLoader, this.interfaces, this::invoke); + + } + + private String toString(Object proxy, Object[] args) { + StringBuilder sb = new StringBuilder(80); + sb.append("ActualTypeName"); + + String del = ""; + for (Class iface : interfaces) { + sb.append(del) + .append(iface.getName()); + } + + sb.append('@'); + sb.append(Integer.toHexString(System.identityHashCode(proxy))); + return sb.toString(); + } + + private static void remote(RSAImportedContext imported, Class proxyClass, Map actions, + Class[] interfaces) { + for (Class interf : interfaces) { + for (Method m : interf.getMethods()) { + String signature = MethodIndexes.toSignature(m); + Integer index = imported.signatureToIndex.get(signature); + if (index == null) { + // TODO debug log + continue; + } + Action action = imported.getAction(m, index); + actions.put(m, action); + + try { + Method direct = proxyClass.getDeclaredMethod(m.getName(), m.getParameterTypes()); + actions.put(direct, action); + } catch (Exception e) { + // ignore + } + } + } + } + + private Object invoke(Object proxy, Method m, Object[] args) throws Exception { + try { + return actions.getOrDefault(m, this::unknown) + .call(proxy, args); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + if (isChecked(m, e)) { + throw e; + } + throw new ServiceException("unchecked exception from RSA", ServiceException.REMOTE, e); + } + } + + private static boolean isChecked(Method m, Exception e) { + Class type = e.getClass(); + for (Class t : m.getExceptionTypes()) { + if (t.isAssignableFrom(type)) + return true; + } + return false; + } + + private Object unknown(Object proxy, Object[] args) { + return null; + } + + public Object proxy() { + return proxy; + } + + @Override + public void close() { + + } + + private static Class[] getInterfaces(RSAImportedContext imported, LoadClass loader) { + return imported.interfaces.stream() + .map(name -> load(loader, name)) + .filter(Objects::nonNull) + .sorted((a, b) -> a.getName() + .compareTo(b.getName())) + .toArray(Class[]::new); + } + + private static Class load(LoadClass loader, String name) { + // TODO Auto-generated method stub + return null; + } + + private static ClassLoader[] getUniqueLoaders(Class... interfaces) { + return Stream.of(interfaces) + .map(Class::getClassLoader) + .distinct() + .toArray(ClassLoader[]::new); + } + +} diff --git a/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/wireformat/MethodIndexes.java b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/wireformat/MethodIndexes.java index c90722a..4cbbda7 100644 --- a/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/wireformat/MethodIndexes.java +++ b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/provider/wireformat/MethodIndexes.java @@ -53,10 +53,11 @@ public Method[] getMappings() { /** * Converts a java.lang.reflect.Method into a canonicalised String * - * @param m the method - * @return A string identifier for the method + * @param name the name of the method + * @param parameters the parameter types + * @return a signature string */ - static String toSignature(String name, Class... parameters) { + public static String toSignature(String name, Class... parameters) { StringBuilder sb = new StringBuilder(name); sb.append('['); String del = ""; @@ -69,4 +70,13 @@ static String toSignature(String name, Class... parameters) { .toString(); } + /** + * Converts a java.lang.reflect.Method into a canonicalised String + * + * @param m the method + * @return A string identifier for the method + */ + public static String toSignature(Method m) { + return toSignature(m.getName(), m.getParameterTypes()); + } } diff --git a/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/util/Utils.java b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/util/Utils.java index 539cdde..dd92bd6 100644 --- a/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/util/Utils.java +++ b/org.eclipse.ot.rsa.distribution.provider/src/main/java/org/eclipse/ot/rsa/distribution/util/Utils.java @@ -8,6 +8,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class Utils { @@ -391,4 +392,20 @@ public static T printer(Class type, Appendable out) { }); } + public static void close(AutoCloseable c) { + if (c == null) + return; + try { + c.close(); + } catch (Exception e) { + // ignore + } + } + + public static Map invert(Map map) { + return map.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); + } + }