File FreezeLocal.java¶
File List > behaviour > SKD > FreezeLocal.java
Go to the documentation of this file
package skydata.behaviour.SKD;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.joda.time.Instant;
import com.google.common.collect.Sets;
import jade.core.behaviours.CyclicBehaviour;
import jade.core.behaviours.TickerBehaviour;
import jade.core.behaviours.WakerBehaviour;
import jade.lang.acl.ACLMessage;
import jade.lang.acl.MessageTemplate;
import jade.lang.acl.UnreadableException;
import skydata.internal.agents.SKAgent;
import skydata.internal.agents.SKD;
import skydata.internal.behaviours.SKAgentBehaviour;
import skydata.internal.message.RSBroadcast;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;
public class FreezeLocal extends SKAgentBehaviour {
static long deltaLocal = 5000;
enum FREEZING_ANSWER {
YES, NO, ALREADY
}
static class Data implements Serializable {
enum Stat {
NONE, IN_PROGRESS, ENDED, CANCELLED
}
Stat isFreezing = Stat.NONE;
Set<SKAID> listFreezing = new HashSet<>();
Set<SKAID> listSended = new HashSet<>();
Set<SKAID> listAnswer = new HashSet<>();
Set<SKAID> listCrashed = new HashSet<>();
SKAID initiator;
HashMap<SKAID, Long> lastAlive = new HashMap<>();
}
public FreezeLocal(SKAgent agent) {
super(agent);
}
public boolean isFinished(Data data) {
for (SKAID a : data.listSended) {
if (!(agent.getSKAID().equals(a) || data.listAnswer.contains(a) || data.listCrashed.contains(a))) {
return false;
}
}
return true;
}
protected void afterFreeze(Data data) {
agent.print("Freezing ended" + data.listFreezing);
}
protected void unfreeze(Data data) {
SKD agent = (SKD) this.agent;
data.isFreezing = Data.Stat.CANCELLED;
agent.addBehaviour(new WakerBehaviour(agent, deltaLocal) {
@Override
public void onWake() {
agent.print("Je quitte !");
agent.unblockMigrate();
data.isFreezing = Data.Stat.NONE;
}
});
}
@Override
public void action() {
SKD agent = (SKD) this.agent;
int rg = (Integer) agent.args.get("rg");
int rgLocal = 1;
agent.createStorageObject("FREEZING_DATA", new Data());
Data data = (Data) agent.getStorageObject("FREEZING_DATA");
// Start the algorithm
agent.addBehaviour(new TickerBehaviour(agent, deltaLocal / 2) {
@Override
public void onTick() {
Set<SKAID> sameHarbour = agent.getFamilySameHarbour();
if (sameHarbour.size() > rgLocal && agent.getFamilySize() > rg) {
if (data.isFreezing == Data.Stat.NONE) {
agent.print("Je débute le freeze");
agent.blockMigrate();
data.listSended.addAll(sameHarbour);
data.listSended.remove(agent.getSKAID());
data.lastAlive.clear();
long now = Instant.now().getMillis();
for (SKAID a : data.listSended) {
data.lastAlive.put(a, now + deltaLocal);
}
data.isFreezing = Data.Stat.IN_PROGRESS;
data.initiator = agent.getSKAID();
data.listFreezing.add(agent.getSKAID());
SKLMessage msg = new SKLMessage("FREEZE_LOCAL", "FREEZING");
agent.broadcast(new RSBroadcast(), msg, data.listSended);
}
}
}
});
// Heartbeat
agent.addBehaviour(new TickerBehaviour(agent, deltaLocal) {
@Override
public void onTick() {
if (data.isFreezing != Data.Stat.CANCELLED) {
if (data.isFreezing != Data.Stat.NONE) {
SKLMessage msg = new SKLMessage("FREEZE_LOCAL", "ALIVE");
msg.setContent(true);
msg.addReceiver(data.initiator);
agent.broadcast(new RSBroadcast(), msg, data.listSended);
long now = Instant.now().getMillis();
Iterator<SKAID> iter = data.listSended.iterator();
while (iter.hasNext()) {
SKAID a = iter.next();
if (data.lastAlive.containsKey(a) && now - data.lastAlive.get(a) >= deltaLocal) {
agent.print(a.getName() + " a crash");
data.listCrashed.add(a);
iter.remove();
}
}
if ((data.isFreezing == Data.Stat.ENDED
&& Sets.difference(data.listFreezing, data.listCrashed).size() <= rgLocal)
|| (data.listSended.size() == 0 && data.isFreezing == Data.Stat.IN_PROGRESS)) {
unfreeze(data);
}
}
}
}
});
// Receive heartbeat
MessageTemplate mtAlive = MessageTemplate.and(
MessageTemplate.MatchProtocol("FREEZE_LOCAL"),
MessageTemplate.and(
MessageTemplate.MatchOntology("ALIVE"),
MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
agent.addBehaviour(new CyclicBehaviour(agent) {
@Override
public void action() {
if (data.isFreezing != Data.Stat.CANCELLED) {
SKLMessage msg = agent.skReceive(mtAlive);
if (msg == null) {
block();
return;
}
SKAID sender = msg.getSender();
data.lastAlive.put(sender, Instant.now().getMillis());
}
}
});
// Receive request of freeze
MessageTemplate mtFreezing = MessageTemplate.and(
MessageTemplate.MatchProtocol("FREEZE_LOCAL"),
MessageTemplate.and(
MessageTemplate.MatchOntology("FREEZING"),
MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
agent.addBehaviour(new CyclicBehaviour(agent) {
@Override
public void action() {
if (data.isFreezing != Data.Stat.CANCELLED) {
SKLMessage msg = agent.skReceive(mtFreezing);
if (msg == null) {
block();
return;
}
SKAID sender = msg.getSender();
agent.print(sender.getName() + " veut débuter le freeze");
agent.blockMigrate();
if (data.isFreezing == Data.Stat.NONE) {
data.isFreezing = Data.Stat.IN_PROGRESS;
data.initiator = sender;
SKLMessage answer = new SKLMessage("FREEZING_ANSWER", "FREEZING");
answer.addReceiver(sender);
Set<SKAID> sameHarbour = agent.getFamilySameHarbour();
answer.setContent(Pair.of(FREEZING_ANSWER.YES, sameHarbour));
agent.skSendReliable(answer);
} else if (data.isFreezing == Data.Stat.IN_PROGRESS
&& data.initiator.equals(agent.getSKAID())) {
if (sender.compareTo(data.initiator) < 0) {
SKLMessage answer = new SKLMessage("FREEZING_ANSWER", "FREEZING");
answer.setContent(Pair.of(FREEZING_ANSWER.NO, null));
agent.skSendReliable(answer);
data.listSended.add(sender);
data.listFreezing.add(sender);
} else {
SKLMessage answer = new SKLMessage("FREEZING_ANSWER", "FREEZING");
answer.addReceiver(sender);
answer.setContent(Pair.of(FREEZING_ANSWER.YES, data.listSended));
data.initiator = sender;
agent.skSendReliable(answer);
SKLMessage changeInit = new SKLMessage("CHANGE_INIT", "FREEZE_LOCAL");
changeInit.setContent(sender);
agent.skSendReliable(changeInit);
}
} else {
SKLMessage answer = new SKLMessage("FREEZING_ANSWER", "FREEZING");
answer.addReceiver(sender);
answer.setContent(Pair.of(FREEZING_ANSWER.ALREADY, null));
agent.skSendReliable(answer);
}
}
}
});
// Receive answer of freeze
MessageTemplate mtFreezingAnswer = MessageTemplate.and(
MessageTemplate.MatchProtocol("FREEZE_LOCAL"),
MessageTemplate.and(
MessageTemplate.MatchOntology("FREEZING_ANSWER"),
MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
agent.addBehaviour(new CyclicBehaviour(agent) {
@SuppressWarnings("unchecked")
public void action() {
if (data.isFreezing != Data.Stat.CANCELLED) {
SKLMessage msg = agent.skReceive(mtFreezingAnswer);
if (msg == null) {
block();
return;
}
Data data = (Data) agent.getStorageObject("FREEZING_DATA");
SKAID sender = msg.getSender();
Pair<FREEZING_ANSWER, Set<SKAID>> answer = (Pair<FREEZING_ANSWER, Set<SKAID>>) msg.getContent();
agent.print(sender.getName() + " me répond " + answer.getLeft());
data.listAnswer.add(sender);
if (answer.getLeft() == FREEZING_ANSWER.YES) {
Set<SKAID> diff = Sets.difference(answer.getRight(), data.listFreezing);
if (diff.size() != 0) {
data.listSended.addAll(diff);
}
data.listFreezing.add(sender);
if (isFinished(data)) {
data.isFreezing = Data.Stat.ENDED;
SKLMessage endFreeze = new SKLMessage("FREEZE_LOCAL", "FREEZING_ENDED");
endFreeze.setContent((Serializable) data.listFreezing);
agent.broadcast(new RSBroadcast(), endFreeze, data.listFreezing);
afterFreeze(data);
}
}
}
}
});
// Change of initiator
MessageTemplate mtChangeInit = MessageTemplate.and(
MessageTemplate.MatchProtocol("FREEZE_LOCAL"),
MessageTemplate.and(
MessageTemplate.MatchOntology("CHANGE_INIT"),
MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
agent.addBehaviour(new CyclicBehaviour(agent) {
@Override
public void action() {
if (data.isFreezing != Data.Stat.CANCELLED) {
SKLMessage msg = agent.skReceive(mtChangeInit);
if (msg == null) {
block();
return;
}
Data data = (Data) agent.getStorageObject("FREEZING_DATA");
SKAID sender = msg.getSender();
if (data.initiator.equals(sender)) {
data.initiator = (SKAID) msg.getContent();
SKLMessage freezingAnswer = new SKLMessage("FREEZING", "FREEZE_LOCAL");
agent.skSendReliable(freezingAnswer);
}
}
}
});
// End of freeze
MessageTemplate mtFreezingEnded = MessageTemplate.and(
MessageTemplate.MatchProtocol("FREEZE_LOCAL"),
MessageTemplate.and(
MessageTemplate.MatchOntology("FREEZING_ENDED"),
MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
agent.addBehaviour(new CyclicBehaviour(agent) {
@SuppressWarnings("unchecked")
@Override
public void action() {
if (data.isFreezing != Data.Stat.CANCELLED) {
SKLMessage msg = agent.skReceive(mtFreezingEnded);
if (msg == null) {
block();
return;
}
Data data = (Data) agent.getStorageObject("FREEZING_DATA");
data.listFreezing = (Set<SKAID>) msg.getContent();
data.isFreezing = Data.Stat.ENDED;
afterFreeze(data);
}
}
});
}
}