Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .mvn/.gradle-enterprise/gradle-enterprise-workspace-id
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kl7k3ragjregzk2gtxfrzupvie
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
import org.apache.iotdb.collector.api.v1.plugin.model.StartPluginRequest;
import org.apache.iotdb.collector.api.v1.plugin.model.StopPluginRequest;
import org.apache.iotdb.collector.service.RuntimeService;

import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
Expand All @@ -35,30 +34,30 @@ public class PluginApiServiceImpl extends PluginApiService {
@Override
public Response createPlugin(
final CreatePluginRequest createPluginRequest, final SecurityContext securityContext) {
return Response.ok("create plugin").entity(RuntimeService.plugin().createPlugin()).build();
return Response.ok("create plugin").build();
}

@Override
public Response alterPlugin(
final AlterPluginRequest alterPluginRequest, final SecurityContext securityContext) {
return Response.ok("alter plugin").entity(RuntimeService.plugin().alterPlugin()).build();
return Response.ok("alter plugin").build();
}

@Override
public Response startPlugin(
final StartPluginRequest startPluginRequest, final SecurityContext securityContext) {
return Response.ok("start plugin").entity(RuntimeService.plugin().startPlugin()).build();
return Response.ok("start plugin").build();
}

@Override
public Response stopPlugin(
final StopPluginRequest stopPluginRequest, final SecurityContext securityContext) {
return Response.ok("stop plugin").entity(RuntimeService.plugin().stopPlugin()).build();
return Response.ok("stop plugin").build();
}

@Override
public Response dropPlugin(
final DropPluginRequest dropPluginRequest, final SecurityContext securityContext) {
return Response.ok("drop plugin").entity(RuntimeService.plugin().dropPlugin()).build();
return Response.ok("drop plugin").build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ public Response createTask(
final CreateTaskRequest createTaskRequest, final SecurityContext securityContext) {
TaskApiServiceRequestValidationHandler.validateCreateRequest(createTaskRequest);

return RuntimeService.task()
.createTask(
createTaskRequest.getTaskId(),
createTaskRequest.getSourceAttribute(),
createTaskRequest.getProcessorAttribute(),
createTaskRequest.getSinkAttribute());
return RuntimeService.task().isPresent()
? RuntimeService.task()
.get()
.createTask(
createTaskRequest.getTaskId(),
createTaskRequest.getSourceAttribute(),
createTaskRequest.getProcessorAttribute(),
createTaskRequest.getSinkAttribute())
: Response.serverError().entity("Task runtime is down").build();
}

@Override
Expand All @@ -56,22 +59,28 @@ public Response startTask(
final StartTaskRequest startTaskRequest, final SecurityContext securityContext) {
TaskApiServiceRequestValidationHandler.validateStartRequest(startTaskRequest);

return RuntimeService.task().startTask(startTaskRequest.getTaskId());
return RuntimeService.task().isPresent()
? RuntimeService.task().get().startTask(startTaskRequest.getTaskId())
: Response.serverError().entity("Task runtime is down").build();
}

@Override
public Response stopTask(
final StopTaskRequest stopTaskRequest, final SecurityContext securityContext) {
TaskApiServiceRequestValidationHandler.validateStopRequest(stopTaskRequest);

return RuntimeService.task().stopTask(stopTaskRequest.getTaskId());
return RuntimeService.task().isPresent()
? RuntimeService.task().get().stopTask(stopTaskRequest.getTaskId())
: Response.serverError().entity("Task runtime is down").build();
}

@Override
public Response dropTask(
final DropTaskRequest dropTaskRequest, final SecurityContext securityContext) {
TaskApiServiceRequestValidationHandler.validateDropRequest(dropTaskRequest);

return RuntimeService.task().dropTask(dropTaskRequest.getTaskId());
return RuntimeService.task().isPresent()
? RuntimeService.task().get().dropTask(dropTaskRequest.getTaskId())
: Response.serverError().entity("Task runtime is down").build();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@

import org.apache.iotdb.pipe.api.PipeSource;

public interface CollectorSource extends StoppablePlugin, PipeSource {}
public abstract class PullSource implements PipeSource {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@

package org.apache.iotdb.collector.plugin.api;

import org.apache.iotdb.pipe.api.PipeSource;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;

public abstract class CollectorPushSource implements CollectorSource {
public abstract class PushSource implements PipeSource {

protected final EventCollector collector;
protected EventCollector collector;

public CollectorPushSource(final EventCollector collector) {
public PushSource() {
this.collector = null;
}

public final void setCollector(final EventCollector collector) {
this.collector = collector;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,26 @@
* under the License.
*/

package org.apache.iotdb.collector.plugin.api;
package org.apache.iotdb.collector.plugin.api.customizer;

import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;

public class RuntimeConfig implements PipeExtractorRuntimeConfiguration {
public class CollectorProcessorRuntimeConfiguration implements PipeProcessorRuntimeConfiguration {

public static class RuntimeEnvironment implements PipeRuntimeEnvironment {
private final CollectorRuntimeEnvironment runtimeEnvironment;

public int getParallelism() {
return 0;
}

public int getParallelismIndex() {
return 0;
}

@Override
public String getPipeName() {
return "";
}

@Override
public long getCreationTime() {
return 0;
}
public CollectorProcessorRuntimeConfiguration(
final String pipeName,
final long creationTime,
final int parallelism,
final int instanceIndex) {
runtimeEnvironment =
new CollectorRuntimeEnvironment(pipeName, creationTime, parallelism, instanceIndex);
}

@Override
public PipeRuntimeEnvironment getRuntimeEnvironment() {
return new PipeRuntimeEnvironment() {

@Override
public String getPipeName() {
return "";
}

@Override
public long getCreationTime() {
return 0;
}
};
return runtimeEnvironment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.collector.plugin.api.customizer;

import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;

public class CollectorRuntimeEnvironment implements PipeRuntimeEnvironment {

private final String pipeName;
private final long creationTime;
private final int parallelism;
private final int instanceIndex;

public CollectorRuntimeEnvironment(
final String pipeName,
final long creationTime,
final int parallelism,
final int instanceIndex) {
this.pipeName = pipeName;
this.creationTime = creationTime;
this.parallelism = parallelism;
this.instanceIndex = instanceIndex;
}

@Override
public String getPipeName() {
return pipeName;
}

@Override
public long getCreationTime() {
return creationTime;
}

public int getParallelism() {
return parallelism;
}

public int getInstanceIndex() {
return instanceIndex;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.collector.plugin.api.customizer;

import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeSinkRuntimeConfiguration;

public class CollectorSinkRuntimeConfiguration implements PipeSinkRuntimeConfiguration {

private final CollectorRuntimeEnvironment runtimeEnvironment;

public CollectorSinkRuntimeConfiguration(
final String pipeName,
final long creationTime,
final int parallelism,
final int instanceIndex) {
runtimeEnvironment =
new CollectorRuntimeEnvironment(pipeName, creationTime, parallelism, instanceIndex);
}

@Override
public PipeRuntimeEnvironment getRuntimeEnvironment() {
return runtimeEnvironment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.collector.plugin.api.customizer;

import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration;

public class CollectorSourceRuntimeConfiguration implements PipeSourceRuntimeConfiguration {

private final CollectorRuntimeEnvironment runtimeEnvironment;

public CollectorSourceRuntimeConfiguration(
final String pipeName,
final long creationTime,
final int parallelism,
final int instanceIndex) {
runtimeEnvironment =
new CollectorRuntimeEnvironment(pipeName, creationTime, parallelism, instanceIndex);
}

@Override
public PipeRuntimeEnvironment getRuntimeEnvironment() {
return runtimeEnvironment;
}
}
Loading