diff options
Diffstat (limited to 'src/exchangedb/shard-0001.sql')
-rw-r--r-- | src/exchangedb/shard-0001.sql | 1876 |
1 files changed, 1876 insertions, 0 deletions
diff --git a/src/exchangedb/shard-0001.sql b/src/exchangedb/shard-0001.sql new file mode 100644 index 000000000..6a91cfbaa --- /dev/null +++ b/src/exchangedb/shard-0001.sql @@ -0,0 +1,1876 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2014--2022 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +-- + +-- Everything in one big transaction +BEGIN; + +-- Check patch versioning is in place. +SELECT _v.register_patch('exchange-0001', NULL, NULL); + +-------------------- Tables ---------------------------- + +CREATE OR REPLACE FUNCTION create_partitioned_table( + IN table_definition VARCHAR + ,IN table_name VARCHAR + ,IN main_table_partition_str VARCHAR -- Used only when it is the main table - we do not partition shard tables + ,IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + IF shard_suffix IS NOT NULL THEN + table_name=table_name || '_' || shard_suffix; + main_table_partition_str = ''; + END IF; + + EXECUTE FORMAT( + table_definition, + table_name, + main_table_partition_str + ); + +END +$$; + +----------------------- wire_targets --------------------------- + +CREATE OR REPLACE FUNCTION create_table_wire_targets( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(wire_target_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',wire_target_h_payto BYTEA PRIMARY KEY CHECK (LENGTH(wire_target_h_payto)=32)' + ',payto_uri VARCHAR NOT NULL' + ',kyc_ok BOOLEAN NOT NULL DEFAULT (FALSE)' + ',external_id VARCHAR' + ') %s ;' + ,'wire_targets' + ,'PARTITION BY HASH (wire_target_h_payto)' + ,shard_suffix + ); + +END +$$; + +-- We need a seperate function for this, as we call create_table only once but need to add +-- those constraints to each partition which gets created +CREATE OR REPLACE FUNCTION add_constraints_to_wire_targets_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + + EXECUTE FORMAT ( + 'ALTER TABLE wire_targets_' || partition_suffix || ' ' + 'ADD CONSTRAINT wire_targets_' || partition_suffix || '_wire_target_serial_id_key ' + 'UNIQUE (wire_target_serial_id)' + ); +END +$$; + +------------------------ reserves ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_reserves( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'reserves'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(reserve_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' + ',reserve_pub BYTEA PRIMARY KEY CHECK(LENGTH(reserve_pub)=32)' + ',current_balance_val INT8 NOT NULL' + ',current_balance_frac INT4 NOT NULL' + ',expiration_date INT8 NOT NULL' + ',gc_date INT8 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (reserve_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_expiration_index ' + 'ON ' || table_name || ' ' + '(expiration_date' + ',current_balance_val' + ',current_balance_frac' + ');' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_expiration_index ' + 'IS ' || quote_literal('used in get_expired_reserves') || ';' + ); + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_uuid_index ' + 'ON ' || table_name || ' ' + '(reserve_uuid);' + ); + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_gc_date_index ' + 'ON ' || table_name || ' ' + '(gc_date);' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_gc_date_index ' + 'IS ' || quote_literal('for reserve garbage collection') || ';' + ); + +END +$$; + +----------------------- reserves_in ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_reserves_in( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR default 'reserves_in'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(reserve_in_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',reserve_pub BYTEA PRIMARY KEY' -- REFERENCES reserves (reserve_pub) ON DELETE CASCADE' + ',wire_reference INT8 NOT NULL' + ',credit_val INT8 NOT NULL' + ',credit_frac INT4 NOT NULL' + ',wire_source_h_payto BYTEA CHECK (LENGTH(wire_source_h_payto)=32)' + ',exchange_account_section TEXT NOT NULL' + ',execution_date INT8 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (reserve_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_in_serial_id_index ' + 'ON ' || table_name || ' ' + '(reserve_in_serial_id);' + ); + -- FIXME: where do we need this index? Can we do better? + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_exch_accnt_section_execution_date_idx ' + 'ON ' || table_name || ' ' + '(exchange_account_section ' + ',execution_date' + ');' + ); + -- FIXME: where do we need this index? Can we do better? + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_exch_accnt_reserve_in_serial_id_idx ' + 'ON ' || table_name || ' ' + '(exchange_account_section,' + 'reserve_in_serial_id DESC' + ');' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_reserves_in_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE reserves_in_' || partition_suffix || ' ' + 'ADD CONSTRAINT reserves_in_' || partition_suffix || '_reserve_in_serial_id_key ' + 'UNIQUE (reserve_in_serial_id)' + ); +END +$$; + +--------------------------- reserves_close ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_reserves_close( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR default 'reserves_close'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(close_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE / PRIMARY KEY' + ',reserve_pub BYTEA NOT NULL' -- REFERENCES reserves (reserve_pub) ON DELETE CASCADE' + ',execution_date INT8 NOT NULL' + ',wtid BYTEA NOT NULL CHECK (LENGTH(wtid)=32)' + ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' + ',amount_val INT8 NOT NULL' + ',amount_frac INT4 NOT NULL' + ',closing_fee_val INT8 NOT NULL' + ',closing_fee_frac INT4 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (reserve_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_close_uuid_index ' + 'ON ' || table_name || ' ' + '(close_uuid);' + ); + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_pub_index ' + 'ON ' || table_name || ' ' + '(reserve_pub);' + ); +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_reserves_close_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE reserves_close_' || partition_suffix || ' ' + 'ADD CONSTRAINT reserves_close_' || partition_suffix || '_close_uuid_pkey ' + 'PRIMARY KEY (close_uuid)' + ); +END +$$; + +---------------------------- reserves_out ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_reserves_out( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR default 'reserves_out'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(reserve_out_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',h_blind_ev BYTEA CHECK (LENGTH(h_blind_ev)=64) UNIQUE' + ',denominations_serial INT8 NOT NULL' -- REFERENCES denominations (denominations_serial)' + ',denom_sig BYTEA NOT NULL' + ',reserve_uuid INT8 NOT NULL' -- REFERENCES reserves (reserve_uuid) ON DELETE CASCADE' + ',reserve_sig BYTEA NOT NULL CHECK (LENGTH(reserve_sig)=64)' + ',execution_date INT8 NOT NULL' + ',amount_with_fee_val INT8 NOT NULL' + ',amount_with_fee_frac INT4 NOT NULL' + ') %s ;' + ,'reserves_out' + ,'PARTITION BY HASH (h_blind_ev)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_out_serial_id_index ' + 'ON ' || table_name || ' ' + '(reserve_out_serial_id);' + ); + -- FIXME: change query to use reserves_out_by_reserve instead and materialize execution_date there as well??? + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_uuid_and_execution_date_index ' + 'ON ' || table_name || ' ' + '(reserve_uuid, execution_date);' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_reserve_uuid_and_execution_date_index ' + 'IS ' || quote_literal('for get_reserves_out and exchange_do_withdraw_limit_check') || ';' + ); + +END +$$; + + +CREATE OR REPLACE FUNCTION add_constraints_to_reserves_out_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE reserves_out_' || partition_suffix || ' ' + 'ADD CONSTRAINT reserves_out_' || partition_suffix || '_reserve_out_serial_id_key ' + 'UNIQUE (reserve_out_serial_id)' + ); +END +$$; + +CREATE OR REPLACE FUNCTION create_table_reserves_out_by_reserve( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'reserves_out_by_reserve'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(reserve_uuid INT8 NOT NULL' -- REFERENCES reserves (reserve_uuid) ON DELETE CASCADE + ',h_blind_ev BYTEA CHECK (LENGTH(h_blind_ev)=64)' + ') %s ' + ,table_name + ,'PARTITION BY HASH (reserve_uuid)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' + 'ON ' || table_name || ' ' + '(reserve_uuid);' + ); + +END +$$; + +---------------------------- known_coins ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_known_coins( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR default 'known_coins'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(known_coin_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',denominations_serial INT8 NOT NULL' -- REFERENCES denominations (denominations_serial) ON DELETE CASCADE' + ',coin_pub BYTEA NOT NULL PRIMARY KEY CHECK (LENGTH(coin_pub)=32)' + ',age_commitment_hash BYTEA CHECK (LENGTH(age_commitment_hash)=32)' + ',denom_sig BYTEA NOT NULL' + ',remaining_val INT8 NOT NULL' + ',remaining_frac INT4 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (coin_pub)' -- FIXME: or include denominations_serial? or multi-level partitioning?; + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_known_coins_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE known_coins_' || partition_suffix || ' ' + 'ADD CONSTRAINT known_coins_' || partition_suffix || '_known_coin_id_key ' + 'UNIQUE (known_coin_id)' + ); +END +$$; + +---------------------------- refresh_commitments ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_refresh_commitments( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'refresh_commitments'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(melt_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',rc BYTEA PRIMARY KEY CHECK (LENGTH(rc)=64)' + ',old_coin_pub BYTEA NOT NULL' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE' + ',old_coin_sig BYTEA NOT NULL CHECK(LENGTH(old_coin_sig)=64)' + ',amount_with_fee_val INT8 NOT NULL' + ',amount_with_fee_frac INT4 NOT NULL' + ',noreveal_index INT4 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (rc)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + -- Note: index spans partitions, may need to be materialized. + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_old_coin_pub_index ' + 'ON ' || table_name || ' ' + '(old_coin_pub);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_refresh_commitments_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE refresh_commitments_' || partition_suffix || ' ' + 'ADD CONSTRAINT refresh_commitments_' || partition_suffix || '_melt_serial_id_key ' + 'UNIQUE (melt_serial_id)' + ); +END +$$; + +------------------------------ refresh_revealed_coins -------------------------------- + +CREATE OR REPLACE FUNCTION create_table_refresh_revealed_coins( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'refresh_revealed_coins'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(rrc_serial BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',melt_serial_id INT8 NOT NULL' -- REFERENCES refresh_commitments (melt_serial_id) ON DELETE CASCADE' + ',freshcoin_index INT4 NOT NULL' + ',link_sig BYTEA NOT NULL CHECK(LENGTH(link_sig)=64)' + ',denominations_serial INT8 NOT NULL' -- REFERENCES denominations (denominations_serial) ON DELETE CASCADE' + ',coin_ev BYTEA NOT NULL' -- UNIQUE' + ',h_coin_ev BYTEA NOT NULL CHECK(LENGTH(h_coin_ev)=64)' -- UNIQUE' + ',ev_sig BYTEA NOT NULL' + ',ewv BYTEA NOT NULL' + -- ,PRIMARY KEY (melt_serial_id, freshcoin_index) -- done per shard + ') %s ;' + ,table_name + ,'PARTITION BY HASH (melt_serial_id)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_coins_by_melt_serial_id_index ' + 'ON ' || table_name || ' ' + '(melt_serial_id);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_refresh_revealed_coins_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE refresh_revealed_coins_' || partition_suffix || ' ' + 'ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_rrc_serial_key ' + 'UNIQUE (rrc_serial) ' + ',ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_coin_ev_key ' + 'UNIQUE (coin_ev) ' + ',ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_h_coin_ev_key ' + 'UNIQUE (h_coin_ev) ' + ',ADD PRIMARY KEY (melt_serial_id, freshcoin_index) ' + ); +END +$$; + +----------------------------- refresh_transfer_keys ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_refresh_transfer_keys( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'refresh_transfer_keys'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(rtc_serial BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',melt_serial_id INT8 PRIMARY KEY' -- REFERENCES refresh_commitments (melt_serial_id) ON DELETE CASCADE' + ',transfer_pub BYTEA NOT NULL CHECK(LENGTH(transfer_pub)=32)' + ',transfer_privs BYTEA NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (melt_serial_id)' + ,shard_suffix + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_refresh_transfer_keys_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE refresh_transfer_keys_' || partition_suffix || ' ' + 'ADD CONSTRAINT refresh_transfer_keys_' || partition_suffix || '_rtc_serial_key ' + 'UNIQUE (rtc_serial)' + ); +END +$$; + +---------------------------- deposits ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_deposits( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'deposits'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- PRIMARY KEY' + ',shard INT8 NOT NULL' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE + ',known_coin_id INT8 NOT NULL' -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE' --- FIXME: column needed??? + ',amount_with_fee_val INT8 NOT NULL' + ',amount_with_fee_frac INT4 NOT NULL' + ',wallet_timestamp INT8 NOT NULL' + ',exchange_timestamp INT8 NOT NULL' + ',refund_deadline INT8 NOT NULL' + ',wire_deadline INT8 NOT NULL' + ',merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)' + ',h_contract_terms BYTEA NOT NULL CHECK (LENGTH(h_contract_terms)=64)' + ',coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64)' + ',wire_salt BYTEA NOT NULL CHECK (LENGTH(wire_salt)=16)' + ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' + ',done BOOLEAN NOT NULL DEFAULT FALSE' + ',extension_blocked BOOLEAN NOT NULL DEFAULT FALSE' + ',extension_details_serial_id INT8' -- REFERENCES extension_details (extension_details_serial_id) ON DELETE CASCADE' + ',UNIQUE (coin_pub, merchant_pub, h_contract_terms)' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (coin_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' + 'ON ' || table_name || ' ' + '(coin_pub);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_deposits_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE deposits_' || partition_suffix || ' ' + 'ADD CONSTRAINT deposits_' || partition_suffix || '_deposit_serial_id_pkey ' + 'PRIMARY KEY (deposit_serial_id)' + ); +END +$$; + +CREATE OR REPLACE FUNCTION create_table_deposits_by_ready( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'deposits_by_ready'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(wire_deadline INT8 NOT NULL' + ',shard INT8 NOT NULL' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' + ',deposit_serial_id INT8' + ') %s ;' + ,table_name + ,'PARTITION BY RANGE (wire_deadline)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' + 'ON ' || table_name || ' ' + '(wire_deadline ASC, shard ASC, coin_pub);' + ); + +END +$$; + + +CREATE OR REPLACE FUNCTION create_table_deposits_for_matching( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'deposits_for_matching'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(refund_deadline INT8 NOT NULL' + ',merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE + ',deposit_serial_id INT8' + ') %s ;' + ,table_name + ,'PARTITION BY RANGE (refund_deadline)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' + 'ON ' || table_name || ' ' + '(refund_deadline ASC, merchant_pub, coin_pub);' + ); + +END +$$; + +----------------------------- refunds ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_refunds( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'refunds'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(refund_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE + ',deposit_serial_id INT8 NOT NULL' -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE' + ',merchant_sig BYTEA NOT NULL CHECK(LENGTH(merchant_sig)=64)' + ',rtransaction_id INT8 NOT NULL' + ',amount_with_fee_val INT8 NOT NULL' + ',amount_with_fee_frac INT4 NOT NULL' + -- ,PRIMARY KEY (deposit_serial_id, rtransaction_id) -- done per shard! + ') %s ;' + ,table_name + ,'PARTITION BY HASH (coin_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' + 'ON ' || table_name || ' ' + '(coin_pub);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_refunds_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE refunds_' || partition_suffix || ' ' + 'ADD CONSTRAINT refunds_' || partition_suffix || '_refund_serial_id_key ' + 'UNIQUE (refund_serial_id) ' + ',ADD PRIMARY KEY (deposit_serial_id, rtransaction_id) ' + ); +END +$$; + +---------------------------- wire_out ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_wire_out( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'wire_out'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(wireout_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- PRIMARY KEY' + ',execution_date INT8 NOT NULL' + ',wtid_raw BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid_raw)=32)' + ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' + ',exchange_account_section TEXT NOT NULL' + ',amount_val INT8 NOT NULL' + ',amount_frac INT4 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (wtid_raw)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_wire_target_h_payto_index ' + 'ON ' || table_name || ' ' + '(wire_target_h_payto);' + ); + + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_wire_out_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE wire_out_' || partition_suffix || ' ' + 'ADD CONSTRAINT wire_out_' || partition_suffix || '_wireout_uuid_pkey ' + 'PRIMARY KEY (wireout_uuid)' + ); +END +$$; + +---------------------------- aggregation_transient ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_aggregation_transient( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'aggregation_transient'; +BEGIN + + EXECUTE FORMAT ( + 'CREATE TABLE IF NOT EXISTS %I ' + '(amount_val INT8 NOT NULL' + ',amount_frac INT4 NOT NULL' + ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' + ',exchange_account_section TEXT NOT NULL' + ',wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32)' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (wire_target_h_payto)' + ,shard_suffix + ); + +END +$$; + +---------------------------- aggregation_tracking ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_aggregation_tracking( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'aggregation_tracking'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(aggregation_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',deposit_serial_id INT8 PRIMARY KEY' -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE' -- FIXME chnage to coint_pub + deposit_serial_id for more efficient depost -- or something else ??? + ',wtid_raw BYTEA NOT NULL' -- CONSTRAINT wire_out_ref REFERENCES wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (deposit_serial_id)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_wtid_raw_index ' + 'ON ' || table_name || ' ' + '(wtid_raw);' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_wtid_raw_index ' + 'IS ' || quote_literal('for lookup_transactions') || ';' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_aggregation_tracking_partition( + IN partition_suffix VARCHAR +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE aggregation_tracking_' || partition_suffix || ' ' + 'ADD CONSTRAINT aggregation_tracking_' || partition_suffix || '_aggregation_serial_id_key ' + 'UNIQUE (aggregation_serial_id) ' + ); +END +$$; + +----------------------------- recoup ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_recoup( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'recoup'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(recoup_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) + ',coin_sig BYTEA NOT NULL CHECK(LENGTH(coin_sig)=64)' + ',coin_blind BYTEA NOT NULL CHECK(LENGTH(coin_blind)=32)' + ',amount_val INT8 NOT NULL' + ',amount_frac INT4 NOT NULL' + ',recoup_timestamp INT8 NOT NULL' + ',reserve_out_serial_id INT8 NOT NULL' -- REFERENCES reserves_out (reserve_out_serial_id) ON DELETE CASCADE' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (coin_pub);' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' + 'ON ' || table_name || ' ' + '(coin_pub);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_recoup_partition( + IN partition_suffix VARCHAR +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE recoup_' || partition_suffix || ' ' + 'ADD CONSTRAINT recoup_' || partition_suffix || '_recoup_uuid_key ' + 'UNIQUE (recoup_uuid) ' + ); +END +$$; + +CREATE OR REPLACE FUNCTION create_table_recoup_by_reserve( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'recoup_by_reserve'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(reserve_out_serial_id INT8 NOT NULL' -- REFERENCES reserves (reserve_out_serial_id) ON DELETE CASCADE + ',coin_pub BYTEA CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) + ') %s ;' + ,table_name + ,'PARTITION BY HASH (reserve_out_serial_id)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' + 'ON ' || table_name || ' ' + '(reserve_out_serial_id);' + ); + +END +$$; + +---------------------------- recoup_refresh ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_recoup_refresh( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'recoup_refresh'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(recoup_refresh_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) + ',known_coin_id BIGINT NOT NULL' -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE + ',coin_sig BYTEA NOT NULL CHECK(LENGTH(coin_sig)=64)' + ',coin_blind BYTEA NOT NULL CHECK(LENGTH(coin_blind)=32)' + ',amount_val INT8 NOT NULL' + ',amount_frac INT4 NOT NULL' + ',recoup_timestamp INT8 NOT NULL' + ',rrc_serial INT8 NOT NULL' -- REFERENCES refresh_revealed_coins (rrc_serial) ON DELETE CASCADE -- UNIQUE' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (coin_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + -- FIXME: any query using this index will be slow. Materialize index or change query? + -- Also: which query uses this index? + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_rrc_serial_index ' + 'ON ' || table_name || ' ' + '(rrc_serial);' + ); + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' + 'ON ' || table_name || ' ' + '(coin_pub);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_recoup_refresh_partition( + IN partition_suffix VARCHAR +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE recoup_refresh_' || partition_suffix || ' ' + 'ADD CONSTRAINT recoup_refresh_' || partition_suffix || '_recoup_refresh_uuid_key ' + 'UNIQUE (recoup_refresh_uuid) ' + ); +END +$$; + +----------------------------- prewire ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_prewire( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'prewire'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(prewire_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY' + ',wire_method TEXT NOT NULL' + ',finished BOOLEAN NOT NULL DEFAULT false' + ',failed BOOLEAN NOT NULL DEFAULT false' + ',buf BYTEA NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (prewire_uuid)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_finished_index ' + 'ON ' || table_name || ' ' + '(finished);' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_finished_index ' + 'IS ' || quote_literal('for gc_prewire') || ';' + ); + -- FIXME: find a way to combine these two indices? + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_failed_finished_index ' + 'ON ' || table_name || ' ' + '(failed,finished);' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_failed_finished_index ' + 'IS ' || quote_literal('for wire_prepare_data_get') || ';' + ); + +END +$$; + +----------------------------- cs_nonce_locks ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_cs_nonce_locks( + shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(cs_nonce_lock_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',nonce BYTEA PRIMARY KEY CHECK (LENGTH(nonce)=32)' + ',op_hash BYTEA NOT NULL CHECK (LENGTH(op_hash)=64)' + ',max_denomination_serial INT8 NOT NULL' + ') %s ;' + ,'cs_nonce_locks' + ,'PARTITION BY HASH (nonce)' + ,shard_suffix + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_cs_nonce_locks_partition( + IN partition_suffix VARCHAR +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE cs_nonce_locks_' || partition_suffix || ' ' + 'ADD CONSTRAINT cs_nonce_locks_' || partition_suffix || '_cs_nonce_lock_serial_id_key ' + 'UNIQUE (cs_nonce_lock_serial_id)' + ); +END +$$; + +------------------------- Partitions ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_partition( + source_table_name VARCHAR + ,modulus INTEGER + ,partition_num INTEGER + ) + RETURNS VOID + LANGUAGE plpgsql +AS $$ +BEGIN + + RAISE NOTICE 'Creating partition %_%', source_table_name, partition_num; + + EXECUTE FORMAT( + 'CREATE TABLE IF NOT EXISTS %I ' + 'PARTITION OF %I ' + 'FOR VALUES WITH (MODULUS %s, REMAINDER %s)' + ,source_table_name || '_' || partition_num + ,source_table_name + ,modulus + ,partition_num-1 + ); + +END +$$; + +CREATE OR REPLACE FUNCTION detach_default_partitions() + RETURNS VOID + LANGUAGE plpgsql +AS $$ +BEGIN + + RAISE NOTICE 'Detaching all default table partitions'; + + ALTER TABLE IF EXISTS wire_targets + DETACH PARTITION wire_targets_default; + + ALTER TABLE IF EXISTS reserves + DETACH PARTITION reserves_default; + + ALTER TABLE IF EXISTS reserves_in + DETACH PARTITION reserves_in_default; + + ALTER TABLE IF EXISTS reserves_close + DETACH PARTITION reserves_close_default; + + ALTER TABLE IF EXISTS reserves_out + DETACH PARTITION reserves_out_default; + + ALTER TABLE IF EXISTS reserves_out_by_reserve + DETACH PARTITION reserves_out_by_reserve_default; + + ALTER TABLE IF EXISTS known_coins + DETACH PARTITION known_coins_default; + + ALTER TABLE IF EXISTS refresh_commitments + DETACH PARTITION refresh_commitments_default; + + ALTER TABLE IF EXISTS refresh_revealed_coins + DETACH PARTITION refresh_revealed_coins_default; + + ALTER TABLE IF EXISTS refresh_transfer_keys + DETACH PARTITION refresh_transfer_keys_default; + + ALTER TABLE IF EXISTS deposits + DETACH PARTITION deposits_default; + +--- TODO range partitioning +-- ALTER TABLE IF EXISTS deposits_by_ready +-- DETACH PARTITION deposits_by_ready_default; +-- +-- ALTER TABLE IF EXISTS deposits_for_matching +-- DETACH PARTITION deposits_default_for_matching_default; + + ALTER TABLE IF EXISTS refunds + DETACH PARTITION refunds_default; + + ALTER TABLE IF EXISTS wire_out + DETACH PARTITION wire_out_default; + + ALTER TABLE IF EXISTS aggregation_transient + DETACH PARTITION aggregation_transient_default; + + ALTER TABLE IF EXISTS aggregation_tracking + DETACH PARTITION aggregation_tracking_default; + + ALTER TABLE IF EXISTS recoup + DETACH PARTITION recoup_default; + + ALTER TABLE IF EXISTS recoup_by_reserve + DETACH PARTITION recoup_by_reserve_default; + + ALTER TABLE IF EXISTS recoup_refresh + DETACH PARTITION recoup_refresh_default; + + ALTER TABLE IF EXISTS prewire + DETACH PARTITION prewire_default; + + ALTER TABLE IF EXISTS cs_nonce_locks + DETACH partition cs_nonce_locks_default; + +END +$$; + +COMMENT ON FUNCTION detach_default_partitions + IS 'We need to drop default and create new one before deleting the default partitions + otherwise constraints get lost too. Might be needed in shardig too'; + + +CREATE OR REPLACE FUNCTION drop_default_partitions() + RETURNS VOID + LANGUAGE plpgsql +AS $$ +BEGIN + + RAISE NOTICE 'Dropping default table partitions'; + + DROP TABLE IF EXISTS wire_targets_default; + DROP TABLE IF EXISTS reserves_default; + DROP TABLE IF EXISTS reserves_in_default; + DROP TABLE IF EXISTS reserves_close_default; + DROP TABLE IF EXISTS reserves_out_default; + DROP TABLE IF EXISTS reserves_out_by_reserve_default; + DROP TABLE IF EXISTS known_coins_default; + DROP TABLE IF EXISTS refresh_commitments_default; + DROP TABLE IF EXISTS refresh_revealed_coins_default; + DROP TABLE IF EXISTS refresh_transfer_keys_default; + DROP TABLE IF EXISTS deposits_default; +--DROP TABLE IF EXISTS deposits_by_ready_default; +--DROP TABLE IF EXISTS deposits_for_matching_default; + DROP TABLE IF EXISTS refunds_default; + DROP TABLE IF EXISTS wire_out_default; + DROP TABLE IF EXISTS aggregation_transient_default; + DROP TABLE IF EXISTS aggregation_tracking_default; + DROP TABLE IF EXISTS recoup_default; + DROP TABLE IF EXISTS recoup_by_reserve_default; + DROP TABLE IF EXISTS recoup_refresh_default; + DROP TABLE IF EXISTS prewire_default; + DROP TABLE IF EXISTS cs_nonce_locks_default; + +END +$$; + +COMMENT ON FUNCTION drop_default_partitions + IS 'Drop all default partitions once other partitions are attached. + Might be needed in sharding too.'; + +CREATE OR REPLACE FUNCTION create_partitions( + num_partitions INTEGER +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + modulus INTEGER; +BEGIN + + modulus := num_partitions; + + PERFORM detach_default_partitions(); + + LOOP + + PERFORM create_table_partition( + 'wire_targets' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_wire_targets_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'reserves' + ,modulus + ,num_partitions + ); + + PERFORM create_table_partition( + 'reserves_in' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_reserves_in_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'reserves_close' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_reserves_close_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'reserves_out' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_reserves_out_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'reserves_out_by_reserve' + ,modulus + ,num_partitions + ); + + PERFORM create_table_partition( + 'known_coins' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_known_coins_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'refresh_commitments' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_refresh_commitments_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'refresh_revealed_coins' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_refresh_revealed_coins_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'refresh_transfer_keys' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_refresh_transfer_keys_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'deposits' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_deposits_partition(num_partitions::varchar); + +-- TODO: dynamically (!) creating/deleting deposits partitions: +-- create new partitions 'as needed', drop old ones once the aggregator has made +-- them empty; as 'new' deposits will always have deadlines in the future, this +-- would basically guarantee no conflict between aggregator and exchange service! +-- SEE also: https://www.cybertec-postgresql.com/en/automatic-partition-creation-in-postgresql/ +-- (article is slightly wrong, as this works:) +--CREATE TABLE tab ( +-- id bigint GENERATED ALWAYS AS IDENTITY, +-- ts timestamp NOT NULL, +-- data text +-- PARTITION BY LIST ((ts::date)); +-- CREATE TABLE tab_def PARTITION OF tab DEFAULT; +-- BEGIN +-- CREATE TABLE tab_part2 (LIKE tab); +-- insert into tab_part2 (id,ts, data) values (5,'2022-03-21', 'foo'); +-- alter table tab attach partition tab_part2 for values in ('2022-03-21'); +-- commit; +-- Naturally, to ensure this is actually 100% conflict-free, we'd +-- need to create tables at the granularity of the wire/refund deadlines; +-- that is right now configurable via AGGREGATOR_SHIFT option. + +-- FIXME: range partitioning +-- PERFORM create_table_partition( +-- 'deposits_by_ready' +-- ,modulus +-- ,num_partitions +-- ); +-- +-- PERFORM create_table_partition( +-- 'deposits_for_matching' +-- ,modulus +-- ,num_partitions +-- ); + + PERFORM create_table_partition( + 'refunds' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_refunds_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'wire_out' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_wire_out_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'aggregation_transient' + ,modulus + ,num_partitions + ); + + PERFORM create_table_partition( + 'aggregation_tracking' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_aggregation_tracking_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'recoup' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_recoup_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'recoup_by_reserve' + ,modulus + ,num_partitions + ); + + PERFORM create_table_partition( + 'recoup_refresh' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_recoup_refresh_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'prewire' + ,modulus + ,num_partitions + ); + + PERFORM create_table_partition( + 'cs_nonce_locks' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_cs_nonce_locks_partition(num_partitions::varchar); + + num_partitions=num_partitions-1; + EXIT WHEN num_partitions=0; + + END LOOP; + + PERFORM drop_default_partitions(); + +END +$$; + +--------------------- Sharding --------------------------- + +---------------------- Shards ---------------------------- +CREATE OR REPLACE FUNCTION setup_shard( + shard_suffix VARCHAR +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + PERFORM create_table_wire_targets(shard_suffix); + PERFORM add_constraints_to_wire_targets_partition(shard_suffix); + + PERFORM create_table_reserves(shard_suffix); + + PERFORM create_table_reserves_in(shard_suffix); + PERFORM add_constraints_to_reserves_in_partition(shard_suffix); + + PERFORM create_table_reserves_close(shard_suffix); + + PERFORM create_table_reserves_out(shard_suffix); + + PERFORM create_table_reserves_out_by_reserve(shard_suffix); + + PERFORM create_table_known_coins(shard_suffix); + PERFORM add_constraints_to_known_coins_partition(shard_suffix); + + PERFORM create_table_refresh_commitments(shard_suffix); + PERFORM add_constraints_to_refresh_commitments_partition(shard_suffix); + + PERFORM create_table_refresh_revealed_coins(shard_suffix); + PERFORM add_constraints_to_refresh_revealed_coins_partition(shard_suffix); + + PERFORM create_table_refresh_transfer_keys(shard_suffix); + PERFORM add_constraints_to_refresh_transfer_keys_partition(shard_suffix); + + PERFORM create_table_deposits(shard_suffix); + PERFORM add_constraints_to_deposits_partition(shard_suffix); + + PERFORM create_table_deposits_by_ready(shard_suffix); + + PERFORM create_table_deposits_for_matching(shard_suffix); + + PERFORM create_table_refunds(shard_suffix); + PERFORM add_constraints_to_refunds_partition(shard_suffix); + + PERFORM create_table_wire_out(shard_suffix); + PERFORM add_constraints_to_wire_out_partition(shard_suffix); + + PERFORM create_table_aggregation_transient(shard_suffix); + + PERFORM create_table_aggregation_tracking(shard_suffix); + PERFORM add_constraints_to_aggregation_tracking_partition(shard_suffix); + + PERFORM create_table_recoup(shard_suffix); + PERFORM add_constraints_to_recoup_partition(shard_suffix); + + PERFORM create_table_recoup_by_reserve(shard_suffix); + + PERFORM create_table_recoup_refresh(shard_suffix); + PERFORM add_constraints_to_recoup_refresh_partition(shard_suffix); + + PERFORM create_table_prewire(shard_suffix); + + PERFORM create_table_cs_nonce_locks(shard_suffix); + PERFORM add_constraints_to_cs_nonce_locks_partition(shard_suffix); + +END +$$; + +------------------------------ Master ---------------------------------- +CREATE OR REPLACE FUNCTION create_foreign_table( + source_table_name VARCHAR + ,modulus INTEGER + ,shard_suffix VARCHAR + ,current_shard_num INTEGER + ) + RETURNS VOID + LANGUAGE plpgsql +AS $$ +BEGIN + + RAISE NOTICE 'Creating %_% on %', source_table_name, shard_suffix, shard_suffix; + + EXECUTE FORMAT( + 'CREATE FOREIGN TABLE IF NOT EXISTS %I ' + 'PARTITION OF %I ' + 'FOR VALUES WITH (MODULUS %s, REMAINDER %s) ' + 'SERVER %I' + ,source_table_name || '_' || shard_suffix + ,source_table_name + ,modulus + ,current_shard_num-1 + ,shard_suffix + ); + + EXECUTE FORMAT( + 'ALTER FOREIGN TABLE %I OWNER TO "taler-exchange-httpd"', + source_table_name || '_' || shard_suffix + ); + +END +$$; + +CREATE OR REPLACE FUNCTION master_prepare_sharding() +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + CREATE EXTENSION IF NOT EXISTS postgres_fdw; + + PERFORM detach_default_partitions(); + + ALTER TABLE IF EXISTS wire_targets + DROP CONSTRAINT IF EXISTS wire_targets_pkey CASCADE + ; + + ALTER TABLE IF EXISTS reserves + DROP CONSTRAINT IF EXISTS reserves_pkey CASCADE + ; + + ALTER TABLE IF EXISTS reserves_in + DROP CONSTRAINT IF EXISTS reserves_in_pkey CASCADE + ; + + ALTER TABLE IF EXISTS reserves_close + DROP CONSTRAINT IF EXISTS reserves_close_pkey CASCADE + ; + + ALTER TABLE IF EXISTS reserves_out + DROP CONSTRAINT IF EXISTS reserves_out_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS reserves_out_denominations_serial_fkey + ,DROP CONSTRAINT IF EXISTS reserves_out_h_blind_ev_key + ; + + ALTER TABLE IF EXISTS known_coins + DROP CONSTRAINT IF EXISTS known_coins_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS known_coins_denominations_serial_fkey + ; + + ALTER TABLE IF EXISTS refresh_commitments + DROP CONSTRAINT IF EXISTS refresh_commitments_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS refresh_old_coin_pub_fkey + ; + + ALTER TABLE IF EXISTS refresh_revealed_coins + DROP CONSTRAINT IF EXISTS refresh_revealed_coins_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS refresh_revealed_coins_denominations_serial_fkey + ; + + ALTER TABLE IF EXISTS refresh_transfer_keys + DROP CONSTRAINT IF EXISTS refresh_transfer_keys_pkey CASCADE + ; + + ALTER TABLE IF EXISTS deposits + DROP CONSTRAINT IF EXISTS deposits_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS deposits_extension_details_serial_id_fkey + ,DROP CONSTRAINT IF EXISTS deposits_shard_known_coin_id_merchant_pub_h_contract_terms_key CASCADE + ; + + ALTER TABLE IF EXISTS refunds + DROP CONSTRAINT IF EXISTS refunds_pkey CASCADE + ; + + ALTER TABLE IF EXISTS wire_out + DROP CONSTRAINT IF EXISTS wire_out_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS wire_out_wtid_raw_key CASCADE + ; + + ALTER TABLE IF EXISTS aggregation_tracking + DROP CONSTRAINT IF EXISTS aggregation_tracking_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS aggregation_tracking_wtid_raw_fkey + ; + + ALTER TABLE IF EXISTS recoup + DROP CONSTRAINT IF EXISTS recoup_pkey CASCADE + ; + + ALTER TABLE IF EXISTS recoup_refresh + DROP CONSTRAINT IF EXISTS recoup_refresh_pkey CASCADE + ; + + ALTER TABLE IF EXISTS prewire + DROP CONSTRAINT IF EXISTS prewire_pkey CASCADE + ; + + ALTER TABLE IF EXISTS cs_nonce_locks + DROP CONSTRAINT IF EXISTS cs_nonce_locks_pkey CASCADE + ; + +END +$$; + + +CREATE OR REPLACE FUNCTION create_shard_server( + shard_suffix VARCHAR + ,total_num_shards INTEGER + ,current_shard_num INTEGER + ,remote_host VARCHAR + ,remote_user VARCHAR + ,remote_user_password VARCHAR + ,remote_db_name VARCHAR DEFAULT 'taler-exchange' + ,remote_port INTEGER DEFAULT '5432' + ,local_user VARCHAR DEFAULT 'taler-exchange-httpd' +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + RAISE NOTICE 'Creating server %s', remote_host; + + EXECUTE FORMAT( + 'CREATE SERVER IF NOT EXISTS %I ' + 'FOREIGN DATA WRAPPER postgres_fdw ' + 'OPTIONS (dbname %L, host %L, port %L)' + ,shard_suffix + ,remote_db_name + ,remote_host + ,remote_port + ); + + EXECUTE FORMAT( + 'CREATE USER MAPPING IF NOT EXISTS ' + 'FOR %s SERVER %I ' + 'OPTIONS (user %L, password %L)' + ,local_user + ,shard_suffix + ,remote_user + ,remote_user_password + ); + + PERFORM create_foreign_table( + 'wire_targets' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'reserves' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'reserves_in' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'reserves_out' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'reserves_close' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'known_coins' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'refresh_commitments' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'refresh_revealed_coins' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'refresh_transfer_keys' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'deposits' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); +-- PERFORM create_foreign_table( +-- 'deposits_by_ready' +-- ,total_num_shards +-- ,shard_suffix +-- ,current_shard_num +-- ); +-- PERFORM create_foreign_table( +-- 'deposits_for_matching' +-- ,total_num_shards +-- ,shard_suffix +-- ,current_shard_num +-- ); + PERFORM create_foreign_table( + 'refunds' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'wire_out' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'aggregation_tracking' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'recoup' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'recoup_by_reserve' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'reserves_out_by_reserve' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'recoup_refresh' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'prewire' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + PERFORM create_foreign_table( + 'cs_nonce_locks' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ); + +END +$$; + +COMMENT ON FUNCTION create_shard_server + IS 'Create a shard server on the master + node with all foreign tables and user mappings'; + +CREATE OR REPLACE FUNCTION create_shards( + num_shards INTEGER + ,domain VARCHAR DEFAULT 'perf.taler' +) + RETURNS VOID + LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 1..num_shards LOOP + PERFORM create_shard_server( + i + ,num_shards + ,i + ,'shard-' || i::varchar || '.' || domain + ,'taler' + ,'taler' + ,'taler-exchange' + ,'5432' + ,'taler-exchange-httpd' + ); + END LOOP; +END +$$; + +COMMIT;
\ No newline at end of file |