Skip to content

Commit

Permalink
Phi Accrual Channel
Browse files Browse the repository at this point in the history
  • Loading branch information
pfouto committed Dec 6, 2021
1 parent 451d8b1 commit d2e7483
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>pt.unl.fct.di.novasys</groupId>
<artifactId>babel-core</artifactId>
<version>0.4.46</version>
<version>0.4.47</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -34,7 +34,7 @@
<dependency>
<groupId>com.github.pfouto</groupId>
<artifactId>network-layer</artifactId>
<version>2.0.40</version>
<version>2.0.41</version>
</dependency>
</dependencies>

Expand Down
7 changes: 3 additions & 4 deletions src/main/java/pt/unl/fct/di/novasys/babel/core/Babel.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pt.unl.fct.di.novasys.babel.core;

import pt.unl.fct.di.novasys.babel.initializers.*;
import pt.unl.fct.di.novasys.babel.internal.BabelMessage;
import pt.unl.fct.di.novasys.babel.internal.IPCEvent;
import pt.unl.fct.di.novasys.babel.internal.NotificationEvent;
Expand All @@ -10,11 +11,8 @@
import pt.unl.fct.di.novasys.babel.metrics.MetricsManager;
import pt.unl.fct.di.novasys.babel.generic.ProtoMessage;
import pt.unl.fct.di.novasys.babel.generic.ProtoTimer;
import pt.unl.fct.di.novasys.babel.initializers.ChannelInitializer;
import pt.unl.fct.di.novasys.babel.initializers.SimpleClientChannelInitializer;
import pt.unl.fct.di.novasys.babel.initializers.SimpleServerChannelInitializer;
import pt.unl.fct.di.novasys.babel.initializers.TCPChannelInitializer;
import pt.unl.fct.di.novasys.channel.IChannel;
import pt.unl.fct.di.novasys.channel.accrual.AccrualChannel;
import pt.unl.fct.di.novasys.channel.simpleclientserver.SimpleClientChannel;
import pt.unl.fct.di.novasys.channel.simpleclientserver.SimpleServerChannel;
import pt.unl.fct.di.novasys.channel.tcp.TCPChannel;
Expand Down Expand Up @@ -122,6 +120,7 @@ private Babel() {
registerChannelInitializer(SimpleClientChannel.NAME, new SimpleClientChannelInitializer());
registerChannelInitializer(SimpleServerChannel.NAME, new SimpleServerChannelInitializer());
registerChannelInitializer(TCPChannel.NAME, new TCPChannelInitializer());
registerChannelInitializer(AccrualChannel.NAME, new AccrualChannelInitializer());

//registerChannelInitializer("Ackos", new AckosChannelInitializer());
//registerChannelInitializer(MultithreadedTCPChannel.NAME, new MultithreadedTCPChannelInitializer());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package pt.unl.fct.di.novasys.babel.initializers;

import pt.unl.fct.di.novasys.babel.internal.BabelMessage;
import pt.unl.fct.di.novasys.channel.ChannelListener;
import pt.unl.fct.di.novasys.channel.accrual.AccrualChannel;
import pt.unl.fct.di.novasys.channel.ackos.AckosChannel;
import pt.unl.fct.di.novasys.network.ISerializer;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Properties;

public class AccrualChannelInitializer implements ChannelInitializer<AccrualChannel<BabelMessage>> {

@Override
public AccrualChannel<BabelMessage> initialize(ISerializer<BabelMessage> serializer,
ChannelListener<BabelMessage> list,
Properties properties, short protoId) throws IOException {
return new AccrualChannel<>(serializer, list, properties);
}
}
93 changes: 93 additions & 0 deletions src/test/java/PhiTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import pt.unl.fct.di.novasys.babel.core.Babel;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.exceptions.InvalidParameterException;
import pt.unl.fct.di.novasys.babel.exceptions.ProtocolAlreadyExistsException;
import pt.unl.fct.di.novasys.channel.accrual.AccrualChannel;
import pt.unl.fct.di.novasys.channel.accrual.events.PhiEvent;
import pt.unl.fct.di.novasys.channel.tcp.events.*;
import pt.unl.fct.di.novasys.network.data.Host;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Properties;

public class PhiTest {

public static void main(String[] args) throws InvalidParameterException, IOException, ProtocolAlreadyExistsException, HandlerRegistrationException {
Properties configProps = Babel.loadConfig(Arrays.copyOfRange(args, 0, args.length), null);
Babel babel = Babel.getInstance();
PhiProto phiProto = new PhiProto(configProps);
babel.registerProtocol(phiProto);
phiProto.init(configProps);
babel.start();
}


public static class PhiProto extends GenericProtocol{

private final String listenPort;

public PhiProto(Properties configProps) throws IOException, HandlerRegistrationException {
super("Phi", (short) 10);

listenPort = configProps.getProperty("listen_port");
Properties peerProps = new Properties();
peerProps.setProperty(AccrualChannel.ADDRESS_KEY, "0.0.0.0");
peerProps.setProperty(AccrualChannel.PORT_KEY, listenPort);
peerProps.setProperty(AccrualChannel.WINDOW_SIZE_KEY, "1000");
peerProps.setProperty(AccrualChannel.HB_INTERVAL_KEY, "50");
peerProps.setProperty(AccrualChannel.MIN_STD_DEVIATION_KEY, "20");
peerProps.setProperty(AccrualChannel.ACCEPTABLE_HB_PAUSE_KEY, "200");
peerProps.setProperty(AccrualChannel.THRESHOLD_KEY, "3");
peerProps.setProperty(AccrualChannel.PREDICT_INTERVAL_KEY, "300");
//peerProps.put(TCPChannel.DEBUG_INTERVAL_KEY, 10000);
int peerChannel = createChannel(AccrualChannel.NAME, peerProps);
registerChannelEventHandler(peerChannel, InConnectionDown.EVENT_ID, this::onInConnectionDown);
registerChannelEventHandler(peerChannel, InConnectionUp.EVENT_ID, this::onInConnectionUp);
registerChannelEventHandler(peerChannel, OutConnectionDown.EVENT_ID, this::onOutConnectionDown);
registerChannelEventHandler(peerChannel, OutConnectionUp.EVENT_ID, this::onOutConnectionUp);
registerChannelEventHandler(peerChannel, OutConnectionFailed.EVENT_ID, this::onOutConnectionFailed);
registerChannelEventHandler(peerChannel, PhiEvent.EVENT_ID, this::onPhiEvent);


}

@Override
public void init(Properties configProps) throws HandlerRegistrationException, IOException {
String mode = configProps.getProperty("mode");
if(mode.equals("client")){
String serverPort = configProps.getProperty("server_port");
String serverAddr = configProps.getProperty("server_addr");
openConnection(new Host(InetAddress.getByName(serverAddr), Integer.parseInt(serverPort)));
}
}

private void onPhiEvent(PhiEvent event, int channel) {
System.out.println(event.getValues());
}

protected void onOutConnectionUp(OutConnectionUp event, int channel){
System.out.println(event);
}

protected void onOutConnectionDown(OutConnectionDown event, int channel){
System.out.println(event);
}

protected void onOutConnectionFailed(OutConnectionFailed<Void> event, int channel){
System.out.println(event);
}

private void onInConnectionDown(InConnectionDown event, int channel) {
System.out.println(event);
}

private void onInConnectionUp(InConnectionUp event, int channel) {
System.out.println(event);;
}

}
}

0 comments on commit d2e7483

Please sign in to comment.