Skip to content

Commit 21b6a3e

Browse files
committed
save
1 parent 6f3b147 commit 21b6a3e

File tree

8 files changed

+88
-9
lines changed

8 files changed

+88
-9
lines changed

integration/postgres_fdw/pgdog.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,20 @@ shard = 1
1313
host = "127.0.0.1"
1414
database_name = "shard_1_fdw"
1515

16+
[[databases]]
17+
name = "pgdog"
18+
shard = 0
19+
host = "127.0.0.1"
20+
database_name = "shard_0_fdw"
21+
role = "replica"
22+
23+
[[databases]]
24+
name = "pgdog"
25+
shard = 1
26+
host = "127.0.0.1"
27+
database_name = "shard_1_fdw"
28+
role = "replica"
29+
1630
[[sharded_tables]]
1731
column = "user_id"
1832
database = "pgdog"

integration/postgres_fdw/users.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,8 @@
22
name = "pgdog"
33
password = "pgdog"
44
database = "pgdog"
5+
6+
[[users]]
7+
name = "lev"
8+
password = "lev"
9+
database = "pgdog"

pgdog-config/src/fdw.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@ use serde::{Deserialize, Serialize};
33
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Copy)]
44
#[serde(deny_unknown_fields)]
55
pub struct Fdw {
6-
#[serde(default)]
7-
pub enabled: bool,
8-
96
#[serde(default = "default_port")]
107
pub port: u16,
118

@@ -16,7 +13,6 @@ pub struct Fdw {
1613
impl Default for Fdw {
1714
fn default() -> Self {
1815
Self {
19-
enabled: bool::default(),
2016
port: default_port(),
2117
launch_timeout: default_launch_timeout(),
2218
}

pgdog-config/src/general.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use std::path::PathBuf;
55
use std::time::Duration;
66

77
use crate::pooling::ConnectionRecovery;
8-
use crate::{CopyFormat, QueryParserEngine, QueryParserLevel, SystemCatalogsBehavior};
8+
use crate::{
9+
CopyFormat, CrossShardBackend, QueryParserEngine, QueryParserLevel, SystemCatalogsBehavior,
10+
};
911

1012
use super::auth::{AuthType, PassthoughAuth};
1113
use super::database::{LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy};
@@ -203,6 +205,9 @@ pub struct General {
203205
/// Copy format used for resharding.
204206
#[serde(default)]
205207
pub resharding_copy_format: CopyFormat,
208+
/// Cross-shard backend.
209+
#[serde(default = "General::cross_shard_backend")]
210+
pub cross_shard_backend: CrossShardBackend,
206211
}
207212

208213
impl Default for General {
@@ -274,6 +279,7 @@ impl Default for General {
274279
system_catalogs: Self::default_system_catalogs(),
275280
omnisharded_sticky: bool::default(),
276281
resharding_copy_format: CopyFormat::default(),
282+
cross_shard_backend: Self::cross_shard_backend(),
277283
}
278284
}
279285
}
@@ -398,6 +404,10 @@ impl General {
398404
)
399405
}
400406

407+
fn cross_shard_backend() -> CrossShardBackend {
408+
Self::env_enum_or_default("PGDOG_CROSS_SHARD_BACKEND")
409+
}
410+
401411
pub fn query_timeout(&self) -> Duration {
402412
Duration::from_millis(self.query_timeout)
403413
}

pgdog-config/src/sharding.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,3 +363,41 @@ impl Display for CopyFormat {
363363
}
364364
}
365365
}
366+
367+
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
368+
#[serde(rename_all = "snake_case", deny_unknown_fields)]
369+
pub enum CrossShardBackend {
370+
#[default]
371+
PgDog,
372+
Fdw,
373+
Hybrid,
374+
}
375+
376+
impl CrossShardBackend {
377+
pub fn need_fdw(&self) -> bool {
378+
matches!(self, Self::Fdw | Self::Hybrid)
379+
}
380+
}
381+
382+
impl Display for CrossShardBackend {
383+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384+
match self {
385+
Self::PgDog => write!(f, "pgdog"),
386+
Self::Fdw => write!(f, "fdw"),
387+
Self::Hybrid => write!(f, "hybrid"),
388+
}
389+
}
390+
}
391+
392+
impl FromStr for CrossShardBackend {
393+
type Err = ();
394+
395+
fn from_str(s: &str) -> Result<Self, Self::Err> {
396+
match s {
397+
"pgdog" => Ok(Self::PgDog),
398+
"fdw" => Ok(Self::Fdw),
399+
"hybrid" => Ok(Self::Hybrid),
400+
_ => Err(()),
401+
}
402+
}
403+
}

pgdog/src/backend/databases.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ pub fn init() -> Result<(), Error> {
9191
let config = config();
9292

9393
// Start postgres_fdw compatibility engine.
94-
if config.config.fdw.enabled {
94+
if config.config.general.cross_shard_backend.need_fdw() {
9595
PostgresLauncher::get().launch();
9696
}
9797

pgdog/src/backend/pool/cluster.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
//! A collection of replicas and a primary.
22
33
use parking_lot::Mutex;
4-
use pgdog_config::{PreparedStatements, QueryParserEngine, QueryParserLevel, Rewrite, RewriteMode};
4+
use pgdog_config::{
5+
CrossShardBackend, PreparedStatements, QueryParserEngine, QueryParserLevel, Rewrite,
6+
RewriteMode,
7+
};
58
use std::{
69
sync::{
710
atomic::{AtomicBool, AtomicUsize, Ordering},
@@ -76,6 +79,7 @@ pub struct Cluster {
7679
lb_strategy: LoadBalancingStrategy,
7780
rw_split: ReadWriteSplit,
7881
fdw_lb: Option<FdwLoadBalancer>,
82+
cross_shard_backend: CrossShardBackend,
7983
}
8084

8185
/// Sharding configuration from the cluster.
@@ -148,6 +152,7 @@ pub struct ClusterConfig<'a> {
148152
pub query_parser_engine: QueryParserEngine,
149153
pub connection_recovery: ConnectionRecovery,
150154
pub lsn_check_interval: Duration,
155+
pub cross_shard_backend: CrossShardBackend,
151156
}
152157

153158
impl<'a> ClusterConfig<'a> {
@@ -195,6 +200,7 @@ impl<'a> ClusterConfig<'a> {
195200
query_parser_engine: general.query_parser_engine,
196201
connection_recovery: general.connection_recovery,
197202
lsn_check_interval: Duration::from_millis(general.lsn_check_interval),
203+
cross_shard_backend: general.cross_shard_backend,
198204
}
199205
}
200206
}
@@ -228,6 +234,7 @@ impl Cluster {
228234
connection_recovery,
229235
lsn_check_interval,
230236
query_parser_engine,
237+
cross_shard_backend,
231238
} = config;
232239

233240
let identifier = Arc::new(DatabaseUser {
@@ -276,9 +283,12 @@ impl Cluster {
276283
lb_strategy,
277284
rw_split,
278285
fdw_lb: None,
286+
cross_shard_backend,
279287
};
280288

281-
cluster.fdw_lb = FdwLoadBalancer::new(&cluster).ok();
289+
if cross_shard_backend.need_fdw() {
290+
cluster.fdw_lb = FdwLoadBalancer::new(&cluster).ok();
291+
}
282292

283293
cluster
284294
}
@@ -460,6 +470,10 @@ impl Cluster {
460470
true
461471
}
462472

473+
pub fn cross_shard_backend(&self) -> CrossShardBackend {
474+
self.cross_shard_backend
475+
}
476+
463477
/// This database/user pair is responsible for schema management.
464478
pub fn schema_admin(&self) -> bool {
465479
self.schema_admin

pgdog/src/frontend/client/query_engine/connect.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ impl QueryEngine {
3030
}
3131

3232
let connect_route = connect_route.unwrap_or(context.client_request.route());
33-
let connect_route = if context.params.is_postgres_fdw() {
33+
let connect_route = if (context.params.is_postgres_fdw() || connect_route.is_cross_shard())
34+
&& self.backend.cluster()?.cross_shard_backend().need_fdw()
35+
{
3436
lazy_static! {
3537
static ref FDW_ROUTE: Route = Route::fdw_fallback();
3638
}

0 commit comments

Comments
 (0)