/** * collectd - src/rrdcached.c * Copyright (C) 2008-2013 Florian octo Forster * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, including without limitation * the rights to use, copy, modify, merge, publish, distribute, sublicense, * and/or sell copies of the Software, and to permit persons to whom the * Software is furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * DEALINGS IN THE SOFTWARE. * * Authors: * Florian octo Forster **/ #include "collectd.h" #include "plugin.h" #include "utils/common/common.h" #include "utils/rrdcreate/rrdcreate.h" #undef HAVE_CONFIG_H #include #include /* * Private variables */ static char *datadir; static char *daemon_address; static bool config_create_files = true; static bool config_collect_stats = true; static rrdcreate_config_t rrdcreate_config = {.stepsize = 0, .heartbeat = 0, .rrarows = 1200, .xff = 0.1, .timespans = NULL, .timespans_num = 0, .consolidation_functions = NULL, .consolidation_functions_num = 0, .async = 0}; /* * Prototypes. */ static int rc_write(const data_set_t *ds, const value_list_t *vl, __attribute__((unused)) user_data_t *ud); static int rc_flush(__attribute__((unused)) cdtime_t timeout, const char *identifier, __attribute__((unused)) user_data_t *ud); static int value_list_to_string(char *buffer, int buffer_len, const data_set_t *ds, const value_list_t *vl) { assert(strcmp(ds->type, vl->type) == 0); memset(buffer, '\0', buffer_len); int status = ssnprintf(buffer, buffer_len, "%.6f", CDTIME_T_TO_DOUBLE(vl->time)); if ((status < 1) || (status >= buffer_len)) return -1; int offset = status; for (size_t i = 0; i < ds->ds_num; i++) { if ((ds->ds[i].type != DS_TYPE_COUNTER) && (ds->ds[i].type != DS_TYPE_GAUGE) && (ds->ds[i].type != DS_TYPE_DERIVE) && (ds->ds[i].type != DS_TYPE_ABSOLUTE)) return -1; if (ds->ds[i].type == DS_TYPE_COUNTER) { status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64, (uint64_t)vl->values[i].counter); } else if (ds->ds[i].type == DS_TYPE_GAUGE) { status = ssnprintf(buffer + offset, buffer_len - offset, ":%f", vl->values[i].gauge); } else if (ds->ds[i].type == DS_TYPE_DERIVE) { status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIi64, vl->values[i].derive); } else /* if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */ { status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64, vl->values[i].absolute); } if ((status < 1) || (status >= (buffer_len - offset))) return -1; offset += status; } /* for ds->ds_num */ return 0; } /* int value_list_to_string */ static int value_list_to_filename(char *buffer, size_t buffer_size, value_list_t const *vl) { char const suffix[] = ".rrd"; if (datadir != NULL) { size_t datadir_len = strlen(datadir) + 1; if (datadir_len >= buffer_size) return ENOMEM; sstrncpy(buffer, datadir, buffer_size); buffer[datadir_len - 1] = '/'; buffer[datadir_len] = '\0'; buffer += datadir_len; buffer_size -= datadir_len; } int status = FORMAT_VL(buffer, buffer_size, vl); if (status != 0) return status; size_t len = strlen(buffer); assert(len < buffer_size); buffer += len; buffer_size -= len; if (buffer_size <= sizeof(suffix)) return ENOMEM; memcpy(buffer, suffix, sizeof(suffix)); return 0; } /* int value_list_to_filename */ static int rc_config_get_int_positive(oconfig_item_t const *ci, int *ret) { int tmp = 0; int status = cf_util_get_int(ci, &tmp); if (status != 0) return status; if (tmp < 0) return EINVAL; *ret = tmp; return 0; } /* int rc_config_get_int_positive */ static int rc_config_get_xff(oconfig_item_t const *ci, double *ret) { if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER)) { ERROR("rrdcached plugin: The \"%s\" needs exactly one numeric argument " "in the range [0.0, 1.0)", ci->key); return EINVAL; } double value = ci->values[0].value.number; if ((value >= 0.0) && (value < 1.0)) { *ret = value; return 0; } ERROR("rrdcached plugin: The \"%s\" needs exactly one numeric argument " "in the range [0.0, 1.0)", ci->key); return EINVAL; } /* int rc_config_get_xff */ static int rc_config_add_timespan(int timespan) { if (timespan <= 0) return EINVAL; int *tmp = realloc(rrdcreate_config.timespans, sizeof(*rrdcreate_config.timespans) * (rrdcreate_config.timespans_num + 1)); if (tmp == NULL) return ENOMEM; rrdcreate_config.timespans = tmp; rrdcreate_config.timespans[rrdcreate_config.timespans_num] = timespan; rrdcreate_config.timespans_num++; return 0; } /* int rc_config_add_timespan */ static int rc_config(oconfig_item_t *ci) { for (int i = 0; i < ci->children_num; i++) { oconfig_item_t const *child = ci->children + i; const char *key = child->key; int status = 0; if (strcasecmp("DataDir", key) == 0) { status = cf_util_get_string(child, &datadir); if (status == 0) { int len = strlen(datadir); while ((len > 0) && (datadir[len - 1] == '/')) { len--; datadir[len] = 0; } if (len <= 0) sfree(datadir); } } else if (strcasecmp("DaemonAddress", key) == 0) status = cf_util_get_string(child, &daemon_address); else if (strcasecmp("CreateFiles", key) == 0) status = cf_util_get_boolean(child, &config_create_files); else if (strcasecmp("CreateFilesAsync", key) == 0) status = cf_util_get_boolean(child, &rrdcreate_config.async); else if (strcasecmp("CollectStatistics", key) == 0) status = cf_util_get_boolean(child, &config_collect_stats); else if (strcasecmp("StepSize", key) == 0) { int tmp = -1; status = rc_config_get_int_positive(child, &tmp); if (status == 0) rrdcreate_config.stepsize = (unsigned long)tmp; } else if (strcasecmp("HeartBeat", key) == 0) status = rc_config_get_int_positive(child, &rrdcreate_config.heartbeat); else if (strcasecmp("RRARows", key) == 0) status = rc_config_get_int_positive(child, &rrdcreate_config.rrarows); else if (strcasecmp("RRATimespan", key) == 0) { int tmp = -1; status = rc_config_get_int_positive(child, &tmp); if (status == 0) status = rc_config_add_timespan(tmp); } else if (strcasecmp("XFF", key) == 0) status = rc_config_get_xff(child, &rrdcreate_config.xff); else { WARNING("rrdcached plugin: Ignoring invalid option %s.", key); continue; } if (status != 0) WARNING("rrdcached plugin: Handling the \"%s\" option failed.", key); } if (daemon_address != NULL) { plugin_register_write("rrdcached", rc_write, /* user_data = */ NULL); plugin_register_flush("rrdcached", rc_flush, /* user_data = */ NULL); } return 0; } /* int rc_config */ static int try_reconnect(void) { rrdc_disconnect(); rrd_clear_error(); int status = rrdc_connect(daemon_address); if (status != 0) { ERROR("rrdcached plugin: Failed to reconnect to RRDCacheD " "at %s: %s (status=%d)", daemon_address, rrd_get_error(), status); return -1; } INFO("rrdcached plugin: Successfully reconnected to RRDCacheD " "at %s", daemon_address); return 0; } /* int try_reconnect */ static int rc_read(void) { value_list_t vl = VALUE_LIST_INIT; vl.values = &(value_t){.gauge = NAN}; vl.values_len = 1; if (daemon_address == NULL) return -1; if (!config_collect_stats) return -1; if ((strncmp("unix:", daemon_address, strlen("unix:")) != 0) && (daemon_address[0] != '/')) sstrncpy(vl.host, daemon_address, sizeof(vl.host)); sstrncpy(vl.plugin, "rrdcached", sizeof(vl.plugin)); rrd_clear_error(); int status = rrdc_connect(daemon_address); if (status != 0) { ERROR("rrdcached plugin: Failed to connect to RRDCacheD " "at %s: %s (status=%d)", daemon_address, rrd_get_error(), status); return -1; } rrdc_stats_t *head; bool retried = false; while (42) { /* The RRD client lib does not provide any means for checking a * connection, hence we'll have to retry upon failed operations. */ head = NULL; rrd_clear_error(); status = rrdc_stats_get(&head); if (status == 0) break; if (!retried) { retried = true; if (try_reconnect() == 0) continue; /* else: report the error and fail */ } ERROR("rrdcached plugin: rrdc_stats_get failed: %s (status=%i).", rrd_get_error(), status); return -1; } for (rrdc_stats_t *ptr = head; ptr != NULL; ptr = ptr->next) { if (ptr->type == RRDC_STATS_TYPE_GAUGE) vl.values[0].gauge = (gauge_t)ptr->value.gauge; else if (ptr->type == RRDC_STATS_TYPE_COUNTER) vl.values[0].counter = (counter_t)ptr->value.counter; else continue; if (strcasecmp("QueueLength", ptr->name) == 0) { sstrncpy(vl.type, "queue_length", sizeof(vl.type)); sstrncpy(vl.type_instance, "", sizeof(vl.type_instance)); } else if (strcasecmp("UpdatesWritten", ptr->name) == 0) { sstrncpy(vl.type, "operations", sizeof(vl.type)); sstrncpy(vl.type_instance, "write-updates", sizeof(vl.type_instance)); } else if (strcasecmp("DataSetsWritten", ptr->name) == 0) { sstrncpy(vl.type, "operations", sizeof(vl.type)); sstrncpy(vl.type_instance, "write-data_sets", sizeof(vl.type_instance)); } else if (strcasecmp("TreeNodesNumber", ptr->name) == 0) { sstrncpy(vl.type, "gauge", sizeof(vl.type)); sstrncpy(vl.type_instance, "tree_nodes", sizeof(vl.type_instance)); } else if (strcasecmp("TreeDepth", ptr->name) == 0) { sstrncpy(vl.type, "gauge", sizeof(vl.type)); sstrncpy(vl.type_instance, "tree_depth", sizeof(vl.type_instance)); } else if (strcasecmp("FlushesReceived", ptr->name) == 0) { sstrncpy(vl.type, "operations", sizeof(vl.type)); sstrncpy(vl.type_instance, "receive-flush", sizeof(vl.type_instance)); } else if (strcasecmp("JournalBytes", ptr->name) == 0) { sstrncpy(vl.type, "counter", sizeof(vl.type)); sstrncpy(vl.type_instance, "journal-bytes", sizeof(vl.type_instance)); } else if (strcasecmp("JournalRotate", ptr->name) == 0) { sstrncpy(vl.type, "counter", sizeof(vl.type)); sstrncpy(vl.type_instance, "journal-rotates", sizeof(vl.type_instance)); } else if (strcasecmp("UpdatesReceived", ptr->name) == 0) { sstrncpy(vl.type, "operations", sizeof(vl.type)); sstrncpy(vl.type_instance, "receive-update", sizeof(vl.type_instance)); } else { DEBUG("rrdcached plugin: rc_read: Unknown statistic `%s'.", ptr->name); continue; } plugin_dispatch_values(&vl); } /* for (ptr = head; ptr != NULL; ptr = ptr->next) */ rrdc_stats_free(head); return 0; } /* int rc_read */ static int rc_init(void) { if (config_collect_stats) plugin_register_read("rrdcached", rc_read); return 0; } /* int rc_init */ static int rc_write(const data_set_t *ds, const value_list_t *vl, user_data_t __attribute__((unused)) * user_data) { char filename[PATH_MAX]; char values[512]; int status; bool retried = false; if (daemon_address == NULL) { ERROR("rrdcached plugin: daemon_address == NULL."); plugin_unregister_write("rrdcached"); return -1; } if (strcmp(ds->type, vl->type) != 0) { ERROR("rrdcached plugin: DS type does not match value list type"); return -1; } if (value_list_to_filename(filename, sizeof(filename), vl) != 0) { ERROR("rrdcached plugin: value_list_to_filename failed."); return -1; } if (value_list_to_string(values, sizeof(values), ds, vl) != 0) { ERROR("rrdcached plugin: value_list_to_string failed."); return -1; } if (config_create_files) { struct stat statbuf; status = stat(filename, &statbuf); if (status != 0) { if (errno != ENOENT) { ERROR("rrdcached plugin: stat (%s) failed: %s", filename, STRERRNO); return -1; } status = cu_rrd_create_file(filename, ds, vl, &rrdcreate_config); if (status != 0) { ERROR("rrdcached plugin: cu_rrd_create_file (%s) failed.", filename); return -1; } else if (rrdcreate_config.async) return 0; } } rrd_clear_error(); status = rrdc_connect(daemon_address); if (status != 0) { ERROR("rrdcached plugin: Failed to connect to RRDCacheD " "at %s: %s (status=%d)", daemon_address, rrd_get_error(), status); return -1; } char *values_array[2] = { [0] = values, [1] = NULL, }; while (42) { /* The RRD client lib does not provide any means for checking a * connection, hence we'll have to retry upon failed operations. */ rrd_clear_error(); status = rrdc_update(filename, /* values_num = */ 1, (void *)values_array); if (status == 0) break; if (!retried) { retried = true; if (try_reconnect() == 0) continue; /* else: report the error and fail */ } ERROR("rrdcached plugin: rrdc_update (%s, [%s], 1) failed: %s (status=%i)", filename, values_array[0], rrd_get_error(), status); return -1; } return 0; } /* int rc_write */ static int rc_flush(__attribute__((unused)) cdtime_t timeout, /* {{{ */ const char *identifier, __attribute__((unused)) user_data_t *ud) { if (identifier == NULL) return EINVAL; char filename[PATH_MAX + 1]; if (datadir != NULL) ssnprintf(filename, sizeof(filename), "%s/%s.rrd", datadir, identifier); else ssnprintf(filename, sizeof(filename), "%s.rrd", identifier); rrd_clear_error(); int status = rrdc_connect(daemon_address); if (status != 0) { ERROR("rrdcached plugin: Failed to connect to RRDCacheD " "at %s: %s (status=%d)", daemon_address, rrd_get_error(), status); return -1; } bool retried = false; while (42) { /* The RRD client lib does not provide any means for checking a * connection, hence we'll have to retry upon failed operations. */ rrd_clear_error(); status = rrdc_flush(filename); if (status == 0) break; if (!retried) { retried = true; if (try_reconnect() == 0) continue; /* else: report the error and fail */ } ERROR("rrdcached plugin: rrdc_flush (%s) failed: %s (status=%i).", filename, rrd_get_error(), status); return -1; } DEBUG("rrdcached plugin: rrdc_flush (%s): Success.", filename); return 0; } /* }}} int rc_flush */ static int rc_shutdown(void) { rrdc_disconnect(); return 0; } /* int rc_shutdown */ void module_register(void) { plugin_register_complex_config("rrdcached", rc_config); plugin_register_init("rrdcached", rc_init); plugin_register_shutdown("rrdcached", rc_shutdown); } /* void module_register */