aboutsummaryrefslogtreecommitdiff
path: root/internal/sqlutil/partition_offset_table.go
blob: be079442a548db3657d13535c9057d9ce1bf56f0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sqlutil

import (
	"context"
	"database/sql"
	"strings"

	"github.com/matrix-org/util"
)

// A PartitionOffset is the offset into a partition of the input log.
type PartitionOffset struct {
	// The ID of the partition.
	Partition int32
	// The offset into the partition.
	Offset int64
}

const partitionOffsetsSchema = `
-- The offsets that the server has processed up to.
CREATE TABLE IF NOT EXISTS ${prefix}_partition_offsets (
    -- The name of the topic.
    topic TEXT NOT NULL,
    -- The 32-bit partition ID
    partition INTEGER NOT NULL,
    -- The 64-bit offset.
    partition_offset BIGINT NOT NULL,
    UNIQUE (topic, partition)
);
`

const selectPartitionOffsetsSQL = "" +
	"SELECT partition, partition_offset FROM ${prefix}_partition_offsets WHERE topic = $1"

const upsertPartitionOffsetsSQL = "" +
	"INSERT INTO ${prefix}_partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" +
	" ON CONFLICT (topic, partition)" +
	" DO UPDATE SET partition_offset = $3"

// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table.
type PartitionOffsetStatements struct {
	db                         *sql.DB
	writer                     Writer
	selectPartitionOffsetsStmt *sql.Stmt
	upsertPartitionOffsetStmt  *sql.Stmt
}

// Prepare converts the raw SQL statements into prepared statements.
// Takes a prefix to prepend to the table name used to store the partition offsets.
// This allows multiple components to share the same database schema.
func (s *PartitionOffsetStatements) Prepare(db *sql.DB, writer Writer, prefix string) (err error) {
	s.db = db
	s.writer = writer
	_, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1))
	if err != nil {
		return
	}
	if s.selectPartitionOffsetsStmt, err = db.Prepare(
		strings.Replace(selectPartitionOffsetsSQL, "${prefix}", prefix, -1),
	); err != nil {
		return
	}
	if s.upsertPartitionOffsetStmt, err = db.Prepare(
		strings.Replace(upsertPartitionOffsetsSQL, "${prefix}", prefix, -1),
	); err != nil {
		return
	}
	return
}

// PartitionOffsets implements PartitionStorer
func (s *PartitionOffsetStatements) PartitionOffsets(
	ctx context.Context, topic string,
) ([]PartitionOffset, error) {
	return s.selectPartitionOffsets(ctx, topic)
}

// SetPartitionOffset implements PartitionStorer
func (s *PartitionOffsetStatements) SetPartitionOffset(
	ctx context.Context, topic string, partition int32, offset int64,
) error {
	return s.upsertPartitionOffset(ctx, topic, partition, offset)
}

// selectPartitionOffsets returns all the partition offsets for the given topic.
func (s *PartitionOffsetStatements) selectPartitionOffsets(
	ctx context.Context, topic string,
) ([]PartitionOffset, error) {
	rows, err := s.selectPartitionOffsetsStmt.QueryContext(ctx, topic)
	if err != nil {
		return nil, err
	}
	defer func() {
		err2 := rows.Close()
		if err2 != nil {
			util.GetLogger(ctx).WithError(err2).Error("selectPartitionOffsets: rows.close() failed")
		}
	}()
	var results []PartitionOffset
	for rows.Next() {
		var offset PartitionOffset
		if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil {
			return nil, err
		}
		results = append(results, offset)
	}
	return results, rows.Err()
}

// UpsertPartitionOffset updates or inserts the partition offset for the given topic.
func (s *PartitionOffsetStatements) upsertPartitionOffset(
	ctx context.Context, topic string, partition int32, offset int64,
) error {
	return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
		stmt := TxStmt(txn, s.upsertPartitionOffsetStmt)
		_, err := stmt.ExecContext(ctx, topic, partition, offset)
		return err
	})
}