"""
Author: <Chuanyu> (skewcy@gmail.com)
ls_tb.py (c) 2023
Desc: description
Created: 2023-10-31T02:34:04.531Z
"""
import traceback
from typing import Dict, List, Optional, Set, Tuple
from .. import core as utils
import numpy as np
Task = Tuple[utils.Stream, utils.Link]
[docs]
def benchmark(
name, task_path, net_path, output_path="./", workers=1
) -> utils.Statistics:
stat = utils.Statistics(name) ## Init empty stat
try:
## Change _Method to your method class
test = ls_tb(workers) # type: ignore
test.init(task_path, net_path)
test.prepare()
stat = test.solve() ## Update stat
if stat.result == utils.Result.schedulable:
test.output().to_csv(name, output_path)
pass
stat.content(name=name)
return stat
except KeyboardInterrupt:
stat.content(name=name)
return stat
except Exception as e:
print("[!]", e, flush=True)
traceback.print_exc()
stat.result = utils.Result.error
stat.content(name=name)
return stat
[docs]
class ls_tb:
def __init__(self, workers=1):
self.workers = workers
[docs]
def init(self, task_path, net_path) -> None:
self.task = utils.load_stream(task_path)
self.net = utils.load_network(net_path)
self.task.set_routings(
{s: self.net.get_shortest_path(s.src, s.dst) for s in self.task}
)
self.est: Dict[utils.Stream, Dict[utils.Link, int]] = {}
self.lst: Dict[utils.Stream, Dict[utils.Link, int]] = {}
for s in self.task:
self.est.setdefault(s, {})
self.lst.setdefault(s, {})
for l in s.links:
if l == s.first_link:
self.est[s][l] = 0
else:
prev_link = s.get_prev_link(l)
if prev_link is None:
raise Exception("prev_link is None")
self.est[s][l] = (
self.est[s][prev_link] + s.get_t_trans(prev_link) + l.t_proc
)
for l in s.links[::-1]:
if l == s.last_link:
self.lst[s][l] = (
self.est[s][s.first_link]
+ s.deadline
- s.get_t_trans(s.first_link)
- s.first_link.t_proc
)
else:
next_link = s.get_next_link(l)
if next_link is None:
raise Exception("next_link is None")
self.lst[s][l] = (
self.lst[s][next_link] - s.get_t_trans(l) - next_link.t_proc
)
self.conflicts: Dict[Task, int] = {
(s, link): 1 for s, link in [(s, l) for s in self.task for l in s.links]
}
self.af: List[Task] = []
self.uf: List[Task] = [
(s, link) for s, link in [(s, l) for s in self.task for l in s.links]
]
self.all_frame = self.af + self.uf
self.assign: List[Optional[Tuple[int, int]]] = [None] * len(self.all_frame)
self.new_values: List[Tuple[int, int]] = [(0, 0)] * len(self.all_frame)
self.cs: List[Set[int]] = [set() for _ in range(len(self.all_frame))]
self.gs: Set[int] = set()
[docs]
def prepare(self) -> None:
pass
@utils.check_time_limit
def solve(self) -> utils.Statistics:
start_time = utils.time_log()
k = 0
while k < len(self.all_frame):
if utils.time_log() - start_time > utils.T_LIMIT:
return utils.Statistics(
"-", utils.Result.unknown, utils.time_log() - start_time
)
if k == len(self.af):
self.af.append(self.get_var())
self.uf.pop(self.uf.index(self.af[k]))
val = self.get_bounds(k)
else:
val = self.new_values[k]
success = False
while not success and val[0] <= self.lst[self.af[k][0]][self.af[k][1]]:
self.assign[k] = val
success, val = self.check(k, val)
if success:
self.new_values[k] = val
k += 1
else:
if len(self.cs[k]) == 0 and len(self.gs) == 0:
return utils.Statistics(
"-", utils.Result.unschedulable, utils.time_log() - start_time
)
self.conflicts[self.af[k]] += len(self.cs[k])
if self.gs and max(self.gs) > max(self.cs[k]):
m = max(self.gs)
self.gs = self.gs | self.cs[k] - set([m])
else:
m = max(self.cs[k])
self.cs[m] = self.cs[m] | self.cs[k] - set([m])
while k > m:
self.assign[k] = None
revert = self.af.pop(k)
self.uf.append(revert)
self.new_values[k] = (0, 0)
self.cs[k] = set()
k -= 1
return utils.Statistics(
"-", utils.Result.schedulable, utils.time_log() - start_time
)
[docs]
def output(self) -> utils.Config:
config = utils.Config()
config.gcl = self.get_gcl()
config.release = self.get_offset()
config.route = self.get_route()
config.queue = self.get_queue()
config._delay = self.get_delay()
return config
[docs]
def crit(self, f: Tuple[utils.Stream, utils.Link]) -> float:
s, l = f
return (
s.deadline * (self.lst[s][l] - self.est[s][l]) + s.links.index(l)
) / self.conflicts[f]
[docs]
def get_var(self) -> Tuple[utils.Stream, utils.Link]:
return sorted(self.uf, key=self.crit, reverse=False)[0]
[docs]
def get_bounds(self, k: int):
"""_summary_
Args:
k (int): Task index
Returns:
(int, int): (earliest time , queue)
"""
s, l = self.af[k]
if s.links.index(l) > 0:
pre_link = s.get_prev_link(l)
if pre_link is None:
raise Exception("prev_link is None")
_prev_ins = (s, pre_link)
if _prev_ins in self.af:
_prev_task = self.assign[self.af.index(_prev_ins)]
if _prev_task is None:
raise Exception("prev_ins is not assigned")
self.est[s][l] = _prev_task[0] + s.get_t_trans(l) + l.t_proc
self.cs[k].add(self.af.index(_prev_ins))
return (self.est[s][l], 0)
[docs]
def check(self, k: int, val: Tuple[int, int]):
_val = list(val) # Make it mutable
s, l = self.af[k]
success = True
for k2, (s2, l2) in [
(k2, (s2, l2)) for k2, (s2, l2) in enumerate(self.af) if k2 != k and l2 == l
]:
if self.assign[k2] is None:
raise Exception("prev_ins is not assigned")
if _val[1] == self.assign[k2][1] and l != s.first_link: # type: ignore
prec = s.get_prev_link(l)
prec2 = s2.get_prev_link(l2)
if (s, prec) not in self.af or (s2, prec2) not in self.af:
continue
k_prec, k2_prec = self.af.index((s, prec)), self.af.index((s2, prec2)) # type: ignore
if self.assign[k2] is None or self.assign[k_prec] is None:
raise Exception("prev_ins is not assigned")
for a, b in self.task.get_frame_index_pairs(s, s2):
frame_iso = (
_val[0] + a * s.period
<= self.assign[k2_prec][0] + b * s2.period + l.t_proc # type: ignore
or self.assign[k2][0] + b * s2.period # type: ignore
<= self.assign[k_prec][0] + a * s.period + l2.t_proc # type: ignore
) # type: ignore
if frame_iso == False:
self.cs[k].add(k2)
self.cs[k].add(k2_prec)
if _val[1] < l.q_num - 1:
_val[1] += 1
else:
_val[0] = np.inf
success = False
break
if not success:
break
if success or _val[0] <= self.lst[s][l]:
g = np.gcd(s.period, s2.period)
d1 = (self.assign[k2][0] - _val[0]) % g # type: ignore
d2 = (_val[0] - self.assign[k2][0]) % g # type: ignore
if d1 < s.get_t_trans(l):
self.cs[k].add(k2)
_val = [_val[0] + s2.get_t_trans(l2) + d1, 0]
success = False
elif d2 < s2.get_t_trans(l2):
self.cs[k].add(k2)
_val = [_val[0] + s2.get_t_trans(l2) - d2, 0]
success = False
if not success:
return False, tuple(_val)
return True, (_val[0], _val[1] + 1) if _val[1] < l.q_num - 1 else (
_val[0] + 1,
val[1],
)
[docs]
def get_gcl(self) -> utils.GCL:
gcl = []
for i in range(len(self.af)):
s, l = self.af[i]
start, queue = self.assign[i] # type: ignore
end = start + s.get_t_trans(l)
for k in s.get_frame_indexes(self.task.lcm):
gcl.append(
[l, queue, start + k * s.period, end + k * s.period, self.task.lcm]
)
return utils.GCL(gcl)
[docs]
def get_offset(self) -> utils.Release:
release = []
for i in range(len(self.af)):
s, l = self.af[i]
start, queue = self.assign[i] # type: ignore
if l == s.first_link:
release.append([s, 0, start])
return utils.Release(release)
[docs]
def get_route(self) -> utils.Route:
route = []
for s in self.task:
for l in s.links:
route.append([s, l])
return utils.Route(route)
[docs]
def get_queue(self) -> utils.Queue:
queue = []
for i in range(len(self.af)):
s, l = self.af[i]
start, q = self.assign[i] # type: ignore
queue.append([s, 0, l, q])
return utils.Queue(queue)
[docs]
def get_delay(self) -> utils.Delay:
delay = []
for s in self.task:
start_hop = s.first_link
end_hop = s.last_link
for index, (start, _) in enumerate(self.assign): # type: ignore
if self.af[index][0] == s and self.af[index][1] == start_hop:
start_time = start
if self.af[index][0] == s and self.af[index][1] == end_hop:
end_time = start
delay.append([s, 0, end_time - start_time])
return utils.Delay(delay)
if __name__ == "__main__":
args = utils.parse_command_line_args()
utils.Statistics().header()
benchmark(args.name, args.task, args.net, args.output, args.workers)