Source code for tsnkit.algorithms.i_omt

"""
Author: <Chuanyu> (skewcy@gmail.com)
i_omt.py (c) 2023
Desc: description
Created:  2023-10-29T04:56:09.748Z
"""


from typing import Dict, List
import traceback
import numpy as np
from .. import core as utils
import z3  # type: ignore


[docs] def benchmark( name, task_path, net_path, output_path="./", workers=1 ) -> utils.Statistics: stat = utils.Statistics(name) ## Init empty stat try: ## Change _Method to your method class test = i_omt(workers) # type: ignore test.init(task_path, net_path) test.prepare() stat = test.solve() ## Update stat if stat.result == utils.Result.schedulable: test.output().to_csv(name, output_path) stat.content(name=name) return stat except KeyboardInterrupt: stat.content(name=name) return stat except Exception as e: print("[!]", e, flush=True) traceback.print_exc() stat.result = utils.Result.error stat.content(name=name) return stat
[docs] class i_omt: def __init__(self, workers=1, delta=14) -> None: self.workers = workers self.num_window = int(delta)
[docs] def init(self, task_path, net_path) -> None: self.task = utils.load_stream(task_path) self.net = utils.load_network(net_path) self.task.set_routings( {s: self.net.get_shortest_path(s.src, s.dst) for s in self.task} ) self.num_group = int(np.ceil(self.task.num_frames / self.num_window)) z3.set_param("parallel.enable", True) z3.set_param("parallel.threads.max", self.workers) self.solver = z3.Solver() self.init_var()
[docs] def prepare(self) -> None: self.queue_assignment()
@utils.check_time_limit def solve(self) -> utils.Statistics: total_time = 0 algo_mem = 0 results = [] for iter in range(self.num_group): if utils.time_log() > utils.T_LIMIT: return utils.Statistics("-", utils.Result.unknown) self.solver = z3.Optimize() self.solver.set("timeout", int(utils.T_LIMIT - utils.time_log()) * 1000) self.reset_link_var() self.add_range_constraint(iter) self.add_flow_trans_const(iter) self.add_delay_const(iter) self.add_link_const(iter) self.add_flow_isolation_const(iter) self.add_time_validity_const(iter) self.add_obj() self.solver.set("timeout", int(utils.T_LIMIT - utils.time_log()) * 1000) res = self.solver.check() info = self.solver.statistics() total_time = total_time + info.time algo_mem = max(algo_mem, info.max_memory) if res == z3.unsat: return utils.Statistics( "-", utils.Result.unschedulable, algo_time=total_time, algo_mem=algo_mem, ) elif res == z3.unknown: return utils.Statistics( "-", utils.Result.unknown, algo_time=total_time, algo_mem=algo_mem ) else: results.append(self.solver.model()) self.increase_c_q(results[-1]) self.results: List[z3.ModelRef] = results return utils.Statistics( "-", utils.Result.schedulable, algo_time=total_time, algo_mem=algo_mem )
[docs] def output(self) -> utils.Config: config = utils.Config() config.gcl = self.get_gcl_list() config.release = self.get_offset() config.queue = self.get_queue_assignment() config.route = self.get_route() config._delay = self.get_delay() return config
[docs] @staticmethod def get_weight(s: utils.Stream): return (s.t_trans_1g + utils.E_SYNC) * (len(s.links) - 1) / s.deadline
[docs] def queue_assignment(self) -> None: self.q = {s: {l: 0 for l in s.links} for s in self.task} _frame_weight = {} for s in self.task: for k in s.get_frame_indexes(self.task.lcm): _frame_weight[s, k] = s.deadline + k * s.period phat: Dict[utils.Link, List[float]] = {} min_queue = utils.MAX_NUM_QUEUE for l in self.net.links: phat.setdefault(l, [0] * l.q_num) min_queue = min(min_queue, l.q_num) for s in sorted(self.task, key=self.get_weight, reverse=True): min_h = -1 min_value = np.inf for g in range(min_queue): result = max([phat[l][g] for l in s.links]) if result < min_value: min_value = result min_h = g for l in s.links: phat[l][min_h] += self.get_weight(s) self.q[s][l] = min_h ## Taskset decomposition frame_weight = [x[0] for x in sorted(_frame_weight.items(), key=lambda x: x[1])] group_size = int(np.ceil(len(frame_weight) / self.num_group)) frame_group = [ frame_weight[i * group_size : (i + 1) * group_size] for i in range((len(frame_weight) + group_size - 1) // group_size) ] for inte, group in enumerate(frame_group): for s, ins in group: for l in s.links: self.group[s][l][ins] = inte
[docs] def init_var(self): self.alpha: Dict[utils.Stream, Dict[utils.Link, List[z3.ArithRef]]] = {} self.w: Dict[utils.Stream, Dict[utils.Link, List[z3.ArithRef]]] = {} self.group: Dict[utils.Stream, Dict[utils.Link, List]] = {} for s in self.task.streams: self.alpha.setdefault(s, {}) self.w.setdefault(s, {}) self.group.setdefault(s, {}) for l in s.links: self.alpha[s].setdefault(l, []) self.w[s].setdefault(l, []) self.group[s].setdefault(l, []) for k in s.get_frame_indexes(self.task.lcm): self.alpha[s][l].append(z3.Int("alpha_%s_%s_%s" % (s, l, k))) self.w[s][l].append(z3.Int("w_%s_%s_%s" % (s, l, k))) self.group[s][l].append(None) self.max_c = {l: 0 for l in self.net.links} self.max_q = {l: 0 for l in self.net.links}
[docs] def add_range_constraint(self, iter: int) -> None: for l in self.net.links: for v in range(self.num_window): self.solver.add( self.t[l][v] < self.task.lcm, self.t[l][v] >= self.max_c[l], self.t[l][v] >= self.t[l][v - 1] if v > 0 else True, self.c[l][v] >= 0, self.c[l][v] < l.q_num, self.v[l][v] >= 0, self.v[l][v] <= 1, ) for s in self.task: for l in s.links: for k in s.get_frame_indexes(self.task.lcm): if self.group[s][l][k] != iter: continue self.solver.add( k * s.period <= self.alpha[s][l][k], self.alpha[s][l][k] <= (k + 1) * s.period, 0 <= self.w[s][l][k], self.w[s][l][k] < self.num_window, )
[docs] def add_flow_trans_const(self, iter: int): for s in self.task: for l in s.links: _next_link = s.get_next_link(l) if _next_link is None: continue for k in s.get_frame_indexes(self.task.lcm): if self.group[s][l][k] != iter: continue self.solver.add( self.alpha[s][l][k] + s.get_t_trans(l) + l.t_proc <= self.alpha[s][_next_link][k] )
[docs] def add_delay_const(self, iter: int) -> None: for s in self.task: _s_hop = s.first_link _e_hop = s.last_link for k in s.get_frame_indexes(self.task.lcm): if self.group[s][_s_hop][k] != iter: continue self.solver.add( self.alpha[s][_e_hop][k] - self.alpha[s][_s_hop][k] + s.get_t_trans(_e_hop) <= s.deadline )
[docs] def add_flow_isolation_const(self, iter: int) -> None: for s1, s2 in self.task.get_pairs(): for x_a, y_a, a_b in self.task.get_merged_links(s1, s2): if self.q[s1][a_b] != self.q[s2][a_b]: continue for k1, k2 in self.task.get_frame_index_pairs(s1, s2): if ( self.group[s1][x_a][k1] != iter or self.group[s2][y_a][k2] != iter ): continue self.solver.add( z3.Or( self.alpha[s1][x_a][k1] + s1.get_t_trans(x_a) + a_b.t_proc > self.alpha[s2][a_b][k2], self.alpha[s2][y_a][k2] + s2.get_t_trans(y_a) + a_b.t_proc > self.alpha[s1][a_b][k1], ) )
[docs] def add_time_validity_const(self, iter: int) -> None: for s in self.task: for l in s.links: _pre_link = s.get_prev_link(l) for k in s.get_frame_indexes(self.task.lcm): if self.group[s][l][k] != iter: continue self.solver.add( z3.Or( [ z3.And( self.t[l][x] <= self.alpha[s][l][k], self.t[l][x + 1] >= self.alpha[s][l][k] + s.get_t_trans(l), self.c[l][x] == self.q[s][l], self.c[l][x + 1] != self.c[l][x], self.v[l][x] == 1, self.w[s][l][k] == x, z3.Or( self.t[l][x] == self.alpha[s][l][k], self.alpha[s][_pre_link][k] # type: ignore + s.get_t_trans(l) + l.t_proc == self.alpha[s][l][k] # type: ignore if l != s.first_link and _pre_link != None else True, z3.Or( [ self.alpha[s2][l][k2] + s2.get_t_trans(l) == self.alpha[s][l][k] for s2 in self.task if l in s2.links and self.q[s2][l] == self.q[s][l] for k2 in s2.get_frame_indexes( self.task.lcm ) ] ), ), ) for x in range(self.num_window - 1) ] ) )
[docs] def add_obj(self) -> None: upper_bound = z3.Int("UP") for l in self.net.links: self.solver.add(upper_bound >= self.t[l][-1]) self.solver.minimize(upper_bound)
[docs] def increase_c_q(self, res: z3.ModelRef) -> None: for l in self.net.links: self.max_c[l] = res[self.t[l][-1]].as_long() ## self.max_c[l] = max([model[self.t[l][x]].as_long() for x in range(self.num_window)]) for v in range(self.num_window - 1): if res[self.t[l][v + 1]].as_long() > res[self.t[l][v]].as_long(): self.max_q[l] = res[self.c[l][v]].as_long()
[docs] def get_gcl_list( self, ) -> utils.GCL: gcl = [] for result in self.results: for l in self.net.links: for v in range(self.num_window - 1): if ( result[self.t[l][v + 1]].as_long() > result[self.t[l][v]].as_long() ): start = result[self.t[l][v]].as_long() end = result[self.t[l][v + 1]].as_long() q = result[self.c[l][v]].as_long() gcl.append([l, q, start, end, self.task.lcm]) return utils.GCL(gcl)
[docs] def get_offset(self) -> utils.Release: offset = [] for i in range(len(self.results)): for s in self.task: l = s.first_link for k in s.get_frame_indexes(self.task.lcm): if self.group[s][l][k] == i: offset.append([s, k, self.results[i][self.alpha[s][l][k]].as_long()]) return utils.Release(offset)
[docs] def get_route(self) -> utils.Route: route = [] for s in self.task: for l in s.links: route.append([s, l]) return utils.Route(route)
[docs] def get_queue_assignment(self) -> utils.Queue: queue = [] for s in self.task: for l in s.links: for k in s.get_frame_indexes(self.task.lcm): queue.append([s, k, l, self.q[s][l]]) return utils.Queue(queue)
[docs] def get_delay(self) -> utils.Delay: delay = [] for s in self.task: for k in s.get_frame_indexes(self.task.lcm): start_link = s.first_link for iter, res in enumerate(self.results): if self.group[s][start_link][k] == iter: start = res[self.alpha[s][start_link][k]].as_long() end_link = s.last_link for iter, res in enumerate(self.results): if self.group[s][end_link][k] == iter: end = res[self.alpha[s][end_link][k]].as_long() delay.append([s, k, end - start]) return utils.Delay(delay)
if __name__ == "__main__": # Test for route space args = utils.parse_command_line_args() utils.Statistics().header() benchmark(args.name, args.task, args.net, args.output, args.workers)