Commit 3e7b5ab8 authored by inphi's avatar inphi

add BackendQueue tests

parent fe474257
......@@ -1235,6 +1235,7 @@ func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) e
// syncTransactionFromQueue will sync the earliest transaction from an external message queue
func (s *SyncService) syncTransactionFromQueue() error {
// we don't drop messages unless they're already applied
cb := func(ctx context.Context, msg QueueSubscriberMessage) {
var (
txMeta types.TransactionMeta
......
......@@ -970,6 +970,87 @@ func TestSyncServiceTransactionLogFailed(t *testing.T) {
}
}
func TestSyncServiceBackendQueue(t *testing.T) {
txLogger := newMockPublisher()
txLogger.unblockPublish()
activeService, _, _, err := newTestSyncService(false, nil, txLogger)
if err != nil {
t.Fatal(err)
}
queueSub := newMockQueueSubscriber()
followerService, followerTxCh, _, err := newTestSyncServiceWithQueueSubscriber(true, nil, &pub.NoopPublisher{}, queueSub)
if err != nil {
t.Fatal(err)
}
tx := mockTx()
go func() {
activeService.applyTransactionToTip(tx)
}()
go func() {
followerService.syncTransactionFromQueue()
}()
// forward the logged transaction from the "active sequencer"
msg := <-txLogger.msgs
queueSub.ProduceMessage(msg)
event := <-followerTxCh
if len(event.Txs) != 1 {
t.Fatal("Unexpected number of transactions")
}
expected, _ := tx.MarshalJSON()
eventTx, _ := event.Txs[0].MarshalJSON()
if !bytes.Equal(expected, eventTx) {
t.Fatalf("invalid tx applied")
}
followerService.chainHeadCh <- core.ChainHeadEvent{}
subEvent := <-queueSub.events
if subEvent != mockAckEvent {
t.Fatalf("subscriber failed to acknowledge transaction")
}
}
func TestSyncServiceBackendQueueNack(t *testing.T) {
tx := mockTx()
tx.GetMeta().RawTransaction = nil
tests := map[string][]byte{
//"good txmeta": []byte("{\"l1BlockNumber\":0,\"l1Timestamp\":1647549225,\"l1MessageSender\":\"0x1487ef4dd5b0ca7610b85964371c1d8ab7c468eb\",\"queueOrigin\":\"sequencer\",\"index\":0,\"queueIndex\":0,\"rawTransaction\":\"34CAgJQrz3UmBr9M0373farCJhgNfaGiVICCAACAgIA=\"}"),
"invalid json": nil,
"invalid RawTransaction": []byte("{\"l1BlockNumber\":0,\"l1Timestamp\":1647549225,\"l1MessageSender\":\"0x1487ef4dd5b0ca7610b85964371c1d8ab7c468eb\",\"queueOrigin\":\"sequencer\",\"index\":0,\"queueIndex\":0,\"rawTransaction\":\"\"}"),
"missing L1BlockNumber": []byte("{\"l1Timestamp\":1647549225,\"l1MessageSender\":\"0x1487ef4dd5b0ca7610b85964371c1d8ab7c468eb\",\"queueOrigin\":\"sequencer\",\"index\":0,\"queueIndex\":0,\"rawTransaction\":\"34CAgJQrz3UmBr9M0373farCJhgNfaGiVICCAACAgIA=\"}"),
"missing L1Timestamp": []byte("{\"l1BlockNumber\":0,\"l1MessageSender\":\"0x1487ef4dd5b0ca7610b85964371c1d8ab7c468eb\",\"queueOrigin\":\"sequencer\",\"index\":0,\"queueIndex\":0,\"rawTransaction\":\"34CAgJQrz3UmBr9M0373farCJhgNfaGiVICCAACAgIA=\"}"),
}
for name, msg := range tests {
t.Run(name, func(t *testing.T) {
queueSub := newMockQueueSubscriber()
service, _, _, err := newTestSyncServiceWithQueueSubscriber(true, nil, &pub.NoopPublisher{}, queueSub)
if err != nil {
t.Fatal(err)
}
go func() {
service.syncTransactionFromQueue()
}()
queueSub.ProduceMessage(msg)
service.chainHeadCh <- core.ChainHeadEvent{}
event := <-queueSub.events
if event != mockNackEvent {
t.Fatalf("subscriber failed to acknowledge transaction")
}
})
}
}
func newTestSyncServiceDeps(isVerifier bool, alloc *common.Address) (Config, *core.TxPool, *core.BlockChain, ethdb.Database, error) {
chainCfg := params.AllEthashProtocolChanges
chainID := big.NewInt(420)
......@@ -1003,7 +1084,7 @@ func newTestSyncServiceDeps(isVerifier bool, alloc *common.Address) (Config, *co
return cfg, txPool, chain, db, nil
}
func newTestSyncService(isVerifier bool, alloc *common.Address, txLogger pub.Publisher) (*SyncService, chan core.NewTxsEvent, event.Subscription, error) {
func newTestSyncServiceWithQueueSubscriber(isVerifier bool, alloc *common.Address, txLogger pub.Publisher, queueSub QueueSubscriber) (*SyncService, chan core.NewTxsEvent, event.Subscription, error) {
cfg, txPool, chain, db, err := newTestSyncServiceDeps(isVerifier, alloc)
if err != nil {
return nil, nil, nil, fmt.Errorf("Cannot initialize syncservice: %w", err)
......@@ -1011,7 +1092,7 @@ func newTestSyncService(isVerifier bool, alloc *common.Address, txLogger pub.Pub
if txLogger == nil {
txLogger = &pub.NoopPublisher{}
}
service, err := NewSyncService(context.Background(), cfg, txPool, chain, db, txLogger, &noopQueueSubscriber{})
service, err := NewSyncService(context.Background(), cfg, txPool, chain, db, txLogger, queueSub)
if err != nil {
return nil, nil, nil, fmt.Errorf("Cannot initialize syncservice: %w", err)
}
......@@ -1023,6 +1104,10 @@ func newTestSyncService(isVerifier bool, alloc *common.Address, txLogger pub.Pub
return service, txCh, sub, nil
}
func newTestSyncService(isVerifier bool, alloc *common.Address, txLogger pub.Publisher) (*SyncService, chan core.NewTxsEvent, event.Subscription, error) {
return newTestSyncServiceWithQueueSubscriber(isVerifier, alloc, txLogger, &noopQueueSubscriber{})
}
type mockPublisher struct {
wg1 sync.WaitGroup
wg2 sync.WaitGroup
......@@ -1061,6 +1146,48 @@ func (p *failingMockPublisher) Publish(ctx context.Context, msg []byte) error {
return fmt.Errorf("publish failed")
}
type mockQueueEvent int
const mockAckEvent mockQueueEvent = 0
const mockNackEvent mockQueueEvent = 1
type mockQueueSubscriber struct {
msgs chan []byte
events chan mockQueueEvent
}
func newMockQueueSubscriber() *mockQueueSubscriber {
return &mockQueueSubscriber{make(chan []byte, 1024), make(chan mockQueueEvent, 1024)}
}
func (p *mockQueueSubscriber) ProduceMessage(msg []byte) {
p.msgs <- msg
}
func (p *mockQueueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) {
msg := <-p.msgs
cb(ctx, &mockQueueSubscriberMessage{msg, p.events})
}
func (p *mockQueueSubscriber) Close() error { return nil }
type mockQueueSubscriberMessage struct {
data []byte
events chan<- mockQueueEvent
}
func (m *mockQueueSubscriberMessage) Data() []byte {
return m.data
}
func (m *mockQueueSubscriberMessage) Ack() {
m.events <- mockAckEvent
}
func (m *mockQueueSubscriberMessage) Nack() {
m.events <- mockNackEvent
}
type mockClient struct {
getEnqueueCallCount int
getEnqueue []*types.Transaction
......@@ -1215,6 +1342,7 @@ func mockTx() *types.Transaction {
gasLimit := uint64(0)
data := []byte{0x00, 0x00}
l1BlockNumber := big.NewInt(0)
queueIndex := uint64(0)
tx := types.NewTransaction(0, target, big.NewInt(0), gasLimit, big.NewInt(0), data)
meta := types.NewTransactionMeta(
......@@ -1223,7 +1351,7 @@ func mockTx() *types.Transaction {
&l1TxOrigin,
types.QueueOriginSequencer,
nil,
nil,
&queueIndex,
nil,
)
tx.SetTransactionMeta(meta)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment