"""
Author: <Chuanyu> (skewcy@gmail.com)
ls.py (c) 2023
Desc: description
Created: 2023-11-06T00:26:19.423Z
"""
import traceback
from typing import Dict, List, Optional
from .. import core as utils
[docs]
def benchmark(
name, task_path, net_path, output_path="./", workers=1
) -> utils.Statistics:
stat = utils.Statistics(name) ## Init empty stat
try:
## Change _Method to your method class
test = ls(workers) # type: ignore
test.init(task_path, net_path)
test.prepare()
stat = test.solve() ## Update stat
if stat.result == utils.Result.schedulable:
test.output().to_csv(name, output_path)
pass
stat.content(name=name)
return stat
except KeyboardInterrupt:
stat.content(name=name)
return stat
except Exception as e:
print("[!]", e, flush=True)
traceback.print_exc()
stat.result = utils.Result.error
stat.content(name=name)
return stat
[docs]
class ls:
def __init__(self, workers=1) -> None:
self.workers = workers
[docs]
def init(self, task_path: str, net_path: str) -> None:
self.task = utils.load_stream(task_path)
self.net = utils.load_network(net_path)
self.task_routes = {s: self.net.get_all_path(s.src, s.dst) for s in self.task}
self.task_order = sorted(
self.task.streams,
key=lambda x: max(
[
len(self.task_routes[x][i].links)
for i in range(len(self.task_routes[x]))
]
),
reverse=True,
)
self._delay: Dict[utils.Stream, Optional[int]] = {}
self._result: Dict[utils.Link, List] = {
l: [] for l in self.net.links
} ## GCL in legacy code
self._paths: Dict[utils.Stream, utils.Path] = (
{}
) ## Only used in recording result
self._offset: Dict[utils.Stream, int] = {} ## Only used in recording result
[docs]
def prepare(self) -> None:
pass
@utils.check_time_limit
def solve(self) -> utils.Statistics:
start_time = utils.time_log()
for s in self.task_order:
succ = self.schedule(s)
end_time = utils.time_log()
if not succ:
return utils.Statistics(
"-", utils.Result.unschedulable, end_time - start_time
)
if end_time - start_time > utils.T_LIMIT:
return utils.Statistics(
"-", utils.Result.unknown, end_time - start_time
)
end_time = utils.time_log()
return utils.Statistics("-", utils.Result.schedulable, end_time - start_time)
[docs]
def output(self) -> utils.Config:
config = utils.Config()
config.gcl = self.get_gcl()
config.release = self.get_offset()
config.route = self.get_route()
config.queue = self.get_queue()
config._delay = self.get_delay()
return config
[docs]
@staticmethod
def match_time(t, sche) -> int:
"""Find the index of entry that starts just before t
Args:
t (_type_): _description_
sche (_type_): [start, end, queue]
Returns:
int: _description_
"""
if not sche:
return -1
gate_time = [x[0] for x in sche]
left = 0
right = len(gate_time) - 1
if gate_time[right] <= t < sche[-1][1]:
return right
elif sche[-1][1] <= t:
return -2
elif t < gate_time[0]:
return -1
while True:
median = (left + right) // 2
if right - left <= 1:
return left
elif gate_time[left] <= t < gate_time[median]:
right = median
else:
left = median
[docs]
@staticmethod
def get_nw_delay(task: utils.Stream, path: utils.Path):
return sum([l.t_proc + task.get_t_trans(l) for l in path.links])
[docs]
def find_inject_time(self, s: utils.Stream, path: utils.Path):
delay = self.get_nw_delay(s, path)
period = s.period
for it in range(0, period - delay + 1):
_prev_end = it
for l in path.links:
_flag = True
_start = _prev_end
_end = _start + s.get_t_trans(l)
_prev_end = _start + l.t_proc + s.get_t_trans(l)
for k in s.get_frame_indexes(self.task.lcm):
match_start = self.match_time(_start + k * period, self._result[l])
match_end = self.match_time(_end + k * period, self._result[l])
if (
match_start == -1 ## _start is before the first entry
and self._result[l]
and _end + k * period
> self._result[l][0][0] ## _end overlaps with the first entry
):
_flag = False
break
if (
match_start == -2 ## _start is after the last entry
and self._result[l]
and _start + k * period
< self._result[l][-1][1] ## _start overlaps with the last entry
):
_flag = False
break
if match_start >= 0 and (
match_start != match_end ## A entry is between _start and _end
or (
self._result[l]
and self._result[l][match_start][1] > _start + k * period
) ## _start overlaps with the matched entry
):
_flag = False
break
if _flag:
## Current hop is available
continue
else:
break
else:
## All hops on the path are available
return it
return -1
[docs]
def schedule(self, task: utils.Stream) -> bool:
self._delay[task] = None
for path in self.task_routes[task]:
## Check if the path is schedulable
_delay = self.get_nw_delay(task, path)
if _delay > task.deadline:
continue
## Try to find the inject time
inject_time = self.find_inject_time(task, path)
if inject_time == -1:
continue
if self._delay[task] == None or _delay < self._delay[task]:
self._delay[task] = _delay
self._paths[task] = path
self._offset[task] = inject_time
if self._delay[task] == None:
return False
## Update GCL
_prev_end = self._offset[task]
for l in self._paths[task].links:
_start = _prev_end
_end = _start + task.get_t_trans(l)
_prev_end = _start + l.t_proc + task.get_t_trans(l)
for k in task.get_frame_indexes(self.task.lcm):
self._result[l].append(
(
_start + k * task.period,
_end + k * task.period,
0,
)
)
## Sort GCL
self._result[l].sort(key=lambda x: x[0], reverse=False)
return True
[docs]
def get_gcl(self) -> utils.GCL:
gcl = []
for l in self._result:
for _entry in self._result[l]:
gcl.append([l, 0, _entry[0], _entry[1], self.task.lcm])
return utils.GCL(gcl)
[docs]
def get_offset(self) -> utils.Release:
offset = []
for s in self._offset:
offset.append([s, 0, self._offset[s]])
return utils.Release(offset)
[docs]
def get_queue(self) -> utils.Queue:
queue = []
for s in self._paths:
for l in self._paths[s].links:
queue.append([s, 0, l, 0])
return utils.Queue(queue)
[docs]
def get_delay(self) -> utils.Delay:
delay = []
for s in self._delay:
start_link = self._paths[s].links[0]
delay.append(
[s, 0, self._delay[s] - start_link.t_proc - s.get_t_trans(start_link)]
)
return utils.Delay(delay)
[docs]
def get_route(self) -> utils.Route:
route = []
for s in self._paths:
for l in self._paths[s].links:
route.append([s, l])
return utils.Route(route)
if __name__ == "__main__":
args = utils.parse_command_line_args()
utils.Statistics().header()
benchmark(args.name, args.task, args.net, args.output, args.workers)