Skip to content

Add direct-query module for prometheus integration #3441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
/** Language type accepted in async query apis. */
public enum LangType {
SQL("sql"),
PPL("ppl");
PPL("ppl"),
PROMQL("promql");
private final String text;

LangType(String text) {
Expand Down
105 changes: 105 additions & 0 deletions direct-query-core/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
id 'java-test-fixtures'
}

repositories {
mavenCentral()
}

dependencies {
api project(':core')
implementation project(':datasources')
implementation project(':async-query-core')

// Common dependencies
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20231013'
implementation group: 'commons-io', name: 'commons-io', version: "${commons_io_version}"

// Test dependencies
testImplementation(platform("org.junit:junit-bom:5.9.3"))
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.3'
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}"
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: "${mockito_version}"
testImplementation group: 'com.squareup.okhttp3', name: 'mockwebserver', version: '4.12.0'

testCompileOnly('junit:junit:4.13.1') {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.vintage:junit-vintage-engine") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testImplementation("org.opensearch.test:framework:${opensearch_version}")
}

test {
useJUnitPlatform()
testLogging {
events "failed"
exceptionFormat "full"
}
}
task junit4(type: Test) {
useJUnitPlatform {
includeEngines("junit-vintage")
}
systemProperty 'tests.security.manager', 'false'
testLogging {
events "failed"
exceptionFormat "full"
}
}

jacocoTestReport {
dependsOn test, junit4
executionData test, junit4
reports {
html.required = true
xml.required = true
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}

jacocoTestCoverageVerification {
dependsOn test, junit4
executionData test, junit4
violationRules {
rule {
element = 'CLASS'
excludes = [
'org.opensearch.sql.prometheus.model.*',
'org.opensearch.sql.directquery.rest.model.*'
]
limit {
counter = 'LINE'
minimum = 1.0
}
limit {
counter = 'BRANCH'
minimum = 1.0
}
}
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
check.dependsOn jacocoTestCoverageVerification
jacocoTestCoverageVerification.dependsOn jacocoTestReport
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.client;

/**
* Base interface for all data source clients. This interface serves as a marker interface for all
* client implementations.
*/
public interface DataSourceClient {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.client;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.inject.Inject;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.client.exceptions.DataSourceClientException;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.prometheus.utils.PrometheusClientUtils;

/** Factory for creating data source clients based on the data source type. */
public class DataSourceClientFactory {

private static final Logger LOG = LogManager.getLogger();

private final Settings settings;
private final DataSourceService dataSourceService;

@Inject
public DataSourceClientFactory(DataSourceService dataSourceService, Settings settings) {
this.settings = settings;
this.dataSourceService = dataSourceService;
}

/**
* Creates a client for the specified data source with appropriate type.
*
* @param <T> The type of client to create, must implement DataSourceClient
* @param dataSourceName The name of the data source
* @return The appropriate client for the data source type
* @throws DataSourceClientException If client creation fails
*/
@SuppressWarnings("unchecked")
public <T extends DataSourceClient> T createClient(String dataSourceName)
throws DataSourceClientException {
try {
if (!dataSourceService.dataSourceExists(dataSourceName)) {
throw new DataSourceClientException("Data source does not exist: " + dataSourceName);
}

DataSourceMetadata metadata =
dataSourceService.verifyDataSourceAccessAndGetRawMetadata(dataSourceName, null);
DataSourceType dataSourceType = metadata.getConnector();

return (T) createClientForType(dataSourceType.name(), metadata);
} catch (Exception e) {
if (e instanceof DataSourceClientException) {
throw e;
}
LOG.error("Failed to create client for data source: " + dataSourceName, e);
throw new DataSourceClientException(
"Failed to create client for data source: " + dataSourceName, e);
}
}

/**
* Gets the data source type for a given data source name.
*
* @param dataSourceName The name of the data source
* @return The type of the data source
* @throws DataSourceClientException If the data source doesn't exist
*/
public DataSourceType getDataSourceType(String dataSourceName) throws DataSourceClientException {
if (!dataSourceService.dataSourceExists(dataSourceName)) {
throw new DataSourceClientException("Data source does not exist: " + dataSourceName);
}

return dataSourceService.getDataSourceMetadata(dataSourceName).getConnector();
}

private DataSourceClient createClientForType(String dataSourceType, DataSourceMetadata metadata)
throws DataSourceClientException {
switch (dataSourceType) {
case "PROMETHEUS":
return PrometheusClientUtils.createPrometheusClient(metadata, settings);
default:
throw new DataSourceClientException("Unsupported data source type: " + dataSourceType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.client.exceptions;

/** Exception thrown when there are issues with data source client operations. */
public class DataSourceClientException extends RuntimeException {

public DataSourceClientException(String message) {
super(message);
}

public DataSourceClientException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.query;

import java.io.IOException;
import org.opensearch.sql.datasource.client.DataSourceClient;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.directquery.rest.model.ExecuteDirectQueryRequest;
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesRequest;
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesResponse;

/**
* Interface for handling queries for specific data source types.
*
* @param <T> The client type this handler works with, extending DataSourceClient
*/
public interface QueryHandler<T extends DataSourceClient> {

/**
* Returns the data source type this handler supports.
*
* @return The supported data source type
*/
DataSourceType getSupportedDataSourceType();

/**
* Executes a query for the supported data source type.
*
* @param client The client instance to use
* @param request The query request
* @return JSON string result of the query
* @throws IOException If query execution fails
*/
String executeQuery(T client, ExecuteDirectQueryRequest request) throws IOException;

/**
* Gets resources from the data source.
*
* @param client The client instance to use
* @param request The resources request
* @return Response containing the requested resources
* @throws IOException If resource retrieval fails
*/
GetDirectQueryResourcesResponse<?> getResources(T client, GetDirectQueryResourcesRequest request)
throws IOException;

/**
* Checks if this handler can handle the given client type.
*
* @param client The client to check
* @return true if this handler can handle the client
*/
boolean canHandle(DataSourceClient client);

/**
* Gets the client class this handler supports.
*
* @return The class of client this handler supports
*/
Class<T> getClientClass();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource.query;

import java.util.List;
import java.util.Optional;
import org.opensearch.common.inject.Inject;
import org.opensearch.sql.datasource.client.DataSourceClient;

/** Registry for all query handlers. */
public class QueryHandlerRegistry {

private final List<QueryHandler<?>> handlers;

@Inject
public QueryHandlerRegistry(List<QueryHandler<?>> handlers) {
this.handlers = handlers;
}

/**
* Finds a handler that can process the given client.
*
* @param client The client to find a handler for
* @param <T> The type of client, extending DataSourceClient
* @return An optional containing the handler if found
*/
@SuppressWarnings("unchecked")
public <T extends DataSourceClient> Optional<QueryHandler<T>> getQueryHandler(T client) {
return handlers.stream()
.filter(
handler -> {
try {
// Get the handler's client class and check if it's compatible with our client
Class<?> handlerClientClass = handler.getClientClass();
return handlerClientClass.isInstance(client)
&& ((QueryHandler<T>) handler).canHandle(client);
} catch (ClassCastException e) {
return false;
}
})
.map(handler -> (QueryHandler<T>) handler)
.findFirst();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.directquery;

import org.opensearch.sql.directquery.rest.model.ExecuteDirectQueryRequest;
import org.opensearch.sql.directquery.rest.model.ExecuteDirectQueryResponse;
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesRequest;
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesResponse;

public interface DirectQueryExecutorService {

/**
* Execute a direct query request.
*
* @param request The direct query request
* @return A response containing the result
*/
ExecuteDirectQueryResponse executeDirectQuery(ExecuteDirectQueryRequest request);

/**
* Get resources from a data source.
*
* @param request The resources request
* @return A response containing the requested resources
*/
GetDirectQueryResourcesResponse<?> getDirectQueryResources(
GetDirectQueryResourcesRequest request);
}
Loading
Loading