diff --git a/src/main/java/com/uber/cadence/largeblob/Configuration.java b/src/main/java/com/uber/cadence/largeblob/Configuration.java new file mode 100644 index 000000000..69d3e1fbe --- /dev/null +++ b/src/main/java/com/uber/cadence/largeblob/Configuration.java @@ -0,0 +1,78 @@ +package com.uber.cadence.largeblob; + +import com.uber.cadence.converter.DataConverter; +import com.uber.cadence.converter.JsonDataConverter; + +import java.time.Duration; + +public class Configuration { + + private Storage storage; + private DataConverter dataConverter; + private Long maxBytes; + private Duration ttl; + + private Configuration() { + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private Storage storage; + private DataConverter dataConverter = JsonDataConverter.getInstance(); + private Duration ttl; + private Long maxBytes = 4096L; + + public Configuration build() { + if (storage == null) { + throw new IllegalArgumentException("storage must be provided"); + } + + Configuration configuration = new Configuration(); + configuration.storage = this.storage; + configuration.dataConverter = this.dataConverter; + configuration.ttl = this.ttl; + configuration.maxBytes = this.maxBytes; + return configuration; + } + + public Builder setDataConverter(DataConverter dataConverter) { + this.dataConverter = dataConverter; + return this; + } + + public Builder setStorage(Storage storage) { + this.storage = storage; + return this; + } + + public Builder setTtl(Duration ttl) { + this.ttl = ttl; + return this; + } + + public Builder setMaxBytes(Long maxBytes) { + this.maxBytes = maxBytes; + return this; + } + } + + public Storage getStorage() { + return storage; + } + + public DataConverter getDataConverter() { + return dataConverter; + } + + public Duration getTtl() { + return ttl; + } + + public Long getMaxBytes() { + return maxBytes; + } +} diff --git a/src/main/java/com/uber/cadence/largeblob/Future.java b/src/main/java/com/uber/cadence/largeblob/Future.java new file mode 100644 index 000000000..c32391196 --- /dev/null +++ b/src/main/java/com/uber/cadence/largeblob/Future.java @@ -0,0 +1,89 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.largeblob; + +import com.uber.cadence.converter.DataConverterException; + +import java.io.IOException; + +/** + * Future is used for passing around potentially large parameters between activities. Future can never be used in Workflow code because it uses an external storage and that will make + * workflow code non deterministic. Small amounts of data are stored in the future instance itself, while larger amounts of data are stored in an external storage. + */ +public class Future { + + private byte[] encoded; + private String url; + private final Configuration config; + private final Class clazz; + + public Future(Configuration config, Class clazz, byte[] encoded) { + this.config = config; + this.encoded = encoded; + this.clazz = clazz; + } + + public Future(Configuration config, Class clazz, String url) { + this.url = url; + this.config = config; + this.clazz = clazz; + } + + public Future(T obj, Configuration configuration) throws IOException { + byte[] bytes; + try { + bytes = configuration.getDataConverter().toData(obj); + } catch (DataConverterException e) { + throw new IOException(e); + } + + this.config = configuration; + this.clazz = (Class) obj.getClass(); + if (bytes.length <= configuration.getMaxBytes()) { + this.encoded = bytes; + } else { + this.url = configuration.getStorage().put(bytes); + } + } + + public T get() throws IOException { + if (encoded != null) { + return config.getDataConverter().fromData(encoded, clazz, clazz); + } + + if (url != null) { + return config .getDataConverter().fromData(config.getStorage().get(url), clazz, clazz); + } + + return null; + } + + public void delete() throws IOException { + if (this.encoded == null) { + config.getStorage().delete(url); + } else { + this.encoded = null; + } + } + + public byte[] getEncoded() { + return encoded; + } + + public String getUrl() { + return url; + } +} diff --git a/src/main/java/com/uber/cadence/largeblob/README.md b/src/main/java/com/uber/cadence/largeblob/README.md new file mode 100644 index 000000000..94a40c44f --- /dev/null +++ b/src/main/java/com/uber/cadence/largeblob/README.md @@ -0,0 +1,19 @@ +This package contains an Activity-oriented Future, which can semi-transparently upload large amounts of data to +external stores, effectively avoiding Cadence's per-event / per-workflow size limits. + +Multiple Futures can be used in a single Activity's arguments or response values, e.g. in separate struct fields or +slices, but each Future will be unaware of the others. Any max-size limit you choose will not be shared between all +Futures (they each have their own limit), so the cumulative size of your response may be much larger. +Take care to keep WithMaxBytes limits low enough for all values, and be aware that the URLs that replace the data +also take space - dozens are fine, thousands are probably not. + +# Caveats + +This tool comes with some semi-severe caveats, but if they are acceptable for your use, it may allow you to easily +reduce your Workflow's history's data use: + +# Workflows will not have access to data + +By design, this tool does not allow you to access wrapped data in your workflows. You can use the type to forward data +between activities, but not inspect the contents - they may not exist, and you cannot safely perform the download in +your workflow. diff --git a/src/main/java/com/uber/cadence/largeblob/Storage.java b/src/main/java/com/uber/cadence/largeblob/Storage.java new file mode 100644 index 000000000..13f2f89f4 --- /dev/null +++ b/src/main/java/com/uber/cadence/largeblob/Storage.java @@ -0,0 +1,63 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.largeblob; + +import java.io.IOException; +import java.time.Duration; + +/** + * Storage is an abstraction for storing large parameters to access inside of activities.0 + */ +public interface Storage { + + /** + * Gets the data based on uri provided + * @param uri uri. + * @return the data as a byte array. + * @throws IOException should be thrown in any implementation class in case of problems accessing the datastore. + */ + byte[] get(String uri) throws IOException; + + /** + * Stores data based on uri provided. + * @param bytes bytes. + * @throws IOException should be thrown in any implementation class in case of problems with the datastore + */ + String put(byte[] bytes) throws IOException; + + /** + * Stores data based on uri provided. + * @param bytes bytes. + * @param ttl ttl is used for storages like s3 to define the total time to store the object. + * @throws IOException should be thrown in any implementation class in case of problems with the datastore + */ + String put(byte[] bytes, Duration ttl) throws IOException; + + /** + * Stores data based on uri provided. + * @param key of the data. + * @param bytes bytes. + * @throws IOException should be thrown in any implementation class in case of problems with the datastore + */ + String put(String key, byte[] bytes) throws IOException; + + /** + * Deletes data based on uri provided. + * @param uri uri. + * @throws IOException should be thrown in any implementation class in case of problems with the datastore + */ + void delete(String uri) throws IOException; +} diff --git a/src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java b/src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java new file mode 100644 index 000000000..9894b8e7c --- /dev/null +++ b/src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java @@ -0,0 +1,56 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.largeblob.impl; + +import com.uber.cadence.largeblob.Storage; +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class InMemoryStorage implements Storage { + + private final Map storage = new HashMap<>(); + + @Override + public byte[] get(String uri) throws IOException { + return storage.get(uri); + } + + @Override + public String put(byte[] bytes) throws IOException { + String uuid = UUID.randomUUID().toString(); + storage.put(uuid, bytes); + return uuid; + } + + @Override + public String put(byte[] bytes, Duration ttl) throws IOException { + return put(bytes); + } + + @Override + public String put(String key, byte[] bytes) throws IOException { + storage.put(key, bytes); + return key; + } + + @Override + public void delete(String uri) throws IOException { + storage.remove(uri); + } +} diff --git a/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java b/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java new file mode 100644 index 000000000..91d51ef48 --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java @@ -0,0 +1,104 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.largeblob; + +import com.uber.cadence.largeblob.Configuration; +import com.uber.cadence.largeblob.Future; +import com.uber.cadence.largeblob.Storage; +import com.uber.cadence.largeblob.impl.InMemoryStorage; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +@RunWith(MockitoJUnitRunner.class) +public class FutureTest { + + private final Storage storage = new InMemoryStorage(); + + @Test + public void testPutSmallElement() throws Exception { + Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build(); + Future future = new Future("testValue", config); + + assertEquals("testValue", future.get()); + assertNull(future.getUrl()); + } + + @Test + public void testLargerValuesGetPutInStorage() throws Exception { + Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build(); + String testValue = "testValuetestValuetestValue"; + Future future = new Future(testValue, config); + + assertEquals("testValuetestValuetestValue", future.get()); + assertNotNull(storage.get(future.getUrl())); + } + + @Test + public void testDeleteValueFromStorage() throws Exception { + Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build(); + String testValue = "testValuetestValuetestValue"; + Future future = new Future<>(testValue, config); + + assertEquals("testValuetestValuetestValue", future.get()); + assertEquals( + "testValuetestValuetestValue", config.getDataConverter().fromData(storage.get(future.getUrl()), String.class, String.class)); + + future.delete(); + + assertNull(storage.get("test")); + + assertNull(future.get()); + } + + @Test + public void testSmallValueIsDelete() throws Exception { + Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build(); + Future future = new Future<>("testValue", config); + + assertEquals("testValue", future.get()); + assertNull(storage.get(future.getUrl())); + + future.delete(); + assertNull(future.get()); + } + + @Test + public void getWorksWhenInitialisingWithEncodedData() throws Exception { + Configuration configuration = Configuration.newBuilder().setStorage(storage).build(); + Future future = new Future<>(configuration, String.class, "testValue".getBytes(StandardCharsets.UTF_8)); + + assertEquals("testValue", future.get()); + } + + @Test + public void getWorksWhenInitialisingWithUrl() throws Exception { + Configuration configuration = Configuration.newBuilder().setStorage(storage).build(); + Future future = new Future<>(configuration, String.class, "test"); + + assertNull(future.get()); + + storage.put("test", "testValue".getBytes(StandardCharsets.UTF_8)); + + assertEquals("testValue", future.get()); + } +}