Skip to content

Commit

Permalink
Support common jdbc catalog lock for filesystem catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 29, 2024
1 parent ac27b66 commit dd476db
Show file tree
Hide file tree
Showing 20 changed files with 415 additions and 19 deletions.
11 changes: 11 additions & 0 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ USE CATALOG my_catalog;

You can define any default table options with the prefix `table-default.` for tables created in the catalog.


The FileSystem catalog supports jdbc lock and can take effect through the following configuration:

> ```shell
> 'lock.uri' = 'jdbc:mysql://<host>:<port>/<databaseName>'
> 'lock.jdbc.user' = '...',
> 'lock.jdbc.password' = '...',
> ```


{{< /tab >}}

{{< tab "Spark3" >}}
Expand Down
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>client-pool-cache-eviction-interval-ms</h5></td>
<td style="word-wrap: break-word;">300000</td>
<td>Long</td>
<td>Client pool cache eviction interval ms.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down Expand Up @@ -66,7 +72,7 @@
<td><h5>lock.type</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The Lock Type for Catalog, such as 'hive', 'zookeeper'.</td>
<td>The Lock Type for Catalog, such as 'hive', 'zookeeper', 'jdbc'.</td>
</tr>
<tr>
<td><h5>metastore</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class CatalogOptions {
ConfigOptions.key("lock.type")
.stringType()
.noDefaultValue()
.withDescription("The Lock Type for Catalog, such as 'hive', 'zookeeper'.");
.withDescription(
"The Lock Type for Catalog, such as 'hive', 'zookeeper', 'jdbc'.");

public static final ConfigOption<Duration> LOCK_CHECK_MAX_SLEEP =
key("lock-check-max-sleep")
Expand Down Expand Up @@ -110,4 +111,10 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());

public static final ConfigOption<Long> CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS =
ConfigOptions.key("client-pool-cache-eviction-interval-ms")
.longType()
.defaultValue(5 * 60 * 1000L)
.withDescription("Client pool cache eviction interval ms.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand All @@ -62,6 +63,7 @@ public abstract class AbstractCatalog implements Catalog {
public static final String DB_SUFFIX = ".db";
protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
protected static final String DB_LOCATION_PROP = "location";
protected static final String LOCK_PROP_PREFIX = "lock.";

protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
Expand Down Expand Up @@ -105,6 +107,31 @@ public Optional<CatalogLockFactory> defaultLockFactory() {
return Optional.empty();
}

@Override
public Optional<CatalogLockContextFactory> lockContextFactory() {
String lock = catalogOptions.get(LOCK_TYPE);
if (lock == null) {
return Optional.empty();
}
return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(),
CatalogLockContextFactory.class,
lock));
}

public Options extractLockConfiguration(Map<String, String> properties) {
Map<String, String> result = new HashMap<>();
result.put(METASTORE.key(), properties.get(METASTORE.key()));
properties.forEach(
(key, value) -> {
if (key.startsWith(LOCK_PROP_PREFIX)) {
result.put(key.substring(LOCK_PROP_PREFIX.length()), value);
}
});
return Options.fromMap(result);
}

@Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public interface Catalog extends AutoCloseable {
*/
Optional<CatalogLockFactory> lockFactory();

/** Get lock context factory for lock factory to create a lock. */
default Optional<CatalogLockContextFactory> lockContextFactory() {
return Optional.empty();
}

/** Get lock context for lock factory to create a lock. */
default Optional<CatalogLockContext> lockContext() {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.factories.Factory;
import org.apache.paimon.options.Options;

import java.io.Serializable;

/** Context for lock context factory to create lock. */
public interface CatalogLockContextFactory extends Factory, Serializable {
CatalogLockContext createLockContext(Options lockOptions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
Expand Down Expand Up @@ -198,4 +199,16 @@ public String warehouse() {
public boolean caseSensitive() {
return catalogOptions.get(CASE_SENSITIVE);
}

@Override
public Optional<CatalogLockContext> lockContext() {
Optional<CatalogLockContextFactory> catalogLockContextFactory = lockContextFactory();
if (!catalogLockContextFactory.isPresent()) {
return super.lockContext();
}
return catalogLockContextFactory.map(
factory ->
factory.createLockContext(
extractLockConfiguration(catalogOptions.toMap())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.TableType;

import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;

/** Factory to create {@link FileSystemCatalog}. */
Expand All @@ -40,6 +42,9 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
throw new IllegalArgumentException(
"Only managed table is supported in File system catalog.");
}
if (context.options().get(LOCK_ENABLED) && context.options().get(LOCK_TYPE) == null) {
throw new IllegalArgumentException("Please configure the lock type correctly.");
}
return new FileSystemCatalog(fileIO, warehouse, context.options());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public Optional<CatalogLockFactory> defaultLockFactory() {

@Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, options));
return Optional.of(new JdbcCatalogLockContext(connections, options));
}

private Lock lock(Identifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,131 @@
package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/** Jdbc lock context. */
public class JdbcCatalogLockContext implements CatalogLockContext {

private final JdbcClientPool connections;
private static final String CONF_KEY_PREFIX = "lockConfigKey:";

// Cache access connections for non Jdbc catalogs.
protected static Cache<String, JdbcClientPool> clientPoolCache;
private long evictionInterval;
private String key;

private JdbcClientPool connections;
private final String catalogKey;
private final Options options;
private final String metastore;

public JdbcCatalogLockContext(JdbcClientPool connections, String catalogKey, Options options) {
this.connections = connections;
this.catalogKey = catalogKey;
public JdbcCatalogLockContext(Options options) {
this.options = options;
this.catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
this.metastore = options.get(CatalogOptions.METASTORE);
if (!metastore.equals(JdbcCatalogFactory.IDENTIFIER)) {
this.evictionInterval =
options.get(CatalogOptions.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS);
this.key = extractKey();
init();
}
}

/** Use all connection parameters of jdbc to form a unique connection. */
private String extractKey() {
Map<String, String> props =
JdbcUtils.extractJdbcConfigurationToMap(
options.toMap(), JdbcCatalog.PROPERTY_PREFIX);
List<String> elements = Lists.newArrayList();
props.forEach(
(key, value) -> {
elements.add(value);
});
return CONF_KEY_PREFIX
.concat(catalogKey)
.concat(":")
.concat(StringUtils.join(elements, "."));
}

JdbcCatalogLockContext(JdbcClientPool connections, Options options) {
this(options);
this.connections = connections;
}

protected synchronized void init() {
if (clientPoolCache == null) {
clientPoolCache =
Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener(
(ignored, value, cause) -> ((JdbcClientPool) value).close())
.scheduler(
Scheduler.forScheduledExecutorService(
new ScheduledThreadPoolExecutor(
1,
new ThreadFactory() {
final ThreadFactory defaultFactory =
Executors.defaultThreadFactory();

@Override
public Thread newThread(Runnable r) {
Thread thread =
defaultFactory.newThread(r);
thread.setDaemon(true);
return thread;
}
})))
.build();
}
}

@Override
public Options options() {
return options;
}

public JdbcClientPool connections() {
return connections;
}

public String catalogKey() {
return catalogKey;
}

public JdbcClientPool connections() {
// Cache connection information is required for non jdbc catalogs.
if (!metastore.equals(JdbcCatalogFactory.IDENTIFIER)) {
return clientPoolCache.get(
key,
k -> {
JdbcClientPool connections =
new JdbcClientPool(
options.get(CatalogOptions.CLIENT_POOL_SIZE),
options.get(CatalogOptions.URI.key()),
options.toMap());
try {
JdbcUtils.createDistributedLockTable(connections, options);
} catch (SQLException e) {
throw new RuntimeException(
"Cannot initialize JDBC distributed lock.", e);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted in call to initialize", e);
}
return connections;
});
} else {
// The use of lock in the jdbc catalog is maintained by the jdbc catalog itself
return connections;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockContextFactory;
import org.apache.paimon.options.Options;

/** Factory for jdbc catalog lock context. */
public class JdbcCatalogLockContextFactory implements CatalogLockContextFactory {

@Override
public String identifier() {
return JdbcCatalogLockFactory.IDENTIFIER;
}

@Override
public CatalogLockContext createLockContext(Options lockOptions) {
return new JdbcCatalogLockContext(lockOptions);
}
}

0 comments on commit dd476db

Please sign in to comment.