primary_keys_enabled = $enabled; return $this; } /** * Returns a subset of $items having history data within the $period of time. * * @param array $items An array of items with the 'itemid' and 'value_type' properties. * @param int $period The maximum period of time to search for history values within. * * @return array An array with items IDs as keys and the original item data as values. */ public function getItemsHavingValues(array $items, $period = null) { $items = zbx_toHash($items, 'itemid'); $results = []; $grouped_items = $this->getItemsGroupedByStorage($items); if (array_key_exists(ZBX_HISTORY_SOURCE_ELASTIC, $grouped_items)) { $results += $this->getLastValuesFromElasticsearch($grouped_items[ZBX_HISTORY_SOURCE_ELASTIC], 1, $period); } if (array_key_exists(ZBX_HISTORY_SOURCE_SQL, $grouped_items)) { $results += $this->getItemsHavingValuesFromSql($grouped_items[ZBX_HISTORY_SOURCE_SQL], $period); } return array_intersect_key($items, $results); } /** * SQL specific implementation of getItemsHavingValues. * * @see CHistoryManager::getItemsHavingValues */ private function getItemsHavingValuesFromSql(array $items, $period = null) { $results = []; if (CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY_GLOBAL) == 1) { $hk_history = timeUnitToSeconds(CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY)); $period = $period !== null ? min($period, $hk_history) : $hk_history; } if ($period !== null) { $period = time() - $period; } $items = zbx_toHash($items, 'itemid'); $itemids_by_type = []; foreach ($items as $itemid => $item) { $itemids_by_type[$item['value_type']][] = $itemid; } foreach ($itemids_by_type as $type => $type_itemids) { $type_results = DBfetchColumn(DBselect( 'SELECT DISTINCT itemid'. ' FROM '.self::getTableName($type). ' WHERE '.dbConditionInt('itemid', $type_itemids). ($period !== null ? ' AND clock>'.$period : '') ), 'itemid'); $results += array_intersect_key($items, array_flip($type_results)); } return $results; } /** * Returns the last $limit history objects for the given items. * * @param array $items An array of items with the 'itemid' and 'value_type' properties. * @param int $limit Max object count to be returned. * @param int $period The maximum period to retrieve data for. * * @return array An array with items IDs as keys and arrays of history objects as values. */ public function getLastValues(array $items, $limit = 1, $period = null) { $results = []; $grouped_items = $this->getItemsGroupedByStorage($items); if (array_key_exists(ZBX_HISTORY_SOURCE_ELASTIC, $grouped_items)) { $results += $this->getLastValuesFromElasticsearch($grouped_items[ZBX_HISTORY_SOURCE_ELASTIC], $limit, $period ); } if (array_key_exists(ZBX_HISTORY_SOURCE_SQL, $grouped_items)) { if ($this->primary_keys_enabled) { $results += $this->getLastValuesFromSqlWithPk($grouped_items[ZBX_HISTORY_SOURCE_SQL], $limit, $period); } else { $results += $this->getLastValuesFromSql($grouped_items[ZBX_HISTORY_SOURCE_SQL], $limit, $period); } } return $results; } /** * Elasticsearch specific implementation of getLastValues. * * @see CHistoryManager::getLastValues */ private function getLastValuesFromElasticsearch($items, $limit, $period) { $terms = []; $results = []; $filter = []; foreach ($items as $item) { $terms[$item['value_type']][] = $item['itemid']; } $query = [ 'aggs' => [ 'group_by_itemid' => [ 'terms' => [ 'field' => 'itemid' ], 'aggs' => [ 'group_by_docs' => [ 'top_hits' => [ 'size' => $limit, 'sort' => [ 'clock' => ZBX_SORT_DOWN ] ] ] ] ] ], 'size' => 0 ]; if ($period) { $filter[] = [ 'range' => [ 'clock' => [ 'gt' => (time() - $period) ] ] ]; } foreach (self::getElasticsearchEndpoints(array_keys($terms)) as $type => $endpoint) { $query['query']['bool']['must'] = array_merge([[ 'terms' => [ 'itemid' => $terms[$type] ] ]], $filter); // Assure that aggregations for all terms are returned. $query['aggs']['group_by_itemid']['terms']['size'] = count($terms[$type]); $data = CElasticsearchHelper::query('POST', $endpoint, $query); if (!is_array($data) || !array_key_exists('group_by_itemid', $data) || !array_key_exists('buckets', $data['group_by_itemid']) || !is_array($data['group_by_itemid']['buckets'])) { continue; } foreach ($data['group_by_itemid']['buckets'] as $item) { if (!is_array($item['group_by_docs']) || !array_key_exists('hits', $item['group_by_docs']) || !is_array($item['group_by_docs']['hits']) || !array_key_exists('hits', $item['group_by_docs']['hits']) || !is_array($item['group_by_docs']['hits']['hits'])) { continue; } foreach ($item['group_by_docs']['hits']['hits'] as $row) { if (!array_key_exists('_source', $row) || !is_array($row['_source'])) { continue; } $results[$item['key']][] = $row['_source']; } } } return $results; } /** * SQL specific implementation of getLastValues that makes use of primary key existence in history tables. * * @see CHistoryManager::getLastValues * * @return array Of itemid => [up to $limit values]. */ private function getLastValuesFromSqlWithPk(array $items, int $limit, ?int $period): array { $results = []; if (CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY_GLOBAL) == 1) { $hk_history = timeUnitToSeconds(CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY)); $period = $period !== null ? min($period, $hk_history) : $hk_history; } if ($period !== null) { $period = time() - $period; } $items_by_type = []; foreach ($items as $key => $item) { $value_type = $item['value_type']; if (!array_key_exists($value_type, $items_by_type)) { $items_by_type[$value_type] = []; } $items_by_type[$value_type][] = $item; unset($items[$key]); } if ($limit == 1) { foreach ($items_by_type as $value_type => $items) { $history_table = self::getTableName($value_type); $max_clock_per_item = DBselect( 'SELECT h.itemid,MAX(h.clock) AS clock'. ' FROM '.$history_table.' h'. ' WHERE '.dbConditionId('h.itemid', array_column($items, 'itemid')). ($period !== null ? ' AND h.clock>'.$period : ''). ' GROUP BY h.itemid' ); while ($itemid_clock = DBfetch($max_clock_per_item, false)) { $db_value = DBfetchArray(DBselect( 'SELECT *'. ' FROM '.$history_table.' h'. ' WHERE h.itemid='.zbx_dbstr($itemid_clock['itemid']). ' AND h.clock='.zbx_dbstr($itemid_clock['clock']). ' ORDER BY h.ns DESC', $limit )); if ($db_value) { $results[$itemid_clock['itemid']] = $db_value; } } } } else { foreach ($items_by_type as $value_type => $items) { $history_table = self::getTableName($value_type); foreach ($items as $item) { $db_values = DBselect('SELECT *'. ' FROM '.$history_table.' h'. ' WHERE h.itemid='.zbx_dbstr($item['itemid']). ($period !== null ? ' AND h.clock>'.$period : ''). ' ORDER BY h.clock DESC,h.ns DESC', $limit ); $values = []; while ($db_value = DBfetch($db_values, false)) { $values[] = $db_value; } if ($values) { $results[$item['itemid']] = $values; } } } } return $results; } /** * SQL specific implementation of getLastValues. * * @see CHistoryManager::getLastValues */ private function getLastValuesFromSql($items, $limit, $period) { $results = []; if (CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY_GLOBAL) == 1) { $hk_history = timeUnitToSeconds(CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY)); $period = $period !== null ? min($period, $hk_history) : $hk_history; } if ($period !== null) { $period = time() - $period; } if ($limit == 1) { foreach ($items as $item) { // Executing two subsequent queries individually for the sake of performance. $clock_max = DBfetch(DBselect( 'SELECT MAX(h.clock)'. ' FROM '.self::getTableName($item['value_type']).' h'. ' WHERE h.itemid='.zbx_dbstr($item['itemid']). ($period !== null ? ' AND h.clock>'.$period : '') ), false); if ($clock_max) { $clock_max = reset($clock_max); if ($clock_max !== null) { $values = DBfetchArray(DBselect( 'SELECT *'. ' FROM '.self::getTableName($item['value_type']).' h'. ' WHERE h.itemid='.zbx_dbstr($item['itemid']). ' AND h.clock='.zbx_dbstr($clock_max). ' ORDER BY h.ns DESC', $limit )); if ($values) { $results[$item['itemid']] = $values; } } } } } else { foreach ($items as $item) { // Cannot order by h.ns directly here due to performance issues. $values = DBfetchArray(DBselect( 'SELECT *'. ' FROM '.self::getTableName($item['value_type']).' h'. ' WHERE h.itemid='.zbx_dbstr($item['itemid']). ($period !== null ? ' AND h.clock>'.$period : ''). ' ORDER BY h.clock DESC', $limit + 1 )); if ($values) { $count = count($values); $clock = $values[$count - 1]['clock']; if ($count == $limit + 1 && $values[$count - 2]['clock'] == $clock) { /* * The last selected entries having the same clock means the selection (not just the order) * of the last entries is possibly wrong due to unordered by nanoseconds. */ do { unset($values[--$count]); } while ($values && $values[$count - 1]['clock'] == $clock); $db_values = DBselect( 'SELECT *'. ' FROM '.self::getTableName($item['value_type']).' h'. ' WHERE h.itemid='.zbx_dbstr($item['itemid']). ' AND h.clock='.$clock. ' ORDER BY h.ns DESC', $limit - $count ); while ($db_value = DBfetch($db_values)) { $values[] = $db_value; $count++; } } CArrayHelper::sort($values, [ ['field' => 'clock', 'order' => ZBX_SORT_DOWN], ['field' => 'ns', 'order' => ZBX_SORT_DOWN] ]); $values = array_values($values); while ($count > $limit) { unset($values[--$count]); } $results[$item['itemid']] = $values; } } } return $results; } /** * Returns the history data of the item at the given time. If no data exists at the given time, the function will * return the previous data. * * The $item parameter must have the value_type and itemid properties set. * * @param array $item * @param string $item['itemid'] * @param int $item['value_type'] * @param int $clock * @param int $ns * * @return array|null Item data at specified time of first data before specified time. null if data is not found. */ public function getValueAt(array $item, $clock, $ns) { switch (self::getDataSourceType($item['value_type'])) { case ZBX_HISTORY_SOURCE_ELASTIC: return $this->getValueAtFromElasticsearch($item, $clock, $ns); default: return $this->primary_keys_enabled ? $this->getValueAtFromSqlWithPk($item, $clock, $ns) : $this->getValueAtFromSql($item, $clock, $ns); } } /** * Elasticsearch specific implementation of getValueAt. * * @see CHistoryManager::getValueAt */ private function getValueAtFromElasticsearch(array $item, $clock, $ns) { $query = [ 'sort' => [ 'clock' => ZBX_SORT_DOWN, 'ns' => ZBX_SORT_DOWN ], 'size' => 1 ]; $filters = [ [ [ 'term' => [ 'itemid' => $item['itemid'] ] ], [ 'term' => [ 'clock' => $clock ] ], [ 'range' => [ 'ns' => [ 'lte' => $ns ] ] ] ], [ [ 'term' => [ 'itemid' => $item['itemid'] ] ], [ 'range' => [ 'clock' => [ 'lt' => $clock, 'gte' => $clock - timeUnitToSeconds(CSettingsHelper::get(CSettingsHelper::HISTORY_PERIOD)) ] ] ] ] ]; foreach ($filters as $filter) { $query['query']['bool']['must'] = $filter; $endpoints = self::getElasticsearchEndpoints($item['value_type']); if (count($endpoints) !== 1) { break; } $result = CElasticsearchHelper::query('POST', reset($endpoints), $query); if (count($result) === 1 && is_array($result[0]) && array_key_exists('value', $result[0])) { return $result[0]; } } return null; } /** * Implementation that uses existence of primary key in history tables. * @see CHistoryManager->getValueAtFromSql() * * @param string $item['itemid'] * @param int $item['value_type'] * @param int $clock * @param int $ns * * @return array|null Item data at specified time of first data before specified time. null if data is not found. */ private function getValueAtFromSqlWithPk(array $item, $clock, $ns): ?array { $hk_history_global = CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY_GLOBAL); $hk_history = timeUnitToSeconds(CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY)); if ($hk_history_global == 1 && $clock <= time() - $hk_history) { return null; } $history_table = self::getTableName($item['value_type']); $sql = 'SELECT *'. ' FROM '.$history_table. ' WHERE itemid='.zbx_dbstr($item['itemid']). ' AND clock='.zbx_dbstr($clock). ' AND ns<='.zbx_dbstr($ns). ' ORDER BY ns DESC'; if (($row = DBfetch(DBselect($sql, 1))) !== false) { return $row; } $time_from = $clock - timeUnitToSeconds(CSettingsHelper::get(CSettingsHelper::HISTORY_PERIOD)); if ($hk_history_global == 1) { $time_from = max($time_from, time() - $hk_history + 1); } $sql = 'SELECT *'. ' FROM '.$history_table. ' WHERE itemid='.zbx_dbstr($item['itemid']). ' AND clock<'.zbx_dbstr($clock). ' AND clock>='.zbx_dbstr($time_from). ' ORDER BY clock DESC,ns DESC'; if (($row = DBfetch(DBselect($sql, 1))) !== false) { return $row; } return null; } /** * SQL specific implementation of getValueAt. * * @see CHistoryManager::getValueAt */ private function getValueAtFromSql(array $item, $clock, $ns) { $hk_history_global = CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY_GLOBAL); $hk_history = timeUnitToSeconds(CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY)); if ($hk_history_global == 1 && $clock <= time() - $hk_history) { return null; } $result = null; $table = self::getTableName($item['value_type']); $sql = 'SELECT *'. ' FROM '.$table. ' WHERE itemid='.zbx_dbstr($item['itemid']). ' AND clock='.zbx_dbstr($clock). ' AND ns='.zbx_dbstr($ns); if (($row = DBfetch(DBselect($sql, 1))) !== false) { $result = $row; } if ($result !== null) { return $result; } $max_clock = 0; $sql = 'SELECT DISTINCT clock'. ' FROM '.$table. ' WHERE itemid='.zbx_dbstr($item['itemid']). ' AND clock='.zbx_dbstr($clock). ' AND ns<'.zbx_dbstr($ns); if (($row = DBfetch(DBselect($sql))) !== false) { $max_clock = $row['clock']; } if ($max_clock == 0) { $time_from = $clock - timeUnitToSeconds(CSettingsHelper::get(CSettingsHelper::HISTORY_PERIOD)); if ($hk_history_global == 1) { $time_from = max($time_from, time() - $hk_history + 1); } $sql = 'SELECT MAX(clock) AS clock'. ' FROM '.$table. ' WHERE itemid='.zbx_dbstr($item['itemid']). ' AND clock<'.zbx_dbstr($clock). ' AND clock>='.zbx_dbstr($time_from); if (($row = DBfetch(DBselect($sql))) !== false) { $max_clock = $row['clock']; } } if ($max_clock == 0) { return $result; } if ($clock == $max_clock) { $sql = 'SELECT *'. ' FROM '.$table. ' WHERE itemid='.zbx_dbstr($item['itemid']). ' AND clock='.zbx_dbstr($clock). ' AND ns<'.zbx_dbstr($ns); } else { $sql = 'SELECT *'. ' FROM '.$table. ' WHERE itemid='.zbx_dbstr($item['itemid']). ' AND clock='.zbx_dbstr($max_clock). ' ORDER BY itemid,clock desc,ns desc'; } if (($row = DBfetch(DBselect($sql, 1))) !== false) { $result = $row; } return $result; } /** * Returns history value aggregation. * * The $item parameter must have the value_type, itemid and source properties set. * * @param array $items Items to get aggregated values for. * @param int $time_from Minimal timestamp (seconds) to get data from. * @param int $time_to Maximum timestamp (seconds) to get data from. * @param string $function Function for data aggregation. * @param string $interval Aggregation interval in seconds. * * @return array History value aggregation. */ public function getAggregationByInterval(array $items, $time_from, $time_to, $function, $interval) { $grouped_items = $this->getItemsGroupedByStorage($items); $results = []; if (array_key_exists(ZBX_HISTORY_SOURCE_ELASTIC, $grouped_items)) { $results = $this->getAggregationByIntervalFromElasticsearch($grouped_items[ZBX_HISTORY_SOURCE_ELASTIC], $time_from, $time_to, $function, $interval ); } if (array_key_exists(ZBX_HISTORY_SOURCE_SQL, $grouped_items)) { $results += $this->getAggregationByIntervalFromSql($grouped_items[ZBX_HISTORY_SOURCE_SQL], $time_from, $time_to, $function, $interval ); } return $results; } /** * Elasticsearch specific implementation of getAggregationByInterval. * * @see CHistoryManager::getAggregationByInterval */ private function getAggregationByIntervalFromElasticsearch(array $items, $time_from, $time_to, $function, $interval) { $terms = []; foreach ($items as $item) { $terms[$item['value_type']][] = $item['itemid']; } $aggs = ['clock' => ['max' => ['field' => 'clock']]]; switch ($function) { case AGGREGATE_MIN: $aggs['value'] = ['min' => ['field' => 'value']]; break; case AGGREGATE_MAX: $aggs['value'] = ['max' => ['field' => 'value']]; break; case AGGREGATE_AVG: $aggs['value'] = ['avg' => ['field' => 'value']]; break; case AGGREGATE_SUM: $aggs['value'] = ['sum' => ['field' => 'value']]; break; case AGGREGATE_FIRST: $aggs['value'] = ['top_hits' => ['size' => 1, 'sort' => ['clock' => ['order' => 'asc']]]]; $aggs['clock'] = ['min' => ['field' => 'clock']]; break; case AGGREGATE_LAST: $aggs['value'] = ['top_hits' => ['size' => 1, 'sort' => ['clock' => ['order' => 'desc']]]]; break; } $query = [ 'aggs' => [ 'group_by_itemid' => [ 'terms' => [ // Assure that aggregations for all terms are returned. 'size' => count($items), 'field' => 'itemid' ] ] ], 'size' => 0 ]; // Clock value is divided by 1000 as it is stored as milliseconds. $formula = '((doc[\'clock\'].date.getMillis()/1000) - ((doc[\'clock\'].date.getMillis()/1000)%params.interval))'; $query['aggs']['group_by_itemid']['aggs'] = [ 'group_by_script' => [ 'terms' => [ 'size' => (($time_to - $time_from) / $interval) + 1, 'script' => [ 'inline' => $formula, 'params' => [ 'interval' => $interval ] ] ], 'aggs' => $aggs ] ]; $results = []; foreach (self::getElasticsearchEndpoints(array_keys($terms)) as $type => $endpoint) { $query['query']['bool']['must'] = [ [ 'terms' => [ 'itemid' => $terms[$type] ] ], [ 'range' => [ 'clock' => [ 'gte' => $time_from, 'lte' => $time_to ] ] ] ]; $data = CElasticsearchHelper::query('POST', $endpoint, $query); foreach ($data['group_by_itemid']['buckets'] as $item) { if (!is_array($item['group_by_script']) || !array_key_exists('buckets', $item['group_by_script']) || !is_array($item['group_by_script']['buckets'])) { continue; } foreach ($item['group_by_script']['buckets'] as $point) { $row = [ 'itemid' => $item['key'], 'tick' => (int)$point['key'], 'count' => $point['doc_count'], 'clock' => (int)$point['clock']['value_as_string'] ]; if ($function == AGGREGATE_FIRST || $function == AGGREGATE_LAST) { $row['value'] = $point['value']['hits']['hits'][0]['_source']['value']; } else { $row['value'] = array_key_exists('value', $point) ? $point['value']['value'] : null; } $results[$item['key']]['data'][] = $row; } if (array_key_exists($item['key'], $results)) { $results[$item['key']]['source'] = 'history'; } } } return $results; } /** * SQL specific implementation of getAggregationByInterval. * * @see CHistoryManager::getAggregationByInterval */ private function getAggregationByIntervalFromSql(array $items, $time_from, $time_to, $function, $interval) { $items_by_table = []; foreach ($items as $item) { $items_by_table[$item['value_type']][$item['source']][] = $item['itemid']; } $result = []; $hk_history_global = CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY_GLOBAL); $hk_history = timeUnitToSeconds(CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY)); $hk_trends_global = CHousekeepingHelper::get(CHousekeepingHelper::HK_TRENDS_GLOBAL); $hk_trends = timeUnitToSeconds(CHousekeepingHelper::get(CHousekeepingHelper::HK_TRENDS)); foreach ($items_by_table as $value_type => $items_by_source) { foreach ($items_by_source as $source => $itemids) { $sql_select = ['itemid']; $sql_group_by = ['itemid']; $calc_field = zbx_dbcast_2bigint('clock').'-'.zbx_sql_mod(zbx_dbcast_2bigint('clock'), $interval); $sql_select[] = $calc_field.' AS tick'; $sql_group_by[] = $calc_field; if ($source === 'history') { switch ($function) { case AGGREGATE_MIN: $sql_select[] = 'MIN(value) AS value, MAX(clock) AS clock'; break; case AGGREGATE_MAX: $sql_select[] = 'MAX(value) AS value, MAX(clock) AS clock'; break; case AGGREGATE_AVG: $sql_select[] = 'AVG(value) AS value, MAX(clock) AS clock'; break; case AGGREGATE_COUNT: $sql_select[] = 'COUNT(*) AS count, MAX(clock) AS clock'; break; case AGGREGATE_SUM: $sql_select[] = 'SUM(value) AS value, MAX(clock) AS clock'; break; case AGGREGATE_FIRST: $sql_select[] = 'MIN(clock) AS clock'; break; case AGGREGATE_LAST: $sql_select[] = 'MAX(clock) AS clock'; break; } $sql_from = ($value_type == ITEM_VALUE_TYPE_UINT64) ? 'history_uint' : 'history'; $_time_from = $hk_history_global == 1 ? max($time_from, time() - $hk_history + 1) : $time_from; } else { switch ($function) { case AGGREGATE_MIN: $sql_select[] = 'MIN(value_min) AS value, MAX(clock) AS clock'; break; case AGGREGATE_MAX: $sql_select[] = 'MAX(value_max) AS value, MAX(clock) AS clock'; break; case AGGREGATE_AVG: $sql_select[] = 'AVG(value_avg) AS value, MAX(clock) AS clock'; break; case AGGREGATE_COUNT: $sql_select[] = 'SUM(num) AS count, MAX(clock) AS clock'; break; case AGGREGATE_SUM: $sql_select[] = 'SUM(value_avg * num) AS value, MAX(clock) AS clock'; break; case AGGREGATE_FIRST: $sql_select[] = 'MIN(clock) AS clock'; break; case AGGREGATE_LAST: $sql_select[] = 'MAX(clock) AS clock'; break; } $sql_from = ($value_type == ITEM_VALUE_TYPE_UINT64) ? 'trends_uint' : 'trends'; $_time_from = $hk_trends_global == 1 ? max($time_from, time() - $hk_trends + 1) : $time_from; } $sql = 'SELECT '.implode(', ', $sql_select). ' FROM '.$sql_from. ' WHERE '.dbConditionInt('itemid', $itemids). ' AND clock>='.zbx_dbstr($_time_from). ' AND clock<='.zbx_dbstr($time_to). ' GROUP BY '.implode(', ', $sql_group_by); if ($function == AGGREGATE_FIRST || $function == AGGREGATE_LAST) { $sql = 'SELECT DISTINCT h.itemid,h.'.($source === 'history' ? 'value' : 'value_avg').' AS value,h.clock,hi.tick'. ' FROM '.$sql_from.' h'. ' JOIN('.$sql.') hi ON h.itemid = hi.itemid AND h.clock = hi.clock'; } $sql_result = DBselect($sql); while (($row = DBfetch($sql_result)) !== false) { $result[$row['itemid']]['source'] = $source; $result[$row['itemid']]['data'][] = $row; } } } return $result; } /** * Returns history value aggregation for graphs. * * The $item parameter must have the value_type, itemid and source properties set. * * @param array $items Items to get aggregated values for. * @param int $time_from Minimal timestamp (seconds) to get data from. * @param int $time_to Maximum timestamp (seconds) to get data from. * @param int $width Graph width in pixels (is not required for pie charts). * * @return array History value aggregation for graphs. */ public function getGraphAggregationByWidth(array $items, $time_from, $time_to, $width = null) { $grouped_items = $this->getItemsGroupedByStorage($items); $results = []; if (array_key_exists(ZBX_HISTORY_SOURCE_ELASTIC, $grouped_items)) { $results += $this->getGraphAggregationByWidthFromElasticsearch($grouped_items[ZBX_HISTORY_SOURCE_ELASTIC], $time_from, $time_to, $width ); } if (array_key_exists(ZBX_HISTORY_SOURCE_SQL, $grouped_items)) { $results += $this->getGraphAggregationByWidthFromSql($grouped_items[ZBX_HISTORY_SOURCE_SQL], $time_from, $time_to, $width ); } return $results; } /** * Elasticsearch specific implementation of getGraphAggregationByWidth. * * @see CHistoryManager::getGraphAggregationByWidth */ private function getGraphAggregationByWidthFromElasticsearch(array $items, $time_from, $time_to, $width) { $terms = []; foreach ($items as $item) { $terms[$item['value_type']][] = $item['itemid']; } $aggs = [ 'max_value' => [ 'max' => [ 'field' => 'value' ] ], 'avg_value' => [ 'avg' => [ 'field' => 'value' ] ], 'min_value' => [ 'min' => [ 'field' => 'value' ] ], 'max_clock' => [ 'max' => [ 'field' => 'clock' ] ] ]; $query = [ 'aggs' => [ 'group_by_itemid' => [ 'terms' => [ // Assure that aggregations for all terms are returned. 'size' => count($items), 'field' => 'itemid' ] ] ], 'query' => [ 'bool' => [ 'must' => [ [ 'terms' => [ 'itemid' => $terms ] ], [ 'range' => [ 'clock' => [ 'gte' => $time_from, 'lte' => $time_to ] ] ] ] ] ], 'size' => 0 ]; if ($width !== null) { $period = $time_to - $time_from; // Additional grouping for line graphs. $aggs['max_clock'] = [ 'max' => [ 'field' => 'clock' ] ]; // Clock value is divided by 1000 as it is stored as milliseconds. $formula = "Math.round(params.width*(doc['clock'].value.getMillis()/1000-params.time_from)/params.period)"; $script = [ 'inline' => $formula, 'params' => [ 'width' => (int)$width, 'time_from' => $time_from, 'period' => $period ] ]; $aggs = [ 'group_by_script' => [ 'terms' => [ 'size' => $width, 'script' => $script ], 'aggs' => $aggs ] ]; } $query['aggs']['group_by_itemid']['aggs'] = $aggs; $results = []; foreach (self::getElasticsearchEndpoints(array_keys($terms)) as $type => $endpoint) { $query['query']['bool']['must'] = [ [ 'terms' => [ 'itemid' => $terms[$type] ] ], [ 'range' => [ 'clock' => [ 'gte' => $time_from, 'lte' => $time_to ] ] ] ]; $data = CElasticsearchHelper::query('POST', $endpoint, $query); if (array_key_exists('group_by_itemid', $data)) { if ($width !== null) { foreach ($data['group_by_itemid']['buckets'] as $item) { if (!is_array($item['group_by_script']) || !array_key_exists('buckets', $item['group_by_script']) || !is_array($item['group_by_script']['buckets'])) { continue; } $results[$item['key']]['source'] = 'history'; foreach ($item['group_by_script']['buckets'] as $point) { $results[$item['key']]['data'][] = [ 'itemid' => $item['key'], 'i' => $point['key'], 'count' => $point['doc_count'], 'min' => $point['min_value']['value'], 'avg' => $point['avg_value']['value'], 'max' => $point['max_value']['value'], // Field value_as_string is used to get value as seconds instead of milliseconds. 'clock' => $point['max_clock']['value_as_string'] ]; } } } else { foreach ($data['group_by_itemid']['buckets'] as $item) { $results[$item['key']]['source'] = 'history'; $results[$item['key']]['data'][] = [ 'itemid' => $item['key'], 'min' => $item['min_value']['value'], 'avg' => $item['avg_value']['value'], 'max' => $item['max_value']['value'], // Field value_as_string is used to get value as seconds instead of milliseconds. 'clock' => $item['max_clock']['value_as_string'] ]; } } } } return $results; } /** * SQL specific implementation of getGraphAggregationByWidth. * * @see CHistoryManager::getGraphAggregationByWidth */ private function getGraphAggregationByWidthFromSql(array $items, $time_from, $time_to, $width) { $group_by = 'itemid'; $sql_select_extra = ''; if ($width !== null) { $period = $time_to - $time_from; // Required for 'group by' support of Oracle. $calc_field = 'round('.$width.'*('.zbx_dbcast_2bigint('clock').'-'.$time_from.')/'.$period.',0)'; $sql_select_extra = ','.$calc_field.' AS i'; $group_by .= ','.$calc_field; } $results = []; $hk_history_global = CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY_GLOBAL); $hk_history = timeUnitToSeconds(CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY)); $hk_trends_global = CHousekeepingHelper::get(CHousekeepingHelper::HK_TRENDS_GLOBAL); $hk_trends = timeUnitToSeconds(CHousekeepingHelper::get(CHousekeepingHelper::HK_TRENDS)); foreach ($items as $item) { if ($item['source'] === 'history') { $sql_select = 'COUNT(*) AS count,AVG(value) AS avg,MIN(value) AS min,MAX(value) AS max'; $sql_from = $item['value_type'] == ITEM_VALUE_TYPE_UINT64 ? 'history_uint' : 'history'; $_time_from = $hk_history_global == 1 ? max($time_from, time() - $hk_history + 1) : $time_from; } else { $sql_select = 'SUM(num) AS count,AVG(value_avg) AS avg,MIN(value_min) AS min,MAX(value_max) AS max'; $sql_from = ($item['value_type'] == ITEM_VALUE_TYPE_UINT64) ? 'trends_uint' : 'trends'; $_time_from = $hk_trends_global == 1 ? max($time_from, time() - $hk_trends + 1) : $time_from; } $data = []; if ($_time_from <= $time_to) { $result = DBselect( 'SELECT itemid,'.$sql_select.$sql_select_extra.',MAX(clock) AS clock'. ' FROM '.$sql_from. ' WHERE itemid='.zbx_dbstr($item['itemid']). ' AND clock>='.zbx_dbstr($_time_from). ' AND clock<='.zbx_dbstr($time_to). ' GROUP BY '.$group_by ); while (($row = DBfetch($result)) !== false) { $data[] = $row; } } $results[$item['itemid']]['source'] = $item['source']; $results[$item['itemid']]['data'] = $data; } return $results; } /** * Returns aggregated history value. * * The $item parameter must have the value_type and itemid properties set. * * @param array $item Item to get aggregated value for. * @param string $aggregation Aggregation to be applied (min / max / avg). * @param int $time_from Timestamp (seconds). * * @return string Aggregated history value. */ public function getAggregatedValue(array $item, $aggregation, $time_from) { switch (self::getDataSourceType($item['value_type'])) { case ZBX_HISTORY_SOURCE_ELASTIC: return $this->getAggregatedValueFromElasticsearch($item, $aggregation, $time_from); default: return $this->getAggregatedValueFromSql($item, $aggregation, $time_from); } } /** * Elasticsearch specific implementation of getAggregatedValue. * * @see CHistoryManager::getAggregatedValue */ private function getAggregatedValueFromElasticsearch(array $item, $aggregation, $time_from) { $query = [ 'aggs' => [ $aggregation.'_value' => [ $aggregation => [ 'field' => 'value' ] ] ], 'query' => [ 'bool' => [ 'must' => [ [ 'term' => [ 'itemid' => $item['itemid'] ] ], [ 'range' => [ 'clock' => [ 'gte' => $time_from ] ] ] ] ] ], 'size' => 0 ]; $endpoints = self::getElasticsearchEndpoints($item['value_type']); if ($endpoints) { $data = CElasticsearchHelper::query('POST', reset($endpoints), $query); if (array_key_exists($aggregation.'_value', $data) && array_key_exists('value', $data[$aggregation.'_value'])) { return $data[$aggregation.'_value']['value']; } } return null; } /** * SQL specific implementation of getAggregatedValue. * * @see CHistoryManager::getAggregatedValue */ private function getAggregatedValueFromSql(array $item, $aggregation, $time_from) { if (CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY_GLOBAL) == 1) { $hk_history = timeUnitToSeconds(CHousekeepingHelper::get(CHousekeepingHelper::HK_HISTORY)); $time_from = max($time_from, time() - $hk_history); } $result = DBselect( 'SELECT '.$aggregation.'(value) AS value'. ' FROM '.self::getTableName($item['value_type']). ' WHERE clock>'.$time_from. ' AND itemid='.zbx_dbstr($item['itemid']). ' HAVING COUNT(*)>0' // Necessary because DBselect() return 0 if empty data set, for graph templates. ); if (($row = DBfetch($result)) !== false) { return $row['value']; } return null; } /** * Clear item history and trends by provided item IDs. History is deleted from both SQL and Elasticsearch. * * @param array $items Key - itemid, value - value_type. * * @return bool */ public function deleteHistory(array $items) { return $this->deleteHistoryFromSql($items) && $this->deleteHistoryFromElasticsearch(array_keys($items)); } /** * Elasticsearch specific implementation of deleteHistory. * * @see CHistoryManager::deleteHistory */ private function deleteHistoryFromElasticsearch(array $itemids) { global $HISTORY; if (is_array($HISTORY) && array_key_exists('types', $HISTORY) && is_array($HISTORY['types']) && count($HISTORY['types']) > 0) { $query = [ 'query' => [ 'terms' => [ 'itemid' => array_values($itemids) ] ] ]; $types = []; foreach ($HISTORY['types'] as $type) { $types[] = self::getTypeIdByTypeName($type); } foreach (self::getElasticsearchEndpoints($types, '_delete_by_query') as $endpoint) { if (!CElasticsearchHelper::query('POST', $endpoint, $query)) { return false; } } } return true; } /** * SQL specific implementation of deleteHistory. * * @see CHistoryManager::deleteHistory */ private function deleteHistoryFromSql(array $items) { global $DB; $item_tables = array_map([self::class, 'getTableName'], array_unique($items)); $table_names = array_flip(self::getTableName()); if (in_array(ITEM_VALUE_TYPE_UINT64, $items)) { $item_tables[] = 'trends_uint'; $table_names['trends_uint'] = ITEM_VALUE_TYPE_UINT64; } if (in_array(ITEM_VALUE_TYPE_FLOAT, $items)) { $item_tables[] = 'trends'; $table_names['trends'] = ITEM_VALUE_TYPE_FLOAT; } if ($DB['TYPE'] == ZBX_DB_POSTGRESQL && PostgresqlDbBackend::isCompressed($item_tables)) { error(_('Some of the history for this item may be compressed, deletion is not available.')); return false; } foreach ($item_tables as $table_name) { $itemids = array_keys(array_intersect($items, [(string) $table_names[$table_name]])); if (!DBexecute('DELETE FROM '.$table_name.' WHERE '.dbConditionInt('itemid', $itemids))) { return false; } } return true; } /** * Get type name by value type id. * * @param int $value_type Value type id. * * @return string Value type name. */ public static function getTypeNameByTypeId($value_type) { $mapping = [ ITEM_VALUE_TYPE_FLOAT => 'dbl', ITEM_VALUE_TYPE_STR => 'str', ITEM_VALUE_TYPE_LOG => 'log', ITEM_VALUE_TYPE_UINT64 => 'uint', ITEM_VALUE_TYPE_TEXT => 'text', ITEM_VALUE_TYPE_BINARY => 'binary' ]; if (array_key_exists($value_type, $mapping)) { return $mapping[$value_type]; } // Fallback to float. return $mapping[ITEM_VALUE_TYPE_FLOAT]; } /** * Get type id by value type name. * * @param int $type_name Value type name. * * @return int Value type id. */ public static function getTypeIdByTypeName($type_name) { $mapping = [ 'dbl' => ITEM_VALUE_TYPE_FLOAT, 'str' => ITEM_VALUE_TYPE_STR, 'log' => ITEM_VALUE_TYPE_LOG, 'uint' => ITEM_VALUE_TYPE_UINT64, 'text' => ITEM_VALUE_TYPE_TEXT, 'binary' => ITEM_VALUE_TYPE_BINARY ]; if (array_key_exists($type_name, $mapping)) { return $mapping[$type_name]; } // Fallback to float. return ITEM_VALUE_TYPE_FLOAT; } /** * Get data source (SQL or Elasticsearch) type based on value type id. * * @param int $value_type Value type id. * * @return string Data source type. */ public static function getDataSourceType($value_type) { static $cache = []; if (!array_key_exists($value_type, $cache)) { global $HISTORY; if (is_array($HISTORY) && array_key_exists('types', $HISTORY) && is_array($HISTORY['types'])) { $cache[$value_type] = in_array(self::getTypeNameByTypeId($value_type), $HISTORY['types']) ? ZBX_HISTORY_SOURCE_ELASTIC : ZBX_HISTORY_SOURCE_SQL; } else { // SQL is a fallback data source. $cache[$value_type] = ZBX_HISTORY_SOURCE_SQL; } } return $cache[$value_type]; } private static function getElasticsearchUrl($value_name) { static $urls = []; static $invalid = []; // Additional check to limit error count produced by invalid configuration. if (array_key_exists($value_name, $invalid)) { return null; } if (!array_key_exists($value_name, $urls)) { global $HISTORY; if (!is_array($HISTORY) || !array_key_exists('url', $HISTORY)) { $invalid[$value_name] = true; error(_s('Elasticsearch URL is not set for type: %1$s.', $value_name)); return null; } $url = $HISTORY['url']; if (is_array($url)) { if (!array_key_exists($value_name, $url)) { $invalid[$value_name] = true; error(_s('Elasticsearch URL is not set for type: %1$s.', $value_name)); return null; } $url = $url[$value_name]; } if (substr($url, -1) !== '/') { $url .= '/'; } $urls[$value_name] = $url; } return $urls[$value_name]; } /** * Get endpoints for Elasticsearch requests. * * @param mixed $value_types Value type(s). * * @return array Elasticsearch query endpoints. */ public static function getElasticsearchEndpoints($value_types, $action = '_search') { if (!is_array($value_types)) { $value_types = [$value_types]; } $indices = []; $endpoints = []; foreach (array_unique($value_types) as $type) { if (self::getDataSourceType($type) === ZBX_HISTORY_SOURCE_ELASTIC) { $indices[$type] = self::getTypeNameByTypeId($type); } } foreach ($indices as $type => $index) { if (($url = self::getElasticsearchUrl($index)) !== null) { $endpoints[$type] = $url.$index.'*/'.$action; } } return $endpoints; } /** * Return the name of the table where the data for the given value type is stored. * * @param int $value_type Value type. * * @return string|array Table name | all tables. */ public static function getTableName($value_type = null) { $tables = [ ITEM_VALUE_TYPE_LOG => 'history_log', ITEM_VALUE_TYPE_TEXT => 'history_text', ITEM_VALUE_TYPE_STR => 'history_str', ITEM_VALUE_TYPE_FLOAT => 'history', ITEM_VALUE_TYPE_UINT64 => 'history_uint', ITEM_VALUE_TYPE_BINARY => 'history_bin' ]; return ($value_type === null) ? $tables : $tables[$value_type]; } /** * Returns the items grouped by the storage type. * * @param array $items An array of items with the 'value_type' property. * * @return array An array with storage type as a keys and item arrays as a values. */ private function getItemsGroupedByStorage(array $items) { $grouped_items = []; foreach ($items as $item) { $source = self::getDataSourceType($item['value_type']); $grouped_items[$source][] = $item; } return $grouped_items; } }