Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a batch write flow control example for Bigtable #9314

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

kongweihan
Copy link
Contributor

Description

Add a batch write flow control example for Bigtable

Checklist

  • I have followed Sample Format Guide
  • pom.xml parent set to latest shared-configuration
  • Appropriate changes to README are included in PR
  • [] These samples need a new API enabled in testing projects to pass (let us know which ones)
  • [] These samples need a new/updated env vars in testing projects set to pass (let us know which ones)
  • Tests pass: mvn clean verify required
  • Lint passes: mvn -P lint checkstyle:check required
  • Static Analysis: mvn -P lint clean compile pmd:cpd-check spotbugs:check advisory only
  • [] This sample adds a new sample directory, and I updated the CODEOWNERS file with the codeowners for this sample
  • [] This sample adds a new Product API, and I updated the Blunderbuss issue/PR auto-assigner with the codeowners for this sample
  • Please merge this PR for me once it is approved

@kongweihan kongweihan requested review from yoshi-approver and a team as code owners May 8, 2024 16:20
@product-auto-label product-auto-label bot added samples Issues that are directly related to samples. api: bigtable Issues related to the Bigtable API. labels May 8, 2024
@kongweihan kongweihan force-pushed the flow-control-example branch 5 times, most recently from 411e662 to 6e4f7db Compare May 8, 2024 17:35
Copy link
Contributor

@minherz minherz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello,
Please address the following questions:

  1. We ask to have a single code sample per file. This code looks like showing two code samples. What is this code sample demonstrates?
  2. This code sample does not have regional tags. What documentation use it?
  3. We do not host code samples without tests. What is a reason for lack of tests?

Comment on lines 86 to 126
Pipeline p = Pipeline.create(options);

PCollection<Long> numbers = p.apply(generateLabel, GenerateSequence.from(0).to(numRows));

if (options.getUseCloudBigtableIo()) {
System.out.println("Using CloudBigtableIO");
PCollection<org.apache.hadoop.hbase.client.Mutation> mutations = numbers.apply(mutationLabel,
ParDo.of(new CreateHbaseMutationFn(options.getBigtableColsPerRow(),
options.getBigtableBytesPerCol())));

mutations.apply(
String.format("Write data to table %s via CloudBigtableIO", options.getBigtableTableId()),
CloudBigtableIO.writeToTable(new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
"true")
.withConfiguration(BigtableOptionsFactory.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES,
"1048576")
.build()));
} else {
System.out.println("Using BigtableIO");
PCollection<KV<ByteString, Iterable<Mutation>>>
mutations = numbers.apply(mutationLabel,
ParDo.of(new CreateMutationFn(options.getBigtableColsPerRow(),
options.getBigtableBytesPerCol())));

BigtableIO.Write write = BigtableIO.write()
.withProjectId(options.getProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withFlowControl(true); // This enables batch write flow control

mutations.apply(
String.format("Write data to table %s via BigtableIO", options.getBigtableTableId()),
write
);
}

p.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this block of code is hard to read. Since it is a code sample it should be easy to understand. Please, reformat it so it will look like steps each of the steps calling apply method of the pipeline. See the dataflow-bigquery-read-tablerows sample as a reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code sample shows how to enable the flow control feature. It has 2 parts because we have two different connectors and they're configured differently. I was unsure whether it'll worth duplicating the rest of the code to keep the sample simple. Please advice and I can split.

We're going to write the doc that will use this code sample.

Yes I'll add tests.

Comment on lines 86 to 126
Pipeline p = Pipeline.create(options);

PCollection<Long> numbers = p.apply(generateLabel, GenerateSequence.from(0).to(numRows));

if (options.getUseCloudBigtableIo()) {
System.out.println("Using CloudBigtableIO");
PCollection<org.apache.hadoop.hbase.client.Mutation> mutations = numbers.apply(mutationLabel,
ParDo.of(new CreateHbaseMutationFn(options.getBigtableColsPerRow(),
options.getBigtableBytesPerCol())));

mutations.apply(
String.format("Write data to table %s via CloudBigtableIO", options.getBigtableTableId()),
CloudBigtableIO.writeToTable(new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
"true")
.withConfiguration(BigtableOptionsFactory.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES,
"1048576")
.build()));
} else {
System.out.println("Using BigtableIO");
PCollection<KV<ByteString, Iterable<Mutation>>>
mutations = numbers.apply(mutationLabel,
ParDo.of(new CreateMutationFn(options.getBigtableColsPerRow(),
options.getBigtableBytesPerCol())));

BigtableIO.Write write = BigtableIO.write()
.withProjectId(options.getProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withFlowControl(true); // This enables batch write flow control

mutations.apply(
String.format("Write data to table %s via BigtableIO", options.getBigtableTableId()),
write
);
}

p.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it intend for run asynchronously? Please, append waitUntilFinish() call to the result of the run().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far our example jobs all run async, is it a better practice to run sync? I'm happy to learn the reasoning of the practice.

Includes using BigtableIO and CloudBigtableIO
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigtable Issues related to the Bigtable API. samples Issues that are directly related to samples.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants