-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrms.py
More file actions
122 lines (113 loc) · 4.22 KB
/
rms.py
File metadata and controls
122 lines (113 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import numpy as np
from math import gcd
class RMS:
# Expected task format ci, pi, dl
def __init__(self, task):
self.task = task
# Utilization test will take ci/pi of each task
# It will then add them all together
# Then check if it is < n(2^1/n - 1)
# Where n is # of tasks
def utilization_test(self):
u_task = 0
for x in range(len(self.task)):
u_task = u_task + (float(self.task[x][0]) / float(self.task[x][1]))
u_test = len(self.task) * (pow(2, 1 / len(self.task)) - 1)
if u_task <= u_test:
return True
else:
return False
# exact analysis
# Get task priority
# iterate through priorities
# add all as iterating
# for each iteration take # of tasks get total Ci
# check if each task is below that deadline
# if not then if it's not the last task, times 2 and check again
# continue doing so until deadlines qualify or until failure
def exact_analysis(self):
p_order = self.priority_order(self.task)
total_ci = 0
flag = True
for x in range(len(p_order)):
total_ci = total_ci + int(p_order[x][0])
total_ci_tmp = total_ci
p_order_tmp = p_order
i = 0
while i < x:
if int(p_order_tmp[x][2]) < total_ci_tmp:
flag = False
break
elif int(p_order_tmp[i][2]) < total_ci_tmp:
total_ci_tmp = total_ci_tmp + int(p_order_tmp[i][0])
p_order_tmp[i] = str(int(p_order_tmp[i][0]) + int(p_order_tmp[i][0])) + \
str(int(p_order_tmp[i][1]) + int(p_order_tmp[i][1])) + \
str(int(p_order_tmp[i][2]) + int(p_order_tmp[i][2]))
i = 0
else:
i += 1
return flag
# compare task priorities
# lowest priority wins
# create array of tasks in order
# highest to lowest priority order
@staticmethod
def priority_order(task):
ordered_array = []
copy_array = np.copy(task)
for x in range(len(task)):
temp = np.copy(copy_array[0])
for y in range(len(copy_array)):
if temp[1] >= copy_array[y][1]:
temp = np.copy(copy_array[y])
ordered_array.append(temp)
test_r = np.where((copy_array == temp).all(axis=1))
copy_array = np.delete(copy_array, test_r[0][0], 0)
order_array = np.array(ordered_array)
return order_array
@staticmethod
def lcm_rms(task):
list_of_deadlines = []
for x in range(len(task)):
list_of_deadlines.append(int(task[x][2]))
lcm = list_of_deadlines[0]
for i in list_of_deadlines[1:]:
lcm = lcm * i // gcd(lcm, i)
return lcm
@staticmethod
def rms_schedule(p_order, lcm, original_order):
queue_order = np.copy(p_order)
row = queue_order.shape[0]
organized_array = []
temp = 0
queue_flag = 0
# arrays made at this point
for x in range(lcm + 1):
if x != 0:
if temp == "":
organized_array.append(0)
queue_flag = 0
else:
organized_array.append(p_order[temp])
queue_flag = 0
for y in range(row):
if x != 0 and x % int(p_order[y][2]) == 0:
queue_order[y][0] += p_order[y][0]
if queue_flag == 0:
if queue_order[y][0] > 0:
temp = y
queue_order[y][0] -= 1
queue_flag = 1
else:
temp = ""
org_array = np.array(organized_array, dtype="object")
t_array = []
for x in range(len(org_array)):
for y in range(len(p_order)):
if np.array_equal(org_array[x], 0):
t_array.append(" ")
break
elif np.array_equal(org_array[x], original_order[y]):
t_array.append("T" + str(y + 1))
print(t_array)
return t_array