Commit b6ff28e3 authored by vicotor's avatar vicotor

add registry

parent eb1b249e
...@@ -11,7 +11,8 @@ WORKDIR /build ...@@ -11,7 +11,8 @@ WORKDIR /build
RUN git clone https://code.wuban.net.cn/odysseus/scheduler && \ RUN git clone https://code.wuban.net.cn/odysseus/scheduler && \
git clone https://code.wuban.net.cn/odysseus/odysseus-protocol && \ git clone https://code.wuban.net.cn/odysseus/odysseus-protocol && \
git clone https://code.wuban.net.cn/odysseus/cache git clone https://code.wuban.net.cn/odysseus/cache && \
git clone https://code.wuban.net.cn/odysseus/service-registry
RUN cd /build/scheduler && make && cp build/bin/scheduler /scheduler RUN cd /build/scheduler && make && cp build/bin/scheduler /scheduler
......
...@@ -6,14 +6,15 @@ require ( ...@@ -6,14 +6,15 @@ require (
github.com/BurntSushi/toml v1.3.2 github.com/BurntSushi/toml v1.3.2
github.com/IBM/sarama v1.42.1 github.com/IBM/sarama v1.42.1
github.com/astaxie/beego v1.12.3 github.com/astaxie/beego v1.12.3
github.com/ethereum/go-ethereum v1.13.10 github.com/ethereum/go-ethereum v1.13.13
github.com/gogo/protobuf v1.3.2 github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.5.0 github.com/google/uuid v1.5.0
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/odysseus/cache v0.0.0-00010101000000-000000000000 github.com/odysseus/cache v0.0.0-00010101000000-000000000000
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/odysseus/service-registry v0.0.0-00010101000000-000000000000
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/redis/go-redis/v9 v9.4.0 github.com/redis/go-redis/v9 v9.5.1
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/sirupsen/logrus v1.9.3 github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0 github.com/spf13/cobra v1.8.0
...@@ -73,7 +74,7 @@ require ( ...@@ -73,7 +74,7 @@ require (
golang.org/x/crypto v0.17.0 // indirect golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/net v0.19.0 // indirect golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/protobuf v1.32.0 // indirect google.golang.org/protobuf v1.32.0 // indirect
...@@ -85,3 +86,5 @@ require ( ...@@ -85,3 +86,5 @@ require (
replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol
replace github.com/odysseus/cache => ../cache replace github.com/odysseus/cache => ../cache
replace github.com/odysseus/service-registry => ../service-registry
...@@ -53,8 +53,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP ...@@ -53,8 +53,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elastic/go-elasticsearch/v6 v6.8.5/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI= github.com/elastic/go-elasticsearch/v6 v6.8.5/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/ethereum/go-ethereum v1.13.10 h1:Ppdil79nN+Vc+mXfge0AuUgmKWuVv4eMqzoIVSdqZek= github.com/ethereum/go-ethereum v1.13.13 h1:KYn9w7pEWRI9oyZOzO94OVbctSusPByHdFDPj634jII=
github.com/ethereum/go-ethereum v1.13.10/go.mod h1:sc48XYQxCzH3fG9BcrXCOOgQk2JfZzNAmIKnceogzsA= github.com/ethereum/go-ethereum v1.13.13/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
...@@ -206,8 +206,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k ...@@ -206,8 +206,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk= github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo= github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM= github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
...@@ -315,8 +315,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc ...@@ -315,8 +315,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
......
...@@ -2,7 +2,9 @@ package server ...@@ -2,7 +2,9 @@ package server
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/odysseus/cache/cachedata" "github.com/odysseus/cache/cachedata"
...@@ -10,8 +12,12 @@ import ( ...@@ -10,8 +12,12 @@ import (
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"github.com/odysseus/scheduler/config" "github.com/odysseus/scheduler/config"
"github.com/odysseus/scheduler/utils" "github.com/odysseus/scheduler/utils"
"github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/query"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
...@@ -25,6 +31,23 @@ type Node struct { ...@@ -25,6 +31,23 @@ type Node struct {
kafkaProducer sarama.AsyncProducer kafkaProducer sarama.AsyncProducer
cache *cachedata.CacheData cache *cachedata.CacheData
wg sync.WaitGroup wg sync.WaitGroup
status string
}
func (n *Node) ServiceType() common.ServiceType {
return common.SERVICE_SCHEDULER
}
func (n *Node) Endpoint() string {
hname, _ := os.Hostname()
return fmt.Sprintf("%s:%d", hname, n.conf.Endpoint)
}
func (n *Node) DetailInfo() (json.RawMessage, error) {
info := query.SchedulerInfo{}
info.Status = n.status
return json.Marshal(info)
} }
func NewNode() *Node { func NewNode() *Node {
...@@ -54,17 +77,20 @@ func NewNode() *Node { ...@@ -54,17 +77,20 @@ func NewNode() *Node {
} }
brokers := strings.Split(node.conf.Kafka.Brokers, ";") brokers := strings.Split(node.conf.Kafka.Brokers, ";")
node.kafkaProducer, _ = utils.NewKafkaProducer(brokers) node.kafkaProducer, _ = utils.NewKafkaProducer(brokers)
node.status = "before running"
return node return node
} }
func (n *Node) Start() error { func (n *Node) Start() error {
n.status = "running"
return n.startAllTask() return n.startAllTask()
} }
func (n *Node) Stop() { func (n *Node) Stop() {
close(n.quit) close(n.quit)
n.wg.Wait() n.wg.Wait()
n.status = "stopped"
} }
func (n *Node) startAllTask() error { func (n *Node) startAllTask() error {
...@@ -75,6 +101,15 @@ func (n *Node) startAllTask() error { ...@@ -75,6 +101,15 @@ func (n *Node) startAllTask() error {
return nil return nil
} }
func (n *Node) register() {
registry := registry.NewRegistry(registry.RedisConnParam{
Addr: n.conf.Redis.Addr,
Password: n.conf.Redis.Password,
DbIndex: n.conf.Redis.DbIndex,
}, n)
registry.Start()
}
func (n *Node) Loop(idx int) { func (n *Node) Loop(idx int) {
defer n.wg.Done() defer n.wg.Done()
...@@ -206,6 +241,7 @@ func (n *Node) attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.Ta ...@@ -206,6 +241,7 @@ func (n *Node) attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.Ta
for { for {
if err := client.Consume(ctx, topics, consumer); err != nil { if err := client.Consume(ctx, topics, consumer); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) { if errors.Is(err, sarama.ErrClosedConsumerGroup) {
n.status = "kafka consumer closed"
return return
} }
log.WithError(err).Error("error from consumer") log.WithError(err).Error("error from consumer")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment