forked from dginev/deprecated-CorTeX
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cortex-client
executable file
·128 lines (118 loc) · 5.16 KB
/
cortex-client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/perl
# /=====================================================================\ #
# | CorTeX Framework | #
# | cortex-client -- TaskDB and Gearman broker | #
# |=====================================================================| #
# | Part of the LaMaPUn project: https://trac.kwarc.info/lamapun/ | #
# | Research software, produced as part of work done by: | #
# | the KWARC group at Jacobs University | #
# | Copyright (c) 2012 | #
# | Released under the GNU Public License | #
# |---------------------------------------------------------------------| #
# | Deyan Ginev <[email protected]> #_# | #
# | http://kwarc.info/people/dginev (o o) | #
# \=========================================================ooo==U==ooo=/ #
use strict;
use warnings;
use Encode;
use Data::Dumper;
use JSON::XS qw(decode_json encode_json);
use FindBin;
my ($RealBin_safe,$libdir);
BEGIN {
$FindBin::RealBin =~ /^([^\0]+)\z/; # Valid Unix path TODO: Windows, revisit regexp
$RealBin_safe = $1;
die 'Fatal:IO:tainted RealBin was tainted! Failing...'
unless ($RealBin_safe && (-e $RealBin_safe.'/cortex-client'));
$libdir = $RealBin_safe."/lib"; }
if (-d $libdir) {
use lib $libdir;}
use CorTeX::Backend;
use CorTeX::Util::DB_File_Utils qw(db_file_connect db_file_disconnect);
use CorTeX::Util::Data qw(parse_log);
use AnyEvent::Gearman;
# Grab a TaskDB
my $db_handle = db_file_connect;
my $backend = CorTeX::Backend->new(%$db_handle);
my $taskdb = $backend->taskdb;
my $docdb = $backend->docdb;
my $metadb = $backend->metadb;
db_file_disconnect();
### MAIN ###
# Event loop between DB Polling and Gearman dispatching
my (@servers) = @ARGV;
my $max_queue_size = 200;
my $queue_size;
my $client = gearman_client(@servers);
my $results = [];
while (1) {
my ($mark,$tasks) = $taskdb->fetch_tasks(size=>$max_queue_size);
$queue_size = scalar(@$tasks);
if (! ($queue_size)) {
# Try again in a minute if queue is empty
sleep 60; next; }
my $next_iteration = AnyEvent->condvar;
foreach my $task(@$tasks) {
my $entry = $task->{entry};
my $taskid = $task->{taskid};
my $service = $task->{iid};
$task->{formats} = $taskdb->serviceiid_to_formats($service);
# Obtain the actual workload:
my $inputconverter = $task->{formats}->[2];
$inputconverter = $taskdb->serviceid_to_iid(
$taskdb->service_to_id($inputconverter))
if $inputconverter;
$task->{workload} = $docdb->fetch_entry(
entry=>$task->{entry},
inputformat=>$task->{formats}->[0],
inputconverter=>$inputconverter);
# TODO: Obtain prerequisite annotations/resources
# Dispatch to Gearman
$client->add_task(
$service => encode_json($task),
on_complete => sub {
my $response = decode_json($_[1]);
# Parse log, to be ready for TaskDB insertion
my $messages = parse_log(delete $response->{log});
@$messages = grep {$_->{severity} ne 'status'} @$messages; # We don't want status messages for now. TODO: What to do with those?
$response->{messages} = $messages;
# Record the service and entry names (TODO: also corpus name or? ensure all entries are unique instead?)
$response->{entry} = $entry;
$response->{taskid} = $taskid;
$response->{service} = $service;
$response->{formats} = $task->{formats};
push @$results, $response;
# New queue if we're done here
if (scalar(@$results) == $queue_size) {
# Done with this queue, report to DB
# TODO: Push new annotations, formats, aggregate resources into docdb
# TODO: Important - try to do so asynchroneously, the Doc DB might become a bottleneck otherwise
complete_tasks($results);
$results=[];
$next_iteration->send(); }},
on_fail => sub {
my $response = {service=>$service,entry=>$entry,taskid=>$taskid,
status=>-4,messages=>parse_log("Fatal:Gearman:client Job failed (generic)")};
push @$results, $response;
# New queue if we're done here
if (scalar(@$results) == $queue_size) {
# Done with this queue, report to DB
# TODO: Push new annotations, formats, aggregate resources into docdb
# TODO: Important - try to do so asynchroneously, the Doc DB might become a bottleneck otherwise
complete_tasks($results);
$results=[];
$next_iteration->send(); }}
);
}
$next_iteration->recv; }
sub complete_tasks {
my ($results) = @_;
# TODO: A single transaction with each database here, speed everything along nicely
# TODO: Validate the service is returning the correct type of data
# Insert any new annotations from the conversion
$metadb->complete_annotations($results);
# Insert any new documents or resources from the conversion
$docdb->complete_documents($results);
# Update all as done, insert logs
$taskdb->complete_tasks($results);
}