Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support W3C Baggage specification #184

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
@Setter
public class TransmitConfig {

public static final String DEFAULT_PROPAGATION = "defaultPropagation";

/**
* A collection of keys that need to be transmitted.
*/
Expand All @@ -41,6 +43,11 @@ public class TransmitConfig {
*/
private Set<String> suffixes;

/**
* Transmit type, W3cBaggage as the default selection
*/
private String type = "W3cBaggage";

/**
* Thread transmit config
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.jd.live.agent.governance.context;

import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.governance.context.bag.Cargo;
import com.jd.live.agent.governance.context.bag.CargoRequire;
import com.jd.live.agent.governance.context.bag.CargoRequires;
import com.jd.live.agent.governance.context.bag.Carrier;
import com.jd.live.agent.governance.request.HeaderReader;
import com.jd.live.agent.governance.request.HeaderWriter;

import java.util.*;

@Injectable
@Extension("LivePropagation")
public class LivePropagation implements Propagation {
private final CargoRequires require;
@Inject
private List<CargoRequire> requires;

{
this.require = new CargoRequires(requires);
}

@Override
public void write(Carrier carrier, HeaderWriter writer) {
Collection<Cargo> cargos = carrier.getCargos();
if (cargos != null) {
List<String> values;
for (Cargo cargo : cargos) {
values = cargo.getValues();
int size = values == null ? 0 : values.size();
switch (size) {
case 0:
writer.setHeader(cargo.getKey(), null);
break;
case 1:
writer.setHeader(cargo.getKey(), values.get(0));
break;
default:
for (String value : values) {
writer.setHeader(cargo.getKey(), value);
}
}
}
}
}

@Override
public void read(Carrier carrier, HeaderReader reader) {
Iterator<String> headerNames = reader.getHeaderNames();
while (headerNames.hasNext()) {
String headerName = headerNames.next();
List<String> headerValues = reader.getHeaders(headerName);
if (require.match(headerName) && headerValues != null && !headerValues.isEmpty()) {
carrier.addCargo(new Cargo(headerName, new ArrayList<>(headerValues)));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.jd.live.agent.governance.context;

import com.jd.live.agent.governance.context.bag.Carrier;
import com.jd.live.agent.governance.request.HeaderReader;
import com.jd.live.agent.governance.request.HeaderWriter;

public interface Propagation {
void write(Carrier carrier, HeaderWriter writer);

void read(Carrier carrier, HeaderReader reader);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.jd.live.agent.governance.context;

import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.governance.context.bag.Cargo;
import com.jd.live.agent.governance.context.bag.CargoRequire;
import com.jd.live.agent.governance.context.bag.CargoRequires;
import com.jd.live.agent.governance.context.bag.Carrier;
import com.jd.live.agent.governance.request.HeaderReader;
import com.jd.live.agent.governance.request.HeaderWriter;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.*;

@Injectable
@Extension("W3cBaggagePropagation")
public class W3cBaggagePropagation implements Propagation {
private static final Logger logger = LoggerFactory.getLogger(W3cBaggagePropagation.class);

private final CargoRequires require;
@Inject
private List<CargoRequire> requires;

{
this.require = new CargoRequires(requires);
}

@Override
public void write(Carrier carrier, HeaderWriter writer) {
StringBuilder baggage = new StringBuilder();
Collection<Cargo> cargos = carrier.getCargos();

if (cargos != null) {
for (Cargo cargo : cargos) {
if (cargo.getValues() != null) {
List<String> values = new ArrayList<>();
for (String value : cargo.getValues()) {
try {
String encodedValue = URLEncoder.encode(value, "UTF-8");
values.add(encodedValue);
} catch (UnsupportedEncodingException e) {
logger.error("URL encoding failed for value: " + value, e);
}
}

if (!values.isEmpty()) {
baggage.append(cargo.getKey()).append("=")
.append(String.join(";", values)).append(",");
}
}
}
}

if (baggage.length() > 0) {
baggage.setLength(baggage.length() - 1);
writer.setHeader("baggage", baggage.toString());
}
}

@Override
public void read(Carrier carrier, HeaderReader reader) {
String baggage = reader.getHeader("baggage");
if (baggage == null || baggage.isEmpty())
return;

String[] keyValuePairs = baggage.split(",");
for (String pair : keyValuePairs) {
String[] keyValue = pair.split("=", 2);
if (keyValue.length == 2) {
String headerName = keyValue[0];
if (require.match(headerName)) {
List<String> headerValues = new ArrayList<>();
String[] values = keyValue[1].split(";");
for (String value : values) {
try {
String decodedValue = URLDecoder.decode(value, "UTF-8");
headerValues.add(decodedValue);
} catch (UnsupportedEncodingException e) {
System.err.println("URL decoding failed for value: " + value);
}
}

if (!headerValues.isEmpty()) {
carrier.addCargo(new Cargo(headerName, headerValues));
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@
import com.jd.live.agent.core.service.ConfigService;
import com.jd.live.agent.core.util.Futures;
import com.jd.live.agent.core.util.time.Timer;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.config.MonitorConfig;
import com.jd.live.agent.governance.config.RegistryConfig;
import com.jd.live.agent.governance.config.ServiceConfig;
import com.jd.live.agent.governance.config.*;
import com.jd.live.agent.governance.context.Propagation;
import com.jd.live.agent.governance.event.TrafficEvent;
import com.jd.live.agent.governance.event.TrafficEvent.ActionType;
import com.jd.live.agent.governance.invoke.InvocationContext;
Expand Down Expand Up @@ -168,6 +166,10 @@ public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, Ex

private List<String> serviceSyncers;

@Getter
@Inject
private Map<String, Propagation> propagations;

private final AtomicBoolean warmup = new AtomicBoolean(false);

@Override
Expand Down Expand Up @@ -223,6 +225,16 @@ public void apply(InjectSource source) {
source.add(PolicySupervisor.COMPONENT_POLICY_SUPPLIER, this);
source.add(InvocationContext.COMPONENT_INVOCATION_CONTEXT, this);
if (governanceConfig != null) {
switch (governanceConfig.getTransmitConfig().getType()) {
case "W3cBaggage":
source.add(TransmitConfig.DEFAULT_PROPAGATION, propagations.get("W3cBaggagePropagation"));
break;
case "Live":
source.add(TransmitConfig.DEFAULT_PROPAGATION, propagations.get("LivePropagation"));
break;
default:
break;
}
source.add(GovernanceConfig.COMPONENT_GOVERNANCE_CONFIG, governanceConfig);
source.add(ServiceConfig.COMPONENT_SERVICE_CONFIG, governanceConfig.getServiceConfig());
source.add(RegistryConfig.COMPONENT_REGISTRY_CONFIG, governanceConfig.getRegistryConfig());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.jd.live.agent.governance.request;

import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

public interface HeaderReader {
Iterator<String> getHeaderNames();

List<String> getHeaders(String key);

default String getHeader(String key) {
List<String> values = getHeaders(key);
return values.isEmpty() ? null : values.get(0);
}

default List<String> getHeader(String key, Function<String, List<String>> func) {
String value = getHeader(key);
return value == null ? null : func.apply(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.jd.live.agent.governance.request;

public interface HeaderWriter {
void setHeader(String key, String value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.jd.live.agent.governance.request;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class MultiMapHeader implements HeaderReader, HeaderWriter {
private Map<String, List<String>> map;

private BiConsumer<String, String> setHeader;

public MultiMapHeader(Supplier<Map<String, List<String>>> getHeader, BiConsumer<String, String> setHeader) {
this.setHeader = setHeader;
this.map = getHeader.get() == null ? new HashMap<>() : getHeader.get();
}

@Override
public Iterator<String> getHeaderNames() {
return map.keySet().iterator();
}

@Override
public List<String> getHeaders(String key) {
return map.get(key) == null ? null : map.get(key);
}

@Override
public void setHeader(String key, String value) {
setHeader.accept(key, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.jd.live.agent.governance.request;

import com.jd.live.agent.core.util.tag.Label;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class ObjectMapHeader implements HeaderReader, HeaderWriter {
private Map<String, Object> map;

private BiConsumer<String, String> setHeader;

public ObjectMapHeader(Supplier<Map<String, Object>> getHeader, BiConsumer<String, String> setHeader) {
this.setHeader = setHeader;
this.map = getHeader.get() == null ? new HashMap<>() : getHeader.get();
}

@Override
public Iterator<String> getHeaderNames() {
return map.keySet().iterator();
}

@Override
public List<String> getHeaders(String key) {
Object value = map.get(key);
return value == null ? null : Label.parseValue(value.toString());
}

@Override
public void setHeader(String key, String value) {
setHeader.accept(key, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.jd.live.agent.governance.request;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class StringMapHeader implements HeaderReader, HeaderWriter {
private Map<String, String> map;

private BiConsumer<String, String> setHeader;

public StringMapHeader(Supplier<Map<String, String>> getHeader, BiConsumer<String, String> setHeader) {
this.setHeader = setHeader;
this.map = getHeader.get() == null ? new HashMap<>() : getHeader.get();
}

@Override
public Iterator<String> getHeaderNames() {
return map.keySet().iterator();
}

@Override
public List<String> getHeaders(String key) {
return map.get(key) == null ? null : Collections.singletonList(map.get(key));
}

@Override
public void setHeader(String key, String value) {
setHeader.accept(key, value);
}
}
Loading