Skip to content

Commit 357c8c8

Browse files
authored
[Bug] fix job silent status can't convert to lost status bugs (#4300)
* fix job silent status can't convert to lost status bugs * fix job silent status can't convert to lost status bugs
1 parent fc448ee commit 357c8c8

File tree

5 files changed

+24
-15
lines changed

5 files changed

+24
-15
lines changed

streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ object K8sFlinkConfig {
2828
classType = classOf[java.lang.Long],
2929
description = "run timeout seconds of single flink-k8s metrics tracking task")
3030

31+
@deprecated
32+
val jobStatusTrackCacheTimeoutSec: InternalOption = InternalOption(
33+
key = "streampark.flink-k8s.tracking.cache-timeout-sec.job-status",
34+
defaultValue = 300,
35+
classType = classOf[java.lang.Integer],
36+
description = "status cache timeout seconds of single flink-k8s job status tracking task")
37+
3138
@deprecated
3239
val metricTrackTaskTimeoutSec: InternalOption = InternalOption(
3340
key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric",

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = FlinkTrackConfig.defaultCo
3333

3434
// cache pool for storage tracking result
3535
implicit val watchController: FlinkK8sWatchController =
36-
new FlinkK8sWatchController()
36+
new FlinkK8sWatchController(conf.jobStatusWatcherConf)
3737

3838
// eventBus for change event
3939
implicit lazy val eventBus: ChangeEventBus = {

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import java.util.Objects
2727
import java.util.concurrent.TimeUnit
2828

2929
/** Tracking info cache pool on flink kubernetes mode. */
30-
class FlinkK8sWatchController extends Logger with AutoCloseable {
30+
class FlinkK8sWatchController(conf: JobStatusWatcherConfig = JobStatusWatcherConfig.defaultConf) extends Logger with AutoCloseable {
3131

3232
// cache for tracking identifiers
3333
lazy val trackIds: TrackIdCache = TrackIdCache.build()
@@ -38,7 +38,7 @@ class FlinkK8sWatchController extends Logger with AutoCloseable {
3838
lazy val endpoints: EndpointCache = EndpointCache.build()
3939

4040
// cache for tracking flink job status
41-
lazy val jobStatuses: JobStatusCache = JobStatusCache.build()
41+
lazy val jobStatuses: JobStatusCache = JobStatusCache.build(conf.jobStatusCacheTimeOutSec)
4242

4343
// cache for tracking kubernetes events with Deployment kind
4444
lazy val k8sDeploymentEvents: K8sDeploymentEventCache =
@@ -156,10 +156,10 @@ object TrackIdCache {
156156
}
157157
}
158158

159-
class JobStatusCache {
159+
class JobStatusCache(timeout: Int) {
160160

161161
private[this] lazy val cache: Cache[CacheKey, JobStatusCV] =
162-
Caffeine.newBuilder.expireAfterWrite(20, TimeUnit.SECONDS).build()
162+
Caffeine.newBuilder.expireAfterWrite(timeout, TimeUnit.SECONDS).build()
163163

164164
def putAll(kvs: Map[TrackId, JobStatusCV]): Unit =
165165
cache.putAll(kvs.map(t => (CacheKey(t._1.appId), t._2)))
@@ -183,7 +183,7 @@ class JobStatusCache {
183183

184184
object JobStatusCache {
185185

186-
def build(): JobStatusCache = new JobStatusCache()
186+
def build(timeout: Int): JobStatusCache = new JobStatusCache(timeout)
187187

188188
}
189189

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,14 @@ case class MetricWatcherConfig(requestTimeoutSec: Long, requestIntervalSec: Long
4848
* interval seconds between two single tracking task
4949
* @param silentStateJobKeepTrackingSec
5050
* retained tracking time for SILENT state flink tasks
51+
* @param jobStatusCacheTimeOutSec
52+
* job status cache time out of single tracking task, must bigger than silentStateJobKeepTrackingSec
5153
*/
5254
case class JobStatusWatcherConfig(
5355
requestTimeoutSec: Long,
5456
requestIntervalSec: Long,
55-
silentStateJobKeepTrackingSec: Int)
57+
silentStateJobKeepTrackingSec: Int,
58+
jobStatusCacheTimeOutSec: Int)
5659

5760
object FlinkTrackConfig {
5861
def defaultConf: FlinkTrackConfig =
@@ -66,7 +69,8 @@ object FlinkTrackConfig {
6669
JobStatusWatcherConfig(
6770
InternalConfigHolder.get(K8sFlinkConfig.jobStatusTrackTaskTimeoutSec),
6871
InternalConfigHolder.get(K8sFlinkConfig.jobStatueTrackTaskIntervalSec),
69-
InternalConfigHolder.get(K8sFlinkConfig.silentStateJobKeepTrackingSec)),
72+
InternalConfigHolder.get(K8sFlinkConfig.silentStateJobKeepTrackingSec),
73+
InternalConfigHolder.get(K8sFlinkConfig.jobStatusTrackCacheTimeoutSec)),
7074
MetricWatcherConfig(
7175
InternalConfigHolder.get(K8sFlinkConfig.metricTrackTaskTimeoutSec),
7276
InternalConfigHolder.get(K8sFlinkConfig.metricTrackTaskIntervalSec)))
@@ -77,12 +81,14 @@ object JobStatusWatcherConfig {
7781
def defaultConf: JobStatusWatcherConfig = JobStatusWatcherConfig(
7882
requestTimeoutSec = 120,
7983
requestIntervalSec = 5,
80-
silentStateJobKeepTrackingSec = 60)
84+
silentStateJobKeepTrackingSec = 60,
85+
jobStatusCacheTimeOutSec = 300)
8186

8287
def debugConf: JobStatusWatcherConfig = JobStatusWatcherConfig(
8388
requestTimeoutSec = 120,
8489
requestIntervalSec = 2,
85-
silentStateJobKeepTrackingSec = 5)
90+
silentStateJobKeepTrackingSec = 5,
91+
jobStatusCacheTimeOutSec = 30)
8692
}
8793

8894
object MetricWatcherConfig {

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
128128
case _ =>
129129
touchSessionJob(trackId) match {
130130
case Some(state) =>
131-
if (FlinkJobState.isEndState(state.jobState)) {
132-
// can't find that job in the k8s cluster.
133-
watchController.unWatching(trackId)
134-
}
135-
eventBus.postSync(FlinkJobStatusChangeEvent(trackId, state))
131+
updateState(trackId, state)
136132
case _ =>
137133
}
138134
}

0 commit comments

Comments
 (0)