Skip to content

Commit

Permalink
[Feature] Hive Source/Sink support multiple table (#5929)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Apr 30, 2024
1 parent 7b4e072 commit 4d9287f
Show file tree
Hide file tree
Showing 30 changed files with 1,612 additions and 468 deletions.
52 changes: 51 additions & 1 deletion docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ By default, we use 2PC commit to ensure `exactly-once`

### table_name [string]

Target Hive table name eg: db1.table1
Target Hive table name eg: db1.table1, and if the source is multiple mode, you can use `${database_name}.${table_name}` to generate the table name, it will replace the `${database_name}` and `${table_name}` with the value of the CatalogTable generate from the source.

### metastore_uri [string]

Expand Down Expand Up @@ -343,6 +343,56 @@ sink {
}
```

### example 2

We have multiple source table like this:

```bash
create table test_1(
)
PARTITIONED BY (xx);

create table test_2(
)
PARTITIONED BY (xx);
...
```

We need read data from these source tables and write to another tables:

The job config file can like this:

```
env {
# You can set flink configuration here
parallelism = 3
job.name="test_hive_source_to_hive"
}
source {
Hive {
tables_configs = [
{
table_name = "test_hive.test_1"
metastore_uri = "thrift://ctyun6:9083"
},
{
table_name = "test_hive.test_2"
metastore_uri = "thrift://ctyun7:9083"
}
]
}
}
sink {
# choose stdout output plugin to output data to console
Hive {
table_name = "${database_name}.${table_name}"
metastore_uri = "thrift://ctyun7:9083"
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
25 changes: 21 additions & 4 deletions docs/en/connector-v2/source/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ Hive metastore uri

The path of `hdfs-site.xml`, used to load ha configuration of namenodes

### hive_site_path [string]

The path of `hive-site.xml`, used to authentication hive metastore

### read_partitions [list]

The target partitions that user want to read from hive table, if user does not set this parameter, it will read all the data from hive table.
Expand Down Expand Up @@ -102,6 +98,8 @@ Source plugin common parameters, please refer to [Source Common Options](common-

## Example

### Example 1: Single table

```bash

Hive {
Expand All @@ -111,6 +109,25 @@ Source plugin common parameters, please refer to [Source Common Options](common-

```

### Example 2: Multiple tables

```bash

Hive {
tables_configs = [
{
table_name = "default.seatunnel_orc_1"
metastore_uri = "thrift://namenode001:9083"
},
{
table_name = "default.seatunnel_orc_2"
metastore_uri = "thrift://namenode001:9083"
}
]
}

```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

/** Utils contains some common methods for construct CatalogTable. */
@Slf4j
Expand Down Expand Up @@ -234,4 +236,41 @@ public static SeaTunnelRowType buildSimpleTextSchema() {
public static CatalogTable buildSimpleTextTable() {
return getCatalogTable("default", buildSimpleTextSchema());
}

public static CatalogTable newCatalogTable(
CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) {
TableSchema tableSchema = catalogTable.getTableSchema();

Map<String, Column> columnMap =
tableSchema.getColumns().stream()
.collect(Collectors.toMap(Column::getName, Function.identity()));
String[] fieldNames = seaTunnelRowType.getFieldNames();
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();

List<Column> finalColumns = new ArrayList<>();
for (int i = 0; i < fieldNames.length; i++) {
Column column = columnMap.get(fieldNames[i]);
if (column != null) {
finalColumns.add(column);
} else {
finalColumns.add(
PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, false, null, null));
}
}

TableSchema finalSchema =
TableSchema.builder()
.columns(finalColumns)
.primaryKey(tableSchema.getPrimaryKey())
.constraintKey(tableSchema.getConstraintKeys())
.build();

return CatalogTable.of(
catalogTable.getTableId(),
finalSchema,
catalogTable.getOptions(),
catalogTable.getPartitionKeys(),
catalogTable.getComment(),
catalogTable.getCatalogName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.seatunnel.connectors.seatunnel.hive.commit;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;

import org.apache.thrift.TException;
Expand All @@ -33,33 +33,31 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;

@Slf4j
public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
private final Config pluginConfig;
private final String dbName;
private final String tableName;
private final boolean abortDropPartitionMetadata;

private final ReadonlyConfig readonlyConfig;

public HiveSinkAggregatedCommitter(
Config pluginConfig, String dbName, String tableName, HadoopConf hadoopConf) {
ReadonlyConfig readonlyConfig, String dbName, String tableName, HadoopConf hadoopConf) {
super(hadoopConf);
this.pluginConfig = pluginConfig;
this.readonlyConfig = readonlyConfig;
this.dbName = dbName;
this.tableName = tableName;
this.abortDropPartitionMetadata =
pluginConfig.hasPath(ABORT_DROP_PARTITION_METADATA.key())
? pluginConfig.getBoolean(ABORT_DROP_PARTITION_METADATA.key())
: ABORT_DROP_PARTITION_METADATA.defaultValue();
readonlyConfig.get(HiveSinkOptions.ABORT_DROP_PARTITION_METADATA);
}

@Override
public List<FileAggregatedCommitInfo> commit(
List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {

List<FileAggregatedCommitInfo> errorCommitInfos = super.commit(aggregatedCommitInfos);
if (errorCommitInfos.isEmpty()) {
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(readonlyConfig);
try {
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
Map<String, List<String>> partitionDirAndValuesMap =
Expand Down Expand Up @@ -87,7 +85,7 @@ public List<FileAggregatedCommitInfo> commit(
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
super.abort(aggregatedCommitInfos);
if (abortDropPartitionMetadata) {
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(readonlyConfig);
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
Map<String, List<String>> partitionDirAndValuesMap =
aggregatedCommitInfo.getPartitionDirAndValuesMap();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.hive.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;

public class BaseHiveOptions extends BaseSourceConfigOptions {

public static final Option<String> TABLE_NAME =
Options.key("table_name")
.stringType()
.noDefaultValue()
.withDescription("Hive table name");

public static final Option<String> METASTORE_URI =
Options.key("metastore_uri")
.stringType()
.noDefaultValue()
.withDescription("Hive metastore uri");

public static final Option<String> HIVE_SITE_PATH =
Options.key("hive_site_path")
.stringType()
.noDefaultValue()
.withDescription("The path of hive-site.xml");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.hive.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.metastore.api.Table;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -66,29 +60,4 @@ public class HiveConfig {
.noDefaultValue()
.withDescription(
"The specified loading path for the 'core-site.xml', 'hdfs-site.xml' files");

public static final String TEXT_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.mapred.TextInputFormat";
public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
public static final String PARQUET_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
public static final String ORC_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
public static final String ORC_OUTPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";

public static Pair<String[], Table> getTableInfo(Config config) {
String table = config.getString(TABLE_NAME.key());
String[] splits = table.split("\\.");
if (splits.length != 2) {
throw new RuntimeException("Please config " + TABLE_NAME + " as db.table format");
}
HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(config);
Table tableInformation = hiveMetaStoreProxy.getTable(splits[0], splits[1]);
hiveMetaStoreProxy.close();
return Pair.of(splits, tableInformation);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.hive.config;

public class HiveConstants {

public static final String CONNECTOR_NAME = "Hive";

public static final String TEXT_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.mapred.TextInputFormat";
public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
public static final String PARQUET_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
public static final String ORC_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
public static final String ORC_OUTPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public enum HiveConnectorErrorCode implements SeaTunnelErrorCode {
GET_HDFS_NAMENODE_HOST_FAILED("HIVE-01", "Get name node host from table location failed"),
INITIALIZE_HIVE_METASTORE_CLIENT_FAILED("HIVE-02", "Initialize hive metastore client failed"),
GET_HIVE_TABLE_INFORMATION_FAILED(
"HIVE-03", "Get hive table information from hive metastore service failed");
"HIVE-03", "Get hive table information from hive metastore service failed"),
HIVE_TABLE_NAME_ERROR("HIVE-04", "Hive table name is invalid"),
;

private final String code;
private final String description;
Expand Down

0 comments on commit 4d9287f

Please sign in to comment.