"""
Author: <Chuanyu> (skewcy@gmail.com)
at.py (c) 2023
Desc: description
Created: 2023-10-27T23:29:21.848Z
"""
import traceback
from typing import Dict, List
import numpy as np
from .. import core as utils
import z3 # type: ignore
[docs]
def benchmark(
name, task_path, net_path, output_path="./", workers=1
) -> utils.Statistics:
stat = utils.Statistics(name) ## Init empty stat
try:
## Change _Method to your method class
test = at(workers) # type: ignore
test.init(task_path, net_path)
test.prepare()
stat = test.solve() ## Update stat
if stat.result == utils.Result.schedulable:
test.output().to_csv(name, output_path)
stat.content(name=name)
return stat
except KeyboardInterrupt:
stat.content(name=name)
return stat
except Exception as e:
print("[!]", e, flush=True)
traceback.print_exc()
stat.result = utils.Result.error
stat.content(name=name)
return stat
[docs]
class at:
def __init__(self, workers=1, num_window=5) -> None:
self.workers = workers
self.num_window = num_window
[docs]
def init(self, task_path: str, net_path: str) -> None:
self.task = utils.load_stream(task_path)
self.net = utils.load_network(net_path)
self.task.set_routings(
{s: self.net.get_shortest_path(s.src, s.dst) for s in self.task.streams}
)
z3.set_param("parallel.enable", True)
z3.set_param("parallel.threads.max", self.workers)
self.solver = z3.Solver()
self.phi = {
link: z3.Array(str(link) + "_phi", z3.IntSort(), z3.IntSort())
for link in self.net.links
}
self.tau = {
link: z3.Array(str(link) + "_tau", z3.IntSort(), z3.IntSort())
for link in self.net.links
}
self.k = {
link: z3.Array(str(link) + "_k", z3.IntSort(), z3.IntSort())
for link in self.net.links
}
self.w: Dict[utils.Stream, Dict[utils.Link, List[z3.ArithRef]]] = {}
for s in self.task:
self.w.setdefault(s, {})
for l in s.links:
self.w[s].setdefault(l, [])
for k in s.get_frame_indexes(self.task.lcm):
self.w[s][l].append(
z3.Int("w_" + str(s) + "_" + str(l) + "_" + str(k))
)
[docs]
def prepare(self) -> None:
self.add_window_order_const()
self.add_window_const()
self.add_frame_const()
self.add_window_overlap_const()
self.add_frame_to_window_range_const()
self.add_flow_tran_const()
self.add_frame_isolation_const()
self.add_delay_const()
[docs]
def output(self) -> utils.Config:
config = utils.Config()
config.gcl = self.get_gcl()
config.release = self.get_offset()
config.route = self.get_route()
config.queue = self.get_queue()
config._delay = self.get_delay()
return config
@utils.check_time_limit
def solve(self) -> utils.Statistics:
self.solver.set("timeout", int(utils.T_LIMIT - utils.time_log()) * 1000)
result = self.solver.check() ## Z3 solving
info = self.solver.statistics()
algo_time = info.time
algo_mem = info.max_memory
algo_result = (
utils.Result.schedulable if result == z3.sat else utils.Result.unschedulable
)
if result == z3.sat:
self.model_output = self.solver.model()
return utils.Statistics("-", algo_result, algo_time, algo_mem)
[docs]
def add_window_order_const(self) -> None:
for l in self.net.links:
for w in range(self.num_window):
self.tau[l] = z3.Store(self.tau[l], w, self.phi[l][w])
for s in self.task:
for l in s.links:
for k in self.w[s][l]:
self.tau[l] = z3.Store(
self.tau[l], k, self.tau[l][k] + s.get_t_trans(l)
)
[docs]
def add_window_const(self) -> None:
for l in self.net.links:
self.solver.add(
self.phi[l][0] >= 0, self.tau[l][-1] < z3.IntVal(str(self.task.lcm))
)
for w in range(self.num_window):
self.solver.add(self.k[l][w] >= 0, self.k[l][w] < l.q_num)
[docs]
def add_frame_const(self) -> None:
for s in self.task:
for l in s.links:
for k in s.get_frame_indexes(self.task.lcm):
self.solver.add(
self.phi[l][self.w[s][l][k]] >= k * s.period,
self.tau[l][self.w[s][l][k]] < (k + 1) * s.period,
)
[docs]
def add_window_overlap_const(self) -> None:
for l in self.net.links:
for w in range(self.num_window - 1):
self.solver.add(self.tau[l][w] <= self.phi[l][w + 1])
[docs]
def add_frame_to_window_range_const(self) -> None:
for s in self.task:
for l in s.links:
for k in s.get_frame_indexes(self.task.lcm):
self.solver.add(
self.w[s][l][k] >= 0, self.w[s][l][k] < self.num_window
)
[docs]
def add_flow_tran_const(self) -> None:
for s in self.task:
for l in s.links[:-1]:
_next = s.get_next_link(l)
if _next is None:
raise ValueError("No next link")
for k in s.get_frame_indexes(self.task.lcm):
self.solver.add(
self.tau[l][self.w[s][l][k]] + l.t_proc + l.t_sync
<= self.phi[_next][self.w[s][_next][k]]
)
[docs]
def add_frame_isolation_const(self) -> None:
for s1, s2 in self.task.get_pairs():
for pl_1, pl_2, l in self.task.get_merged_links(s1, s2):
for f1, f2 in self.task.get_frame_index_pairs(s1, s2):
self.solver.add(
z3.Or(
self.tau[l][self.w[s1][l][f1]] + pl_2.t_proc + l.t_sync
< self.phi[pl_2][self.w[s2][pl_2][f2]],
self.tau[l][self.w[s2][l][f2]] + pl_1.t_proc + l.t_sync
< self.phi[pl_1][self.w[s1][pl_1][f1]],
self.k[l][self.w[s1][l][f1]]
!= self.k[l][self.w[s2][l][f2]],
self.w[s1][l][f1] == self.w[s2][l][f2],
)
)
[docs]
def add_delay_const(self) -> None:
for s in self.task:
for k in s.get_frame_indexes(self.task.lcm):
self.solver.add(
self.tau[s.last_link][self.w[s][s.last_link][k]]
- self.phi[s.first_link][self.w[s][s.first_link][k]]
<= s.deadline - utils.E_SYNC
)
[docs]
def get_gcl(self) -> utils.GCL:
gcl = []
for l in self.net.links:
for w in range(self.num_window):
start = self.model_output.eval(self.phi[l][w]).as_long()
end = self.model_output.eval(self.tau[l][w]).as_long()
queue = self.model_output.eval(self.k[l][w]).as_long()
if end - start > 0:
gcl.append([l, queue, start, end, self.task.lcm])
return utils.GCL(gcl)
[docs]
def get_offset(self) -> utils.Release:
offset = []
for s in self.task:
for k in s.get_frame_indexes(self.task.lcm):
offset.append(
[
s,
k,
self.model_output.eval(
self.phi[s.first_link][self.w[s][s.first_link][k]]
).as_long(),
]
)
return utils.Release(offset)
[docs]
def get_route(self) -> utils.Route:
route = []
for s in self.task:
for l in s.links:
route.append([s, l])
return utils.Route(route)
[docs]
def get_queue(self) -> utils.Queue:
queue = []
for s in self.task:
for l in s.links:
for k in s.get_frame_indexes(self.task.lcm):
queue.append(
[
s,
k,
l,
self.model_output.eval(
self.k[l][self.w[s][l][k]]
).as_long(),
]
)
return utils.Queue(queue)
[docs]
def get_delay(self) -> utils.Delay:
delay = []
for s in self.task:
for k in s.get_frame_indexes(self.task.lcm):
start = self.model_output.eval(
self.phi[s.first_link][self.w[s][s.first_link][k]]
).as_long()
end = self.model_output.eval(
self.tau[s.last_link][self.w[s][s.last_link][k]]
).as_long()
delay.append([s, k, end - start - s.get_t_trans(s.last_link)])
return utils.Delay(delay)
if __name__ == "__main__":
# Test for route space
args = utils.parse_command_line_args()
utils.Statistics().header()
benchmark(args.name, args.task, args.net, args.output, args.workers)