[ 'header' => "Content-Type: application/json; charset=UTF-8", 'method' => $method, 'ignore_errors' => true // To get error messages from Elasticsearch. ] ]; if ($request) { $request = json_encode($request); $options['http']['content'] = $request; } try { $result = file_get_contents($endpoint, false, stream_context_create($options)); } catch (Exception $e) { error($e->getMessage()); } CProfiler::getInstance()->profileElasticsearch(microtime(true) - $time_start, $method, $endpoint, $request); return $result; } /** * Get Elasticsearch endpoint for scroll API requests. * Endpoint should be in following format: ///. * * @param string $endpoint endpoint of the initial request * * @return array parsed result */ private static function getScrollApiEndpoint($endpoint) { $url = $endpoint; for ($i = 0; $i < 2; $i++) { if (($pos = strrpos($url, '/')) !== false) { $url = substr($url, 0, $pos); } else { // Endpoint is in different format, no way to get scroll API url. error(_s('Elasticsearch error: %1$s.', _('cannot perform Scroll API request, data could be truncated')) ); return null; } } return $url.'/_search/scroll'; } /** * Perform request(s) to Elasticsearch and parse the results. * * @param string $method HTTP method to be used to perform request * @param string $endpoint requested url * @param mixed $request data to be sent * * @return array parsed result */ public static function query($method, $endpoint, $request = null) { $parse_as = ELASTICSEARCH_RESPONSE_PLAIN; // For non-search requests no additional parsing is done. if (substr($endpoint, -strlen('/_search')) === '/_search') { $parse_as = ELASTICSEARCH_RESPONSE_DOCUMENTS; if (is_array($request) && array_key_exists('aggs', $request)) { $parse_as = (array_key_exists('size', $request) && $request['size'] == 0) ? ELASTICSEARCH_RESPONSE_AGGREGATION : ELASTICSEARCH_RESPONSE_PLAIN; } } if (is_array($request) && (!array_key_exists('size', $request) || $request['size'] > self::MAX_RESULT_WINDOW)) { // Scroll API should be used to retrieve all data. $results = []; $limit = array_key_exists('size', $request) ? $request['size'] : null; $request['size'] = self::MAX_RESULT_WINDOW; self::$scroll_id = null; self::$scrolls = []; $scroll_endpoint = self::getScrollApiEndpoint($endpoint); if ($scroll_endpoint !== null) { $endpoint .= '?scroll='.self::KEEP_CONTEXT_PERIOD; } $slice = self::parseResult(self::request($method, $endpoint, $request), $parse_as); $results = array_merge($results, $slice); if (self::$scroll_id === null) { $slice = null; // Reset slice if there is no scroll_id. } $endpoint = $scroll_endpoint; while ($slice) { if (count($slice) < self::MAX_RESULT_WINDOW) { // No need to continue as there are no more data. break; } $scroll = [ 'scroll' => self::KEEP_CONTEXT_PERIOD, 'scroll_id' => self::$scroll_id ]; $slice = self::parseResult(self::request($method, $endpoint, $scroll), $parse_as); $results = array_merge($results, $slice); if ($limit !== null && count($results) >= $limit) { $results = array_slice($results, 0, $limit); // No need to perform additional queries as limit is reached. break; } } // Scrolls should be deleted when they are not required anymore. if (count(self::$scrolls) > 0) { self::request('DELETE', $endpoint, ['scroll_id' => array_keys(self::$scrolls)]); } return $results; } return self::parseResult(self::request($method, $endpoint, $request), $parse_as); } /** * Parse result based on request data. * * @param string $data result as a string * @param int $parse_as result type * * @return array parsed result */ private static function parseResult($data, $parse_as) { $result = json_decode($data, TRUE); if (!is_array($result)) { error(_s('Elasticsearch error: %1$s.', _('failed to parse JSON'))); return []; } if (array_key_exists('error', $result)) { $error = (is_array($result['error']) && array_key_exists('reason', $result['error'])) ? $result['error']['reason'] : _('Unknown error'); error(_s('Elasticsearch error: %1$s.', $error)); return []; } if (array_key_exists('_scroll_id', $result)) { self::$scroll_id = $result['_scroll_id']; self::$scrolls[self::$scroll_id] = true; } switch ($parse_as) { // Return aggregations only. case ELASTICSEARCH_RESPONSE_AGGREGATION: if (array_key_exists('aggregations', $result) && is_array($result['aggregations'])) { return $result['aggregations']; } break; // Return documents only. case ELASTICSEARCH_RESPONSE_DOCUMENTS: if (array_key_exists('hits', $result) && array_key_exists('hits', $result['hits'])) { $values = []; foreach ($result['hits']['hits'] as $row) { if (!array_key_exists('_source', $row)) { continue; } $values[] = $row['_source']; } return $values; } break; // Return result "as is". case ELASTICSEARCH_RESPONSE_PLAIN: return $result; } return []; } /** * Add filters to Elasticsearch query. * * @param array $schema DB schema * @param array $query Elasticsearch query. * @param array $options Filtering options. * * @return array Elasticsearch query with added filtering */ public static function addFilter($schema, $query, $options) { foreach ($options['filter'] as $field => $value) { // Skip missing fields, textual fields (different mapping is needed for exact matching) and empty values. if ($value === null || !array_key_exists($field, $schema['fields'])) { continue; } $field_type = $schema['fields'][$field]['type']; if (in_array($field_type, [DB::FIELD_TYPE_TEXT, DB::FIELD_TYPE_NCLOB, DB::FIELD_TYPE_CHAR])) { continue; } if ($options['searchByAny']) { $type = 'should'; $query['minimum_should_match'] = 1; } else { $type = 'must'; } $query['query']['bool'][$type][] = [ 'terms' => [ $field => $value ] ]; } return $query; } /** * Add search criteria to Elasticsearch query. * * @param array $schema DB schema * @param array $query Elasticsearch query * @param array $options search options * * @return array Elasticsearch query with added search criteria */ public static function addSearch($schema, $query, $options) { $start = $options['startSearch'] ? '' : '*'; $exclude = $options['excludeSearch'] ? 'must_not' : 'must'; if ($options['searchByAny']) { if (!$options['excludeSearch']) { $exclude = 'should'; } $query['minimum_should_match'] = 1; } foreach ($options['search'] as $field => $value) { // Skip missing fields, non textual fields and empty values. if ($value === null || !array_key_exists($field, $schema['fields'])) { continue; } $field_type = $schema['fields'][$field]['type']; if (!in_array($field_type, [DB::FIELD_TYPE_TEXT, DB::FIELD_TYPE_NCLOB, DB::FIELD_TYPE_CHAR])) { continue; } foreach ($value as $phrase) { $phrase = str_replace('?', '\\?', $phrase); if (!$options['searchWildcardsEnabled']) { $phrase = str_replace('*', '\\*', $phrase); $criteria = [ 'wildcard' => [ $field => $start.$phrase.'*' ] ]; } else { $criteria = [ 'wildcard' => [ $field => $phrase ] ]; } if ($options['excludeSearch'] && $options['searchByAny']) { $query['query']['bool']['must_not']['bool']['should'][] = $criteria; } else { $query['query']['bool'][$exclude][] = $criteria; } } } return $query; } /** * Add sorting criteria to Elasticsearch query. * * @param array $query Elasticsearch query. * @param array $options Sorting options. * * @return array Elasticsearch query with added sorting options */ public static function addSort($query, $options) { foreach ($options['sortfield'] as $i => $sortfield) { // Add sort field to order. if (is_array($options['sortorder'])) { $sortorder = array_key_exists($i, $options['sortorder']) ? $options['sortorder'][$i] : ZBX_SORT_UP; } else { $sortorder = ($options['sortorder'] !== '') ? $options['sortorder'] : ZBX_SORT_UP; } if ($sortorder === ZBX_SORT_DOWN) { $query['sort'][$sortfield] = $sortorder; } else { $query['sort'][] = $sortfield; } } return $query; } }