Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pinot query options to query #21902

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

naman-patel
Copy link

@naman-patel naman-patel commented May 9, 2024

Description

Currently no Pinot query options can be passed along with query. This means Pinot null handling cant be leveraged. This PR provides a way to provide these query options.

Additional context and related issues

The same PR was also implemented for Presto. Here is the reference to that.

Fixes #21897

Release notes

(x) Release notes are required, with the following suggested text:

# Pinot
* Allow users to configure Pinot query options. ({21897}`issuenumber`)

@@ -44,4 +44,18 @@ public void testConnectionTimeoutParsedProperly()
.build();
assertThat(PinotSessionProperties.getConnectionTimeout(session)).isEqualTo(new Duration(0.25, TimeUnit.MINUTES));
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new assertion to BasePinotConnectorSmokeTest#testQueryOptions with query_options session property.

Copy link
Member

@ebyhr ebyhr May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can define the variable like below and use it in assertion method.

        Session queryOptions = Session.builder(getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty("pinot", "query_options", "...")
                .build();

Please refer to BasePinotConnectorSmokeTest.testAggregationPushdown.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is connector config. I tried session catalog property. It complains its not a recognized session property.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can use the above code. How did you try?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exacty what you have there. It compiles but since that is not catalog session property, I see a runtime exception.

Session queryOptions = Session.builder(getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty("pinot", "query_options", "skipUpsert=true")
                .build();
        assertThat(query(queryOptions,"SELECT city, \"sum(long_number)\" FROM" +
                         " \"SET skipUpsert = 'true';" +
                         " SET numReplicaGroupsToQuery = '1';" +
                         " SELECT city, SUM(long_number)" +
                         "  FROM my_table" +
                         "  GROUP BY city" +
                         "  HAVING SUM(long_number) > 10000\""))
                .matches("VALUES (VARCHAR 'Los Angeles', DOUBLE '50000.0'), (VARCHAR 'New York', DOUBLE '20000.0')")
                .isFullyPushedDown();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The session value skipUpsert=true should be skipUpsert:true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gentle reminder.

@naman-patel
Copy link
Author

@ebyhr addrressed your feedback except for the last one as I dont see a way to add those session properties in test framework.

Comment on lines 243 to 265
public static Optional<String> getQueryOptionsString(String options)
{
if (isNullOrEmpty(options)) {
return Optional.empty();
}

Map<String, String> queryOptionsMap = ImmutableMap.copyOf(MAP_SPLITTER.split(options));
return getQueryOptions(queryOptionsMap);
}

public static Optional<String> getQueryOptions(Map<String, String> queryOptionsMap)
{
if (queryOptionsMap.isEmpty()) {
return Optional.empty();
}
StringBuilder result = new StringBuilder();
for (Map.Entry<String, String> entry : queryOptionsMap.entrySet()) {
result.append("SET ")
.append(entry.getKey())
.append(" = ")
.append(entry.getValue())
.append(";\n");
}
return Optional.of(result.toString());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use

public Map<String, String> getAdditionalHeaders()
{
return additionalHeaders;
}
@Config("hive.metastore.http.client.additional-headers")
@ConfigDescription("Comma separated key:value pairs to be send to metastore as additional headers")
public ThriftHttpMetastoreConfig setAdditionalHeaders(String httpHeaders)
{
try {
// we allow escaping the delimiters like , and : using back-slash.
// To support that we create a negative lookbehind of , and : which
// are not preceded by a back-slash.
String headersDelim = "(?<!\\\\),";
String kvDelim = "(?<!\\\\):";
Map<String, String> temp = new HashMap<>();
if (httpHeaders != null) {
for (String kv : httpHeaders.split(headersDelim)) {
String key = kv.split(kvDelim, 2)[0].trim();
String val = kv.split(kvDelim, 2)[1].trim();
temp.put(key, val);
}
this.additionalHeaders = ImmutableMap.copyOf(temp);
}
}
catch (IndexOutOfBoundsException e) {
throw new IllegalArgumentException(String.format("Invalid format for 'hive.metastore.http.client.additional-headers'. " +
"Value provided is %s", httpHeaders), e);
}
return this;
}
as PoC on how to deal with a config value containing a map of values.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findinpath Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@naman-patel I was suggesting returning a Map<String, String>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then everywhere there needs to be code to translate the settings to SET statements. That feels like redundant code everywhere.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config getter is called from PinotSessionProperties and it expects String type. Returning Optional<String> seems acceptable. We should still validate if the value can be converted to map though.

@ebyhr
Copy link
Member

ebyhr commented May 23, 2024

Could you rebase on master to resolve conflicts?

@naman-patel
Copy link
Author

naman-patel commented May 24, 2024

Could you rebase on master to resolve conflicts?

@ebyhr Done

@findinpath findinpath requested a review from ebyhr May 26, 2024 04:58
@github-actions github-actions bot added the docs label May 28, 2024
@ebyhr
Copy link
Member

ebyhr commented May 28, 2024

Please fix CI failure and add a query based test to BasePinotConnectorSmokeTest

Error:    TestDynamicTable.testQueryOptions:469 
expected: 
  "SET useMultistageEngine = true;
  SET skipUpsert = true;
  SELECT "FlightNum" FROM realtimeOnly_REALTIME LIMIT 50"
 but was: 
  "SET useMultistageEngine = true;
  SET skipUpsert = true;SELECT "FlightNum" FROM realtimeOnly_REALTIME LIMIT 50"

@@ -57,7 +57,6 @@ public class PinotConfig

private int estimatedSizeInBytesForNonNumericColumn = 20;
private Duration metadataCacheExpiry = new Duration(2, TimeUnit.MINUTES);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert unrelated change.

Comment on lines +319 to +331
@Config("pinot.query-options")
@ConfigDescription("Comma separated list of query options. Each option should be in the format key:value. " +
"For example, enableNullHandling:true,skipUpsert:true,varcharOption:'value'")
public PinotConfig setQueryOptions(String options)
{
if (options == null) {
queryOptions = Optional.empty();
}
else {
queryOptions = Optional.of(options);
}
return this;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to before validate method.

false));
}

public static String getQueryOptions(ConnectorSession session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to the bottom for the same order as fields. Also, please return Optinal<String>. We should avoid returning null as much as possible.

Comment on lines +62 to +66
Set<String> expected = Arrays.stream("SET enableNullHandling = true;\nSET skipUpsert = true;\nSET varcharOption = 'value';\n"
.split("\n")).collect(Collectors.toSet());
Set<String> options = Arrays.stream(PinotQueryBuilder.getQueryOptionsString(queryOptions).orElse("").split("\n"))
.collect(Collectors.toSet());
assertThat(options).isEqualTo(expected);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove redundant Arrays.stream method:

        assertThat(PinotQueryBuilder.getQueryOptionsString(queryOptions))
                .isEqualTo(Optional.of("""
                        SET enableNullHandling = true;
                        SET skipUpsert = true;
                        SET varcharOption = 'value';"""));


import static org.assertj.core.api.Assertions.assertThat;

public class TestQueryOptionsParsing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove public, add final and rename to TestPinotQueryBuilder.

Test classes should be defined as package-private and final.
https://trino.io/docs/current/develop/tests.html#conventions-and-recommendations

Comment on lines +29 to +31
assertThat(parssedOptions.get("limitForSegmentQueries")).isEqualTo("1000");
assertThat(parssedOptions.get("limitForBrokerQueries")).isEqualTo("1000");
assertThat(parssedOptions.get("targetSegmentPageSizeBytes")).isEqualTo("1000");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use containsExactly(entry(...), ... entry(...)) instead. Same for others.

{
String options = "";
Map<String, String> parssedOptions = PinotQueryBuilder.parseQueryOptions(options);
assertThat(parssedOptions.isEmpty()).isEqualTo(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use assertThat(parssedOptions).isEmpty(); instead.

// are not preceded by a back-slash.
String headersDelim = "(?<!\\\\),";
String kvDelim = "(?<!\\\\):";
Map<String, String> temp = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ImmutableMap.Builder and rename to queryOptions.

String val = kv.split(kvDelim, 2)[1].trim();
temp.put(key, val);
}
return ImmutableMap.copyOf(temp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableMap.copyOf is redundant once we change to ImmutableMap.Builder.

Comment on lines +286 to +287
throw new IllegalArgumentException(String.format("Invalid format for 'pinot.query-options'. " +
"Value provided is %s", options), e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Pinot connector has no way to pass query option along with query
3 participants