-
Notifications
You must be signed in to change notification settings - Fork 0
/
dynamicprocesses.go
170 lines (145 loc) · 4.67 KB
/
dynamicprocesses.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Actress Copyright (C) 2024 Bjørn Tore Svinningen
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package actress
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/google/uuid"
)
// Holds information about what process functions who belongs to what
// event, and also a map of the started processes.
type dynProcesses struct {
procMap map[EventType]*Process
mu sync.Mutex
}
// Add a new Event and it's process to the processes map.
func (p *dynProcesses) Add(et EventType, proc *Process) {
// Check if a process for the same event is defined, and if so we
// cancel the current process before we replace it with a new one.
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.procMap[et]; ok {
p.procMap[et].Cancel()
}
p.procMap[et] = proc
}
// Delete an Event and it's process from the processes map.
func (p *dynProcesses) Delete(et EventType) {
p.mu.Lock()
defer p.mu.Unlock()
p.procMap[et].Cancel()
delete(p.procMap, et)
log.Printf("deleted process %v\n", et)
}
// Checks if the event is defined in the processes map, and returns true if it is.
func (p *dynProcesses) IsEventDefined(ev EventType) bool {
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.procMap[ev]; !ok {
return false
}
return true
}
// Prepare and return a new *dynProcesses structure.
func newDynProcesses() *dynProcesses {
p := dynProcesses{
procMap: make(map[EventType]*Process),
}
return &p
}
// Will create and return a new UUID.
func NewUUID() string {
u := uuid.New()
return u.String()
}
// NewDynProcess will prepare and return a *Process. It will copy
// channels and map structures from the root process.
// The purpose of dynamic processes is to have short lived processes
// that can be quickly started, and removed again when it's job is done.
// The only difference between a process and a dynamic process are that
// the dynamic processes have a mutex in processes map DynProcesses so
// we also can delete the processes when they are no longer needed.
func NewDynProcess(ctx context.Context, parentP Process, event EventType, fn ETFunc) *Process {
ctx, cancel := context.WithCancel(ctx)
p := Process{
fn: nil,
InCh: make(chan Event, 1),
EventCh: parentP.EventCh,
ErrorCh: parentP.ErrorCh,
TestCh: parentP.TestCh,
DynCh: parentP.DynCh,
Event: event,
Processes: parentP.Processes,
DynProcesses: parentP.DynProcesses,
ErrProcesses: parentP.ErrProcesses,
isRoot: false,
Config: parentP.Config,
pids: parentP.pids,
PID: parentP.pids.next(),
Cancel: cancel,
}
p.DynProcesses.Add(event, &p)
if fn != nil {
p.fn = fn(ctx, &p)
}
return &p
}
// ------------------------------------------------------
// Router for normal events.
const EDRouter EventType = "EDRouter"
// Process function for routing and handling events. Will check
// and route the event to the correct process.
func edRouterFn(ctx context.Context, p *Process) func() {
fn := func() {
for {
select {
case e := <-p.DynCh:
// Dynamic processes can take a little longer to start up and be
// registered in the map. We check here if process is registred,
// and it it is not we retry.
if _, ok := p.DynProcesses.procMap[e.EventType]; !ok {
go func(ev Event) {
// Try to 3 times to deliver the message.
for i := 0; i < 3; i++ {
_, ok := p.DynProcesses.procMap[e.EventType]
if !ok {
p.AddError(Event{EventType: ERLog, Err: fmt.Errorf("found no process registered for the event type : %v", ev.EventType)})
time.Sleep(time.Millisecond * 1000)
continue
}
// Process is now registred, so we can safely put
//the event on the InCh of the process.
p.DynProcesses.procMap[e.EventType].InCh <- e
return
}
}(e)
continue
}
// Process was registered. Deliver the event to the process InCh.
p.DynProcesses.procMap[e.EventType].InCh <- e
case <-ctx.Done():
p.AddError(Event{
EventType: ERLog,
Err: fmt.Errorf("info: got ctx.Done"),
})
return
}
}
}
return fn
}