Skip to content

Defining the Streams

Riccardo Tommasini edited this page Jan 2, 2021 · 4 revisions

The stream package contains all the necessary abstractions to identify, represent, and process streams.

The pacakge is inspired to VoCaLS, which is a vocabulary designed for cataloging streams and share their description as linked data.

The base element is the Web Stream, which is defined as follow 12

A Web data stream is a Web Resource that identifies an un- bounded ordered collection of pairs (o,i), where o is a Web resource, e.g., a named graph, and i is a metadata that can be used to establish an ordering rela- tion, e.g., a timestamp.

In RSP4J, the WebStream interface represent such an abstaction. It does not include any particular logic, but represents the stream as a Web re-source, i.e., it is identi-fied by a HTTP URI so it can be de-referenced and then consumed.

public interface WebStream {

    String uri();

}

The WebDataStream, on the other hand, represents the stream as a data source, i.e., allows production and consumption. Indeed, the interface is parametric in the type of the stream items E. Moreover, it exposes two methods: put and addConsumer: the former allows injection of timestamped data items of type E; the latter connects the stream to interested consumers, e.g., window operators.

public interface WebDataStream<E> extends WebStream {

    void addConsumer(Consumer<E> windowAssigner);

    void put(E e, long ts);

}

In Yasper

YASPER includes an implementation of the WebDataStream based on RDF commons. In particular, E is bound to a import org.apache.commons.rdf.api.Graph.

public class RDFStream implements WebDataStream<Graph> {

    protected String stream_uri;

    public RDFStream(String stream_uri) {
        this.stream_uri = stream_uri;
    }

    protected List<Consumer<Graph>> consumers = new ArrayList<>();


    @Override
    public void addConsumer(Consumer<Graph> c) {
        consumers.add(c);
    }

    @Override
    public void put(Graph e, long ts) {
        consumers.forEach(graphConsumer -> graphConsumer.notify(e, ts));
    }

    @Override
    public String uri() {
        return stream_uri;
    }

}

Putting all together: Colorwave

Alongside the documentation of RSP4J, we will use the ColorWave running example, which was originally presented during a series of tutorials at ESWC, ISWC, ICWE, RW, and TheWebConf.

In the Colorwave running example, we observe a stream of colors (or colored boxes). Each element is a timestamped color observation made by a sensor.

Writing to a Stream

RDFStream stream = new RDFStream("https:linkeddata.stream/colorstream");

RDF rdf  = instance = RDFUtils.getInstance();
IRI type = RDFUtils.getInstance().createIRI("rdf:type");

Graph graph =rdf.createGraph();
graph.add(rdf.createTriple(rdf.createIRI("6e852dd4"), type, rdf.createIRI("color:Blue")));
stream.put(graph, 1000);

graph = RDFUtils.getInstance().createGraph();
graph.add(rdf.createTriple(rdf.createIRI("507f704d"), type, rdf.createIRI("color:Red")));
stream.put(graph, 1999);

graph = RDFUtils.getInstance().createGraph();
graph.add(rdf.createTriple(rdf.createIRI("80350a4b"), type, rdf.createIRI("color:Yellow")));
stream.put(graph, 2001);

graph = RDFUtils.getInstance().createGraph();
graph.add(rdf.createTriple(rdf.createIRI("84b0350a"), type, rdf.createIRI("color:Green")));
stream.put(graph, 3000);