import argparse
import pandas as pd
import numpy as np
from enum import Enum
from tqdm import tqdm
from .draw import *
from typing import Set, Dict, List, Tuple
import os
from ..core import find_files_with_prefix, T_SLOT, T_PROC, parse_command_line_constants
# Try to import Cython optimized functions
try:
from .cython.simulation_core import match_time_optimized
CYTHON_AVAILABLE = True
except ImportError:
CYTHON_AVAILABLE = False
# DEBUG only
DEBUG_FLAG = 0
DEBUG_FLOWSET: Set[int] = set([0])
[docs]
def match_time(t: int, gcl: List[Tuple[int, int, int]]) -> int:
"""Match the entry in GCl with the given time. Implemented in binary search.
Args:
t (int): _description_
gcl (List[Tuple[int, int, int]]): _description_
Returns:
int: _description_
"""
if not gcl:
return -1
gate_time = [x[0] for x in gcl]
left = 0
right = len(gcl) - 1
if gate_time[right] <= t < gcl[-1][1]:
return right
elif gcl[-1][1] <= t or t < gate_time[0]:
return -1
while True:
median = (left + right) // 2
if right - left <= 1:
return left
elif gate_time[left] <= t < gate_time[median]:
right = median
else:
left = median
[docs]
class ConfigTypes(Enum):
GCL = 0
ROUTE = 1
OFFSET = 2
QUEUE = 3
def __str__(self):
return self.name.lower()
[docs]
def match_config_type(
configs: List[pd.DataFrame], config_type: ConfigTypes
) -> pd.DataFrame:
return_dfs = []
for df in configs:
if (
df.shape[1] == 5
and all(df.columns == ["link", "queue", "start", "end", "cycle"])
and config_type == ConfigTypes.GCL
):
return_dfs.append(df)
if (
df.shape[1] == 2
and all(df.columns == ["stream", "link"])
and config_type == ConfigTypes.ROUTE
):
return_dfs.append(df)
if (
df.shape[1] == 3
and all(df.columns == ["stream", "frame", "offset"])
and config_type == ConfigTypes.OFFSET
):
return_dfs.append(df)
if (
df.shape[1] == 4
and all(df.columns == ["stream", "frame", "link", "queue"])
and config_type == ConfigTypes.QUEUE
):
return_dfs.append(df)
if len(return_dfs) == 0:
raise ValueError(f"No config type for {config_type} matched")
elif len(return_dfs) > 1:
raise ValueError(
f"Multiple configs matched for {config_type}, please modify the prefix to avoid ambiguity"
)
else:
return return_dfs[0]
[docs]
def simulation(
task_path: str = "./",
config_path_affix: str = "./",
it: int = 10,
verbose: bool = False,
draw_results: bool = True,
disable_pbar: bool = False
) -> List[List[List[int]]]:
if CYTHON_AVAILABLE:
match_time_func = match_time_optimized
else:
match_time_func = match_time
_path = "/".join(config_path_affix.split("/")[:-1])
_prefix = config_path_affix.split("/")[-1]
config_paths = find_files_with_prefix(_path, _prefix)
configs = [pd.read_csv(path) for path in config_paths]
task = pd.read_csv(task_path)
gcl = match_config_type(configs, ConfigTypes.GCL)
route = match_config_type(configs, ConfigTypes.ROUTE)
offset = match_config_type(configs, ConfigTypes.OFFSET)
queue = match_config_type(configs, ConfigTypes.QUEUE)
GCL: Dict[Tuple[int, int], List[Tuple[int, int, int]]] = (
{}
) ## (src, dst) -> [(start, end, queue)]
CYCLE: Dict[Tuple[int, int], int] = {}
for i, row in gcl.iterrows():
GCL.setdefault(eval(row["link"]), [])
CYCLE.setdefault(eval(row["link"]), row["cycle"])
GCL[eval(row["link"])].append((row["start"], row["end"], row["queue"]))
for link in GCL:
GCL[link] = sorted(GCL[link], key=lambda x: x[0], reverse=False)
if verbose:
for link in GCL:
temp = GCL[link]
for i, row in enumerate(temp[:-1]): # type: ignore
if row[1] > temp[i + 1][0]:
print("overlap", link, row, temp[i + 1])
ROUTE: Dict[int, Dict[int, List[int]]] = {} ## flow -> link -> [link]
SRC = {}
DST = {}
for i, row in route.iterrows():
ROUTE.setdefault(row["stream"], {})
link = eval(row["link"])
ROUTE[row["stream"]].setdefault(link[0], [])
ROUTE[row["stream"]][link[0]].append(link[1])
for i, row in task.iterrows():
SRC[i] = row["src"]
DST[i] = eval(row["dst"])
OFFSET: Dict[Tuple[int, int], int] = {}
for i, row in offset.iterrows():
OFFSET[(row["stream"], row["frame"])] = row["offset"]
OFFSET_MAX: Dict[int, int] = {}
for i, row in offset.groupby("stream", as_index=False).count().iterrows():
OFFSET_MAX[row["stream"]] = row["offset"]
QUEUE: Dict[Tuple[int, int], Dict[Tuple[int, int], int]] = {}
for i, row in queue.iterrows():
QUEUE.setdefault((row["stream"], row["frame"]), {})
QUEUE[(row["stream"], row["frame"])][eval(row["link"])] = row["queue"]
NUM_QUEUES = max(max(queue["queue"]), max(gcl["queue"])) + 1
## Global setting
period = list(task["period"])
size = list(task["size"])
hyper_period = np.lcm.reduce(period)
log: List[List[List[int]]] = [[[], []] for _ in range(len(task))]
output: List[int] = []
instance_count = [0 for _ in range(len(task))]
egress_q: Dict[Tuple[int, int], List[List]] = {
link: [[] for _ in range(NUM_QUEUES)] for link, _ in GCL.items()
}
available_t = {link: 0 for link, _ in GCL.items()}
_pool: dict[Tuple[int, int], list[Tuple]] = {link: [] for link, _ in GCL.items()}
for t in tqdm(range(0, hyper_period * it, T_SLOT), disable=disable_pbar):
## Release task
for flow in range(len(task)):
frame = (flow, instance_count[flow] % OFFSET_MAX[flow])
if (t / period[flow] >= instance_count[flow]) and t % period[
flow
] == OFFSET[frame]:
for v in ROUTE[flow][SRC[flow]]:
link = (SRC[flow], v)
egress_q[link][QUEUE[frame][link]].append(frame)
instance_count[flow] = instance_count[flow] + 1
## Timer - TODO: Use min-heap
for link, vec in _pool.items():
_new_vec = []
for ct, frame in vec:
flow = frame[0]
if t >= ct:
if link[0] == SRC[flow]:
log[flow][0].append(t)
# output.append([flow, link, 1, t])
if verbose and (DEBUG_FLAG == 0 or flow in DEBUG_FLOWSET):
print(
("[Talker %d]:" % link[0]).ljust(20)
+ "Flow %d - Send at %d" % (flow, t)
)
if link[1] in DST[flow]:
log[flow][1].append(t - T_PROC)
output.append([flow, link, 0, t])
if verbose and (DEBUG_FLAG == 0 or flow in DEBUG_FLOWSET):
print(
("[Listener %d]:" % link[-1]).ljust(20)
+ "Flow %d - Receive at %d" % (flow, t - T_PROC)
)
continue
try:
for v in ROUTE[flow][link[-1]]:
nxt_link = (link[-1], v)
output.append([flow, nxt_link, 0, t])
if verbose and (DEBUG_FLAG == 0 or flow in DEBUG_FLOWSET):
print(
("[Bridge %s]:" % str(nxt_link)).ljust(20)
+ "Flow %d - Arrive at %d" % (flow, t)
)
egress_q[nxt_link][QUEUE[frame][nxt_link]].append(frame)
except KeyError:
print(flow, link)
raise
else:
_new_vec.append((ct, frame))
_pool[link] = _new_vec # type: ignore
# Qbv
for link, sche in GCL.items():
current_t = t % CYCLE[link]
index = match_time_func(current_t, sche)
if index == -1:
continue
_, e, q = sche[index]
if t >= available_t[link] and egress_q[link][q]:
trans_delay = size[egress_q[link][q][0][0]] * 8
if e - current_t >= trans_delay:
out = egress_q[link][q].pop(0)
_pool[link].append((t + trans_delay + T_PROC, out))
available_t[link] = t + trans_delay
output.append([out[0], link, 1, t])
if verbose and (DEBUG_FLAG == 0 or out[0] in DEBUG_FLOWSET):
print(
("[Bridge %s]:" % str(link)).ljust(20)
+ "Flow %d - Trans at %d" % (out[0], t)
)
if draw_results:
draw(log)
return log, output
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Process the stream and configs paths."
)
# Add positional arguments with defaults set to None
parser.add_argument(
"task",
type=str,
nargs="?",
default=None,
help="The file path to the stream CSV file.",
)
parser.add_argument(
"config",
type=str,
nargs="?",
default=None,
help="The file path to the folder that contains configs.",
)
# Add optional arguments corresponding to the positional arguments
parser.add_argument(
"--task",
dest="task_opt",
type=str,
help="The file path to the stream CSV file (optional).",
)
parser.add_argument(
"--config",
dest="config_opt",
type=str,
help="The file path to the folder that contains configs (optional).",
)
# Add other optional arguments
parser.add_argument(
"--iter", type=int, help="Number of iterations to run.", default=1
)
parser.add_argument("--verbose", action="store_true", help="Enable verbose output.")
parser.add_argument("--no-draw", action="store_true", help="Disable matplotlib plotting (useful for benchmarking)")
parser.add_argument("--output", type=str, nargs="?", help="Save logs to output file")
# log: [
# Release time, Arrival time
# | |
# V V
# [[0,10,20,30], [5,15,25,35]] <-- Each flow
# [[2,12,22,32], [7,17,27,37]]
# ]
# Parse the arguments
parse_command_line_constants(parser)
args = parser.parse_args()
task_path = args.task_opt if args.task_opt is not None else args.task
config_path = args.config_opt if args.config_opt is not None else args.config
# Handle cases where neither positional nor optional arguments are provided for required fields
if task_path is None or config_path is None:
parser.error("the following arguments are required: task, config")
log, output = simulation(
task_path,
config_path,
it=args.iter,
verbose=args.verbose,
draw_results=not args.no_draw,
)
### Check error
print(
"[Potential Errors]:",
[
(log.index(flow), x)
for flow in log
for x in [[flow[1][i] - flow[0][i] for i in range(len(flow[1]))]]
if len(x) == 0 or np.var(x) > 0
],
)
if parser.parse_args().verbose:
print("\n\n\n")
print("[Log]:")
for flow_id, flow_log in enumerate(log):
print("Flow %d:" % flow_id)
print("Send time:", flow_log[0])
print("Receive time:", flow_log[1])
print("\n\n\n")
print("[Statistics]:")
for flow_id, flow_log in enumerate(log):
average_delay = np.mean(
[flow_log[1][i] - flow_log[0][i] for i in range(len(flow_log[1]))]
)
average_jitter = np.std(
[flow_log[1][i] - flow_log[0][i] for i in range(len(flow_log[1]))]
)
print(
f"Flow {flow_id:>4}: ",
f"Average delay: {average_delay:<10.2f}",
f"Average jitter: {average_jitter:<10.2f}",
)
if args.output:
path_name, file_name = os.path.split(args.output)
if path_name:
os.makedirs(path_name, exist_ok=True)
if not file_name:
file_name = "log.csv"
pd.DataFrame(output,
columns=["stream", "link", "di", "time"]
).to_csv(os.path.join(path_name, file_name), index=False)