-
Notifications
You must be signed in to change notification settings - Fork 404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue 6884: Random Read Perf Tuning - Token Cache #6885
base: master
Are you sure you want to change the base?
Conversation
client/src/main/java/io/pravega/client/segment/impl/EventSegmentReaderImpl.java
Outdated
Show resolved
Hide resolved
client/src/main/java/io/pravega/client/segment/impl/SegmentInputStreamFactoryImpl.java
Outdated
Show resolved
Hide resolved
|
||
protected DelegationTokenProvider getDelegationTokenProvider(Segment segment) { | ||
log.debug("getDelegationTokenProvider, scopedName: {}, cacheInfo: {}", segment.getScopedName(), tokenProviderCache.printCache()); | ||
DelegationTokenProvider tokenProvider = tokenProviderCache.get(segment.getScopedName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than using a per-segment cache it makes more sense to have a per stream cache. It is possible to create a DelegationTokenProvider given a stream. If we take advantage of this we will be able to consolidate entries in the cache.
client/src/main/java/io/pravega/client/segment/impl/SegmentInputStreamFactoryImpl.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
protected DelegationTokenProvider createDelegationTokenProvider(Segment segment) { | ||
return DelegationTokenProviderFactory.create(controller, segment, AccessOperation.READ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the stream rather than the segment. (See above)
client/src/main/java/io/pravega/client/segment/impl/SegmentInputStreamFactoryImpl.java
Outdated
Show resolved
Hide resolved
client/src/main/java/io/pravega/client/segment/impl/SegmentInputStreamFactoryImpl.java
Outdated
Show resolved
Hide resolved
client/src/test/java/io/pravega/client/segment/impl/SegmentInputStreamFactoryImplTest.java
Outdated
Show resolved
Hide resolved
DelegationTokenProvider mockDelegationTokenProvider = mock(DelegationTokenProvider.class); | ||
tokenProviderSimpleCache.put("1", mockDelegationTokenProvider); | ||
|
||
Thread.sleep(60000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sleep is not allowed in unit tests. We run over 800 tests and they complete in 13 minutes. Please insure that your tests do not take disproportionate time. They should run in a few milliseconds. To do this we need all tests operated based on time to use a Supplier<Long>
instead of just using systemTimeMillis
that was an alternate implementation can be provided, or alternatively find a way to use a mock or have test methods to make the test deterministic without relying on time.
client/src/test/java/io/pravega/client/segment/impl/SegmentInputStreamFactoryImplTest.java
Outdated
Show resolved
Hide resolved
|
||
private static final Logger log = LoggerFactory.getLogger(SegmentInputStreamFactoryImpl.class); | ||
private static final int CACHE_MAX_SIZE = 100; | ||
private static final int EXPIRATION_TIME_MINUTE = 60; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it for every 60 minutes token gets evicted from cache? and cache is updated with new token in very next call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this should be in milis. That is more standard.
Looks like client Junit test is failing SegmentInputStreamFactoryImplTest.unnecessary Mockito stubbings FAILED, Check https://github.com/pravega/pravega/actions/runs/3150227954/jobs/5122840362 for more details |
008c6e2
to
8b5caec
Compare
6411a61
to
f0faab0
Compare
private final Controller controller; | ||
private final ConnectionPool cp; | ||
private SimpleCache<String, DelegationTokenProvider> tokenProviderCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I have some comments:
- Client tests are failing because they are terminating with code 99. If you look to our guidelines (https://github.com/pravega/pravega/wiki/Contributing#builddebugging), this may indicate that at some point of your test some resource is being left open. Please, check this when you have the time.
- Have we executed a performance regression test with this change? That would be important to do before being safe when merging this PR.
- What would be the performance implication of this change when security is enabled compared to when it is not? Maybe we need to test Pravega with this change and security enabled.
public class SegmentInputStreamFactoryImpl implements SegmentInputStreamFactory { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(SegmentInputStreamFactoryImpl.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally we use @Slf4j
annotation that creates the logger transparently.
this.controller = controller; | ||
this.cp = cp; | ||
this.tokenProviderCache = new SimpleCache<>(CACHE_MAX_SIZE, | ||
Duration.ofMillis(EXPIRATION_TIME_MILLIS), (k, v) -> log.info("key: {} is evicted from tokenProviderCache.", k)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this log to be info
level? I think it may be too verbose, so please consider making it debug
public class SegmentInputStreamFactoryImpl implements SegmentInputStreamFactory { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(SegmentInputStreamFactoryImpl.class); | ||
private static final int CACHE_MAX_SIZE = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know the heap space of the cache with 100 elements? Why 100 concretely? Do we have an idea of when elements are added or removed from this cache to ensure that this number is good enough?
@@ -88,4 +108,23 @@ public SegmentInputStream createInputStreamForSegment(Segment segment, Delegatio | |||
async.getConnection(); | |||
return new SegmentInputStreamImpl(async, startOffset); | |||
} | |||
|
|||
DelegationTokenProvider getDelegationTokenProvider(final String scope, final String streamName) { | |||
final String scopedStreamName = NameUtils.getScopedStreamName(scope, streamName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know how expensive this String operation is? This is being executed on a per-read basis? If so, we may want to think how to get rid of this string operation.
Change log description
2022-08-26 07:42:36,610 398197 [ClientSocketReaders-5] WARN io.pravega.client.connection.impl.FlowHandler: [] - No ReplyProcessor found for the provided flowId 0. Ignoring response
Purpose of the change
Fixes #6884
What the code does
(Detailed description of the code changes)
Test Result
//before perf tuning, result of query result.
//After perf tuning