Skip to content

Commit

Permalink
Handle verifyLatestProtocolPresent when http is used; Sink has to pas…
Browse files Browse the repository at this point in the history
…s extra parameter for the router URL
  • Loading branch information
dlg99 committed May 4, 2023
1 parent 55d9d88 commit 3d2649a
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_AGGREGATE;
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_CONTROLLER_DISCOVERY_URL;
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_PUSH_TYPE;
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_ROUTER_URL;
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_STORE;

import com.linkedin.venice.meta.Version;
Expand Down Expand Up @@ -251,6 +252,7 @@ private Map<String, String> getConfig(String storeName, String systemName) {
config.put(configPrefix + VENICE_AGGREGATE, "false");
config.put("venice.discover.urls", this.config.getVeniceDiscoveryUrl());
config.put(VENICE_CONTROLLER_DISCOVERY_URL, this.config.getVeniceDiscoveryUrl());
config.put(VENICE_ROUTER_URL, this.config.getVeniceRouterUrl());
config.put(DEPLOYMENT_ID, Utils.getUniqueString("venice-push-id-pulsar-sink"));
config.put(SSL_ENABLED, "false");
if (this.config.getKafkaSaslConfig() != null && !this.config.getKafkaSaslConfig().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public class VeniceSinkConfig implements Serializable {
private static final long serialVersionUID = 1L;

@FieldDoc(defaultValue = "", help = "The url of the Venice controller")
private String veniceDiscoveryUrl = "http://venice-controller:7777";
private String veniceDiscoveryUrl = "http://venice-controller:5555";

@FieldDoc(defaultValue = "", help = "The url of the Venice router")
private String veniceRouterUrl = "http://venice-router:7777";

@FieldDoc(defaultValue = "", help = "SASL configuration for Kafka. See Kafka client documentation for details.")
private String kafkaSaslConfig = "";
Expand Down Expand Up @@ -72,6 +75,11 @@ public String getVeniceDiscoveryUrl() {
return this.veniceDiscoveryUrl;
}

@java.lang.SuppressWarnings("all")
public String getVeniceRouterUrl() {
return this.veniceRouterUrl;
}

@java.lang.SuppressWarnings("all")
public String getKafkaSaslConfig() {
return this.kafkaSaslConfig;
Expand Down Expand Up @@ -111,6 +119,15 @@ public VeniceSinkConfig setVeniceDiscoveryUrl(final String veniceDiscoveryUrl) {
return this;
}

/**
* @return {@code this}.
*/
@java.lang.SuppressWarnings("all")
public VeniceSinkConfig setVeniceRouterUrl(final String veniceRouterUrl) {
this.veniceRouterUrl = veniceRouterUrl;
return this;
}

/**
* @return {@code this}.
*/
Expand Down Expand Up @@ -185,6 +202,12 @@ public boolean equals(final java.lang.Object o) {
? other$veniceDiscoveryUrl != null
: !this$veniceDiscoveryUrl.equals(other$veniceDiscoveryUrl))
return false;
final java.lang.Object this$veniceRouterUrl = this.getVeniceRouterUrl();
final java.lang.Object other$veniceRouterUrl = other.getVeniceRouterUrl();
if (this$veniceRouterUrl == null
? other$veniceRouterUrl != null
: !this$veniceRouterUrl.equals(other$veniceRouterUrl))
return false;
final java.lang.Object this$kafkaSaslConfig = this.getKafkaSaslConfig();
final java.lang.Object other$kafkaSaslConfig = other.getKafkaSaslConfig();
if (this$kafkaSaslConfig == null
Expand Down Expand Up @@ -225,6 +248,8 @@ public int hashCode() {
result = result * PRIME + this.getMaxNumberUnflushedRecords();
final java.lang.Object $veniceDiscoveryUrl = this.getVeniceDiscoveryUrl();
result = result * PRIME + ($veniceDiscoveryUrl == null ? 43 : $veniceDiscoveryUrl.hashCode());
final java.lang.Object $veniceRouterUrl = this.getVeniceRouterUrl();
result = result * PRIME + ($veniceRouterUrl == null ? 43 : $veniceRouterUrl.hashCode());
final java.lang.Object $kafkaSaslConfig = this.getKafkaSaslConfig();
result = result * PRIME + ($kafkaSaslConfig == null ? 43 : $kafkaSaslConfig.hashCode());
final java.lang.Object $kafkaSaslMechanism = this.getKafkaSaslMechanism();
Expand All @@ -239,10 +264,10 @@ public int hashCode() {
@java.lang.Override
@java.lang.SuppressWarnings("all")
public java.lang.String toString() {
return "VeniceSinkConfig(veniceDiscoveryUrl=" + this.getVeniceDiscoveryUrl() + ", kafkaSaslConfig="
+ this.getKafkaSaslConfig() + ", kafkaSaslMechanism=" + this.getKafkaSaslMechanism()
+ ", kafkaSecurityProtocol=" + this.getKafkaSecurityProtocol() + ", storeName=" + this.getStoreName()
+ ", flushIntervalMs=" + this.getFlushIntervalMs() + ", maxNumberUnflushedRecords="
return "VeniceSinkConfig(veniceDiscoveryUrl=" + this.getVeniceDiscoveryUrl() + ", veniceRouterUrl="
+ this.getVeniceRouterUrl() + ", kafkaSaslConfig=" + this.getKafkaSaslConfig() + ", kafkaSaslMechanism="
+ this.getKafkaSaslMechanism() + ", kafkaSecurityProtocol=" + this.getKafkaSecurityProtocol() + ", storeName="
+ this.getStoreName() + ", flushIntervalMs=" + this.getFlushIntervalMs() + ", maxNumberUnflushedRecords="
+ this.getMaxNumberUnflushedRecords() + ")";
}

Expand Down
2 changes: 2 additions & 0 deletions clients/venice-samza/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ dependencies {
implementation project(':internal:venice-client-common')
implementation project(':clients:venice-thin-client')

implementation libraries.httpAsyncClient

implementation libraries.kafka
implementation libraries.log4j2api
implementation libraries.samzaApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class VeniceSystemFactory implements SystemFactory, Serializable {
public static final String VENICE_CHILD_D2_ZK_HOSTS = "venice.child.d2.zk.hosts";

public static final String VENICE_CONTROLLER_DISCOVERY_URL = "venice.controller.discovery.url";
public static final String VENICE_ROUTER_URL = "venice.router.url";

/**
* D2 ZK hosts for Venice Parent Cluster.
Expand Down Expand Up @@ -351,13 +352,16 @@ public SystemProducer getProducer(

if (discoveryUrl.isPresent()) {

String routerUrl = config.get(VENICE_ROUTER_URL);

LOGGER.info("Configs for {} producer: ", systemName);
LOGGER.info("{}{}: {}", prefix, VENICE_STORE, storeName);
LOGGER.info("{}{}: {}", prefix, VENICE_AGGREGATE, veniceAggregate);
LOGGER.info("{}{}: {}", prefix, VENICE_PUSH_TYPE, venicePushType);
LOGGER.info("{}: {}", VENICE_CONTROLLER_DISCOVERY_URL, discoveryUrl.get());
LOGGER.info("{}: {}", VENICE_ROUTER_URL, routerUrl);

return new VeniceSystemProducer(
VeniceSystemProducer p = new VeniceSystemProducer(
discoveryUrl.get(),
storeName,
venicePushType,
Expand All @@ -368,6 +372,8 @@ public SystemProducer getProducer(
sslFactory,
partitioners,
SystemTime.INSTANCE);
p.setRouterUrl(routerUrl);
return p;
}

String veniceParentZKHosts = config.get(VENICE_PARENT_D2_ZK_HOSTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.client.store.transport.D2TransportClient;
import com.linkedin.venice.client.store.transport.HttpTransportClient;
import com.linkedin.venice.client.store.transport.HttpsTransportClient;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
Expand Down Expand Up @@ -154,6 +155,7 @@ public class VeniceSystemProducer implements SystemProducer, Closeable {
private boolean isStarted = false;

private Optional<String> discoveryUrl = Optional.empty();
private Optional<String> routerUrl = Optional.empty();

private VeniceWriter<byte[], byte[], byte[]> veniceWriter = null;
private Optional<RouterBasedPushMonitor> pushMonitor = Optional.empty();
Expand Down Expand Up @@ -318,6 +320,10 @@ public VeniceSystemProducer(
this.time = time;
}

public void setRouterUrl(String routerUrl) {
this.routerUrl = Optional.of(routerUrl);
}

public String getRunningFabric() {
return this.runningFabric;
}
Expand Down Expand Up @@ -418,31 +424,27 @@ public synchronized void start() {
* version in the code base is registered in Venice backend; if not, fail fast in start phase before start writing
* Kafka messages that Venice backend couldn't deserialize.
*/
/* TODO: find a way to discover router url
if (verifyLatestProtocolPresent) {
if (verifyLatestProtocolPresent && routerUrl.isPresent()) {
LOGGER.info("Start verifying the latest protocols at runtime are valid in Venice backend.");
// Discover the D2 service name for the system store
// does not return anything useful in case of discoveryUrl
// e.g. cannot get venice router url
//D2ServiceDiscoveryResponse sysStoreDiscoveryResponse = (D2ServiceDiscoveryResponse) controllerRequestWithRetry(
// () -> ControllerClient.discoverCluster(discoveryUrl.get(), storeName, sslFactory, 1),
// 2);
String kafkaMessageEnvelopSchemaSysStore = AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName();
ClientConfig clientConfigForKafkaMessageEnvelopeSchemaReader =
ClientConfig.defaultGenericClientConfig(kafkaMessageEnvelopSchemaSysStore);
clientConfigForKafkaMessageEnvelopeSchemaReader.setVeniceURL("http://venice-router:7777");
ClientConfig.defaultGenericClientConfig(kafkaMessageEnvelopSchemaSysStore);
clientConfigForKafkaMessageEnvelopeSchemaReader.setVeniceURL(routerUrl.get());

SchemaReader kafkaMessageEnvelopeSchemaReader =
ClientFactory.getSchemaReader(clientConfigForKafkaMessageEnvelopeSchemaReader, null);
ClientFactory.getSchemaReader(clientConfigForKafkaMessageEnvelopeSchemaReader, null);
new SchemaPresenceChecker(kafkaMessageEnvelopeSchemaReader, AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE)
.verifySchemaVersionPresentOrExit();
.verifySchemaVersionPresentOrExit();
LOGGER.info("Successfully verified the latest protocols at runtime are valid in Venice backend.");
} else {
LOGGER.info("Skip verifying the latest protocols at runtime are valid in Venice backend.");
}
*/

transportClient = new HttpTransportClient(discoveryUrl.get());
if (sslFactory.isPresent()) {
transportClient = new HttpsTransportClient(discoveryUrl.get(), sslFactory.get());
} else {
transportClient = new HttpTransportClient(discoveryUrl.get());
}
} else {
this.primaryControllerColoD2Client = getStartedD2Client(primaryControllerColoD2ZKHost);
this.childColoD2Client = getStartedD2Client(veniceChildD2ZkHost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public void testPulsarVeniceSink() throws Exception {
String token =
"eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJzdXBlcnVzZXIifQ.JQpDWJ9oHD743ZyuIw55Qp0bb8xzP6gK0KIWRniF2WnJB1m3v5MsrpfMlmRIlFc3-htWRAFHCc4E0ipj7JU8HjBqLIvVErRseRG-UTM1EprVkj0mk37jXV3ef7gER0KHn9CUKEQPfmTACeKlQ2oV4_qPAZ6HiEt51vzANfZH24vLCIjiOG77Z4s_w2sfgpiodRmhBLFOg_qnQTfGs7TBDWgu4DRoJ6CYZSEcp8q7j8xp_zNVIFGTRjWskocUvedHS9ZsCGZjzuPvRPp19B0VvAjEjtwpa6j7Khvjf4imjp2QHDnZwpCIEp4DSicwM48F5q4k722IdiyTTsVBWy8Cyg";

String sinkConfig = "{\"veniceDiscoveryUrl\":\"" + veniceControllerUrl + "\"," + "\"storeName\":\"t1_n1_s1\","
String sinkConfig = "{\"veniceDiscoveryUrl\":\"" + veniceControllerUrl + "\"," + "\"veniceRouterUrl\":\""
+ veniceRouterUrl + "\"," + "\"storeName\":\"t1_n1_s1\","
+ "\"kafkaSaslMechanism\":\"PLAIN\",\"kafkaSecurityProtocol\":\"SASL_PLAINTEXT\","
+ "\"kafkaSaslConfig\":\"org.apache.kafka.common.security.plain.PlainLoginModule "
+ "required username=\\\"public\\\" password=\\\"token: " + token + "\\\"\\\";\\\");\"}";
Expand Down

0 comments on commit 3d2649a

Please sign in to comment.