Skip to content

Commit

Permalink
Merge pull request #184 from moxygen2001/main
Browse files Browse the repository at this point in the history
Support W3C Baggage specification
  • Loading branch information
hexiaofeng authored Dec 30, 2024
2 parents 390d473 + ce23ca9 commit c8261d6
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
@Setter
public class TransmitConfig {

public static final String DEFAULT_PROPAGATION = "defaultPropagation";

/**
* A collection of keys that need to be transmitted.
*/
Expand All @@ -41,6 +43,11 @@ public class TransmitConfig {
*/
private Set<String> suffixes;

/**
* Transmit type, W3cBaggage as the default selection
*/
private String type = "W3cBaggage";

/**
* Thread transmit config
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.jd.live.agent.governance.context;

import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.governance.context.bag.Cargo;
import com.jd.live.agent.governance.context.bag.CargoRequire;
import com.jd.live.agent.governance.context.bag.CargoRequires;
import com.jd.live.agent.governance.context.bag.Carrier;
import com.jd.live.agent.governance.request.HeaderReader;
import com.jd.live.agent.governance.request.HeaderWriter;

import java.util.*;

@Injectable
@Extension("LivePropagation")
public class LivePropagation implements Propagation {
private final CargoRequires require;
@Inject
private List<CargoRequire> requires;

{
this.require = new CargoRequires(requires);
}

@Override
public void write(Carrier carrier, HeaderWriter writer) {
Collection<Cargo> cargos = carrier.getCargos();
if (cargos != null) {
List<String> values;
for (Cargo cargo : cargos) {
values = cargo.getValues();
int size = values == null ? 0 : values.size();
switch (size) {
case 0:
writer.setHeader(cargo.getKey(), null);
break;
case 1:
writer.setHeader(cargo.getKey(), values.get(0));
break;
default:
for (String value : values) {
writer.setHeader(cargo.getKey(), value);
}
}
}
}
}

@Override
public void read(Carrier carrier, HeaderReader reader) {
Iterator<String> headerNames = reader.getHeaderNames();
while (headerNames.hasNext()) {
String headerName = headerNames.next();
List<String> headerValues = reader.getHeaders(headerName);
if (require.match(headerName) && headerValues != null && !headerValues.isEmpty()) {
carrier.addCargo(new Cargo(headerName, new ArrayList<>(headerValues)));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.jd.live.agent.governance.context;

import com.jd.live.agent.governance.context.bag.Carrier;
import com.jd.live.agent.governance.request.HeaderReader;
import com.jd.live.agent.governance.request.HeaderWriter;

public interface Propagation {
void write(Carrier carrier, HeaderWriter writer);

void read(Carrier carrier, HeaderReader reader);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.jd.live.agent.governance.context;

import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.governance.context.bag.Cargo;
import com.jd.live.agent.governance.context.bag.CargoRequire;
import com.jd.live.agent.governance.context.bag.CargoRequires;
import com.jd.live.agent.governance.context.bag.Carrier;
import com.jd.live.agent.governance.request.HeaderReader;
import com.jd.live.agent.governance.request.HeaderWriter;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.*;

@Injectable
@Extension("W3cBaggagePropagation")
public class W3cBaggagePropagation implements Propagation {
private static final Logger logger = LoggerFactory.getLogger(W3cBaggagePropagation.class);

private final CargoRequires require;
@Inject
private List<CargoRequire> requires;

{
this.require = new CargoRequires(requires);
}

@Override
public void write(Carrier carrier, HeaderWriter writer) {
StringBuilder baggage = new StringBuilder();
Collection<Cargo> cargos = carrier.getCargos();

if (cargos != null) {
for (Cargo cargo : cargos) {
if (cargo.getValues() != null) {
List<String> values = new ArrayList<>();
for (String value : cargo.getValues()) {
try {
String encodedValue = URLEncoder.encode(value, "UTF-8");
values.add(encodedValue);
} catch (UnsupportedEncodingException e) {
logger.error("URL encoding failed for value: " + value, e);
}
}

if (!values.isEmpty()) {
baggage.append(cargo.getKey()).append("=")
.append(String.join(";", values)).append(",");
}
}
}
}

if (baggage.length() > 0) {
baggage.setLength(baggage.length() - 1);
writer.setHeader("baggage", baggage.toString());
}
}

@Override
public void read(Carrier carrier, HeaderReader reader) {
String baggage = reader.getHeader("baggage");
if (baggage == null || baggage.isEmpty())
return;

String[] keyValuePairs = baggage.split(",");
for (String pair : keyValuePairs) {
String[] keyValue = pair.split("=", 2);
if (keyValue.length == 2) {
String headerName = keyValue[0];
if (require.match(headerName)) {
List<String> headerValues = new ArrayList<>();
String[] values = keyValue[1].split(";");
for (String value : values) {
try {
String decodedValue = URLDecoder.decode(value, "UTF-8");
headerValues.add(decodedValue);
} catch (UnsupportedEncodingException e) {
System.err.println("URL decoding failed for value: " + value);
}
}

if (!headerValues.isEmpty()) {
carrier.addCargo(new Cargo(headerName, headerValues));
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@
import com.jd.live.agent.core.service.ConfigService;
import com.jd.live.agent.core.util.Futures;
import com.jd.live.agent.core.util.time.Timer;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.config.MonitorConfig;
import com.jd.live.agent.governance.config.RegistryConfig;
import com.jd.live.agent.governance.config.ServiceConfig;
import com.jd.live.agent.governance.config.*;
import com.jd.live.agent.governance.context.Propagation;
import com.jd.live.agent.governance.event.TrafficEvent;
import com.jd.live.agent.governance.event.TrafficEvent.ActionType;
import com.jd.live.agent.governance.invoke.InvocationContext;
Expand Down Expand Up @@ -168,6 +166,10 @@ public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, Ex

private List<String> serviceSyncers;

@Getter
@Inject
private Map<String, Propagation> propagations;

private final AtomicBoolean warmup = new AtomicBoolean(false);

@Override
Expand Down Expand Up @@ -223,6 +225,16 @@ public void apply(InjectSource source) {
source.add(PolicySupervisor.COMPONENT_POLICY_SUPPLIER, this);
source.add(InvocationContext.COMPONENT_INVOCATION_CONTEXT, this);
if (governanceConfig != null) {
switch (governanceConfig.getTransmitConfig().getType()) {
case "W3cBaggage":
source.add(TransmitConfig.DEFAULT_PROPAGATION, propagations.get("W3cBaggagePropagation"));
break;
case "Live":
source.add(TransmitConfig.DEFAULT_PROPAGATION, propagations.get("LivePropagation"));
break;
default:
break;
}
source.add(GovernanceConfig.COMPONENT_GOVERNANCE_CONFIG, governanceConfig);
source.add(ServiceConfig.COMPONENT_SERVICE_CONFIG, governanceConfig.getServiceConfig());
source.add(RegistryConfig.COMPONENT_REGISTRY_CONFIG, governanceConfig.getRegistryConfig());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.jd.live.agent.governance.request;

import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

public interface HeaderReader {
Iterator<String> getHeaderNames();

List<String> getHeaders(String key);

default String getHeader(String key) {
List<String> values = getHeaders(key);
return values.isEmpty() ? null : values.get(0);
}

default List<String> getHeader(String key, Function<String, List<String>> func) {
String value = getHeader(key);
return value == null ? null : func.apply(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.jd.live.agent.governance.request;

public interface HeaderWriter {
void setHeader(String key, String value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.jd.live.agent.governance.request;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class MultiMapHeader implements HeaderReader, HeaderWriter {
private Map<String, List<String>> map;

private BiConsumer<String, String> setHeader;

public MultiMapHeader(Supplier<Map<String, List<String>>> getHeader, BiConsumer<String, String> setHeader) {
this.setHeader = setHeader;
this.map = getHeader.get() == null ? new HashMap<>() : getHeader.get();
}

@Override
public Iterator<String> getHeaderNames() {
return map.keySet().iterator();
}

@Override
public List<String> getHeaders(String key) {
return map.get(key) == null ? null : map.get(key);
}

@Override
public void setHeader(String key, String value) {
setHeader.accept(key, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.jd.live.agent.governance.request;

import com.jd.live.agent.core.util.tag.Label;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class ObjectMapHeader implements HeaderReader, HeaderWriter {
private Map<String, Object> map;

private BiConsumer<String, String> setHeader;

public ObjectMapHeader(Supplier<Map<String, Object>> getHeader, BiConsumer<String, String> setHeader) {
this.setHeader = setHeader;
this.map = getHeader.get() == null ? new HashMap<>() : getHeader.get();
}

@Override
public Iterator<String> getHeaderNames() {
return map.keySet().iterator();
}

@Override
public List<String> getHeaders(String key) {
Object value = map.get(key);
return value == null ? null : Label.parseValue(value.toString());
}

@Override
public void setHeader(String key, String value) {
setHeader.accept(key, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.jd.live.agent.governance.request;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class StringMapHeader implements HeaderReader, HeaderWriter {
private Map<String, String> map;

private BiConsumer<String, String> setHeader;

public StringMapHeader(Supplier<Map<String, String>> getHeader, BiConsumer<String, String> setHeader) {
this.setHeader = setHeader;
this.map = getHeader.get() == null ? new HashMap<>() : getHeader.get();
}

@Override
public Iterator<String> getHeaderNames() {
return map.keySet().iterator();
}

@Override
public List<String> getHeaders(String key) {
return map.get(key) == null ? null : Collections.singletonList(map.get(key));
}

@Override
public void setHeader(String key, String value) {
setHeader.accept(key, value);
}
}

0 comments on commit c8261d6

Please sign in to comment.