Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][S3 File] Make S3 File Connector support multiple table write #6698

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en/connector-v2/sink/S3File.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ transform {
sink {
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
tmp_path = "/tmp/seatunnel/${table_name}"
path="/test/${table_name}"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<name>SeaTunnel : Connectors V2 : File : S3</name>

<properties>
<hadoop-aws.version>2.6.5</hadoop-aws.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
<guava.version>27.0-jre</guava.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class S3FileCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
HadoopConf hadoopConf = S3Conf.buildWithReadOnlyConfig(options);
HadoopConf hadoopConf = S3HadoopConf.buildWithReadOnlyConfig(options);
HadoopFileSystemProxy fileSystemUtils = new HadoopFileSystemProxy(hadoopConf);
return new S3FileCatalog(fileSystemUtils, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.file.s3.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;

import java.util.HashMap;
import java.util.Map;

public class S3Conf extends HadoopConf {
public class S3HadoopConf extends HadoopConf {
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
protected static final String S3A_SCHEMA = "s3a";
Expand All @@ -47,39 +44,33 @@ public void setSchema(String schema) {
this.schema = schema;
}

protected S3Conf(String hdfsNameKey) {
public S3HadoopConf(String hdfsNameKey) {
super(hdfsNameKey);
}

public static HadoopConf buildWithConfig(Config config) {
public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig config) {

String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());
S3Conf hadoopConf = new S3Conf(bucketName);
String bucketName = config.get(S3ConfigOptions.S3_BUCKET);
S3HadoopConf hadoopConf = new S3HadoopConf(bucketName);
if (bucketName.startsWith(S3A_SCHEMA)) {
hadoopConf.setSchema(S3A_SCHEMA);
}
HashMap<String, String> s3Options = new HashMap<>();
hadoopConf.putS3SK(s3Options, config);
if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
.forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
if (config.getOptional(S3ConfigOptions.S3_PROPERTIES).isPresent()) {
config.get(S3ConfigOptions.S3_PROPERTIES)
.forEach((key, value) -> s3Options.put(key, String.valueOf(value)));
}

s3Options.put(
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
config.getString(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key()));
config.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());
s3Options.put(
S3ConfigOptions.FS_S3A_ENDPOINT.key(),
config.getString(S3ConfigOptions.FS_S3A_ENDPOINT.key()));
S3ConfigOptions.FS_S3A_ENDPOINT.key(), config.get(S3ConfigOptions.FS_S3A_ENDPOINT));
hadoopConf.setExtraOptions(s3Options);
return hadoopConf;
}

public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
Config config = readonlyConfig.toConfig();
return buildWithConfig(config);
}

protected String switchHdfsImpl() {
switch (this.schema) {
case S3A_SCHEMA:
Expand All @@ -89,13 +80,13 @@ protected String switchHdfsImpl() {
}
}

private void putS3SK(Map<String, String> s3Options, Config config) {
if (!CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_ACCESS_KEY.key())
&& !CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_SECRET_KEY.key())) {
private void putS3SK(Map<String, String> s3Options, ReadonlyConfig config) {
if (!config.getOptional(S3ConfigOptions.S3_ACCESS_KEY).isPresent()
&& config.getOptional(S3ConfigOptions.S3_SECRET_KEY).isPresent()) {
return;
}
String accessKey = config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());
String secretKey = config.getString(S3ConfigOptions.S3_SECRET_KEY.key());
String accessKey = config.get(S3ConfigOptions.S3_ACCESS_KEY);
String secretKey = config.get(S3ConfigOptions.S3_SECRET_KEY);
if (S3A_SCHEMA.equals(this.schema)) {
s3Options.put("fs.s3a.access.key", accessKey);
s3Options.put("fs.s3a.secret.key", secretKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;

import java.util.Optional;
Expand All @@ -55,7 +55,7 @@ public String getPluginName() {
}

public S3FileSink(CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
super(S3Conf.buildWithConfig(readonlyConfig.toConfig()), readonlyConfig, catalogTable);
super(S3HadoopConf.buildWithReadOnlyConfig(readonlyConfig), readonlyConfig, catalogTable);
this.catalogTable = catalogTable;
this.readonlyConfig = readonlyConfig;
Config pluginConfig = readonlyConfig.toConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,111 +17,19 @@

package org.apache.seatunnel.connectors.seatunnel.file.s3.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.file.s3.source.config.MultipleTableS3FileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;

import com.google.auto.service.AutoService;
public class S3FileSource extends BaseMultipleTableFileSource {

import java.io.IOException;
public S3FileSource(ReadonlyConfig readonlyConfig) {
super(new MultipleTableS3FileSourceConfig(readonlyConfig));
}

@AutoService(SeaTunnelSource.class)
public class S3FileSource extends BaseFileSource {
@Override
public String getPluginName() {
return FileSystemType.S3.getFileSystemPluginName();
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
S3ConfigOptions.FILE_PATH.key(),
S3ConfigOptions.FILE_FORMAT_TYPE.key(),
S3ConfigOptions.S3_BUCKET.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
String path = pluginConfig.getString(S3ConfigOptions.FILE_PATH.key());
hadoopConf = S3Conf.buildWithConfig(pluginConfig);
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(S3ConfigOptions.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
readStrategy.init(hadoopConf);
try {
filePaths = readStrategy.getFileNamesByPath(path);
} catch (IOException e) {
String errorMsg = String.format("Get file list from this path [%s] failed", path);
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(
pluginConfig
.getString(S3ConfigOptions.FILE_FORMAT_TYPE.key())
.toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
case EXCEL:
case XML:
SeaTunnelRowType userDefinedSchema =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
case PARQUET:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
"SeaTunnel does not support user-defined schema for [parquet, orc] files");
default:
// never got in there
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"SeaTunnel does not supported this file format");
}
} else {
if (filePaths.isEmpty()) {
// When the directory is empty, distribute default behavior schema
rowType = CatalogTableUtil.buildSimpleTextSchema();
return;
}
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
} catch (FileConnectorException e) {
String errorMsg =
String.format("Get table schema from file [%s] failed", filePaths.get(0));
throw new FileConnectorException(
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;

import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.Arrays;

@AutoService(Factory.class)
Expand All @@ -38,6 +42,12 @@ public String factoryIdentifier() {
return FileSystemType.S3.getFileSystemPluginName();
}

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
return () -> (SeaTunnelSource<T, SplitT, StateT>) new S3FileSource(context.getOptions());
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.s3.source.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;

public class MultipleTableS3FileSourceConfig extends BaseMultipleTableFileSourceConfig {

public MultipleTableS3FileSourceConfig(ReadonlyConfig s3FileSourceRootConfig) {
super(s3FileSourceRootConfig);
}

@Override
public BaseFileSourceConfig getBaseSourceConfig(ReadonlyConfig readonlyConfig) {
return new S3FileSourceConfig(readonlyConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.s3.source.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;

import lombok.Getter;

@Getter
public class S3FileSourceConfig extends BaseFileSourceConfig {

private static final long serialVersionUID = 1L;

@Override
public HadoopConf getHadoopConfig() {
return S3HadoopConf.buildWithReadOnlyConfig(getBaseFileSourceConfig());
}

@Override
public String getPluginName() {
return FileSystemType.S3.getFileSystemPluginName();
}

public S3FileSourceConfig(ReadonlyConfig readonlyConfig) {
super(readonlyConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;

public class HiveOnS3Conf extends S3Conf {
public class HiveOnS3Conf extends S3HadoopConf {
protected static final String S3_SCHEMA = "s3";
// The emr of amazon on s3 use this EmrFileSystem as the file system
protected static final String HDFS_S3_IMPL = "com.amazon.ws.emr.hadoop.fs.EmrFileSystem";
Expand All @@ -43,7 +43,7 @@ protected String switchHdfsImpl() {
}

public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
S3Conf s3Conf = (S3Conf) S3Conf.buildWithReadOnlyConfig(readonlyConfig);
S3HadoopConf s3Conf = (S3HadoopConf) S3HadoopConf.buildWithReadOnlyConfig(readonlyConfig);
String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
if (bucketName.startsWith(DEFAULT_SCHEMA)) {
s3Conf.setSchema(DEFAULT_SCHEMA);
Expand Down