Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1559] Added min/max/avg degree operators (static + evolution) #1565

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.statistics;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.functions.AddSumDegreesToGraphHeadCrossFunction;

/**
* Max degree operator calculates the maximum degree of all vertices of a graph and writes it to the graph
* head as a new property named {@link MaxDegree#PROPERTY_MAX_DEGREE}.
*/
public class MaxDegree implements UnaryBaseGraphToBaseGraphOperator<LogicalGraph> {

/**
* The name of the property that holds the max degree after the calculation.
*/
public static final String PROPERTY_MAX_DEGREE = "maxDegree";

@Override
public LogicalGraph execute(LogicalGraph graph) {

DataSet<EPGMGraphHead> newGraphHead = new VertexDegrees().execute(graph)
.max(1)
.crossWithTiny(graph.getGraphHead().first(1))
.with(new AddSumDegreesToGraphHeadCrossFunction(PROPERTY_MAX_DEGREE));

return graph.getFactory().fromDataSets(newGraphHead, graph.getVertices(), graph.getEdges());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.statistics;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.functions.AddSumDegreesToGraphHeadCrossFunction;

/**
* Min degree operator calculates the minimum degree of all vertices of a graph and writes it to the graph
* head as a new property named {@link MinDegree#PROPERTY_MIN_DEGREE}.
*/
public class MinDegree implements UnaryBaseGraphToBaseGraphOperator<LogicalGraph> {

/**
* The name of the property that holds the min degree after the calculation.
*/
public static final String PROPERTY_MIN_DEGREE = "minDegree";

@Override
public LogicalGraph execute(LogicalGraph graph) {
DataSet<EPGMGraphHead> newGraphHead = new VertexDegrees().execute(graph)
.min(1)
.crossWithTiny(graph.getGraphHead().first(1))
.with(new AddSumDegreesToGraphHeadCrossFunction(PROPERTY_MIN_DEGREE));

return graph.getFactory().fromDataSets(newGraphHead, graph.getVertices(), graph.getEdges());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.AverageClusteringCoefficient;
import org.gradoop.flink.util.FlinkAsciiGraphLoader;
import org.junit.Before;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.AverageDegree;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.AverageIncomingDegree;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.AverageOutgoingDegree;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import com.google.common.collect.Lists;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.ConnectedComponentsDistribution;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.apache.flink.api.java.DataSet;
import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.DegreeCentrality;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.GraphDensity;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
Expand All @@ -46,6 +46,6 @@ public void testGraphDensity() throws Exception {
// density should not be 0
assertTrue("Graph density is 0", density > 0.);
// density for social network graph should be (24 / 11 * 10) = 0.21818...
assertTrue("Computed graph density is incorrect", density == (24d / 110d));
assertEquals("Computed graph density is incorrect", 24d / 110d, density, 0.0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Integration test class of {@link MaxDegree}.
*/
public class MaxDegreeTest extends GradoopFlinkTestBase {

/**
* Tests the computation of the maximum degree of a logical graph.
*
* @throws Exception If loading of the example-graph fails
*/
@Test
public void testMaxDegree() throws Exception {
LogicalGraph graph = getSocialNetworkLoader().getLogicalGraph();

long maxDegree = graph.callForGraph(new MaxDegree())
.getGraphHead()
.collect()
.get(0)
.getPropertyValue(MaxDegree.PROPERTY_MAX_DEGREE).getLong();

assertEquals(6L, maxDegree);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.statistics;

import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Integration test class of {@link MinDegree}.
*/
public class MinDegreeTest extends GradoopFlinkTestBase {

/**
* Tests the computation of the minimum degree of a logical graph.
*
* @throws Exception If loading of the example-graph fails
*/
@Test
public void testMinDegree() throws Exception {
LogicalGraph graph = getSocialNetworkLoader().getLogicalGraph();

long minDegree = graph.callForGraph(new MinDegree())
.getGraphHead()
.collect()
.get(0)
.getPropertyValue(MinDegree.PROPERTY_MIN_DEGREE).getLong();

assertEquals(2L, minDegree);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
package org.gradoop.flink.model.impl.operators.statistics;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.flink.model.GradoopFlinkTestBase;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.statistics.ConnectedComponentsDistributionAsValues;
import org.gradoop.flink.util.FlinkAsciiGraphLoader;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;

import java.util.Objects;

/**
* Operator that calculates the average degree evolution of all vertices of a temporal graph for the
* whole lifetime of the graph. The average value is rounded up to the next integer.
*/
public class AvgDegreeEvolution
implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Integer>>> {
/**
* The time dimension that will be considered.
*/
private final TimeDimension dimension;

/**
* The degree type (IN, OUT, BOTH);
*/
private final VertexDegree degreeType;

/**
* Creates an instance of this average degree evolution operator.
*
* @param degreeType the degree type to use (IN, OUT, BOTH).
* @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME).
*/
public AvgDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
}

@Override
public DataSet<Tuple2<Long, Integer>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
// 5) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.AVG));
}
}
Loading