Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 101 additions & 13 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use graph_store_postgres::find_chain;
use graph_store_postgres::update_chain_name;
use graph_store_postgres::{ConnectionPool, command_support::catalog::block_store};

use crate::manager::prompt::prompt_for_confirmation;
use crate::network_setup::Networks;

pub async fn list(primary: ConnectionPool, store: BlockStore) -> Result<(), Error> {
Expand Down Expand Up @@ -250,51 +251,138 @@ pub async fn change_block_cache_shard(
.await?
.ok_or_else(|| anyhow!("unknown chain: {}", chain_name))?;
let old_shard = chain.shard;
let canonical_backup_name = format!("{chain_name}-old");
let existing_backup = find_chain(&mut conn, &canonical_backup_name).await?;

println!("Current shard: {}", old_shard);

let chain_store = store
.chain_store(&chain_name)
.await
.ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?;
let new_name = format!("{}-old", &chain_name);
let ident = chain_store.chain_identifier().await?;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I missed is that the current chain and the backup Identities may differ. graphman provides a command to update the genesis hash graphman chain update-genesis, which is part of the ChainIdentifier, so we should fetch the backup identifier as well and compare them and abort If they are different.

let target_shard = Shard::new(shard.clone())?;

let reuse_existing_backup = match existing_backup.as_ref() {
None => false,
Some(backup) if backup.shard != target_shard => {
bail!(
"`{}` already exists on shard `{}`. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
canonical_backup_name,
backup.shard,
canonical_backup_name,
chain_name,
target_shard,
);
}
Some(backup) => {
let backup_ident = backup.network_identifier()?;
if backup_ident != ident {
bail!(
"`{}` has a different chain identifier ({}) than `{}` ({}). Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
canonical_backup_name,
backup_ident,
chain_name,
ident,
canonical_backup_name,
chain_name,
target_shard,
);
}
let prompt = format!(
"`{}` already exists on shard `{}` and will be reused as the active `{}` chain.\nProceed?",
canonical_backup_name, target_shard, chain_name
);
if !prompt_for_confirmation(&prompt)? {
println!(
"Aborting. Remove `{}` with `graphman chain remove {}` if you want to create a fresh cache on shard `{}`.",
canonical_backup_name, canonical_backup_name, target_shard
);
return Ok(());
}
true
}
};

conn.transaction::<(), StoreError, _>(|conn| {
async {
let shard = Shard::new(shard.to_string())?;
let existing_backup_store = if reuse_existing_backup {
store.chain_store(&canonical_backup_name).await
} else {
None
};

let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident).await?;
let allocated_chain = if reuse_existing_backup {
None
} else {
let chain =
BlockStore::allocate_chain(&mut conn, &chain_name, &target_shard, &ident).await?;
store.add_chain_store(&chain, true).await?;
Some(chain)
};

store.add_chain_store(&chain, true).await?;
let temp_backup_name = format!("{chain_name}-old-temp");
if reuse_existing_backup && find_chain(&mut conn, &temp_backup_name).await?.is_some() {
bail!(
"`{}` already exists. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
temp_backup_name,
temp_backup_name,
chain_name,
target_shard,
);
}

// Drop the foreign key constraint on deployment_schemas
conn.transaction::<(), StoreError, _>(|conn| {
async {
sql_query(
"alter table deployment_schemas drop constraint deployment_schemas_network_fkey;",
)
.execute(conn).await?;

// Update the current chain name to chain-old
update_chain_name(conn, &chain_name, &new_name).await?;
if let Some(backup) = existing_backup.as_ref() {
update_chain_name(conn, &backup.name, &temp_backup_name).await?;
}

// Create a new chain with the name in the destination shard
let _ = add_chain(conn, &chain_name, &shard, ident).await?;
update_chain_name(conn, &chain_name, &canonical_backup_name).await?;

if reuse_existing_backup {
update_chain_name(conn, &temp_backup_name, &chain_name).await?;
} else {
add_chain(conn, &chain_name, &target_shard, ident.clone()).await?;
}

// Re-add the foreign key constraint
sql_query(
"alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);",
)
.execute(conn).await?;

Ok(())
}.scope_boxed()
}).await?;

chain_store.update_name(&new_name).await?;
chain_store.update_name(&canonical_backup_name).await?;

if reuse_existing_backup && let Some(backup_store) = existing_backup_store.as_ref() {
backup_store.update_name(&chain_name).await?;
}

println!(
"Changed block cache shard for {} from {} to {}",
chain_name, old_shard, shard
);
println!("Latest backup recorded as `{}`", canonical_backup_name);

if reuse_existing_backup {
println!(
"Reused existing backup `{}` as the active `{}` chain",
canonical_backup_name, chain_name
);
}

if allocated_chain.is_some() {
println!(
"Allocated new chain state for `{}` on shard {}",
chain_name, shard
);
}

Ok(())
}
Expand Down