forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
upstrem #2
Merged
Merged
upstrem #2
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…ct for schema with type... (#15598)
…cursor compression (#15210)
… auto-update partition (#15589) ### Motivation Currently, the TableViewImpl itself maintains the reader per partition to support auto-update partition, but the reader support muti-partition topic and auto-update partitions, we should reuse the `MultiTopicsReaderImpl` to implement the TableView. ### Modifications Reusing `MultiTopicsReaderImpl` to implement TableView auto-update partition.
…) due to it had been migrated to the repo pulsar-site (#15636)
…view script `start.sh` (#15642)
### Motivation It's because of this issue #13787. Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below: ``` for ( long requestId = 1; i < 5; i++ ){ ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this)); cnx.sendRequestWithId(request1, requestId).thenRun(() -> {}); } ``` The root cause is below snippet: https://github.com/apache/pulsar/blob/c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L994-L1021 If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated. The right way to remove consumer (line 1015) is when the `existingConsumerFuture` is completedExceptionally. Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle `subscribe` correctly on the broker side. ### Modifications Modify the process execution sequence to improve stability
) - release notes https://netty.io/news/2022/05/06/2-1-77-Final.html - improves Alpine / musl compatibility - could help issues such as #14534 #11415 #11224 #10798 - improves shading compatibility - fixes a bug related to the native epoll transport and epoll_pwait2
… decoded (#15622) ### Motivation When I tried to consume a topic via a consumer with Avro schema while the topic was produced by a producer without schema, the consumption failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion` doesn't check if `schemaVersion` is an empty byte array. If yes, a `BytesSchemaVersion` of an empty array will be passed to `cache.get` and then passed to `loadSchema`. https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98 However, `LookupService#getSchema` cannot accept an empty byte array as the version, so `loadSchema` failed. The root cause is that the schema version was set unexpectly when messages were sent by a producer without schema. At broker side, the returned schema version is never null. If the schema version was an empty array, then it means the message doesn't have schema. However, at Java client side, the empty byte array is treated as an existing schema and the schema version field will be set. When consumer receives the message, it will try to load schema whose version is an empty array. ### Modifications - When a producer receives a response whose schema version is an empty byte array, just ignore it. - Make `MesasgeImpl#getSchemaVersion` return null if the schema version is an empty byte array so that the consumer can consume messages produced by older version producers without schema. And return the internal schema for `getRegetReaderSchema` when `getSchemaVersion` returns null. - Fix the existing tests. Since producer without schema won't set the `schema_version` field after this patch, some tests that rely on the precise stats should be modified. - Add `testConsumeAvroMessagesWithoutSchema` to cover the case that messages without schema are compatible with the schema. This patch also modifies the existing behavior when `schemaValidationEnforced` is false and messages are produced by a producer without schema and consumed by a consumer with schema. 1. If the message is incompatible with the schema - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`: > org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY - After: `getSchemaVersion` returns `null` and `getValue` fails with `SchemaSerializationException`. 2. Otherwise (the message is compatible with the schema) - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`. - After: `getSchemaVersion` returns `null` and `getValue` returns the correctly decoded object.
Co-authored-by: nicklixinyang <[email protected]>
* Fix error set jvm gc log dir * fix unit test Co-authored-by: gavingaozhangmin <[email protected]>
… (ExclusiveWithFencing mode) (#15488)
…ageRouter not set (#15677)
…tly in RocksdbMetadataStore (#16005) * Fix LockTimeout when storePut or storeDelete on the same key concurrently * Add cleanup
…tore completely. (#13957) [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. ### Motivation The MLTransactionMetadataStore constructor and openTransactionMetadataStore method are asynchronous. So there may be situations where the store in the Initializing state was put into stores ### Modification Pass in the future to wait for MLTransactionMetadataStore initialization to complete
### Motivation The `authenticationData` field in `ServerCnx` is being mutated to add the `subscription` field that will be passed on to the authorization plugin. The problem is that `authenticationData` is scoped to the whole connection and it should be getting mutated for each consumer that is created on the connection. The current code leads to a race condition where the subscription name used in the authz plugin is already modified while we're looking at it. Instead, we should create a new object and enforce the final modifier.
### Motivation In PulsarService, create a client with an incorrect config. When `tlsEnabled` is `true`, and `brokerClientTlsEnabled` is `false`, users will meet `Failed reason: General OpenSslEngine problem`, due to `tlsTrustCertsFilePath` is incorrect. ### Modifications - Fix check TLS enable - Setup ciphers and protocols - Remove duplicate setTlsTrustCertsFilePath
### Motivation When token expiration, the broker requests the client to refresh the token, then the broker performs `org.apache.pulsar.broker.service.ServerCnx#doAuthentication` when the broker receives the auth response, which uses `org.apache.pulsar.broker.authentication.AuthenticationState#authenticate` to authentication, but the `org.apache.pulsar.broker.authentication.AuthenticationProviderToken.TokenAuthenticationState#authenticate` doesn't do anything, this cause a loop to refresh the token. Right now the token is only validated in the TokenAuthenticationState constructor, we need to add a check to the authenticate method.
… name contains ``-partition-`` (#14920) (#16066) Co-authored-by: lipenghui <[email protected]>
…16068) * [broker] Add config to allow deliverAt time to be strictly honored * Fix checkstyle error (this is what happens why you change names last minute) * Improve documentation; add private final modifiers ### Motivation The current implementation for `InMemoryDelayedDeliveryTracker` allows messages to deliver early when their `deliverAt` time is within `tickTimeMillis` from now. This is an optimization that ensures messages deliver around the `deliverAt` time. However, some use cases require that messages do not deliver before the `deliverAt` time. (Note that the client api includes a `deliverAfter` method that implies messages won't deliver before some duration of time.) In order to support this alternative implementation, this PR adds a broker configuration named `isDelayedDeliveryDeliverAtTimeStrict`. When true, messages will only deliver when the `deliverAt` time is greater than or equal to `now`. Note that a tradeoff here is that messages will be later than the `deliverAt` time. There are two factors that will determine how late messages will get to consumers. The first is the topic's `DelayedDeliveryTickTimeMillis` and the second is the broker's `delayedDeliveryTickTimeMillis`. The first will determine how frequently a timer will be scheduled to deliver delayed messages. The second is used to determine the tick time of the `HashedWheelTimer`, and as a result, can compound with the topic's delay to make a message deliver even later. ### Modifications * Add broker config named `isDelayedDeliveryDeliverAtTimeStrict`. This config defaults to `false` to maintain the original behavior. * Update the `InMemoryDelayedDeliveryTracker#addMessage` method so that it will return false when `deliverAt <= getCutoffTime()` instead of just `deliverAt <= getCutoffTime()`. * Update documentation in several places. * Implement `InMemoryDelayedDeliveryTracker#getCutoffTime` method that returns the right cutoff time based on the value of `isDelayedDeliveryDeliverAtTimeStrict`. This is the core logical change. * Update `InMemoryDelayedDeliveryTracker#updateTimer` so that it will not schedule a tick to run sooner that the most recent tick run plus the `tickTimeMillis`. This will ensure the timer is not run too frequently. It is also backwards compatible since the existing feature will deliver any messages that were within now plus the `tickTimeMillis`. * Add new tests to cover the new configuration. ### Verifying this change New tests are added as part of this change. ### Does this pull request potentially affect one of the following parts: This is a new feature that maintains backwards compatibility.
…16075) This option can be used as an alternative to the --jar option to create a built-in Function. This is the equivalent of the --sink-type/--source-type options of connectors for Functions
### Motivation Currently, some users want to change `narExtractionDirectory` config, but users may be confused, because the `broker.conf` and `standalone.conf` config files don't Include `narExtractionDirectory` config. ### Modifications Add config of NAR extraction directory for the broker configuration
### Motivation Currently, the oauth2 curl example doc has the wrong URL, if we use the command, we will get error code 1020. ```shell curl --request POST \ --url https://dev-kt-aa9ne.us.auth0.com \ --header 'content-type: application/json' \ --data '{ "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x", "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb", "audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/", "grant_type":"client_credentials"}' error code: 1020 ``` ### Modifications Fix security-oauth2 docs curl example OAuth URL
* [doc] Transactions: fix code samples indentantion * Update txn-use.md
…ds in Namespaces async (#15880)
Master Issue: #15631 ### Motivation Currently, when the Proxy server transfer data will have four triggered the user mode and kernel mode of context switching, respectively is the read() / write() call and return when cut, it is inefficient for the Proxy. There is an efficient way of reducing the times of copies of data and reduce CPU load. for linux system, we can use [`splice`](https://man7.org/linux/man-pages/man2/splice.2.html) system call.
…ched messages non-unique sequenceIds (#16098)
…ets than requested (#16100) * KCA: handle preCommit returning earlier offsets * corrected parameter name in the comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
No description provided.