Commit f876add9 authored by inphi's avatar inphi

synchronize Publish with ResumePublish

parent 935782f5
...@@ -2,6 +2,7 @@ package pub ...@@ -2,6 +2,7 @@ package pub
import ( import (
"context" "context"
"sync"
"time" "time"
"cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub"
...@@ -20,7 +21,8 @@ type GooglePublisher struct { ...@@ -20,7 +21,8 @@ type GooglePublisher struct {
client *pubsub.Client client *pubsub.Client
topic *pubsub.Topic topic *pubsub.Topic
publishSettings pubsub.PublishSettings publishSettings pubsub.PublishSettings
Timeout time.Duration timeout time.Duration
mutex sync.Mutex
} }
func NewGooglePublisher(ctx context.Context, config Config) (*GooglePublisher, error) { func NewGooglePublisher(ctx context.Context, config Config) (*GooglePublisher, error) {
...@@ -41,19 +43,27 @@ func NewGooglePublisher(ctx context.Context, config Config) (*GooglePublisher, e ...@@ -41,19 +43,27 @@ func NewGooglePublisher(ctx context.Context, config Config) (*GooglePublisher, e
log.Info("Sanitizing publisher timeout to 2 seconds") log.Info("Sanitizing publisher timeout to 2 seconds")
timeout = time.Second * 2 timeout = time.Second * 2
} }
return &GooglePublisher{client, topic, publishSettings, timeout}, nil return &GooglePublisher{
client: client,
topic: topic, publishSettings: publishSettings,
timeout: timeout,
}, nil
} }
func (p *GooglePublisher) Publish(ctx context.Context, msg []byte) error { func (p *GooglePublisher) Publish(ctx context.Context, msg []byte) error {
ctx, cancel := context.WithTimeout(ctx, p.Timeout) ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel() defer cancel()
pmsg := pubsub.Message{ pmsg := pubsub.Message{
Data: msg, Data: msg,
OrderingKey: messageOrderingKey, OrderingKey: messageOrderingKey,
} }
// If there was an error previously, clear it out to allow publishing to work again
p.mutex.Lock()
// If there was an error previously, clear it out to allow publishing to proceed again
p.topic.ResumePublish(messageOrderingKey) p.topic.ResumePublish(messageOrderingKey)
result := p.topic.Publish(ctx, &pmsg) result := p.topic.Publish(ctx, &pmsg)
_, err := result.Get(ctx) _, err := result.Get(ctx)
p.mutex.Unlock()
return err return err
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment