-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathedf.py
More file actions
116 lines (107 loc) · 3.86 KB
/
edf.py
File metadata and controls
116 lines (107 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import numpy as np
from math import gcd
# Input is task set
# Run utilization test
# If needed run exact analysis based on threshold
# If pass/fail notify user
# If pass then move on to scheduling
# EDF does scheduling based on earliest deadline first
# Ties will be given to task that is currently executing
# Otherwise task will be selected at random amongst tied
class EDF:
def __init__(self, task):
self.task = task
# Utilization test will take ci/pi of each task
# It will then add them all together
# Then check if it is <= 1
# Where n is # of tasks
# Necessary for EDF and sufficient
@staticmethod
def utilization_test(task):
u_task = 0
task_copy = np.copy(task)
task_row = task_copy.shape[0]
for x in range(task_row):
u_task = u_task + (float(task_copy[x][0]) / float(task_copy[x][1]))
if u_task <= 1:
return True
else:
return False
# order on deadlines
@staticmethod
def priority_order(task):
ordered_array = []
ordered_index = []
copy_array = np.copy(task)
copy_index = []
for x in range(len(copy_array)):
copy_index.append(x)
for x in range(len(task)):
temp = np.copy(copy_array[0])
index_temp = copy_index[0]
for y in range(len(copy_array)):
if temp[2] >= copy_array[y][2]:
temp = np.copy(copy_array[y])
index_temp = copy_index[y]
ordered_array.append(temp)
ordered_index.append(index_temp)
test_r = np.where((copy_array == temp).all(axis=1))
copy_array = np.delete(copy_array, test_r[0][0], 0)
copy_index.remove(index_temp)
order_array = np.array(ordered_array)
return order_array, ordered_index
# find least common multiple from deadlines of tasks
@staticmethod
def lcm_edf(task):
list_of_deadlines = []
for x in range(len(task)):
list_of_deadlines.append(int(task[x][2]))
lcm = list_of_deadlines[0]
for i in list_of_deadlines[1:]:
lcm = lcm * i // gcd(lcm, i)
return lcm
def edf_schedule(self, p_order, lcm, original_order):
queue_order = np.copy(p_order)
row = queue_order.shape[0]
organized_array = []
temp = 0
queue_flag = 0
swap_flag = 0
# arrays made at this point
for x in range(lcm + 1):
if x != 0:
if temp == "":
organized_array.append(0)
queue_flag = 0
else:
organized_array.append(p_order[temp])
queue_flag = 0
swap_flag = 0
for y in range(row):
if x != 0 and x % int(p_order[y][2]) == 0:
queue_order[y][2] += p_order[y][2]
queue_order[y][0] += p_order[y][0]
swap_flag = 1
if swap_flag == 1:
swap = self.priority_order(queue_order)
queue_order = swap[0]
p_order = p_order[swap[1]]
for z in range(row):
if queue_flag == 0:
if queue_order[z][0] > 0:
temp = z
queue_order[z][0] -= 1
queue_flag = 1
else:
temp = ""
org_array = np.array(organized_array, dtype="object")
t_array = []
for x in range(len(org_array)):
for y in range(len(p_order)):
if np.array_equal(org_array[x], 0):
t_array.append(" ")
break
elif np.array_equal(org_array[x], original_order[y]):
t_array.append("T" + str(y + 1))
print(t_array)
return t_array