Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] [InfluxDB Source] add read by chunk #6808

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from

Conversation

15767714253
Copy link

Purpose of this pull request

This pull request enhances the InfluxDB source connector by introducing chunked data retrieval. The feature allows the connector to efficiently process large datasets by querying and fetching data in chunks.

Does this PR introduce any user-facing change?

No,The existing functionality of the InfluxDB source connector remains unchanged. Users who upgrade to the version containing this patch will have the additional option to enable chunked data retrieval for performance improvements, but this is not a breaking change, and the feature is opt-in.

How was this patch tested?

I have conducted multiple synchronizations in our company's production environment, with a single table containing more than 300 million data. However, this feature puts a significant load on the resources of InfluxDB itself, which is related to the size of the data read.

Check list

@davidzollo davidzollo added the First-time contributor First-time contributor label May 7, 2024
@Hisoka-X Hisoka-X added feature New feature influxdb labels May 8, 2024
@Hisoka-X
Copy link
Member

Hisoka-X commented May 8, 2024

cc @hailin0 @EricJoy2048

@zhilinli123
Copy link
Contributor

zhilinli123 commented May 8, 2024

### For SeaTunnel Zeta Engine

> 1. You need to ensure that the [influxDB connector jar package](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-influxdb) has been placed in directory `${SEATUNNEL_HOME}/lib/`.

## Key features
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key Features


> 1. 需要确保连接器Jar包 [influxDB connector jar package](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-influxdb) 被放在目录 `${SEATUNNEL_HOME}/lib/`.

## Key features
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key Features


## Options

| name | type | required | default value |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
Refer to mysql Documentation

value = INT
rt = STRING
time = BIGINT
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

identical?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using split_column is for shard querying, using chunk_size is for chunk querying, and using neither is for regular querying.

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support multiple table reading](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Repeated

}
},
() -> {
log.error("this chunk reader influxDB complete");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.info

read(split, output);
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
// reader influxDB By chunk
if (StringUtils.isEmpty(config.getSplitKey()) && config.getChunkSize() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Even if users set split_key, they can still use Chunk's read mode. Can this be understood as follows?
  2. I think using Chunk mode to read should be a better way. A better approach is to give a default value if the user has not set chunk_size, and then use the readBychunkSize method uniformly to read the data.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not advisable, because I believe that chunked reading puts a lot of performance pressure on InfluxDB, and some users would still prefer to shift the pressure to SeaTunnel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not advisable, because I believe that chunked reading puts a lot of performance pressure on InfluxDB, and some users would still prefer to shift the pressure to SeaTunnel.

Can the pressure on InfluxDB be resolved by reducing parallelism (e.g. setting parallelism to 1) or config speed limit config in env https://seatunnel.apache.org/docs/2.3.5/concept/speed-limit?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parallelism was originally 1; it has nothing to do with the speed of reading, but rather the consumption of InfluxDB's own chunk operation.


private void readByChunkSize(InfluxDBSourceSplit split, Collector<SeaTunnelRow> output) {
influxdb.query(
new Query(split.getQuery(), config.getDatabase()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use CountDownLatch is not a good idea. You can update the code like this: (Then method readByChunkSize will become a sync methods and will not return until the reading thread is completed )

final CompletableFuture<Void> queryCompleteFeature = new CompletableFuture<>();

influxdb.query(
                new Query(split.getQuery(), config.getDatabase()),
config.getChunkSize(),
                (cancellable, queryResult) -> {
                    if (cancellable.isCanceled()) {
                        log.info("this chunk reader influxDB is canceled");
                        queryCompleteFeature.complete();
                        return;
                    }
                    if (queryResult.hasError()) {
                        log.error(
                                "this chunk reader influxDB result has error [{}]",
                                queryResult.getError());
                        queryCompleteFeature.completeExceptionally(new InfluxdbConnectorException)
                        return;
                    }
                    for (QueryResult.Result result : queryResult.getResults()) {
                        List<QueryResult.Series> serieList = result.getSeries();
                        if (CollectionUtils.isNotEmpty(serieList)) {
                            for (QueryResult.Series series : serieList) {
                                for (List<Object> values : series.getValues()) {
                                    SeaTunnelRow row =
                                            InfluxDBRowConverter.convert(
                                                    values, seaTunnelRowType, columnsIndexList);
                                    output.collect(row);
                                }
                            }
                        } else {
                            log.info("this chunk reader influxDB series is empty");
                        }
                    }
                },
                () -> {
                    log.info("this chunk reader influxDB complete");
                    queryCompleteFeature.complete();
                },
                throwable -> {
                    log.error(
                            "this chunk reader influxDB result has error [{}]",
                            throwable.getMessage());
                    queryCompleteFeature.completeExceptionally(new InfluxdbConnectorException(throwable))
                });
    }

   queryCompleteFeature.get();

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've already tried to switch to CompletableFuture yesterday, but I haven't got it working yet due to limited time. I know CountDownLatch is not a good choice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But merely changing this method to a synchronous one is not enough, as it occupies the lock in the pollNext method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But merely changing this method to a synchronous one is not enough, as it occupies the lock in the pollNext method.

It is normal to occupy a lock, and we must ensure that the lock(output.getCheckpointLock()) cannot be released until a split read is completed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature First-time contributor First-time contributor influxdb
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants