Skip to content

Commit 42ee5e0

Browse files
committed
Added timeout for multi execs to make it work properly
1 parent 2b14bc6 commit 42ee5e0

File tree

2 files changed

+70
-30
lines changed

2 files changed

+70
-30
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
setup(
44
name='shuffle_sdk',
5-
version='0.0.21',
5+
version='0.0.22',
66
description='The SDK used for Shuffle',
77
py_modules=["shuffle_sdk"],
88
license='MIT',

shuffle_sdk/shuffle_sdk.py

Lines changed: 69 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -425,9 +425,12 @@ def init_singul(self):
425425
execution_id=singul_executionid,
426426
)
427427

428-
self.logger.info("[DEBUG] Created Singul API object with auth %s, url %s and execution_id %s" % (singul_apikey, self.base_url, singul_executionid))
428+
if os.getenv("DEBUG").lower() == "true":
429+
self.logger.info("[DEBUG] Created Singul API object with auth %s, url %s and execution_id %s" % (singul_apikey, self.base_url, singul_executionid))
429430
except ValueError as e:
430-
self.logger.error(f"[ERROR] Failed to create Singul API object: {e}")
431+
if os.getenv("DEBUG").lower() == "true":
432+
self.logger.error(f"[ERROR] Failed to create Singul API object: {e}")
433+
431434
self.singul = None
432435

433436
# Checks output for whether it should be automatically parsed or not
@@ -1152,10 +1155,15 @@ def get_param_multipliers(self, baseparams):
11521155
#self.logger.info(f"{value} is not a list")
11531156
pass
11541157

1155-
self.logger.info("[DEBUG] Listlengths: %s - listitems: %d" % (listlengths, len(listitems)))
1158+
1159+
if os.getenv("DEBUG").lower() == "true":
1160+
self.logger.info("[DEBUG] Listlengths: %s - listitems: %d" % (listlengths, len(listitems)))
1161+
11561162
#if len(listitems) == 0:
11571163
if len(listlengths) == 0:
1158-
self.logger.info("[DEBUG] NO multiplier. Running a single iteration.")
1164+
if os.getenv("DEBUG").lower() == "true":
1165+
self.logger.info("[DEBUG] NO multiplier. Running a single iteration.")
1166+
11591167
paramlist.append(baseparams)
11601168

11611169
#elif len(listitems) == 1:
@@ -1407,9 +1415,9 @@ async def parse_value(tmp):
14071415
ret.append(new_value)
14081416

14091417
#self.logger.info("[INFO] Function return length: %d" % len(ret))
1410-
if len(ret) == 1:
1411-
#ret = ret[0]
1412-
self.logger.info("[DEBUG] DONT make list of 1 into 0!!")
1418+
#if len(ret) == 1:
1419+
#ret = ret[0]
1420+
#self.logger.info("[DEBUG] DONT make list of 1 into 0!!")
14131421

14141422
self.logger.info(f"[DEBUG][%s] Done with execution recursion %d times" % (self.current_execution_id, len(param_multiplier)))
14151423

@@ -2145,7 +2153,7 @@ def execute_action(self, action):
21452153
if param["name"] == "body":
21462154
contains_body = True
21472155

2148-
self.logger.info("[DEBUG][%s] Action name: %s, Params: %d, Has Body: %s" % (self.current_execution_id, self.action["name"], parameter_count, str(contains_body)))
2156+
self.logger.info("[INFO][%s] Action name: %s, Params: %d, Has Body: %s" % (self.current_execution_id, self.action["name"], parameter_count, str(contains_body)))
21492157
except Exception as e:
21502158
print("[ERROR] Failed in init print handler: %s" % e)
21512159

@@ -3867,11 +3875,22 @@ def check_branch_conditions(action, fullexecution, self):
38673875
#self.logger.info(f"Error with subparam deletion of {subparam} in {multi_parameters} (2)")
38683876
pass
38693877

3870-
#self.logger.info()
3871-
#self.logger.info(f"Param: {params}")
3872-
#self.logger.info(f"Multiparams: {multi_parameters}")
3873-
#self.logger.info()
3874-
3878+
if os.getenv("DEBUG").lower() == "true":
3879+
self.logger.info(f"[DEBUG] Param: {params}")
3880+
self.logger.info(f"[DEBUG] Multiparams: {multi_parameters}")
3881+
3882+
timeout = 30
3883+
if "app_name" in self.action and (self.action["app_name"].lower() == "shuffle tools" or self.action["app_name"].lower() == "shuffle subflow"):
3884+
timeout = 55
3885+
3886+
timeout_env = os.getenv("SHUFFLE_APP_SDK_TIMEOUT", timeout)
3887+
try:
3888+
timeout = int(timeout_env)
3889+
#self.logger.info(f"[DEBUG] Timeout set to {timeout} seconds")
3890+
except Exception as e:
3891+
self.logger.info(f"[ERROR] Failed parsing timeout to int: {e}")
3892+
3893+
# Single exec
38753894
if not multiexecution:
38763895
#self.logger.info("NOT MULTI EXEC")
38773896
# Runs a single iteration here
@@ -3935,22 +3954,12 @@ def check_branch_conditions(action, fullexecution, self):
39353954
# Individual functions shouldn't take longer than this
39363955
# This is an attempt to make timeouts occur less, incentivizing users to make use efficient API's
39373956
# PS: Not implemented for lists - only single actions as of May 2023
3938-
timeout = 30
39393957

39403958
# Check if current app is Shuffle Tools, then set to 55 due to certain actions being slow (ioc parser..)
39413959
# In general, this should be disabled for onprem
3942-
if self.action["app_name"].lower() == "shuffle tools":
3943-
timeout = 55
3944-
3945-
timeout_env = os.getenv("SHUFFLE_APP_SDK_TIMEOUT", timeout)
3946-
try:
3947-
timeout = int(timeout_env)
3948-
#self.logger.info(f"[DEBUG] Timeout set to {timeout} seconds")
3949-
except Exception as e:
3950-
self.logger.info(f"[ERROR] Failed parsing timeout to int: {e}")
39513960

39523961
#timeout = 30
3953-
self.logger.info("[DEBUG][%s] Running function '%s' with timeout %d" % (self.current_execution_id, action["name"], timeout))
3962+
self.logger.info("[INFO][%s] Running function '%s' with timeout %d" % (self.current_execution_id, action["name"], timeout))
39543963

39553964
try:
39563965
executor = concurrent.futures.ThreadPoolExecutor()
@@ -4099,14 +4108,44 @@ async def parse_value(newres):
40994108
self.logger.info("Can't handle type %s value from function" % (type(newres)))
41004109

41014110
else:
4102-
#self.logger.info("[INFO] APP_SDK DONE: Starting MULTI execution (length: %d) with values %s" % (minlength, multi_parameters))
4111+
if os.getenv("DEBUG").lower() == "true":
4112+
self.logger.info("[DEBUG] APP_SDK DONE: Starting MULTI execution (length: %d) with values %s" % (minlength, multi_parameters))
4113+
41034114
# 1. Use number of executions based on the arrays being similar
41044115
# 2. Find the right value from the parsed multi_params
41054116

41064117
#self.logger.info("[INFO] Running WITH loop. MULTI: %s", multi_parameters)
41074118
json_object = False
4108-
#results = await self.run_recursed_items(func, multi_parameters, {})
4109-
results = self.run_recursed_items(func, multi_parameters, {})
4119+
4120+
if os.getenv("DEBUG").lower() != "true":
4121+
results = self.run_recursed_items(func, multi_parameters, {})
4122+
else:
4123+
try:
4124+
executor = concurrent.futures.ThreadPoolExecutor()
4125+
future = executor.submit(self.run_recursed_items, func, multi_parameters, {})
4126+
results = future.result(timeout)
4127+
4128+
if not future.done():
4129+
# The future is still running, so we need to cancel it
4130+
future.cancel()
4131+
results = json.dumps({
4132+
"success": False,
4133+
"exception": str(e),
4134+
"reason": "Timeout error within %d seconds (1). This happens if we can't reach or use the API you're trying to use within the time limit. Configure SHUFFLE_APP_SDK_TIMEOUT=100 in Orborus to increase it to 100 seconds. Not changeable for cloud." % timeout,
4135+
})
4136+
4137+
else:
4138+
# The future is done, so we can just get the result from newres :)
4139+
pass
4140+
4141+
except concurrent.futures.TimeoutError as e:
4142+
results = json.dumps({
4143+
"success": False,
4144+
"reason": "Timeout error (2) within %d seconds (2). This happens if we can't reach or use the API you're trying to use within the time limit. Configure SHUFFLE_APP_SDK_TIMEOUT=100 in Orborus to increase it to 100 seconds. Not changeable for cloud." % timeout,
4145+
})
4146+
4147+
4148+
41104149
if isinstance(results, dict) or isinstance(results, list):
41114150
json_object = True
41124151

@@ -4143,7 +4182,7 @@ async def parse_value(newres):
41434182
result = result[:-2]
41444183
result += "]"
41454184
else:
4146-
self.logger.info("Normal result - no list?")
4185+
#self.logger.info("Normal result - no list?")
41474186
result = results
41484187

41494188
if self.authorization == "standalone":
@@ -4201,7 +4240,8 @@ async def parse_value(newres):
42014240

42024241
except Exception as e:
42034242
self.logger.info(f"[ERROR] Failed to execute: {e}")
4204-
self.logger.exception(f"[ERROR] Failed to execute {e}-{action['id']}")
4243+
if "id" in action:
4244+
self.logger.exception(f"[ERROR] Failed to execute {e}-{action['id']}")
42054245
self.action_result["status"] = "FAILURE"
42064246
try:
42074247
e = json.loads(f"{e}")

0 commit comments

Comments
 (0)