Source code for tsnkit.algorithms.cg

"""
Author: <Chuanyu> (skewcy@gmail.com)
cg.py (c) 2023
Desc: description
Created:  2023-11-25T22:06:52.279Z
"""

import time
import traceback
from typing import Optional, Set, Tuple, List
import numpy as np
import gurobipy as gp
from gurobipy import GRB
from .. import core as utils
import networkx as nx


[docs] def benchmark( name, task_path, net_path, output_path="./", workers=1 ) -> utils.Statistics: stat = utils.Statistics(name) ## Init empty stat try: ## Change _Method to your method class test = cg(workers) # type: ignore test.init(task_path, net_path) test.prepare() stat = test.solve() ## Update stat if stat.result == utils.Result.schedulable: test.output().to_csv(name, output_path) stat.content(name=name) return stat except KeyboardInterrupt: stat.content(name=name) return stat except Exception as e: print("[!]", e, flush=True) traceback.print_exc() stat.result = utils.Result.error stat.content(name=name) return stat
[docs] class NodeCG: def __init__(self, id: int, f: utils.Stream, path: utils.Path, offset: int) -> None: self.i = id self.f = f self.pi = f.period self.ci = f.t_trans_1g self.di = f.deadline self.path = path self.offset = offset self.init()
[docs] def init(self) -> None: self.r = np.zeros(max(self.path._network.links) + 1) self.t = np.zeros(max(self.path._network.links) + 1) for l in self.path.links: self.r[l] = 1 if l == self.path.links[0]: self.t[l] = self.offset else: if self.path.get_prev_link(l) is None: raise Exception("Path is not sorted") self.t[l] = ( self.t[self.path.get_prev_link(l)] + l.t_proc + self.f.get_t_trans(l) )
[docs] def conflict(i: NodeCG, j: NodeCG) -> bool: r_i = np.nonzero(i.r)[0] r_j = np.nonzero(j.r)[0] if i.i == j.i: return False if not set(r_i) & set(r_j): return False for l in set(r_i) & set(r_j): lcm = np.lcm(i.pi, j.pi) for a, b in [ (a, b) for a in range(0, int(lcm / i.pi)) for b in range(0, int(lcm / j.pi)) ]: jgi = j.t[l] + b * j.pi >= i.t[l] + a * i.pi + i.ci igj = i.t[l] + a * i.pi >= j.t[l] + b * j.pi + j.ci if not (jgi or igj): return True return False
[docs] def get_paths(g: nx.DiGraph, i: int, j: int) -> List[List[Tuple[int, int]]]: paths = nx.all_simple_paths(g, i, j) return [[(v, path[h + 1]) for h, v in enumerate(path[:-1])] for path in paths]
[docs] class cg: def __init__(self, workers=1, opt_w=5, graph_w=5, max_w=20): self.workers = workers self.opt_w = opt_w self.graph_w = graph_w self.max_w = max_w
[docs] def init(self, task_path, net_path) -> None: self.task = utils.load_stream(task_path) self.net = utils.load_network(net_path) self.paths = {s: get_paths(self.net.net_nx, s.src, s.dst) for s in self.task} ## Filter out paths that are not feasible max_delay = {s: -1 for s in self.task} for s in self.task: _temp_paths = [] for path in self.paths[s]: _path = utils.Path(path, self.net) # type: ignore _nw_delay = ( sum([l.t_proc + s.get_t_trans(l) for l in _path.links]) - _path.links[0].t_proc ) if _nw_delay <= s.deadline: _temp_paths.append(path) max_delay[s] = max(max_delay[s], _nw_delay) self.paths[s] = _temp_paths self.phases = [ list(range(0, int(s.period - max_delay[s] + 1))) for s in self.task ] for i in self.task: np.random.shuffle(self.phases[i]) np.random.shuffle(self.paths[i]) self.current_state = {s: [0, 0] for s in self.task} self.CG = nx.Graph() self.num_node = 0 self.sc = np.zeros(len(self.task)) self.num_covered: List[int] = [] self.p_thres = len(self.task) * 0.5
[docs] def prepare(self) -> None: pass
@utils.check_time_limit def solve(self) -> utils.Statistics: for s in self.task: if not self.paths[s]: return utils.Statistics("-", utils.Result.unschedulable, 0.0) flag = True start = utils.time_log() while flag: if utils.time_log() - start > utils.T_LIMIT: return utils.Statistics( "-", utils.Result.unknown, utils.time_log() - start ) flag = self.generator(list(range(len(self.task)))) search_result, I = self.luby() if search_result: covered_streams = self.nods_to_streams(I) missed_streams = set(range(len(self.task))) - covered_streams self.num_covered.append(len(covered_streams)) self.sc[list(covered_streams)] += 1 ## print( ## 'Luby Triggered: | graph size-%d | IMS size-%d | covered-%d |' ## % (len(self.CG.nodes), len(I), len(covered_streams))) ## print(missed_streams) if missed_streams and self.trigger_completion(len(missed_streams)): self.generator(missed_streams) if self.trigger_sure(): search_result, I = self.ILP() if search_result: covered_streams = self.nods_to_streams(I) missed_streams = set(range(len(self.task))) - covered_streams self.num_covered.append(len(covered_streams)) self.sc[list(covered_streams)] += 1 ## print( ## 'ILP Triggered: | graph size-%d | IMS size-%d | covered-%d |' ## % (len(self.CG.nodes), len(I), len(covered_streams))) ## print(missed_streams) if missed_streams and self.trigger_completion(len(missed_streams)): self.generator(missed_streams) if len(covered_streams) == len(self.task): ## Record the nodes for output self.result_nodes = [] self.result_streams: Set[utils.Stream] = set() for i in I: if self.CG.nodes[i]["config"].f not in self.result_streams: self.result_streams.add(self.CG.nodes[i]["config"].f) self.result_nodes.append(i) return utils.Statistics( "-", utils.Result.schedulable, utils.time_log() - start ) return utils.Statistics( "-", utils.Result.unschedulable, utils.time_log() - start )
[docs] def output(self) -> utils.Config: config = utils.Config() config.gcl = self.get_gcl() config.release = self.get_offset() config.queue = self.get_queue() config.route = self.get_route() config._delay = self.get_delay() return config
[docs] def stateful_generator( self, i: utils.Stream ) -> Optional[Tuple[int, List[Tuple[int, int]]]]: if self.current_state[i][1] < len(self.paths[i]) - 1: self.current_state[i][1] += 1 return ( self.phases[i][self.current_state[i][0]], self.paths[i][self.current_state[i][1]], ) elif self.current_state[i][0] < len(self.phases[i]) - 1: self.current_state[i][0] += 1 self.current_state[i][1] = 0 return ( self.phases[i][self.current_state[i][0]], self.paths[i][self.current_state[i][1]], ) else: return None
[docs] def add_vertex(self, v: NodeCG): self.CG.add_node(v.i, config=v) for node in self.CG.nodes: if node == v.i: continue if conflict(v, self.CG.nodes[node]["config"]): self.CG.add_edge(v.i, node)
[docs] def generator(self, task_set) -> bool: flag = False for s in task_set: result = self.stateful_generator(s) if result is None: return flag phase, path = result config = NodeCG(self.num_node, self.task.get_stream(s), utils.Path(path, self.net), phase) # type: ignore self.add_vertex(config) self.num_node += 1 flag = True return flag
[docs] def luby(self) -> Tuple[bool, Optional[Set[int]]]: a = 0.7 CG_copy = self.CG.copy() I = set() _t_limit = min(5 * 60, utils.T_LIMIT - utils.time_log()) start_time = time.time() while CG_copy.nodes(): X = set() if time.time() - start_time > _t_limit: return False, None for node in [x for x in CG_copy.nodes]: if nx.degree(CG_copy, node) == 0: I.add(node) CG_copy.remove_node(node) continue p_deg = 1 / (2 * nx.degree(CG_copy, node)) p_sc = 1 - ( (self.sc[self.CG.nodes[node]["config"].f] + 1) / (max(self.sc) + 1) ) if np.random.random() < a * p_deg + (1 - a) * p_sc: X.add(node) I_p = X edges = list(CG_copy.subgraph(I_p).edges) while edges: link = edges.pop() if nx.degree(CG_copy, link[0]) <= nx.degree(CG_copy, link[1]): I_p.remove(link[0]) else: I_p.remove(link[1]) edges = list(CG_copy.subgraph(I_p).edges) I = I | I_p Y = I_p | set().union(*(CG_copy.neighbors(n) for n in I_p)) CG_copy.remove_nodes_from(Y) return True, I
[docs] def ILP(self) -> Tuple[bool, Optional[Set[int]]]: CG_copy = self.CG.copy() m = gp.Model("ILP") m.setParam("OutputFlag", False) m.Params.Threads = self.workers _t_limit = min(5 * 60, utils.T_LIMIT - utils.time_log()) m.setParam( "timeLimit", _t_limit ) ## Hard code 5 minutes time limit as described in the paper xv = m.addVars(CG_copy.nodes, vtype=GRB.BINARY, name="x") xs = m.addVars(len(self.task), vtype=GRB.BINARY, name="s") m.setObjective(xs.sum(), GRB.MAXIMIZE) for edge in CG_copy.edges: m.addConstr(xv[edge[0]] + xv[edge[1]] <= 1) for i in range(len(self.task)): m.addConstr( xs[i] <= sum( xv[k] for k in CG_copy.nodes if self.CG.nodes[k]["config"].f == i ) ) m.optimize() ## Infeasible (3) or Other reasons (9, 10, 11, 12, 16, 17) if m.status in {3, 9, 10, 11, 12, 16, 17}: return False, None else: return True, set([k for k in CG_copy.nodes if xv[k].x == 1])
[docs] def nods_to_streams(self, nodes: Set[int]) -> Set[utils.Stream]: streams = set() for node in nodes: streams.add(self.CG.nodes[node]["config"].f) return streams
[docs] def trigger_sure(self) -> bool: opt_window_size = min(self.opt_w, self.max_w) window = self.num_covered[-opt_window_size:] d_past = np.sum(np.diff(window)) if d_past > 0: opt_window_size += 1 return False else: opt_window_size = 10 return True
[docs] def trigger_completion(self, miss_num: int) -> bool: graph_window_size = min(self.opt_w, self.max_w) window_old = self.num_covered[-(graph_window_size + 1) : -1] window_new = self.num_covered[-graph_window_size:] if not window_old: n_old = 0 else: n_old = round(np.mean(window_old)) n_new = round(np.mean(window_new)) if n_new < n_old: self.p_thres -= 1 graph_window_size -= 1 elif n_new > n_old: self.p_thres += 1 graph_window_size += 1 else: self.p_thres -= 1 if miss_num > self.p_thres: return True else: return False
[docs] def get_gcl(self) -> utils.GCL: GCL = [] for i in self.result_nodes: node: NodeCG = self.CG.nodes[i]["config"] period = node.pi for link in node.path.links: start = node.t[link] end = start + node.ci for ins in node.f.get_frame_indexes(self.task.lcm): GCL.append( [ link, 0, (start + ins * period), (end + ins * period), self.task.lcm, ] ) return utils.GCL(GCL)
[docs] def get_route(self) -> utils.Route: route = [] for i in self.result_nodes: node: NodeCG = self.CG.nodes[i]["config"] for link in node.path.links: route.append([node.f, link]) return utils.Route(route)
[docs] def get_queue(self) -> utils.Queue: queue = [] for i in self.result_nodes: node: NodeCG = self.CG.nodes[i]["config"] for link in node.path.links: queue.append([node.f, 0, link, 0]) return utils.Queue(queue)
[docs] def get_offset(self) -> utils.Release: offset = [] for i in self.result_nodes: node: NodeCG = self.CG.nodes[i]["config"] offset.append([node.f, 0, node.offset]) return utils.Release(offset)
[docs] def get_delay(self) -> utils.Delay: delay = [] for i in self.result_nodes: node: NodeCG = self.CG.nodes[i]["config"] start = node.offset end = node.t[node.path.links[-1]] delay.append([node.f, 0, end - start]) return utils.Delay(delay)
if __name__ == "__main__": # Test for route space args = utils.parse_command_line_args() utils.Statistics().header() benchmark(args.name, args.task, args.net, args.output, args.workers)