/** * collectd - src/pinba.c (based on code from pinba_engine 0.0.5) * Copyright (c) 2007-2009 Antony Dovgal * Copyright (C) 2010 Phoenix Kayo * Copyright (C) 2010 Florian Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; only version 2 of the License is applicable. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * * Authors: * Antony Dovgal * Phoenix Kayo * Florian Forster **/ #include "collectd.h" #include "plugin.h" #include "utils/common/common.h" #include #include #include "pinba.pb-c.h" /* AIX doesn't have MSG_DONTWAIT */ #ifndef MSG_DONTWAIT #define MSG_DONTWAIT MSG_NONBLOCK #endif /* * Defines */ #ifndef PINBA_UDP_BUFFER_SIZE #define PINBA_UDP_BUFFER_SIZE 65536 #endif #ifndef PINBA_DEFAULT_NODE #define PINBA_DEFAULT_NODE "::0" #endif #ifndef PINBA_DEFAULT_SERVICE #define PINBA_DEFAULT_SERVICE "30002" #endif #ifndef PINBA_MAX_SOCKETS #define PINBA_MAX_SOCKETS 16 #endif /* * Private data structures */ /* {{{ */ struct pinba_socket_s { struct pollfd fd[PINBA_MAX_SOCKETS]; nfds_t fd_num; }; typedef struct pinba_socket_s pinba_socket_t; /* Fixed point counter value. n is the decimal part multiplied by 10^9. */ struct float_counter_s { uint64_t i; uint64_t n; /* nanos */ }; typedef struct float_counter_s float_counter_t; struct pinba_statnode_s { /* collector name, used as plugin instance */ char *name; /* query data */ char *host; char *server; char *script; derive_t req_count; float_counter_t req_time; float_counter_t ru_utime; float_counter_t ru_stime; derive_t doc_size; gauge_t mem_peak; }; typedef struct pinba_statnode_s pinba_statnode_t; /* }}} */ /* * Module global variables */ /* {{{ */ static pinba_statnode_t *stat_nodes; static unsigned int stat_nodes_num; static pthread_mutex_t stat_nodes_lock; static char *conf_node; static char *conf_service; static bool collector_thread_running; static bool collector_thread_do_shutdown; static pthread_t collector_thread_id; /* }}} */ /* * Functions */ static void float_counter_add(float_counter_t *fc, float val) /* {{{ */ { uint64_t tmp; if (val < 0.0) return; tmp = (uint64_t)val; val -= (double)tmp; fc->i += tmp; fc->n += (uint64_t)((val * 1000000000.0) + .5); if (fc->n >= 1000000000) { fc->i += 1; fc->n -= 1000000000; assert(fc->n < 1000000000); } } /* }}} void float_counter_add */ static derive_t float_counter_get(const float_counter_t *fc, /* {{{ */ uint64_t factor) { derive_t ret; ret = (derive_t)(fc->i * factor); ret += (derive_t)(fc->n / (1000000000 / factor)); return ret; } /* }}} derive_t float_counter_get */ static void strset(char **str, const char *new) /* {{{ */ { char *tmp; if (!str || !new) return; tmp = strdup(new); if (tmp == NULL) return; sfree(*str); *str = tmp; } /* }}} void strset */ static void service_statnode_add(const char *name, /* {{{ */ const char *host, const char *server, const char *script) { pinba_statnode_t *node; node = realloc(stat_nodes, sizeof(*stat_nodes) * (stat_nodes_num + 1)); if (node == NULL) { ERROR("pinba plugin: realloc failed"); return; } stat_nodes = node; node = stat_nodes + stat_nodes_num; memset(node, 0, sizeof(*node)); /* reset strings */ node->name = NULL; node->host = NULL; node->server = NULL; node->script = NULL; node->mem_peak = NAN; /* fill query data */ strset(&node->name, name); strset(&node->host, host); strset(&node->server, server); strset(&node->script, script); /* increment counter */ stat_nodes_num++; } /* }}} void service_statnode_add */ /* Copy the data from the global "stat_nodes" list into the buffer pointed to * by "res", doing the derivation in the process. Returns the next index or * zero if the end of the list has been reached. */ static unsigned int service_statnode_collect(pinba_statnode_t *res, /* {{{ */ unsigned int index) { pinba_statnode_t *node; if (stat_nodes_num == 0) return 0; /* begin collecting */ if (index == 0) pthread_mutex_lock(&stat_nodes_lock); /* end collecting */ if (index >= stat_nodes_num) { pthread_mutex_unlock(&stat_nodes_lock); return 0; } node = stat_nodes + index; memcpy(res, node, sizeof(*res)); /* reset node */ node->mem_peak = NAN; return index + 1; } /* }}} unsigned int service_statnode_collect */ static void service_statnode_process(pinba_statnode_t *node, /* {{{ */ Pinba__Request *request) { node->req_count++; float_counter_add(&node->req_time, request->request_time); float_counter_add(&node->ru_utime, request->ru_utime); float_counter_add(&node->ru_stime, request->ru_stime); node->doc_size += request->document_size; if (isnan(node->mem_peak) || (node->mem_peak < ((gauge_t)request->memory_peak))) node->mem_peak = (gauge_t)request->memory_peak; } /* }}} void service_statnode_process */ static void service_process_request(Pinba__Request *request) /* {{{ */ { pthread_mutex_lock(&stat_nodes_lock); for (unsigned int i = 0; i < stat_nodes_num; i++) { if ((stat_nodes[i].host != NULL) && (strcmp(request->hostname, stat_nodes[i].host) != 0)) continue; if ((stat_nodes[i].server != NULL) && (strcmp(request->server_name, stat_nodes[i].server) != 0)) continue; if ((stat_nodes[i].script != NULL) && (strcmp(request->script_name, stat_nodes[i].script) != 0)) continue; service_statnode_process(&stat_nodes[i], request); } pthread_mutex_unlock(&stat_nodes_lock); } /* }}} void service_process_request */ static int pb_del_socket(pinba_socket_t *s, /* {{{ */ nfds_t index) { if (index >= s->fd_num) return EINVAL; close(s->fd[index].fd); s->fd[index].fd = -1; /* When deleting the last element in the list, no memmove is necessary. */ if (index < (s->fd_num - 1)) { memmove(&s->fd[index], &s->fd[index + 1], sizeof(s->fd[0]) * (s->fd_num - (index + 1))); } s->fd_num--; return 0; } /* }}} int pb_del_socket */ static int pb_add_socket(pinba_socket_t *s, /* {{{ */ const struct addrinfo *ai) { if (s->fd_num == PINBA_MAX_SOCKETS) { WARNING("pinba plugin: Sorry, you have hit the built-in limit of " "%i sockets. Please complain to the collectd developers so we can " "raise the limit.", PINBA_MAX_SOCKETS); return -1; } int fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); if (fd < 0) { ERROR("pinba plugin: socket(2) failed: %s", STRERRNO); return 0; } int status = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)); if (status != 0) { WARNING("pinba plugin: setsockopt(SO_REUSEADDR) failed: %s", STRERRNO); } status = bind(fd, ai->ai_addr, ai->ai_addrlen); if (status != 0) { ERROR("pinba plugin: bind(2) failed: %s", STRERRNO); close(fd); return 0; } s->fd[s->fd_num].fd = fd; s->fd[s->fd_num].events = POLLIN | POLLPRI; s->fd[s->fd_num].revents = 0; s->fd_num++; return 0; } /* }}} int pb_add_socket */ static pinba_socket_t *pinba_socket_open(const char *node, /* {{{ */ const char *service) { pinba_socket_t *s; struct addrinfo *ai_list; int status; if (node == NULL) node = PINBA_DEFAULT_NODE; if (service == NULL) service = PINBA_DEFAULT_SERVICE; struct addrinfo ai_hints = {.ai_family = AF_UNSPEC, .ai_flags = AI_PASSIVE, .ai_socktype = SOCK_DGRAM}; status = getaddrinfo(node, service, &ai_hints, &ai_list); if (status != 0) { ERROR("pinba plugin: getaddrinfo(3) failed: %s", gai_strerror(status)); return NULL; } assert(ai_list != NULL); s = calloc(1, sizeof(*s)); if (s == NULL) { freeaddrinfo(ai_list); ERROR("pinba plugin: calloc failed."); return NULL; } for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { status = pb_add_socket(s, ai_ptr); if (status != 0) break; } /* for (ai_list) */ freeaddrinfo(ai_list); if (s->fd_num < 1) { WARNING("pinba plugin: Unable to open socket for address %s.", node); sfree(s); s = NULL; } return s; } /* }}} pinba_socket_open */ static void pinba_socket_free(pinba_socket_t *socket) /* {{{ */ { if (!socket) return; for (nfds_t i = 0; i < socket->fd_num; i++) { if (socket->fd[i].fd < 0) continue; close(socket->fd[i].fd); socket->fd[i].fd = -1; } sfree(socket); } /* }}} void pinba_socket_free */ static int pinba_process_stats_packet(const uint8_t *buffer, /* {{{ */ size_t buffer_size) { Pinba__Request *request; request = pinba__request__unpack(NULL, buffer_size, buffer); if (!request) return -1; service_process_request(request); pinba__request__free_unpacked(request, NULL); return 0; } /* }}} int pinba_process_stats_packet */ static int pinba_udp_read_callback_fn(int sock) /* {{{ */ { uint8_t buffer[PINBA_UDP_BUFFER_SIZE]; size_t buffer_size; int status; while (42) { buffer_size = sizeof(buffer); status = recvfrom(sock, buffer, buffer_size - 1, MSG_DONTWAIT, /* from = */ NULL, /* from len = */ 0); if (status < 0) { if ((errno == EINTR) #ifdef EWOULDBLOCK || (errno == EWOULDBLOCK) #endif || (errno == EAGAIN)) { continue; } WARNING("pinba plugin: recvfrom(2) failed: %s", STRERRNO); return -1; } else if (status == 0) { DEBUG("pinba plugin: recvfrom(2) returned unexpected status zero."); return -1; } else /* if (status > 0) */ { assert(((size_t)status) < buffer_size); buffer_size = (size_t)status; buffer[buffer_size] = 0; status = pinba_process_stats_packet(buffer, buffer_size); if (status != 0) DEBUG("pinba plugin: Parsing packet failed."); return status; } } /* while (42) */ /* not reached */ assert(23 == 42); return -1; } /* }}} void pinba_udp_read_callback_fn */ static int receive_loop(void) /* {{{ */ { pinba_socket_t *s; s = pinba_socket_open(conf_node, conf_service); if (s == NULL) { ERROR("pinba plugin: Collector thread is exiting prematurely."); return -1; } while (!collector_thread_do_shutdown) { int status; if (s->fd_num < 1) break; status = poll(s->fd, s->fd_num, /* timeout = */ 1000); if (status == 0) /* timeout */ { continue; } else if (status < 0) { if ((errno == EINTR) || (errno == EAGAIN)) continue; ERROR("pinba plugin: poll(2) failed: %s", STRERRNO); pinba_socket_free(s); return -1; } for (nfds_t i = 0; i < s->fd_num; i++) { if (s->fd[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { pb_del_socket(s, i); i--; } else if (s->fd[i].revents & (POLLIN | POLLPRI)) { pinba_udp_read_callback_fn(s->fd[i].fd); } } /* for (s->fd) */ } /* while (!collector_thread_do_shutdown) */ pinba_socket_free(s); s = NULL; return 0; } /* }}} int receive_loop */ static void *collector_thread(void *arg) /* {{{ */ { receive_loop(); memset(&collector_thread_id, 0, sizeof(collector_thread_id)); collector_thread_running = false; pthread_exit(NULL); return NULL; } /* }}} void *collector_thread */ /* * Plugin declaration section */ static int pinba_config_view(const oconfig_item_t *ci) /* {{{ */ { char *name = NULL; char *host = NULL; char *server = NULL; char *script = NULL; int status; status = cf_util_get_string(ci, &name); if (status != 0) return status; for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; if (strcasecmp("Host", child->key) == 0) status = cf_util_get_string(child, &host); else if (strcasecmp("Server", child->key) == 0) status = cf_util_get_string(child, &server); else if (strcasecmp("Script", child->key) == 0) status = cf_util_get_string(child, &script); else { WARNING("pinba plugin: Unknown config option: %s", child->key); status = -1; } if (status != 0) break; } if (status == 0) service_statnode_add(name, host, server, script); sfree(name); sfree(host); sfree(server); sfree(script); return status; } /* }}} int pinba_config_view */ static int plugin_config(oconfig_item_t *ci) /* {{{ */ { /* The lock should not be necessary in the config callback, but let's be * sure.. */ pthread_mutex_lock(&stat_nodes_lock); for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; if (strcasecmp("Address", child->key) == 0) cf_util_get_string(child, &conf_node); else if (strcasecmp("Port", child->key) == 0) cf_util_get_service(child, &conf_service); else if (strcasecmp("View", child->key) == 0) pinba_config_view(child); else WARNING("pinba plugin: Unknown config option: %s", child->key); } pthread_mutex_unlock(&stat_nodes_lock); return 0; } /* }}} int pinba_config */ static int plugin_init(void) /* {{{ */ { int status; if (stat_nodes == NULL) { /* Collect the "total" data by default. */ service_statnode_add("total", /* host = */ NULL, /* server = */ NULL, /* script = */ NULL); } if (collector_thread_running) return 0; status = plugin_thread_create(&collector_thread_id, collector_thread, /* args = */ NULL, "pinba collector"); if (status != 0) { ERROR("pinba plugin: pthread_create(3) failed: %s", STRERRNO); return -1; } collector_thread_running = true; return 0; } /* }}} */ static int plugin_shutdown(void) /* {{{ */ { if (collector_thread_running) { int status; DEBUG("pinba plugin: Shutting down collector thread."); collector_thread_do_shutdown = true; status = pthread_join(collector_thread_id, /* retval = */ NULL); if (status != 0) { ERROR("pinba plugin: pthread_join(3) failed: %s", STRERROR(status)); } collector_thread_running = false; collector_thread_do_shutdown = false; } /* if (collector_thread_running) */ return 0; } /* }}} int plugin_shutdown */ static int plugin_submit(const pinba_statnode_t *res) /* {{{ */ { value_list_t vl = VALUE_LIST_INIT; vl.values_len = 1; sstrncpy(vl.plugin, "pinba", sizeof(vl.plugin)); sstrncpy(vl.plugin_instance, res->name, sizeof(vl.plugin_instance)); vl.values = &(value_t){.derive = res->req_count}; sstrncpy(vl.type, "total_requests", sizeof(vl.type)); plugin_dispatch_values(&vl); vl.values = &(value_t){ .derive = float_counter_get(&res->req_time, /* factor = */ 1000)}; sstrncpy(vl.type, "total_time_in_ms", sizeof(vl.type)); plugin_dispatch_values(&vl); vl.values = &(value_t){.derive = res->doc_size}; sstrncpy(vl.type, "total_bytes", sizeof(vl.type)); plugin_dispatch_values(&vl); vl.values = &(value_t){ .derive = float_counter_get(&res->ru_utime, /* factor = */ 100)}; sstrncpy(vl.type, "cpu", sizeof(vl.type)); sstrncpy(vl.type_instance, "user", sizeof(vl.type_instance)); plugin_dispatch_values(&vl); vl.values = &(value_t){ .derive = float_counter_get(&res->ru_stime, /* factor = */ 100)}; sstrncpy(vl.type, "cpu", sizeof(vl.type)); sstrncpy(vl.type_instance, "system", sizeof(vl.type_instance)); plugin_dispatch_values(&vl); vl.values = &(value_t){.gauge = res->mem_peak}; sstrncpy(vl.type, "memory", sizeof(vl.type)); sstrncpy(vl.type_instance, "peak", sizeof(vl.type_instance)); plugin_dispatch_values(&vl); return 0; } /* }}} int plugin_submit */ static int plugin_read(void) /* {{{ */ { unsigned int i = 0; pinba_statnode_t data; while ((i = service_statnode_collect(&data, i)) != 0) { plugin_submit(&data); } return 0; } /* }}} int plugin_read */ void module_register(void) /* {{{ */ { plugin_register_complex_config("pinba", plugin_config); plugin_register_init("pinba", plugin_init); plugin_register_read("pinba", plugin_read); plugin_register_shutdown("pinba", plugin_shutdown); } /* }}} void module_register */