Skip to content

Commit

Permalink
Convert es-graphite-dynamic.pl to use App::ElasticSearch::Utilities::…
Browse files Browse the repository at this point in the history
…Metrics
  • Loading branch information
reyjrar committed Jul 20, 2023
1 parent c4cf0fc commit b1dc300
Showing 1 changed file with 16 additions and 205 deletions.
221 changes: 16 additions & 205 deletions scripts/es-graphite-dynamic.pl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
use strict;
use warnings;

use App::ElasticSearch::Utilities qw(es_request es_node_stats es_index_stats es_index_strip_date es_flatten_hash);
use App::ElasticSearch::Utilities qw(es_connect);
use App::ElasticSearch::Utilities::Metrics;
use CLI::Helpers qw(:all);
use Getopt::Long qw(:config no_ignore_case no_ignore_case_always);
use IO::Socket::INET;
Expand All @@ -20,8 +21,6 @@
'carbon-proto=s',
'carbon-server=s',
'carbon-port=i',
'with-indices',
'with-cluster',
'prefix=s',
'no-prefix',
'help|h',
Expand All @@ -36,23 +35,27 @@
#------------------------------------------------------------------------#
# Argument Sanitazation


# Ignore uninteresting metrics
my @_IGNORE = qw(id attributes timestamp upms _all _shards);
my @_IGNORE = ();
push @_IGNORE, split(/,/, $opt{ignore}) if exists $opt{ignore};
my %_IGNORE = map { $_ => 1 } @_IGNORE;
# Merge options into config
my %cfg = (
'carbon-proto' => 'tcp',
'carbon-base' => 'general.es',
%opt,
host => App::ElasticSearch::Utilities::def('HOST'),
);

#------------------------------------------------------------------------#
# Globals
my $TIME = time;
my $HOSTNAME = undef;
my $CLUSTER = undef;
my $TIME = time;
my $Fetcher = App::ElasticSearch::Utilities::Metrics->new(
connection => es_connect(),
%_IGNORE ? (
ignore => [ sort keys %_IGNORE ]
) : (),
);

#------------------------------------------------------------------------#
# Carbon Socket Creation
Expand All @@ -70,32 +73,18 @@

#------------------------------------------------------------------------#
# Collect and Decode the Cluster Statistics
my @metrics = exists $opt{'with-cluster'} ? cluster_health() : ();
my $stats = es_node_stats('_local');
if( !$stats ) {
output({color=>'red'}, "Error retrieving nodes_stats()");

my @metrics = sort map { "$_->{key} $_->{value}" } @{ $Fetcher->get_metrics };
if( !@metrics ) {
output({color=>'red'}, "Error retrieving metrics");
exit 1;
}
push @metrics, @{ parse_nodes_stats($stats) };
# Fetch Local Shard Data
push @metrics, local_shard_data();

# Collect individual indexes names and their own statistics
if( exists $cfg{'with-indices'} ) {
my $data = es_index_stats('_all');
if( defined $data ) {
push @metrics, dynamic_stat_collector($data->{indices},'cluster',$CLUSTER);
}
else {
output({color=>'red'}, "Index stats requested, but response was empty.");
}
}

#------------------------------------------------------------------------#
# Send output to appropriate channels
for ( @metrics ) {
# Format
my $prefix = exists $cfg{prefix} ? $cfg{prefix} : join('.', $cfg{'carbon-base'}, $HOSTNAME);
my $prefix = exists $cfg{prefix} ? $cfg{prefix} : join('.', $cfg{'carbon-base'}, $Fetcher->node_details->{name});
s/^/$prefix./ unless $cfg{'no-prefix'};
s/$/ $TIME\n/;

Expand All @@ -108,179 +97,6 @@
output({data=>1},$_);
}
}

#------------------------------------------------------------------------#
# Basic Cluster Statistics
sub cluster_health {
my $result = es_request('_cluster/health');
my @stats =();
if( defined $result ) {
$CLUSTER ||= $result->{cluster_name};
push @stats,
"cluster.nodes.total $result->{number_of_nodes}",
"cluster.nodes.data $result->{number_of_data_nodes}",
"cluster.shards.primary $result->{active_primary_shards}",
"cluster.shards.active $result->{active_shards}",
"cluster.shards.initializing $result->{initializing_shards}",
"cluster.shards.relocating $result->{relocating_shards}",
"cluster.shards.unassigned $result->{unassigned_shards}",
;
}
push @stats, index_blocks();
return @stats;
}
#------------------------------------------------------------------------#
# Index Blocks
sub index_blocks {
my $result = es_request('_settings/index.blocks.*', { index => '_all' });

my %collected=();
foreach my $idx ( keys %{ $result } ) {
if( $result->{$idx}{settings} ) {
my $settings = es_flatten_hash( $result->{$idx}{settings} );
foreach my $block ( keys %{ $settings } ) {
my $value = $settings->{$block};
if( lc $value eq 'true') {
$collected{$block} ||= 0;
$collected{$block}++;
}
}
}
}

return map { "cluster.$_ $collected{$_}" } sort keys %collected;
}
#------------------------------------------------------------------------#
# Local Shard Data
sub local_shard_data {
# Retrieve our local node id
my $result = es_request('_nodes/_local');
return unless $result->{nodes};

my ($id) = keys %{ $result->{nodes} };

return unless $id;

my $shardres = es_request('_cat/shards',
{
uri_param => {
local => 'true',
format => 'json',
bytes => 'b',
h => join(',', qw( index prirep docs store id state )),
}
}
);

my %results;
foreach my $shard ( @{ $shardres } ) {
# Skip unallocated shards
next unless $shard->{id};

# Skip unless this shard is allocated to this shard
next unless $shard->{id} eq $id;

# Skip "Special" Indexes
next if $shard->{index} =~ /^\./;

# Get Metadata
my $index = es_index_strip_date( $shard->{index} );
next unless $index;

$index =~ s/\./_/g;

my $type = $shard->{prirep} eq 'p' ? 'primary' : 'replica';

# Initialize
$results{$index} ||= { map { $_ => 0 } qw( docs bytes primary replica ) };
$results{$index}->{state} ||= {};
$results{$index}->{state}{$shard->{state}} ||= 0;
$results{$index}->{state}{$shard->{state}}++;

# Add it up, Add it up
$results{$index}->{docs} += $shard->{docs};
$results{$index}->{bytes} += $shard->{store};
$results{$index}->{$type}++;
}

my @results;
foreach my $idx (sort keys %results) {
foreach my $k ( sort keys %{ $results{$idx} } ) {
# Skip the complex
next if ref $results{$idx}->{$k};
push @results,
sprintf "node.indices.%s.%s %d",
$idx, $k, $results{$idx}->{$k};
}
my $states = $results{$idx}->{state} || {};

foreach my $k ( sort keys %{ $states } ) {
push @results,
sprintf "node.indices.%s.state.%s %d",
$idx, $k, $states->{$k};
}
}
return @results;

}
#------------------------------------------------------------------------#
# Parse Statistics Dynamically
sub dynamic_stat_collector {
my $ref = shift;
my @path = @_;
my @stats = ();

# Base Case
return unless is_hashref($ref);

foreach my $key (sort keys %{ $ref }) {
# Skip uninteresting keys
next if exists $_IGNORE{$key};

# Skip peak values, we'll see those in the graphs.
next if $key =~ /^peak/;

# Sanitize Key Name
my $key_name = $key;
$key_name =~ s/(?:time_)?in_millis/ms/;
$key_name =~ s/(?:size_)?in_bytes/bytes/;
$key_name =~ s/\./_/g;

if( is_hashref($ref->{$key}) ) {
# Recurse
push @stats, dynamic_stat_collector($ref->{$key},@path,$key_name);
}
elsif( $ref->{$key} =~ /^\d+(?:\.\d+)?$/ ) {
# Numeric
push @stats, join('.',@path,$key_name) . " $ref->{$key}";
}
}

return @stats;
}

#------------------------------------------------------------------------#
# Generate Nodes Statistics
sub parse_nodes_stats {
my $data = shift;

# We are using _local, so we'll only have our target
# nodes data in the results, using the loop to grab
# the node_id, which is hashed.
my $node_id;
foreach my $id (keys %{ $data->{nodes} }) {
$node_id = $id;
$HOSTNAME=$data->{nodes}{$id}{name};
last;
}
$CLUSTER ||= $data->{cluster_name};
verbose("[$CLUSTER] Parsing node_stats for ID:$node_id => $HOSTNAME");
my $node = $data->{nodes}{$node_id};

my @stats = dynamic_stat_collector($node);
return \@stats;
}

__END__
=head1 SYNOPSIS
Expand Down Expand Up @@ -330,11 +146,6 @@ =head1 OPTIONS
Use this port for the carbon server, useless without --carbon-server
=item B<with-indices>
Also grab data at the individual index level, will not append hostnames as this is useless. It will
map the data into "$CARBON_BASE.cluster.$CLUSTERNAME.$INDEX..."
=item B<ignore>
A comma separated list of keys to ignore when parsing the tree. This is in addition to the
Expand Down

0 comments on commit b1dc300

Please sign in to comment.