Skip to content

Commit

Permalink
handler: make message filtering async-capable
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Dec 13, 2024
1 parent 6a431f2 commit 693165a
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 14 deletions.
155 changes: 149 additions & 6 deletions src/handler/filter.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2016-2019 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -28,14 +29,24 @@

namespace {

class SkipSelfFilter : public Filter
class SkipSelfFilter : public Filter, public Filter::MessageFilter
{
public:
SkipSelfFilter() :
Filter("skip-self")
{
}

virtual void start(const Filter::Context &context, const QByteArray &content)
{
setContext(context);

Result r;
r.sendAction = sendAction();
r.content = content;
finished(r);
}

virtual SendAction sendAction() const
{
QString user = context().subscriptionMeta.value("user");
Expand All @@ -47,14 +58,24 @@ class SkipSelfFilter : public Filter
}
};

class SkipUsersFilter : public Filter
class SkipUsersFilter : public Filter, public Filter::MessageFilter
{
public:
SkipUsersFilter() :
Filter("skip-users")
{
}

virtual void start(const Filter::Context &context, const QByteArray &content)
{
setContext(context);

Result r;
r.sendAction = sendAction();
r.content = content;
finished(r);
}

virtual SendAction sendAction() const
{
QString user = context().subscriptionMeta.value("user");
Expand All @@ -74,14 +95,24 @@ class SkipUsersFilter : public Filter
}
};

class RequireSubFilter : public Filter
class RequireSubFilter : public Filter, public Filter::MessageFilter
{
public:
RequireSubFilter() :
Filter("require-sub")
{
}

virtual void start(const Filter::Context &context, const QByteArray &content)
{
setContext(context);

Result r;
r.sendAction = sendAction();
r.content = content;
finished(r);
}

virtual SendAction sendAction() const
{
QString require_sub = context().publishMeta.value("require_sub");
Expand All @@ -92,7 +123,7 @@ class RequireSubFilter : public Filter
}
};

class BuildIdFilter : public Filter
class BuildIdFilter : public Filter, public Filter::MessageFilter
{
public:
IdFormat::ContentRenderer *idContentRenderer;
Expand Down Expand Up @@ -162,6 +193,16 @@ class BuildIdFilter : public Filter
return true;
}

virtual void start(const Filter::Context &context, const QByteArray &content)
{
setContext(context);

Result r;
r.sendAction = sendAction();
r.content = process(content);
finished(r);
}

virtual QByteArray update(const QByteArray &data)
{
if(!ensureInit())
Expand Down Expand Up @@ -223,14 +264,24 @@ class VarSubstFormatHandler : public Format::Handler
}
};

class VarSubstFilter : public Filter
class VarSubstFilter : public Filter, public Filter::MessageFilter
{
public:
VarSubstFilter() :
Filter("var-subst")
{
}

virtual void start(const Filter::Context &context, const QByteArray &content)
{
setContext(context);

Result r;
r.sendAction = sendAction();
r.content = process(content);
finished(r);
}

virtual QByteArray update(const QByteArray &data)
{
VarSubstFormatHandler handler;
Expand All @@ -255,6 +306,10 @@ class VarSubstFilter : public Filter

}

Filter::MessageFilter::~MessageFilter()
{
}

Filter::Filter(const QString &name) :
name_(name)
{
Expand Down Expand Up @@ -308,6 +363,22 @@ Filter *Filter::create(const QString &name)
return 0;
}

Filter::MessageFilter *Filter::createMessageFilter(const QString &name)
{
if(name == "skip-self")
return new SkipSelfFilter;
else if(name == "skip-users")
return new SkipUsersFilter;
else if(name == "require-sub")
return new RequireSubFilter;
else if(name == "build-id")
return new BuildIdFilter;
else if(name == "var-subst")
return new VarSubstFilter;
else
return 0;
}

QStringList Filter::names()
{
return (QStringList()
Expand All @@ -327,9 +398,81 @@ Filter::Targets Filter::targets(const QString &name)
else if(name == "require-sub")
return Filter::MessageDelivery;
else if(name == "build-id")
return Filter::Targets(Filter::MessageContent | Filter::ProxyContent);
return Filter::Targets(Filter::MessageContent | Filter::ResponseContent);
else if(name == "var-subst")
return Filter::MessageContent;
else
return Filter::Targets(0);
}

Filter::MessageFilterStack::MessageFilterStack(const QStringList &filterNames)
{
foreach(const QString &name, filterNames)
{
MessageFilter *f = createMessageFilter(name);
if(f)
filters_ += f;
}
}

Filter::MessageFilterStack::~MessageFilterStack()
{
qDeleteAll(filters_);
}

void Filter::MessageFilterStack::start(const Filter::Context &context, const QByteArray &content)
{
context_ = context;
content_ = content;
lastSendAction_ = Send;

nextFilter();
}

void Filter::MessageFilterStack::nextFilter()
{
if(filters_.isEmpty())
{
Result r;
r.sendAction = lastSendAction_;
r.content = content_;
finished(r);
return;
}

finishedConnection_ = filters_[0]->finished.connect(boost::bind(&MessageFilterStack::filterFinished, this, boost::placeholders::_1)),

// may call filterFinished immediately
filters_[0]->start(context_, content_);
}

void Filter::MessageFilterStack::filterFinished(const Result &result)
{
if(!result.errorMessage.isNull())
{
qDeleteAll(filters_);
filters_.clear();

Result r;
r.errorMessage = result.errorMessage;
finished(r);
return;
}

lastSendAction_ = result.sendAction;
content_ = result.content;

switch(lastSendAction_)
{
case Send:
filters_.removeFirst();
break;
case Drop:
qDeleteAll(filters_);
filters_.clear();
break;
}

// will emit finished if there are no remaining filters
nextFilter();
}
48 changes: 46 additions & 2 deletions src/handler/filter.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2016-2019 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand All @@ -26,6 +27,8 @@
#include <QString>
#include <QStringList>
#include <QHash>
#include <QMetaType>
#include <boost/signals2.hpp>

class Filter
{
Expand All @@ -40,7 +43,7 @@ class Filter
{
MessageDelivery = 0x01,
MessageContent = 0x02,
ProxyContent = 0x04,
ResponseContent = 0x04,
};

class Context
Expand All @@ -51,7 +54,45 @@ class Filter
QHash<QString, QString> publishMeta;
};

Filter(const QString &name = QString());
class MessageFilter
{
public:
class Result
{
public:
SendAction sendAction;
QByteArray content;
QString errorMessage; // non-null on error
};

virtual ~MessageFilter();

// may emit finished immediately
virtual void start(const Filter::Context &context, const QByteArray &content = QByteArray()) = 0;

boost::signals2::signal<void(const Result&)> finished;
};

class MessageFilterStack : public MessageFilter
{
public:
MessageFilterStack(const QStringList &filterNames);
~MessageFilterStack();

// reimplemented
virtual void start(const Filter::Context &context, const QByteArray &content = QByteArray());

private:
QList<MessageFilter*> filters_;
Filter::Context context_;
QByteArray content_;
SendAction lastSendAction_;
boost::signals2::scoped_connection finishedConnection_;

void nextFilter();
void filterFinished(const Result &result);
};

virtual ~Filter();

const QString & name() const { return name_; }
Expand All @@ -69,10 +110,13 @@ class Filter
QByteArray process(const QByteArray &data);

static Filter *create(const QString &name);
static MessageFilter *createMessageFilter(const QString &name);
static QStringList names();
static Targets targets(const QString &name);

protected:
Filter(const QString &name = QString());

void setError(const QString &s) { errorMessage_ = s; }

private:
Expand Down
4 changes: 2 additions & 2 deletions src/handler/handlerengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -930,13 +930,13 @@ class AcceptWorker : public Deferred

if(!responseSent)
{
// apply ProxyContent filters of all channels
// apply ResponseContent filters of all channels
QStringList allFilters;
foreach(const Instruct::Channel &c, instruct.channels)
{
foreach(const QString &filter, c.filters)
{
if((Filter::targets(filter) & Filter::ProxyContent) && !allFilters.contains(filter))
if((Filter::targets(filter) & Filter::ResponseContent) && !allFilters.contains(filter))
allFilters += filter;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/handler/httpsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,13 @@ class HttpSession::Private : public QObject

if(!instruct.response.body.isEmpty())
{
// apply ProxyContent filters of all channels
// apply ResponseContent filters of all channels
QStringList allFilters;
foreach(const Instruct::Channel &c, instruct.channels)
{
foreach(const QString &filter, c.filters)
{
if((Filter::targets(filter) & Filter::ProxyContent) && !allFilters.contains(filter))
if((Filter::targets(filter) & Filter::ResponseContent) && !allFilters.contains(filter))
allFilters += filter;
}
}
Expand Down Expand Up @@ -1558,13 +1558,13 @@ private slots:
// won't be used for anything else
instruct = i;

// apply ProxyContent filters of all channels
// apply ResponseContent filters of all channels
QStringList allFilters;
foreach(const Instruct::Channel &c, instruct.channels)
{
foreach(const QString &filter, c.filters)
{
if((Filter::targets(filter) & Filter::ProxyContent) && !allFilters.contains(filter))
if((Filter::targets(filter) & Filter::ResponseContent) && !allFilters.contains(filter))
allFilters += filter;
}
}
Expand Down

0 comments on commit 693165a

Please sign in to comment.