task.go 5.12 KB
Newer Older
Ubuntu's avatar
Ubuntu committed
1 2 3
package main

import (
4
	"code.wuban.net.cn/odysseus/twitter_syncer/swarm"
Ubuntu's avatar
Ubuntu committed
5 6 7
	"fmt"
	"log/slog"
	"sync"
Ubuntu's avatar
Ubuntu committed
8
	"time"
Ubuntu's avatar
Ubuntu committed
9 10 11 12
)

type Work struct {
	Lock sync.Mutex
Ubuntu's avatar
Ubuntu committed
13

Ubuntu's avatar
Ubuntu committed
14 15
	Task map[string]chan<- interface{}
	// userClient map[string]*Client
Ubuntu's avatar
Ubuntu committed
16 17 18 19 20 21
}

var Worker Work

func init() {
	Worker = Work{
Ubuntu's avatar
Ubuntu committed
22 23
		Task: make(map[string]chan<- interface{}),
		//userClient: make(map[string]*Client),
Ubuntu's avatar
Ubuntu committed
24 25 26
	}
}

vicotor's avatar
vicotor committed
27
func (w *Work) StopJob(req StopTaskReq) error {
Ubuntu's avatar
Ubuntu committed
28
	w.Lock.Lock()
vicotor's avatar
vicotor committed
29
	defer w.Lock.Unlock()
Ubuntu's avatar
Ubuntu committed
30

vicotor's avatar
vicotor committed
31 32 33
	key := req.User + "-" + req.TaskId + "-" + req.TaskType

	if v, ok := w.Task[key]; ok {
Ubuntu's avatar
Ubuntu committed
34
		close(v)
vicotor's avatar
vicotor committed
35
		delete(w.Task, key)
Ubuntu's avatar
Ubuntu committed
36

Ubuntu's avatar
Ubuntu committed
37
	} else {
vicotor's avatar
vicotor committed
38
		return fmt.Errorf("%s do not run", key)
Ubuntu's avatar
Ubuntu committed
39 40 41 42 43
	}

	return nil
}

Ubuntu's avatar
Ubuntu committed
44 45
func (w *Work) AddJob(t TaskJob) error {
	w.Lock.Lock()
vicotor's avatar
vicotor committed
46
	defer w.Lock.Unlock()
Ubuntu's avatar
Ubuntu committed
47

vicotor's avatar
vicotor committed
48 49 50 51
	key := t.UserId + "-" + t.TaskId + "-" + t.TaskType

	if _, ok := w.Task[key]; ok {
		return fmt.Errorf("%s has run", key)
Ubuntu's avatar
Ubuntu committed
52 53 54 55
	}

	done := w.RunJob(t)

vicotor's avatar
vicotor committed
56
	w.Task[key] = done
Ubuntu's avatar
Ubuntu committed
57 58 59 60

	return nil
}

Ubuntu's avatar
Ubuntu committed
61 62 63 64 65
type TimeAndFollowCount struct {
	Date        time.Time
	FollowCount int
}

Ubuntu's avatar
Ubuntu committed
66 67
func (w *Work) RunJob(t TaskJob) chan<- interface{} {

vicotor's avatar
vicotor committed
68
	slog.Info("exec job", "userid", t.UserId, "task type", t.TaskType, "task id", t.TaskId, "t.FollowerCount", t.FollowerCount)
Ubuntu's avatar
Ubuntu committed
69

Ubuntu's avatar
Ubuntu committed
70 71
	done := make(chan interface{})
	go func() {
Ubuntu's avatar
Ubuntu committed
72
		var cli *Client
Ubuntu's avatar
Ubuntu committed
73

Ubuntu's avatar
Ubuntu committed
74 75
		if t.TaskType == RetweetType {
			cli = NewRetweeterClient(t.Config)
Ubuntu's avatar
Ubuntu committed
76
		} else if t.TaskType == TweetLikingUsersType {
Ubuntu's avatar
Ubuntu committed
77
			cli = NewLikeClient(t.Config)
Ubuntu's avatar
Ubuntu committed
78
		}
Ubuntu's avatar
Ubuntu committed
79 80 81

		page := NewPageUsers(NewIdx(t.Idx))

Ubuntu's avatar
Ubuntu committed
82 83
		if t.TaskType == FollowType {

84
			cli := swarm.GetSwarm()
Ubuntu's avatar
Ubuntu committed
85

Ubuntu's avatar
Ubuntu committed
86
			secondTicker := time.NewTicker(time.Second * 3)
vicotor's avatar
vicotor committed
87
			fiveMinutesTicker := time.NewTicker(time.Minute * 1)
Ubuntu's avatar
Ubuntu committed
88 89 90 91 92 93
			halfHourTicker := time.NewTicker(time.Minute * 30)

			recordFc := make([]TimeAndFollowCount, 0, 100)

			for {
				select {
vicotor's avatar
vicotor committed
94
				case _, ok := <-done: // task is stopped by api.
Ubuntu's avatar
Ubuntu committed
95 96 97 98 99 100
					if !ok {
						return
					}
				case <-fiveMinutesTicker.C:
					slog.Info("case <-fiveMinutesTicker.C:")

vicotor's avatar
vicotor committed
101
					maybeFound := false // trigger followers request when maybeFound is true.
Ubuntu's avatar
Ubuntu committed
102 103 104 105 106 107 108 109 110 111 112 113 114 115

					for k, v := range recordFc {
						fmt.Println(k, v)
						if v.FollowCount != t.FollowerCount {
							maybeFound = true
						}
						if k == len(recordFc)-1 {
							t.FollowerCount = v.FollowCount
						}
					}

					fmt.Println(" t.FollowerCount", t.FollowerCount, "maybeFound", maybeFound)

					if maybeFound {
vicotor's avatar
vicotor committed
116
						fiveMinutesTicker.Reset(time.Minute * 3)
Ubuntu's avatar
Ubuntu committed
117
						halfHourTicker.Reset(time.Minute * 30)
118
						if err := Request(cli.GetFollowerList, page, t); err != nil {
Ubuntu's avatar
Ubuntu committed
119 120 121 122 123 124 125 126 127 128 129 130 131 132
							slog.Error(" page.Request", "task id", t.TaskId, "t.TaskType", t.TaskType, "err", err.Error())
							continue
						}

						if err := UpdateFollowerTaskCount(t.TaskId, t.TaskType, t.UserId, t.FollowerCount); err != nil {
							slog.Error("UpdateFollowerTaskCount", "err", err.Error(), "t.TaskId,", t.TaskId, "t.TaskType", t.TaskType, "t.UserId", t.UserId)
						}
					}

					recordFc = make([]TimeAndFollowCount, 0, 100)

					//recordFc = make(map[string]int)

				case <-secondTicker.C:
133
					fc, err := cli.GetFollowerCount(t.TaskId)
Ubuntu's avatar
Ubuntu committed
134 135 136 137 138 139 140 141 142 143

					if err != nil {
						slog.Error("TryProfileFollowerCount", "err", err.Error())
						continue
					}

					recordFc = append(recordFc, TimeAndFollowCount{
						Date:        time.Now(),
						FollowCount: fc,
					})
144
					secondTicker.Reset(time.Minute * 1)
Ubuntu's avatar
Ubuntu committed
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159

					// 先用和like retweet一样的周期模式;

				case <-halfHourTicker.C:
					slog.Info("case <-halfHourTicker.C:")
					for k, v := range recordFc {

						fmt.Println(k, v)
						if k == len(recordFc)-1 {
							t.FollowerCount = v.FollowCount
						}
					}

					fmt.Println(" t.FollowerCount", t.FollowerCount)

160
					if err := Request(cli.GetFollowerList, page, t); err != nil {
Ubuntu's avatar
Ubuntu committed
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
						slog.Error(" page.Request", "task id", t.TaskId, "t.TaskType", t.TaskType, "err", err.Error())
						continue
					}

					if err := UpdateFollowerTaskCount(t.TaskId, t.TaskType, t.UserId, t.FollowerCount); err != nil {
						slog.Error("UpdateFollowerTaskCount", "err", err.Error(), "t.TaskId,", t.TaskId, "t.TaskType", t.TaskType, "t.UserId", t.UserId)
					}
					//}

					recordFc = make([]TimeAndFollowCount, 0, 100)

					// if err := Request(Follower, page, t); err != nil {
					// 	slog.Error(" page.Request", "task id", t.TaskId, "t.TaskType", t.TaskType, "err", err.Error())
					// 	continue
					// }
				}
			}
		}

Ubuntu's avatar
Ubuntu committed
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
		for {

			select {
			case _, ok := <-done:
				if !ok {
					return
				}
			default:

				var f req

				if t.TaskType == RetweetType {
					f = cli.Retweeters
				} else {
					f = cli.TweetLikingUsers
				}

Ubuntu's avatar
Ubuntu committed
197
				if err := Request(f, page, t); err != nil {
Ubuntu's avatar
Ubuntu committed
198 199 200 201 202 203 204 205 206 207 208 209
					slog.Error(" page.Request", "task id", t.TaskId, "t.TaskType", t.TaskType, "err", err.Error())
					continue
				}

			}
		}

	}()

	return done

}
Ubuntu's avatar
Ubuntu committed
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232

func Request(f req, page *PageUsers, t TaskJob) error {

	users, err := page.Request(t.TaskId, "", f)

	if err != nil {
		return err
	}

	if err := InsertTaskRes(users, t.TaskType, t.TaskId); err != nil {

		for k, v := range users {
			fmt.Println(k, v.UserId, v.UserName)
		}

		slog.Error("InsertTaskRes", "task id", t.TaskId, "t.TaskType", t.TaskType, "len(users)", len(users), "err", err.Error())
	}

	slog.Info("InsertTaskRes", "task id", t.TaskId, "t.TaskType", t.TaskType, "len(users)", len(users))

	return nil

}