Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions src/api/ateliereLive/pipelines/streams/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import { getSourceIdFromSourceName, getUuidFromIngestName } from '../../ingest';
import { connectIngestToPipeline } from '../../streams';
import { getAuthorizationHeader } from '../../utils/authheader';
import {
getAvailablePortsForIngest,
getCurrentlyUsedPorts,
getNextAvailablePortForIngest,
initDedicatedPorts
} from '../../utils/fwConfigPorts';
import {
Expand Down Expand Up @@ -90,23 +90,16 @@ export async function createStream(
await initDedicatedPorts();

for (const pipeline of production_settings.pipelines) {
const availablePorts = getAvailablePortsForIngest(
const availablePort = await getNextAvailablePortForIngest(
source.ingest_name,
usedPorts
);

if (availablePorts.size === 0) {
if (availablePort == -1) {
Log().error(`No available ports for ingest '${source.ingest_name}'`);
throw `No available ports for ingest '${source.ingest_name}'`;
}

const availablePort = availablePorts.values().next().value;
if (!availablePort)
throw `Allocated port ${availablePort} on '${source.ingest_name}' for ${source.ingest_source_name} cannot be undefined`;
Log().info(
`Allocated port ${availablePort} on '${source.ingest_name}' for ${source.ingest_source_name}`
);

const pipelineSource = pipeline.sources?.find(
(s) =>
s.ingest_source_name === source.ingest_source_name &&
Expand Down
17 changes: 10 additions & 7 deletions src/api/ateliereLive/utils/fwConfigPorts.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getPipelines } from '../pipelines/pipelines';
import {
getAvailablePortsForIngest,
getCurrentlyUsedPorts,
getNextAvailablePortForIngest,
initDedicatedPorts
} from './fwConfigPorts';

Expand Down Expand Up @@ -35,17 +35,20 @@ describe.skip('fwConfigPorts tests', () => {

describe('getAvailableTypePorts', () => {
test('should return available ingest ports', async () => {
const ingestPorts = getAvailablePortsForIngest('cloud_ingest', usedPorts);
const ingestPort = getNextAvailablePortForIngest(
'cloud_ingest',
usedPorts
);

expect(ingestPorts).not.toBeUndefined();
expect(ingestPorts.size).toBeGreaterThan(0);
expect(ingestPort).not.toBeUndefined();
expect(ingestPort).toBeGreaterThan(-1);
});

test('should return default ingest ports when ingest doesnt exist', async () => {
const ingestPorts = getAvailablePortsForIngest('wrong_name', usedPorts);
const ingestPort = getNextAvailablePortForIngest('wrong_name', usedPorts);

expect(ingestPorts).not.toBeUndefined();
expect(ingestPorts.size).toBeGreaterThan(0);
expect(ingestPort).not.toBeUndefined();
expect(ingestPort).toBeGreaterThan(-1);
});
});
});
66 changes: 28 additions & 38 deletions src/api/ateliereLive/utils/fwConfigPorts.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,20 @@
import { FwConfigTypeEnum } from '../../../interfaces/firewallConfig';
import { getFwConfigs } from '../../manager/firewallConfig';
import {
FwConfigTypeEnum,
FwConfigWithId
} from '../../../interfaces/firewallConfig';
import {
getFwConfigs,
putFwConfigLastUsedPortIndex
} from '../../manager/firewallConfig';
import { getPipeline } from '../pipelines/pipelines';

const dedicatedPorts = new Map<string, Set<number>>();
const dedicatedPorts = new Map<string, FwConfigWithId>();

export async function initDedicatedPorts() {
dedicatedPorts.clear();
(await getFwConfigs()).map((conf) => {
const temp = dedicatedPorts.get(`${conf.type}-${conf.name}`);
if (temp) {
conf.port_range_allow.forEach((port) => {
temp.add(port);
});
dedicatedPorts.set(`${conf.type}-${conf.name}`, temp);
} else {
dedicatedPorts.set(
`${conf.type}-${conf.name}`,
new Set<number>(conf.port_range_allow)
);
}
dedicatedPorts.set(`${conf.type}-${conf.name}`, conf);
});
const temp = dedicatedPorts;
return temp;
}

export async function getCurrentlyUsedPorts(
Expand Down Expand Up @@ -65,7 +58,7 @@ export async function getCurrentlyUsedPorts(
return usedPorts;
}

export function getAvailablePortsForIngest(
export async function getNextAvailablePortForIngest(
name: string,
usedPorts: Set<number>
) {
Expand All @@ -77,26 +70,23 @@ export function getAvailablePortsForIngest(
`${FwConfigTypeEnum.Ingest}-${'default'}`
)!;
}
const availablePorts = new Set<number>();
dedicatedPortsForName.forEach((dedPort) => {
if (usedPorts && !usedPorts.has(dedPort)) {
availablePorts.add(dedPort);
}
});
return availablePorts;
}

export function getAvailablePortsForNameAndType(
name: string,
type: string,
usedPorts: Set<number>
) {
const dedicatedTypePorts = dedicatedPorts.get(`${type}-${name}`)!;
const availablePorts = new Set<number>();
dedicatedTypePorts.forEach((dedPort) => {
if (usedPorts && !usedPorts.has(dedPort)) {
availablePorts.add(dedPort);
const port_range = dedicatedPortsForName.port_range_allow;
const numberOfPorts = port_range.length;
let availablePort = -1;
for (let i = 0; i < numberOfPorts; i++) {
const currentPort =
port_range[dedicatedPortsForName.last_used_port_index++ % numberOfPorts];
if (usedPorts && !usedPorts.has(currentPort)) {
availablePort = currentPort;
break;
}
});
return availablePorts;
}

if (availablePort != -1) {
dedicatedPortsForName.last_used_port_index %= numberOfPorts;
await putFwConfigLastUsedPortIndex(dedicatedPortsForName);
}

return availablePort;
}
19 changes: 18 additions & 1 deletion src/api/manager/firewallConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,24 @@ export async function putFwConfig(
{
name: fwConfig.name,
type: fwConfig.type,
port_range_allow: fwConfig.port_range_allow
port_range_allow: fwConfig.port_range_allow,
last_used_port_index: fwConfig.last_used_port_index
}
);
}

export async function putFwConfigLastUsedPortIndex(
fwConfig: FwConfigWithId
): Promise<void> {
const db = await getDatabase();

const result = await db
.collection('fw_config')
.findOneAndUpdate(
{ _id: fwConfig._id },
{ $set: { last_used_port_index: fwConfig.last_used_port_index } }
);
if (!result.value) {
console.log('Failed to update firewall rules with last used port:', result);
}
}
88 changes: 31 additions & 57 deletions src/api/manager/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import {
import { getSourcesByIds } from './sources';
import { SourceToPipelineStream } from '../../interfaces/Source';
import {
getAvailablePortsForIngest,
getCurrentlyUsedPorts,
getNextAvailablePortForIngest,
initDedicatedPorts
} from '../ateliereLive/utils/fwConfigPorts';
import { getAudioMapping } from './inventory';
Expand Down Expand Up @@ -117,21 +117,15 @@ async function connectIngestSources(
const audioMapping = newAudioMapping?.length ? newAudioMapping : [[0, 1]];

for (const pipeline of productionSettings.pipelines) {
const availablePorts = getAvailablePortsForIngest(
const nextAvailablePort = await getNextAvailablePortForIngest(
source.ingest_name,
usedPorts
);

if (availablePorts.size === 0) {
Log().error(`No available ports for ingest '${source.ingest_name}'`);
throw `No available ports for ingest '${source.ingest_name}'`;
if (nextAvailablePort == -1) {
throw `Failed to find an available port to '${source.ingest_name}'-${source.ingest_source_name}`;
}

const availablePort = availablePorts.values().next().value || 0;
Log().info(
`Allocated port ${availablePort} on '${source.ingest_name}' for ${source.ingest_source_name}`
);

const pipelineSource = pipeline.sources?.find(
(s) =>
s.ingest_source_name === source.ingest_source_name &&
Expand Down Expand Up @@ -167,7 +161,7 @@ async function connectIngestSources(
interfaces: [
{
...pipeline.interfaces[0],
port: availablePort
port: nextAvailablePort
}
]
};
Expand All @@ -185,7 +179,7 @@ async function connectIngestSources(
throw `Source '${source.ingest_name}/${ingestUuid}:${source.ingest_source_name}' failed to connect to '${pipeline.pipeline_name}/${pipeline.pipeline_id}': ${error.message}`;
});

usedPorts.add(availablePort);
usedPorts.add(nextAvailablePort);
sourceToPipelineStreams.push({
source_id: source._id.toString(),
stream_uuid: result.stream_uuid,
Expand Down Expand Up @@ -782,25 +776,18 @@ export async function startProduction(
} catch (error) {
Log().error('Could not setup control panels');
Log().error(error);
if (typeof error !== 'string') {
return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: false }
],
error: 'Unknown error occured'
};
let errorMessage = 'Unknown error occured';
if (typeof error === 'string') {
errorMessage = error;
}
return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: false, message: error }
{ step: 'control_panels', success: false, message: errorMessage }
],
error: error
error: errorMessage
};
} // Try to connect control panels and pipeline-to-pipeline connections end

Expand All @@ -809,36 +796,30 @@ export async function startProduction(
for (const pipeline of production_settings.pipelines) {
await createPipelineOutputs(pipeline);
}
} catch (e) {
} catch (error) {
Log().error('Could not setup pipeline outputs');
Log().error(e);
Log().error(error);
Log().error('Stopping pipelines');
await stopPipelines(
production_settings.pipelines.map((pipeline) => pipeline.pipeline_id!)
).catch((error) => {
throw `Failed to stop pipelines after production start failure: ${error}`;
).catch((stropError) => {
throw `Failed to stop pipelines after production start failure: ${stropError}`;
});
if (typeof e !== 'string') {
return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: true },
{ step: 'pipeline_outputs', success: false }
],
error: 'Unknown error occured'
};

let errorMessage = 'Unknown error occured';
if (typeof error === 'string') {
errorMessage = error;
}

return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: true },
{ step: 'pipeline_outputs', success: false, message: e }
{ step: 'pipeline_outputs', success: false, message: errorMessage }
],
error: e
error: errorMessage
};
}

Expand Down Expand Up @@ -879,32 +860,25 @@ export async function startProduction(
Log().info(
`Production '${production.name}' with preset '${production_settings.name}' started`
);
} catch (e) {
} catch (error) {
Log().error('Could not start multiviews');
Log().error(e);
if (typeof e !== 'string') {
return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: true },
{ step: 'pipeline_outputs', success: false },
{ step: 'multiviews', success: false }
],
error: 'Could not start multiviews'
};
Log().error(error);

let errorMessage = 'Could not start multiviews';
if (typeof error === 'string') {
errorMessage = error;
}

return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: true },
{ step: 'pipeline_outputs', success: true },
{ step: 'multiviews', success: false, message: e }
{ step: 'multiviews', success: false, message: errorMessage }
],
error: e
error: errorMessage
};
} // Try to setup multiviews end

Expand Down
3 changes: 2 additions & 1 deletion src/api/mongoClient/defaults/fwConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export const defaultFwConfig = [
type: 'ingest',
port_range_allow: [...Array(200).keys()].map(
(increment) => 9000 + increment
)
),
last_used_port_index: 0
}
];
2 changes: 1 addition & 1 deletion src/hooks/useGetFirstEmptySlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { CallbackHook } from './types';
export function useGetFirstEmptySlot(): CallbackHook<
(productionSetup?: Production | undefined) => number
> {
const [loading, setLoading] = useState(true);
const [loading] = useState(true);

const findFirstEmptySlot = (productionSetup: Production | undefined) => {
if (!productionSetup) throw 'no_production';
Expand Down
1 change: 1 addition & 0 deletions src/interfaces/firewallConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface FwConfig {
name: string;
type: FwConfigType;
port_range_allow: number[];
last_used_port_index: number;
}

export type FwConfigWithId = WithId<FwConfig>;