Skip to content

Commit

Permalink
Add logs OnWireReceived and dispatched for rd primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
Iliya-usov committed Nov 13, 2023
1 parent 63c9606 commit e4c1568
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,11 @@ open class RdMap<K : Any, V : Any> private constructor(
val lifetime = dispatchHelper.lifetime
val definition = tryPreBindValue(lifetime, key, value, true)

logReceived.trace { "onWireReceived:: ${logmsg(op, version, key, value)}" }

dispatchHelper.dispatch {
if (msgVersioned || !master || !isPendingForAck(key)) {
logReceived.trace { logmsg(op, version, key, value) }
logReceived.trace { "dispatched:: ${logmsg(op, version, key, value)}" }

if (value != null) {
val definitions = tryGetBindDefinitions(lifetime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ abstract class RdPropertyBase<T>(val valueSerializer: ISerializer<T>) : RdReacti

val definition = tryPreBindValue(dispatchHelper.lifetime, v, true)

logReceived.trace { "onWireReceived:: ${getMessage(version, v)}" }

dispatchHelper.dispatch {
val rejected = master && version < masterVersion
logReceived.trace { "property `$location` ($rdid):: oldver = $masterVersion, newver = $version, value = ${v.printToString()}${rejected.condstr { " >> REJECTED" }}" }
logReceived.trace { "dispatched:: ${getMessage(version, v)}${rejected.condstr { " >> REJECTED" }}" }

if (rejected) {
definition?.terminate()
Expand All @@ -124,6 +126,9 @@ abstract class RdPropertyBase<T>(val valueSerializer: ISerializer<T>) : RdReacti
}
}

private fun getMessage(version: Int, v: T) =
"property `$location` ($rdid):: oldver = $masterVersion, newver = $version, value = ${v.printToString()}"

override fun advise(lifetime: Lifetime, handler: (T) -> Unit) {
if (isBound) assertThreading()
property.advise(lifetime, handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ open class RdSet<T : Any> constructor(val valueSerializer: ISerializer<T>, priva
val kind = buffer.readEnum<AddRemove>()
val v = valueSerializer.read(ctx, buffer)

logReceived.trace { "onWireReceived:: $this :: $kind :: ${v.printToString()}" }
dispatchHelper.dispatch {
logReceived.trace { "dispatched:: $this :: $kind :: ${v.printToString()}" }
when (kind) {
AddRemove.Add -> set.add(v)
AddRemove.Remove -> set.remove(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ class RdSignal<T>(val valueSerializer: ISerializer<T> = Polymorphic<T>()) : RdRe

override fun onWireReceived(proto: IProtocol, buffer: AbstractBuffer, ctx: SerializationCtx, dispatchHelper: IRdWireableDispatchHelper) {
val value = valueSerializer.read(ctx, buffer)
logReceived.trace {"signal `$location` ($rdid):: value = ${value.printToString()}"}
logReceived.trace {"onWireReceived:: signal `$location` ($rdid):: value = ${value.printToString()}"}
dispatchHelper.dispatch(scheduler) {
logReceived.trace {"dispatched:: signal `$location` ($rdid):: value = ${value.printToString()}"}
signal.fire(value)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,10 @@ class RdCall<TReq, TRes>(internal val requestSzr: ISerializer<TReq> = Polymorphi
private fun onWireReceived(proto: IProtocol, ctx: SerializationCtx, buffer: AbstractBuffer, wiredRdTask: EndpointWiredRdTask<TReq, TRes>, dispatchHelper: IRdWireableDispatchHelper) {
val externalCancellation = wiredRdTask.lifetime
val value = requestSzr.read(ctx, buffer)
logReceived.trace { "endpoint `$location`::($rdid) taskId=(${wiredRdTask.rdid}) request = ${value.printToString()}" }
logReceived.trace { "onWireReceived:: endpoint `$location`::($rdid) taskId=(${wiredRdTask.rdid}) request = ${value.printToString()}" }
dispatchHelper.dispatch(handlerScheduler) {
logReceived.trace { "dispatch:: endpoint `$location`::($rdid) taskId=(${wiredRdTask.rdid}) request = ${value.printToString()}" }

val rdTask = try {
val handlerLocal = handler
if (handlerLocal == null) {
Expand Down
15 changes: 11 additions & 4 deletions rd-net/RdFramework/Impl/RdMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ protected override void Unbind()
myBindDefinitions = null;
}

private static string getMessage(bool msgVersioned, long version, bool isPut, V? value)
{
return "{this} :: {kind} :: key = {key.PrintToString()}" +
(msgVersioned ? " :: version = " + version : "") +
(isPut ? $" :: value = {value.PrintToString()}" : "");
}

public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, UnsafeReader stream, IRdWireableDispatchHelper dispatchHelper)
{
var header = stream.ReadInt();
Expand Down Expand Up @@ -237,14 +244,14 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
var isPut = kind is AddUpdateRemove.Add or AddUpdateRemove.Update;
var value = isPut ? ReadValueDelegate(ctx, stream) : default;
var definition = TryPreBindValue(lifetime, key, value, true);

ReceiveTrace?.Log($"OnWireReceived:: {getMessage(msgVersioned, version, isPut, value)}");

dispatchHelper.Dispatch(() =>
{
if (msgVersioned || !IsMaster || !IsPendingForAck(key))
{
ReceiveTrace?.Log($"{this} :: {kind} :: key = {key.PrintToString()}" +
(msgVersioned ? " :: version = " + version : "") +
(isPut ? $" :: value = {value.PrintToString()}" : ""));
ReceiveTrace?.Log($"Dispatched:: {getMessage(msgVersioned, version, isPut, value)}");

if (isPut)
{
Expand All @@ -271,7 +278,7 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
}
else
{
ReceiveTrace?.Log(">> CHANGE IGNORED");
ReceiveTrace?.Log($">> CHANGE IGNORED {getMessage(msgVersioned, version, isPut, value)}");
}

if (msgVersioned)
Expand Down
9 changes: 8 additions & 1 deletion rd-net/RdFramework/Impl/RdProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,13 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
var lifetime = dispatchHelper.Lifetime;
var definition = TryPreBindValue(lifetime, value, true);

ReceiveTrace?.Log($"OnWireReceived:: {GetMessage(version, value)}");

dispatchHelper.Dispatch(() =>
{
var rejected = IsMaster && version < myMasterVersion;

ReceiveTrace?.Log($"{this} :: oldver = {myMasterVersion}, newver = {version}, value = {value.PrintToString()}{(rejected ? " REJECTED" : "")}");
ReceiveTrace?.Log($"Dispatch:: {GetMessage(version, value)}{(rejected ? " REJECTED" : "")}");

if (rejected)
{
Expand All @@ -203,6 +205,11 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
});
}

private string GetMessage(int version, T value)
{
return $"{this} :: oldver = {myMasterVersion}, newver = {version}, value = {value.PrintToString()}";
}

private LifetimeDefinition? TryPreBindValue(Lifetime lifetime, T value, bool bindAlso)
{
if (OptimizeNested || !value.IsBindable())
Expand Down
3 changes: 2 additions & 1 deletion rd-net/RdFramework/Impl/RdSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
{
var kind = (AddRemove) stream.ReadInt();
var value = ReadValueDelegate(ctx, stream);
ReceiveTrace?.Log($"{this} :: {kind} :: {value.PrintToString()}");
ReceiveTrace?.Log($"OnWireReceived:: {this} :: {kind} :: {value.PrintToString()}");

dispatchHelper.Dispatch(() =>
{
ReceiveTrace?.Log($"Dispatched:: {this} :: {kind} :: {value.PrintToString()}");
switch (kind)
{
case AddRemove.Add:
Expand Down
3 changes: 2 additions & 1 deletion rd-net/RdFramework/Impl/RdSignal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ protected override void PreInit(Lifetime lifetime, IProtocol proto)
public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, UnsafeReader reader, IRdWireableDispatchHelper dispatchHelper)
{
var value = ReadValueDelegate(ctx, reader);
ReceiveTrace?.Log($"{this} :: value = {value.PrintToString()}");
ReceiveTrace?.Log($"OnWireReceived:: {this} :: value = {value.PrintToString()}");
dispatchHelper.Dispatch(Scheduler, () =>
{
ReceiveTrace?.Log($"Dispatched:: {this} :: value = {value.PrintToString()}");
mySignal.Fire(value);
});
}
Expand Down
3 changes: 2 additions & 1 deletion rd-net/RdFramework/Tasks/RdCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ private void OnWireReceived(SerializationCtx ctx, UnsafeReader reader, WiredRdTa
{
var externalCancellation = wiredTask.Lifetime;
var value = ReadRequestDelegate(ctx, reader);
ReceiveTrace?.Log($"{wiredTask} :: received request: {value.PrintToString()}");
ReceiveTrace?.Log($"OnWireReceived:: {wiredTask} :: received request: {value.PrintToString()}");

dispatchHelper.Dispatch(myHandlerScheduler, () =>
{
ReceiveTrace?.Log($"Dispatched:: {wiredTask} :: received request: {value.PrintToString()}");
var rdTask = RunHandler(value, externalCancellation, wiredTask);

rdTask.Result.Advise(Lifetime.Eternal, result =>
Expand Down

0 comments on commit e4c1568

Please sign in to comment.