diff --git a/api/health_controller.go b/api/health_controller.go index fdf296be4310672b03a7a9a79053a2fba8e45cb8..66eb2d6d21e6a9c573aeaa6acc8abb44d49a6b24 100644 --- a/api/health_controller.go +++ b/api/health_controller.go @@ -19,7 +19,7 @@ func NewHealthController() *HealthController { // @Tags health // @Produce json // @Success 200 {object} HealthResponse -// @Router /api/v1/health [get] +// @Router /health [get] func (r HealthController) Show(c echo.Context) error { return c.JSON(http.StatusOK, HealthResponse{Status: "ok"}) } diff --git a/api/search_controller.go b/api/search_controller.go index 29e85d9d29a0e167b139361cbad4066b623c602e..bb64f358ebf72f4b20ead38d1900a86e9055811b 100644 --- a/api/search_controller.go +++ b/api/search_controller.go @@ -2,22 +2,38 @@ package api import ( "github.com/labstack/echo/v4" + "go.uber.org/dig" "net/http" + "strconv" "strings" + "steamshard.net/oppai-api/jobs" "steamshard.net/oppai-api/mal" "steamshard.net/oppai-api/models" ) type SearchController struct { - malClient mal.IClient - showRepo models.IShowRepo + malClient mal.IClient + showRepo models.IShowRepo + syncEpisodesJob jobs.ISyncEpisodesJob + jobManager jobs.IManager } -func NewSearchController(malClient mal.IClient, repo models.IShowRepo) *SearchController { +type NewSearchControllerParams struct { + dig.In + + MalClient mal.IClient + Repo models.IShowRepo + SyncEpisodesJob jobs.ISyncEpisodesJob + JobsManager jobs.IManager +} + +func NewSearchController(p NewSearchControllerParams) *SearchController { return &SearchController{ - malClient: malClient, - showRepo: repo, + malClient: p.MalClient, + showRepo: p.Repo, + syncEpisodesJob: p.SyncEpisodesJob, + jobManager: p.JobsManager, } } @@ -26,10 +42,10 @@ func NewSearchController(malClient mal.IClient, repo models.IShowRepo) *SearchCo // @Summary Search for anime // @Tags anime // @Produce json -// @Param q query string true "Query" +// @Param q query string true "Query" // @Param force_mal query string false "Force MAL search, expose to user when they're not happy with results" -// @Success 200 {array} models.Show -// @Failure 500 {object} Error +// @Success 200 {array} models.Show +// @Failure 500 {object} Error func (r SearchController) Show(c echo.Context) error { query := c.QueryParam("q") if query == "" { @@ -87,6 +103,21 @@ func (r SearchController) searchMal(query string) ([]models.Show, error) { } show, _ := r.showRepo.Upsert(upsertableShow) + if show.NeedsSync { + payload := jobs.SyncEpisodesJobPayload{ + ShowID: strconv.Itoa(int(show.ID)), + } + + job, err := r.syncEpisodesJob.ToEnqueue(0, payload) + if err != nil { + return nil, err + } + + err = r.jobManager.Enqueue(job) + if err != nil { + return nil, err + } + } shows[i] = *show } diff --git a/api/search_controller_test.go b/api/search_controller_test.go index 4a1dcee07eea37e7ccc0eaa8acb507ffe5c3a47e..78cd3921bfced315d78f0fa0d31f58d6fe901b5e 100644 --- a/api/search_controller_test.go +++ b/api/search_controller_test.go @@ -7,6 +7,9 @@ import ( "github.com/stretchr/testify/suite" "net/http/httptest" "testing" + "time" + + "steamshard.net/queue" "steamshard.net/oppai-api/api" "steamshard.net/oppai-api/mal" @@ -17,8 +20,10 @@ import ( type SearchControllerTestSuite struct { suite.Suite - mockMalClient *mocks.MockMalClient - mockShowRepo *mocks.MockShowRepo + mockMalClient *mocks.MockMalClient + mockShowRepo *mocks.MockShowRepo + mockSyncEpisodesJob *mocks.MockJobsSyncEpisodesJob + mockJobManager *mocks.MockJobsManager subject *api.SearchController } @@ -26,7 +31,17 @@ type SearchControllerTestSuite struct { func (suite *SearchControllerTestSuite) SetupTest() { suite.mockMalClient = &mocks.MockMalClient{} suite.mockShowRepo = &mocks.MockShowRepo{} - suite.subject = api.NewSearchController(suite.mockMalClient, suite.mockShowRepo) + suite.mockSyncEpisodesJob = &mocks.MockJobsSyncEpisodesJob{} + suite.mockJobManager = &mocks.MockJobsManager{} + + p := api.NewSearchControllerParams{ + MalClient: suite.mockMalClient, + Repo: suite.mockShowRepo, + SyncEpisodesJob: suite.mockSyncEpisodesJob, + JobsManager: suite.mockJobManager, + } + + suite.subject = api.NewSearchController(p) } func (suite *SearchControllerTestSuite) TestSearchForceMal() { @@ -171,6 +186,7 @@ func (suite *SearchControllerTestSuite) TestDbNoMatch() { TitleJapanese: "ãã®ç€ã›æ›¿ãˆäººå½¢ã¯æ‹ã‚’ã™ã‚‹", TitleSynonyms: []string{"Sono Kisekae Ningyou wa Koi wo Suru", "KiseKoi"}, ImageUrl: "https://example.local/url.jpeg", + NeedsSync: true, } suite.mockShowRepo.On("Search", "Sono Bisque Doll wa Koi wo Suru").Return([]models.Show{}, nil) @@ -188,6 +204,15 @@ func (suite *SearchControllerTestSuite) TestDbNoMatch() { suite.mockMalClient.On("SearchAnime", "Sono Bisque Doll wa Koi wo Suru").Return([]mal.Anime{anime}, nil) suite.mockShowRepo.On("Upsert", mock.AnythingOfType("Show")).Return(&mockShow, nil) + suite.mockSyncEpisodesJob.On("ToEnqueue", mock.AnythingOfType("time.Duration"), mock.AnythingOfType("jobs.SyncEpisodesJobPayload")).Return( + &queue.Job{ + Queue: "sync_jobs", + Payload: nil, + At: time.Now(), + Retries: 0, + }, nil) + suite.mockJobManager.On("Enqueue", mock.AnythingOfType("*queue.Job")).Return(nil) + request := httptest.NewRequest("GET", "/search?q=Sono%20Bisque%20Doll%20wa%20Koi%20wo%20Suru", nil) recorder := httptest.NewRecorder() @@ -213,6 +238,8 @@ func (suite *SearchControllerTestSuite) TestDbNoMatch() { suite.mockShowRepo.AssertExpectations(suite.T()) suite.mockMalClient.AssertExpectations(suite.T()) + suite.mockSyncEpisodesJob.AssertExpectations(suite.T()) + suite.mockJobManager.AssertExpectations(suite.T()) } diff --git a/config/config.go b/config/config.go index a3845cc0e4ca07b85892f1bae31999db3d204118..a94a027dc9ec741d2111507b7afd5fcfbcce5466 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "net/url" "os" + "steamshard.net/queue" "strconv" "time" @@ -19,26 +20,33 @@ func NewConfig() *Config { host, user, password, dbname, port := convertUrlToDsn(os.Getenv("DATABASE_URL")) return &Config{ - Env: env(), - HttpPort: os.Getenv("HTTP_PORT"), - HttpHost: os.Getenv("HTTP_HOST"), - EnableStatic: env() == "development", - GoogleClientID: os.Getenv("GOOGLE_CLIENT_ID"), - GoogleClientSecret: os.Getenv("GOOGLE_CLIENT_SECRET"), - GoogleCertsURL: "https://www.googleapis.com/oauth2/v1/certs", - DatabaseHost: host, - DatabasePort: port, - DatabaseUser: user, - DatabasePassword: password, - DatabaseName: dbname, - TokenSecret: []byte(os.Getenv("TOKEN_SECRET")), - AccessTokenExpiry: time.Minute * 30, // 30 minutes - RefreshTokenExpiry: time.Hour * 24 * 30, // 30 days - BackgroundProcessingConcurrency: backgroundProcessingConcurrency(), - BackgroundProcessingInterval: backgroundProcessingInterval(), + Env: env(), + HttpPort: os.Getenv("HTTP_PORT"), + HttpHost: os.Getenv("HTTP_HOST"), + EnableStatic: env() == "development", + GoogleClientID: os.Getenv("GOOGLE_CLIENT_ID"), + GoogleClientSecret: os.Getenv("GOOGLE_CLIENT_SECRET"), + GoogleCertsURL: "https://www.googleapis.com/oauth2/v1/certs", + DatabaseHost: host, + DatabasePort: port, + DatabaseUser: user, + DatabasePassword: password, + DatabaseName: dbname, + TokenSecret: []byte(os.Getenv("TOKEN_SECRET")), + AccessTokenExpiry: time.Minute * 30, // 30 minutes + RefreshTokenExpiry: time.Hour * 24 * 30, // 30 days } } +func NewQueueConfig() *queue.Config { + loadCurrentEnvironment() + + return queue.NewConfig( + uint(backgroundProcessingConcurrency()), + backgroundProcessingInterval(), + ) +} + func env() string { env := os.Getenv("OPPAI_ENV") if "" == env { diff --git a/container.go b/container.go index 76f2d4bd58f91c2851464f8959a6453df4e06881..865a1be16a418642a70fe4387bd0882c9b1d5809 100644 --- a/container.go +++ b/container.go @@ -3,11 +3,13 @@ package oppai_api import ( "go.uber.org/dig" + "steamshard.net/queue" + "steamshard.net/oppai-api/api" "steamshard.net/oppai-api/config" + "steamshard.net/oppai-api/jobs" "steamshard.net/oppai-api/mal" "steamshard.net/oppai-api/models" - "steamshard.net/oppai-api/tasks" ) func NewContainer() (*dig.Container, error) { @@ -33,6 +35,11 @@ func NewContainer() (*dig.Container, error) { return nil, err } + err = container.Provide(models.NewEpisodeRepo) + if err != nil { + return nil, err + } + err = container.Provide(models.NewConnection) if err != nil { return nil, err @@ -43,6 +50,11 @@ func NewContainer() (*dig.Container, error) { return nil, err } + err = container.Provide(config.NewQueueConfig) + if err != nil { + return nil, err + } + err = container.Provide(api.NewSessionsController) if err != nil { return nil, err @@ -63,7 +75,17 @@ func NewContainer() (*dig.Container, error) { return nil, err } - err = container.Provide(tasks.NewQueueManager) + err = container.Provide(queue.NewManager) + if err != nil { + return nil, err + } + + err = container.Provide(jobs.NewManager) + if err != nil { + return nil, err + } + + err = container.Provide(jobs.NewSyncEpisodesJob) if err != nil { return nil, err } diff --git a/docs/docs.go b/docs/docs.go index bd0fb76d2ebc737c433ad6cb83de4ff42f54a8a7..58a4ffcae3e9aba0c4ffd9b0f7dd620687852ad2 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -20,26 +20,6 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { - "/api/v1/health": { - "get": { - "description": "Check health status, primarily used for monitoring", - "produces": [ - "application/json" - ], - "tags": [ - "health" - ], - "summary": "Check health status", - "responses": { - "200": { - "description": "OK", - "schema": { - "$ref": "#/definitions/api.HealthResponse" - } - } - } - } - }, "/api/v1/me": { "get": { "security": [ @@ -196,6 +176,26 @@ const docTemplate = `{ } } }, + "/health": { + "get": { + "description": "Check health status, primarily used for monitoring", + "produces": [ + "application/json" + ], + "tags": [ + "health" + ], + "summary": "Check health status", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.HealthResponse" + } + } + } + } + }, "/oauth2/callback": { "post": { "description": "Create a session via accepting a Google OAuth2 redirect", @@ -306,11 +306,22 @@ const docTemplate = `{ } } }, - "mal.Anime": { + "models.Show": { "type": "object", "properties": { + "airing": { + "type": "boolean" + }, + "airing_end": { + "type": "string" + }, + "airing_start": { + "type": "string" + }, + "episode_count": { + "type": "integer" + }, "id": { - "description": "This will be populated from the DB later", "type": "integer" }, "image_url": { @@ -319,6 +330,9 @@ const docTemplate = `{ "mal_id": { "type": "integer" }, + "synopsis": { + "type": "string" + }, "title": { "type": "string" }, diff --git a/docs/swagger.json b/docs/swagger.json index c16389d79d99c2f0d947972a9b3912a9098b171d..f2175e25c66595d93c17b94c25e73c8465efd311 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -14,26 +14,6 @@ "host": "oppai-api.fly.dev", "basePath": "/", "paths": { - "/api/v1/health": { - "get": { - "description": "Check health status, primarily used for monitoring", - "produces": [ - "application/json" - ], - "tags": [ - "health" - ], - "summary": "Check health status", - "responses": { - "200": { - "description": "OK", - "schema": { - "$ref": "#/definitions/api.HealthResponse" - } - } - } - } - }, "/api/v1/me": { "get": { "security": [ @@ -190,6 +170,26 @@ } } }, + "/health": { + "get": { + "description": "Check health status, primarily used for monitoring", + "produces": [ + "application/json" + ], + "tags": [ + "health" + ], + "summary": "Check health status", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.HealthResponse" + } + } + } + } + }, "/oauth2/callback": { "post": { "description": "Create a session via accepting a Google OAuth2 redirect", @@ -300,11 +300,22 @@ } } }, - "mal.Anime": { + "models.Show": { "type": "object", "properties": { + "airing": { + "type": "boolean" + }, + "airing_end": { + "type": "string" + }, + "airing_start": { + "type": "string" + }, + "episode_count": { + "type": "integer" + }, "id": { - "description": "This will be populated from the DB later", "type": "integer" }, "image_url": { @@ -313,6 +324,9 @@ "mal_id": { "type": "integer" }, + "synopsis": { + "type": "string" + }, "title": { "type": "string" }, diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 8463d57962e71b78504a8103532b8968918b4532..8c412a5938f28ef1f71290d95565b8510fda4844 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -40,15 +40,24 @@ definitions: refresh_token: type: string type: object - mal.Anime: + models.Show: properties: + airing: + type: boolean + airing_end: + type: string + airing_start: + type: string + episode_count: + type: integer id: - description: This will be populated from the DB later type: integer image_url: type: string mal_id: type: integer + synopsis: + type: string title: type: string title_english: @@ -74,19 +83,6 @@ info: title: Oppai API version: "0.1" paths: - /api/v1/health: - get: - description: Check health status, primarily used for monitoring - produces: - - application/json - responses: - "200": - description: OK - schema: - $ref: '#/definitions/api.HealthResponse' - summary: Check health status - tags: - - health /api/v1/me: get: produces: @@ -185,6 +181,19 @@ paths: summary: Refresh a session tags: - sessions + /health: + get: + description: Check health status, primarily used for monitoring + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.HealthResponse' + summary: Check health status + tags: + - health /oauth2/callback: post: consumes: diff --git a/exe/migrate/main.go b/exe/migrate/main.go index 0728189c25b9de84e05de63d340604cabb3eee38..d155a465a2c7bc9ea88c102f918a2e95da65f745 100644 --- a/exe/migrate/main.go +++ b/exe/migrate/main.go @@ -4,23 +4,26 @@ import ( "go.uber.org/zap" "gorm.io/gorm" + "steamshard.net/queue" + oppaiapi "steamshard.net/oppai-api" + "steamshard.net/oppai-api/config" "steamshard.net/oppai-api/models" - "steamshard.net/oppai-api/tasks" ) func main() { container, _ := oppaiapi.NewContainer() logger, _ := zap.NewDevelopment() - _ = container.Invoke(func(db *gorm.DB) { - logger.Info("Migrating database") + _ = container.Invoke(func(db *gorm.DB, cfg *config.Config) { + logger.Info("Migrating database in " + cfg.Env + " environment") models.Migrate(db, logger, &models.User{}) models.Migrate(db, logger, &models.Session{}) models.Migrate(db, logger, &models.Show{}) + models.Migrate(db, logger, &models.Episode{}) - models.Migrate(db, logger, &tasks.Job{}) + models.Migrate(db, logger, &queue.Job{}) logger.Info("Database migrated") }) diff --git a/exe/server/main.go b/exe/server/main.go index ea7706172b6291908160ec9e1af87d05c75b525e..7c6f7128cafc9c11db09f868d94b0d0f70338244 100644 --- a/exe/server/main.go +++ b/exe/server/main.go @@ -1,11 +1,9 @@ package main import ( - "go.uber.org/dig" - oppaiapi "steamshard.net/oppai-api" "steamshard.net/oppai-api/api" - "steamshard.net/oppai-api/tasks" + "steamshard.net/oppai-api/jobs" ) func main() { @@ -14,13 +12,13 @@ func main() { panic(err) } - queueManager, err := prepareQueueManager(c) - if err != nil { + var jobManager jobs.IManager + if err := c.Invoke(func(jm jobs.IManager) { jobManager = jm }); err != nil { panic(err) } - queueManager.Start() - defer queueManager.Stop() + jobManager.Start() + defer jobManager.Stop() var app *api.Application if err := c.Invoke(func(a *api.Application) { app = a }); err != nil { @@ -29,14 +27,3 @@ func main() { app.Start() } - -func prepareQueueManager(c *dig.Container) (*tasks.QueueManager, error) { - var queueManager *tasks.QueueManager - if err := c.Invoke(func(qm *tasks.QueueManager) { - queueManager = qm - }); err != nil { - return nil, err - } - - return queueManager, nil -} diff --git a/go.mod b/go.mod index d0b001bba787c045ddf9147443f856efab2f8419..8b66efd3f77aca4fa56fb61c904889d73084a7da 100644 --- a/go.mod +++ b/go.mod @@ -3,26 +3,27 @@ module steamshard.net/oppai-api go 1.22.3 require ( - github.com/cenkalti/backoff/v4 v4.3.0 github.com/darenliang/jikan-go v1.2.3 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/joho/godotenv v1.5.1 github.com/labstack/echo/v4 v4.12.0 github.com/lib/pq v1.10.9 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 github.com/swaggo/echo-swagger v1.4.1 github.com/swaggo/swag v1.16.3 go.uber.org/dig v1.17.1 go.uber.org/zap v1.27.0 - gorm.io/driver/postgres v1.5.7 + gorm.io/driver/postgres v1.5.9 gorm.io/gorm v1.25.10 moul.io/zapgorm2 v1.3.0 + steamshard.net/queue v0.0.1 ) require ( github.com/KyleBanks/depth v1.2.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect @@ -31,7 +32,8 @@ require ( github.com/go-openapi/swag v0.19.15 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/pgx/v5 v5.4.3 // indirect + github.com/jackc/pgx/v5 v5.5.5 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -40,14 +42,14 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rogpeppe/go-internal v1.12.0 // indirect - github.com/stretchr/objx v0.5.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/swaggo/files/v2 v2.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/crypto v0.23.0 // indirect golang.org/x/net v0.25.0 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/go.sum b/go.sum index b661ba66fcaa8f632592c2391d5f065a948ed6ec..a1cae09efa97f0d2bc2faaac9e31164f61322905 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,10 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY= -github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= @@ -72,16 +74,13 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/swaggo/echo-swagger v1.4.1 h1:Yf0uPaJWp1uRtDloZALyLnvdBeoEL5Kc7DtnjzO/TUk= github.com/swaggo/echo-swagger v1.4.1/go.mod h1:C8bSi+9yH2FLZsnhqMZLIZddpUxZdBYuNHbtaS1Hljc= github.com/swaggo/files/v2 v2.0.0 h1:hmAt8Dkynw7Ssz46F6pn8ok6YmGZqHSVLZ+HQM7i0kw= @@ -123,6 +122,8 @@ golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -165,10 +166,12 @@ gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gorm.io/driver/postgres v1.5.7 h1:8ptbNJTDbEmhdr62uReG5BGkdQyeasu/FZHxI0IMGnM= -gorm.io/driver/postgres v1.5.7/go.mod h1:3e019WlBaYI5o5LIdNV+LyxCMNtLOQETBXL2h4chKpA= +gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8= +gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= gorm.io/gorm v1.23.6/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s= gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= moul.io/zapgorm2 v1.3.0 h1:+CzUTMIcnafd0d/BvBce8T4uPn6DQnpIrz64cyixlkk= moul.io/zapgorm2 v1.3.0/go.mod h1:nPVy6U9goFKHR4s+zfSo1xVFaoU7Qgd5DoCdOfzoCqs= +steamshard.net/queue v0.0.1 h1:/SWhA7N/K4wz/A/2j8o0hoSzGO0PUIJOLZbBkV8AHKg= +steamshard.net/queue v0.0.1/go.mod h1:p19X4FFZ9OMa+3Jd5ovyeTm/bIKlGOi/KNqHQ+aOoL4= diff --git a/jobs/job_manager.go b/jobs/job_manager.go new file mode 100644 index 0000000000000000000000000000000000000000..54b9142f4f07c7614ba2c193ac864026734def16 --- /dev/null +++ b/jobs/job_manager.go @@ -0,0 +1,52 @@ +package jobs + +import ( + "go.uber.org/dig" + + "steamshard.net/queue" +) + +type IManager interface { + Start() + Stop() + Enqueue(job *queue.Job) error +} + +type Manager struct { + QueueManager *queue.Manager + + SyncEpisodesJob ISyncEpisodesJob +} + +type NewManagerParams struct { + dig.In + + QueueManager *queue.Manager + SyncEpisodesJob ISyncEpisodesJob +} + +func NewManager(p NewManagerParams) IManager { + manager := &Manager{ + QueueManager: p.QueueManager, + SyncEpisodesJob: p.SyncEpisodesJob, + } + manager.route() + return manager +} + +func (jm *Manager) Start() { + jm.QueueManager.Start() +} + +func (jm *Manager) Stop() { + jm.QueueManager.Stop() +} + +func (jm *Manager) Enqueue(job *queue.Job) error { + return jm.QueueManager.Enqueue(job) +} + +func (jm *Manager) route() { + jm.QueueManager.AddQueue(jm.SyncEpisodesJob.Queue(), 1, 5) + jm.QueueManager.Handle(jm.SyncEpisodesJob.Queue(), jm.SyncEpisodesJob.Handler()) +} diff --git a/jobs/sync_episodes_job.go b/jobs/sync_episodes_job.go new file mode 100644 index 0000000000000000000000000000000000000000..912064718c957e0218ad3d8281ae94abd5c4d355 --- /dev/null +++ b/jobs/sync_episodes_job.go @@ -0,0 +1,108 @@ +package jobs + +import ( + "go.uber.org/dig" + "time" + + "steamshard.net/queue" + + "steamshard.net/oppai-api/mal" + "steamshard.net/oppai-api/models" +) + +const SyncEpisodesJobQueue = "sync_episodes" + +type SyncEpisodesJob struct { + queue string + showRepo models.IShowRepo + episodeRepo models.IEpisodeRepo + malClient mal.IClient +} + +type NewSyncEpisodesJobParams struct { + dig.In + + ShowRepo models.IShowRepo + EpisodeRepo models.IEpisodeRepo + MalClient mal.IClient +} + +type SyncEpisodesJobPayload struct { + ShowID string +} + +type ISyncEpisodesJob interface { + ToEnqueue(in time.Duration, payload SyncEpisodesJobPayload) (*queue.Job, error) + Handler() queue.HandlerFunc + Queue() string +} + +func NewSyncEpisodesJob(p NewSyncEpisodesJobParams) ISyncEpisodesJob { + return &SyncEpisodesJob{ + queue: SyncEpisodesJobQueue, + showRepo: p.ShowRepo, + episodeRepo: p.EpisodeRepo, + malClient: p.MalClient, + } +} + +func (j *SyncEpisodesJob) Queue() string { + return j.queue +} + +func (j *SyncEpisodesJob) ToEnqueue(in time.Duration, payload SyncEpisodesJobPayload) (*queue.Job, error) { + return queue.NewJob(SyncEpisodesJobQueue, payload, time.Now().Add(in)) +} + +func (j *SyncEpisodesJob) Handler() queue.HandlerFunc { + return func(job *queue.Job, manager queue.IManager) error { + return j.handle(job, manager) + } +} + +func (j *SyncEpisodesJob) handle(job *queue.Job, _ queue.IManager) error { + var payload SyncEpisodesJobPayload + err := job.UnmarshalPayload(&payload) + if err != nil { + return err + } + + show, err := j.showRepo.Find(payload.ShowID) + if err != nil { + return err + } + + if show.NeedsSync && show.LatestSync.Before(time.Now().Add(-24*time.Hour)) { + for i := 0; i < show.EpisodeCount; i++ { + malEpisode, err := j.malClient.GetEpisodeByAnimeID(show.MalID, i+1) + if err != nil { + return err + } + + episode := &models.Episode{ + Show: *show, + ShowID: show.ID, + MalID: malEpisode.MalID, + Title: malEpisode.Title, + TitleJapanese: malEpisode.TitleJapanese, + TitleRomanji: malEpisode.TitleRomanji, + Aired: malEpisode.Aired, + Synopsis: malEpisode.Synopsis, + } + err = j.episodeRepo.CreateEpisodeOf(show, episode) + if err != nil { + return err + } + } + + show.LatestSync = time.Now() + show.NeedsSync = false + + _, err = j.showRepo.Update(*show) + if err != nil { + return err + } + } + + return nil +} diff --git a/jobs/sync_episodes_job_test.go b/jobs/sync_episodes_job_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1a825949f3984bf2389ea8bd3ba2be3383daa8a9 --- /dev/null +++ b/jobs/sync_episodes_job_test.go @@ -0,0 +1,236 @@ +package jobs_test + +import ( + "encoding/json" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "testing" + "time" + + "steamshard.net/oppai-api/jobs" + "steamshard.net/oppai-api/mal" + "steamshard.net/oppai-api/mocks" + "steamshard.net/oppai-api/models" +) + +type SyncEpisodesJobSuite struct { + suite.Suite + + mockShowRepo mocks.MockShowRepo + mockEpisodeRepo mocks.MockEpisodeRepo + mockMalClient mocks.MockMalClient + + mockQueueManager mocks.MockQueueManager + + subject jobs.ISyncEpisodesJob +} + +func (suite *SyncEpisodesJobSuite) SetupTest() { + suite.mockShowRepo = mocks.MockShowRepo{} + suite.mockEpisodeRepo = mocks.MockEpisodeRepo{} + suite.mockMalClient = mocks.MockMalClient{} + + p := jobs.NewSyncEpisodesJobParams{ + ShowRepo: &suite.mockShowRepo, + EpisodeRepo: &suite.mockEpisodeRepo, + MalClient: &suite.mockMalClient, + } + suite.subject = jobs.NewSyncEpisodesJob(p) + + suite.mockQueueManager = mocks.MockQueueManager{} +} + +func (suite *SyncEpisodesJobSuite) TestQueue() { + suite.Equal(jobs.SyncEpisodesJobQueue, suite.subject.Queue()) +} + +func (suite *SyncEpisodesJobSuite) TestToEnqueue() { + payload := jobs.SyncEpisodesJobPayload{ + ShowID: "1", + } + + job, err := suite.subject.ToEnqueue(0, payload) + + bytes, _ := json.Marshal(payload) + + suite.NoError(err) + suite.Equal(jobs.SyncEpisodesJobQueue, job.Queue) + suite.Equal(bytes, job.Payload) +} + +func (suite *SyncEpisodesJobSuite) TestHandlerUnmarshalPayloadError() { + payload := jobs.SyncEpisodesJobPayload{ + ShowID: "1", + } + + job, err := suite.subject.ToEnqueue(0, payload) + job.Payload = []byte("invalid") + + err = suite.subject.Handler()(job, &suite.mockQueueManager) + + suite.Error(err) +} + +func (suite *SyncEpisodesJobSuite) TestHandlerShowRepoError() { + payload := jobs.SyncEpisodesJobPayload{ + ShowID: "1", + } + + job, err := suite.subject.ToEnqueue(0, payload) + + suite.mockShowRepo.On("Find", "1").Return(nil, assert.AnError) + + err = suite.subject.Handler()(job, &suite.mockQueueManager) + + suite.mockShowRepo.AssertExpectations(suite.T()) + suite.Error(err) +} + +func (suite *SyncEpisodesJobSuite) TestHandlerMalClientError() { + payload := jobs.SyncEpisodesJobPayload{ + ShowID: "1", + } + + job, err := suite.subject.ToEnqueue(0, payload) + + show := &models.Show{ + MalID: 1, + EpisodeCount: 1, + NeedsSync: true, + LatestSync: time.Now().Add(-25 * time.Hour), + } + + suite.mockShowRepo.On("Find", "1").Return(show, nil) + suite.mockMalClient.On("GetEpisodeByAnimeID", 1, 1).Return(&mal.AnimeEpisode{}, assert.AnError) + + err = suite.subject.Handler()(job, &suite.mockQueueManager) + + suite.mockShowRepo.AssertExpectations(suite.T()) + suite.mockMalClient.AssertExpectations(suite.T()) + suite.Error(err) +} + +func (suite *SyncEpisodesJobSuite) TestHandlerEpisodeRepoError() { + payload := jobs.SyncEpisodesJobPayload{ + ShowID: "1", + } + + job, err := suite.subject.ToEnqueue(0, payload) + + show := &models.Show{ + MalID: 1, + EpisodeCount: 1, + NeedsSync: true, + LatestSync: time.Now().Add(-25 * time.Hour), + } + + episode := mal.AnimeEpisode{ + MalID: 1, + Title: "My Dummy Episode", + TitleJapanese: "マイダミーエピソード", + TitleRomanji: "Mai DamÄ« EpisÅdo", + Aired: time.Time{}, + Synopsis: "Synopsis of my dummy episode", + } + + suite.mockShowRepo.On("Find", "1").Return(show, nil) + suite.mockMalClient.On("GetEpisodeByAnimeID", 1, 1).Return(&episode, nil) + suite.mockEpisodeRepo.On("CreateEpisodeOf", show, mock.AnythingOfType("*models.Episode")).Return(assert.AnError) + + err = suite.subject.Handler()(job, &suite.mockQueueManager) + + suite.mockShowRepo.AssertExpectations(suite.T()) + suite.mockMalClient.AssertExpectations(suite.T()) + suite.mockEpisodeRepo.AssertExpectations(suite.T()) + suite.Error(err) +} + +func (suite *SyncEpisodesJobSuite) TestHandlerHappyPath() { + payload := jobs.SyncEpisodesJobPayload{ + ShowID: "1", + } + + job, err := suite.subject.ToEnqueue(0, payload) + + show := &models.Show{ + MalID: 1, + EpisodeCount: 1, + NeedsSync: true, + LatestSync: time.Now().Add(-25 * time.Hour), + } + + episode := mal.AnimeEpisode{ + MalID: 1, + Title: "My Dummy Episode", + TitleJapanese: "マイダミーエピソード", + TitleRomanji: "Mai DamÄ« EpisÅdo", + Aired: time.Time{}, + Synopsis: "Synopsis of my dummy episode", + } + + suite.mockShowRepo.On("Find", "1").Return(show, nil) + suite.mockMalClient.On("GetEpisodeByAnimeID", 1, 1).Return(&episode, nil) + suite.mockEpisodeRepo.On("CreateEpisodeOf", show, mock.AnythingOfType("*models.Episode")).Return(nil) + suite.mockShowRepo.On("Update", mock.AnythingOfType("models.Show")).Return(show, nil) + + err = suite.subject.Handler()(job, &suite.mockQueueManager) + + suite.mockShowRepo.AssertExpectations(suite.T()) + suite.mockMalClient.AssertExpectations(suite.T()) + suite.mockEpisodeRepo.AssertExpectations(suite.T()) + suite.NoError(err) +} + +func (suite *SyncEpisodesJobSuite) TestHandlerHappyPathNoEpisodes() { + payload := jobs.SyncEpisodesJobPayload{ + ShowID: "1", + } + + job, err := suite.subject.ToEnqueue(0, payload) + + show := &models.Show{ + MalID: 1, + EpisodeCount: 0, + NeedsSync: true, + LatestSync: time.Now().Add(-25 * time.Hour), + } + + suite.mockShowRepo.On("Find", "1").Return(show, nil) + suite.mockShowRepo.On("Update", mock.AnythingOfType("models.Show")).Return(show, nil) + + err = suite.subject.Handler()(job, &suite.mockQueueManager) + + suite.mockShowRepo.AssertExpectations(suite.T()) + suite.mockMalClient.AssertExpectations(suite.T()) + suite.mockEpisodeRepo.AssertExpectations(suite.T()) + suite.NoError(err) +} + +func (suite *SyncEpisodesJobSuite) TestHandlerHappyPathNoSyncNeeded() { + payload := jobs.SyncEpisodesJobPayload{ + ShowID: "1", + } + + job, err := suite.subject.ToEnqueue(0, payload) + + show := &models.Show{ + MalID: 1, + EpisodeCount: 1, + NeedsSync: false, + LatestSync: time.Now().Add(-5 * time.Hour), + } + + suite.mockShowRepo.On("Find", "1").Return(show, nil) + + err = suite.subject.Handler()(job, &suite.mockQueueManager) + + suite.mockShowRepo.AssertExpectations(suite.T()) + suite.mockMalClient.AssertExpectations(suite.T()) + suite.mockEpisodeRepo.AssertExpectations(suite.T()) + suite.NoError(err) +} + +func TestSyncEpisodesJobSuite(t *testing.T) { + suite.Run(t, new(SyncEpisodesJobSuite)) +} diff --git a/mal/anime_episode.go b/mal/anime_episode.go new file mode 100644 index 0000000000000000000000000000000000000000..51da0290dc92c2c619aa81f8b886c9bc432d8bec --- /dev/null +++ b/mal/anime_episode.go @@ -0,0 +1,14 @@ +package mal + +import ( + "time" +) + +type AnimeEpisode struct { + MalID int `json:"mal_id"` + Title string `json:"title"` + TitleJapanese string `json:"title_japanese"` + TitleRomanji string `json:"title_romanji"` + Aired time.Time `json:"aired"` + Synopsis string `json:"synopsis"` +} diff --git a/mal/client.go b/mal/client.go index e79172b937803b1c6822536dd3098299264f5f57..749ff7ffcf5ade228fa74c584413273ef30cd99a 100644 --- a/mal/client.go +++ b/mal/client.go @@ -3,23 +3,33 @@ package mal import ( "github.com/darenliang/jikan-go" "net/url" + "sync" + "time" ) type IClient interface { SearchAnime(query string) ([]Anime, error) + GetEpisodeByAnimeID(animeID int, episode int) (*AnimeEpisode, error) } type Client struct { - JikanGetanimesearch func(query url.Values) (*jikan.AnimeSearch, error) + JikanGetanimesearch func(query url.Values) (*jikan.AnimeSearch, error) + JikanGetanimeepisodebyid func(id int, episode int) (*jikan.AnimeEpisodeById, error) + + lastRequestAt time.Time + mu sync.Mutex } func NewClient() IClient { return &Client{ - JikanGetanimesearch: jikan.GetAnimeSearch, + JikanGetanimesearch: jikan.GetAnimeSearch, + JikanGetanimeepisodebyid: jikan.GetAnimeEpisodeById, } } func (c *Client) SearchAnime(query string) ([]Anime, error) { + c.waitForNextRequestSlot() + jikanQuery := url.Values{} jikanQuery.Set("q", query) jikanQuery.Set("type", "tv") @@ -37,6 +47,32 @@ func (c *Client) SearchAnime(query string) ([]Anime, error) { return animes, nil } +func (c *Client) GetEpisodeByAnimeID(animeID int, episode int) (*AnimeEpisode, error) { + c.waitForNextRequestSlot() + + episodeResult, err := c.JikanGetanimeepisodebyid(animeID, episode) + if err != nil { + return nil, err + } + + return &AnimeEpisode{ + MalID: episodeResult.Data.MalId, + Title: episodeResult.Data.Title, + TitleJapanese: episodeResult.Data.TitleJapanese, + TitleRomanji: episodeResult.Data.TitleRomanji, + Aired: episodeResult.Data.Aired, + Synopsis: episodeResult.Data.Synopsis, + }, nil +} + +func (c *Client) waitForNextRequestSlot() { + c.mu.Lock() + time.Sleep(time.Until(c.lastRequestAt.Add(1 * time.Second))) + + c.lastRequestAt = time.Now() + c.mu.Unlock() +} + func animeBaseToAnime(base jikan.AnimeBase) Anime { return Anime{ MalID: base.MalId, diff --git a/mal/client_test.go b/mal/client_test.go index f8a0d4947ad4a774acb3c29f8f72a0796f9bc946..001056832acac7860d05aac7f02c47b372517db5 100644 --- a/mal/client_test.go +++ b/mal/client_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/suite" "net/url" "testing" + "time" "steamshard.net/oppai-api/mal" ) @@ -80,6 +81,60 @@ func (suite *MalClientTestSuite) TestSearchAnimeError() { suite.Nil(animes) } +type MockJikanData struct { + MalId int `json:"mal_id"` + Url string `json:"url"` + Title string `json:"title"` + TitleJapanese string `json:"title_japanese"` + TitleRomanji string `json:"title_romanji"` + Duration int `json:"duration"` + Aired time.Time `json:"aired"` + Filler bool `json:"filler"` + Recap bool `json:"recap"` + Synopsis string `json:"synopsis"` +} + +func (suite *MalClientTestSuite) TestGetEpisodeByAnimeID() { + suite.subject.(*mal.Client).JikanGetanimeepisodebyid = func(id int, episode int) (*jikan.AnimeEpisodeById, error) { + return &jikan.AnimeEpisodeById{ + Data: MockJikanData{ + MalId: 48736, + Title: "Episode 1", + TitleJapanese: "エピソード1", + TitleRomanji: "Episode 1", + Aired: time.Date(2022, 1, 8, 0, 0, 0, 0, time.UTC), + Synopsis: "Episode 1", + Url: "", + Duration: 0, + Filler: false, + Recap: false, + }, + }, nil + } + + episode, err := suite.subject.GetEpisodeByAnimeID(48736, 1) + + suite.NoError(err) + suite.Equal(48736, episode.MalID) + suite.Equal("Episode 1", episode.Title) + suite.Equal("エピソード1", episode.TitleJapanese) + suite.Equal("Episode 1", episode.TitleRomanji) + suite.Equal("2022-01-08 00:00:00 +0000 UTC", episode.Aired.String()) + suite.Equal("Episode 1", episode.Synopsis) + +} + +func (suite *MalClientTestSuite) TestGetEpisodeByAnimeIDError() { + suite.subject.(*mal.Client).JikanGetanimeepisodebyid = func(id int, episode int) (*jikan.AnimeEpisodeById, error) { + return nil, fmt.Errorf("error") + } + + episode, err := suite.subject.GetEpisodeByAnimeID(48736, 1) + + suite.Error(err) + suite.Nil(episode) +} + func TestMalClientTestSuite(t *testing.T) { suite.Run(t, new(MalClientTestSuite)) } diff --git a/mocks/jobs_manager.go b/mocks/jobs_manager.go new file mode 100644 index 0000000000000000000000000000000000000000..3bf77c063a8afaba88b1db5c6bd22db3d4b79535 --- /dev/null +++ b/mocks/jobs_manager.go @@ -0,0 +1,25 @@ +package mocks + +import ( + "github.com/stretchr/testify/mock" + + "steamshard.net/queue" +) + +type MockJobsManager struct { + mock.Mock +} + +func (m *MockJobsManager) Start() { + m.Called() +} + +func (m *MockJobsManager) Stop() { + m.Called() +} + +func (m *MockJobsManager) Enqueue(job *queue.Job) error { + args := m.Called(job) + + return args.Error(0) +} diff --git a/mocks/jobs_sync_episodes_job.go b/mocks/jobs_sync_episodes_job.go new file mode 100644 index 0000000000000000000000000000000000000000..efed953c575c10e4728664acfc27b961b38463c6 --- /dev/null +++ b/mocks/jobs_sync_episodes_job.go @@ -0,0 +1,51 @@ +package mocks + +import ( + "github.com/stretchr/testify/mock" + "time" + + "steamshard.net/queue" + + "steamshard.net/oppai-api/jobs" +) + +type MockJobsSyncEpisodesJob struct { + mock.Mock +} + +func (m *MockJobsSyncEpisodesJob) ToEnqueue(in time.Duration, payload jobs.SyncEpisodesJobPayload) (*queue.Job, error) { + ret := m.Called(in, payload) + + var r0 *queue.Job + if ret.Get(0) != nil { + r0 = ret.Get(0).(*queue.Job) + } + var r1 error + if ret.Get(1) != nil { + r1 = ret.Get(1).(error) + } + + return r0, r1 +} + +func (m *MockJobsSyncEpisodesJob) Handler() queue.HandlerFunc { + ret := m.Called() + + var r0 queue.HandlerFunc + if ret.Get(0) != nil { + r0 = ret.Get(0).(queue.HandlerFunc) + } + + return r0 +} + +func (m *MockJobsSyncEpisodesJob) Queue() string { + ret := m.Called() + + var r0 string + if ret.Get(0) != nil { + r0 = ret.Get(0).(string) + } + + return r0 +} diff --git a/mocks/mal_client.go b/mocks/mal_client.go index 0d8800b889cb4da56978b75bed91cda7d2c0ec3f..bc846b10b0737318599f165f7e8d13aa5407a55a 100644 --- a/mocks/mal_client.go +++ b/mocks/mal_client.go @@ -14,3 +14,8 @@ func (m *MockMalClient) SearchAnime(query string) ([]mal.Anime, error) { args := m.Called(query) return args.Get(0).([]mal.Anime), args.Error(1) } + +func (m *MockMalClient) GetEpisodeByAnimeID(animeID int, episode int) (*mal.AnimeEpisode, error) { + args := m.Called(animeID, episode) + return args.Get(0).(*mal.AnimeEpisode), args.Error(1) +} diff --git a/mocks/models_episode.go b/mocks/models_episode.go new file mode 100644 index 0000000000000000000000000000000000000000..b8d57a1b9b18b8fedb3122ec7305222b870f5fae --- /dev/null +++ b/mocks/models_episode.go @@ -0,0 +1,35 @@ +package mocks + +import ( + "github.com/stretchr/testify/mock" + + "steamshard.net/oppai-api/models" +) + +type MockEpisodeRepo struct { + mock.Mock +} + +func (m *MockEpisodeRepo) EpisodesOf(p0 *models.Show) ([]*models.Episode, error) { + ret := m.Called(p0) + + var r0 []*models.Episode + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*models.Episode) + } + var r1 error + if ret.Get(1) != nil { + r1 = ret.Error(1) + } + return r0, r1 +} + +func (m *MockEpisodeRepo) CreateEpisodeOf(p0 *models.Show, p1 *models.Episode) error { + ret := m.Called(p0, p1) + + var r0 error + if ret.Get(0) != nil { + r0 = ret.Error(0) + } + return r0 +} diff --git a/mocks/models_show.go b/mocks/models_show.go index a1d40a4fd3deda9c19aeb96245efa725c314af95..312c40094f4a801011f30c5e59a3a1e2c4628327 100644 --- a/mocks/models_show.go +++ b/mocks/models_show.go @@ -35,3 +35,9 @@ func (m *MockShowRepo) Search(query string) ([]models.Show, error) { } return r0, ret.Error(1) } + +func (m *MockShowRepo) Update(show models.Show) (*models.Show, error) { + ret := m.Called(show) + + return ret.Get(0).(*models.Show), ret.Error(1) +} diff --git a/mocks/queue_manager.go b/mocks/queue_manager.go new file mode 100644 index 0000000000000000000000000000000000000000..7ea11c76d2038277589c4ab36eb2e8410cdb0cee --- /dev/null +++ b/mocks/queue_manager.go @@ -0,0 +1,52 @@ +package mocks + +import ( + "github.com/stretchr/testify/mock" + "go.uber.org/zap" + + "steamshard.net/queue" +) + +type MockQueueManager struct { + mock.Mock +} + +// Start() +// Stop() +// Enqueue(job *Job) error +// JobLogger(job *Job) *zap.Logger +// AddQueue(name string, priority uint, maxRetries uint) +// Handle(queueName string, handler HandlerFunc) + +func (m *MockQueueManager) Start() { + m.Called() +} + +func (m *MockQueueManager) Stop() { + m.Called() +} + +func (m *MockQueueManager) Enqueue(job *queue.Job) error { + ret := m.Called(job) + + var r0 error + if ret.Get(0) != nil { + r0 = ret.Error(0) + } + + return r0 +} + +func (m *MockQueueManager) JobLogger(job *queue.Job) *zap.Logger { + args := m.Called(job) + + return args.Get(0).(*zap.Logger) +} + +func (m *MockQueueManager) AddQueue(name string, priority uint, maxRetries uint) { + m.Called(name, priority, maxRetries) +} + +func (m *MockQueueManager) Handle(queueName string, handler queue.HandlerFunc) { + m.Called(queueName, handler) +} diff --git a/models/episode.go b/models/episode.go new file mode 100644 index 0000000000000000000000000000000000000000..4c7febe82a4ba02d9a6ff886869959bef11948bc --- /dev/null +++ b/models/episode.go @@ -0,0 +1,64 @@ +package models + +import ( + "gorm.io/gorm" + "time" +) + +type Episode struct { + ID int `json:"id"` + CreatedAt time.Time `json:"-"` + UpdatedAt time.Time `json:"-"` + DeletedAt gorm.DeletedAt `json:"deleted_at" gorm:"index"` + + Show Show `json:"-" gorm:"foreignKey:ShowID"` + ShowID uint `json:"show_id" gorm:"index:idx_show_id_and_mal_id"` + MalID int `json:"mal_id" gorm:"index:idx_mal_id_and_show_id"` + Title string `json:"title"` + TitleJapanese string `json:"title_japanese"` + TitleRomanji string `json:"title_romanji"` + Aired time.Time `json:"aired"` + Synopsis string `json:"synopsis"` +} + +type IEpisodeRepo interface { + EpisodesOf(*Show) ([]*Episode, error) + CreateEpisodeOf(*Show, *Episode) error +} + +type EpisodeRepo struct { + db *gorm.DB +} + +func NewEpisodeRepo(db *gorm.DB) IEpisodeRepo { + return EpisodeRepo{db} +} + +func (e EpisodeRepo) EpisodesOf(show *Show) ([]*Episode, error) { + var episodes []*Episode + result := e.db.Where("show_id = ?", show.ID).Order("mal_id ASC").Find(&episodes) + if result.Error != nil { + return nil, result.Error + } + + return episodes, nil +} + +func (e EpisodeRepo) CreateEpisodeOf(show *Show, episode *Episode) error { + episode.Show = *show + episode.ShowID = show.ID + + // check if episode already exists + var existing Episode + result := e.db.Where("mal_id = ? AND show_id = ?", episode.MalID, show.ID).First(&existing) + if result.Error == nil && existing.ID != 0 { + return nil + } + + result = e.db.Create(episode) + if result.Error != nil { + return result.Error + } + + return nil +} diff --git a/models/show.go b/models/show.go index 9b7a934ffc4a26ae1a6963ddc88660f6946c7237..e4f78f5bc72022c0f8c10a262150bdedee717885 100644 --- a/models/show.go +++ b/models/show.go @@ -16,19 +16,23 @@ type Show struct { Title string `gorm:"index" json:"title"` TitleEnglish string `gorm:"index" json:"title_english"` TitleJapanese string `gorm:"index" json:"title_japanese"` - TitleSynonyms pq.StringArray `gorm:"type:text[];index" json:"title_synonyms"` + TitleSynonyms pq.StringArray `gorm:"type:text[];index" json:"title_synonyms" swaggertype:"array,string"` Synopsis string `json:"synopsis"` EpisodeCount int `json:"episode_count"` Airing bool `json:"airing"` AiringStart time.Time `gorm:"default:null" json:"airing_start"` AiringEnd time.Time `gorm:"default:null" json:"airing_end"` ImageUrl string `json:"image_url"` + Episodes []Episode `json:"-"` + NeedsSync bool `json:"-" gorm:"default:true"` + LatestSync time.Time `json:"-" gorm:"default:null"` } type IShowRepo interface { Upsert(toUpsert Show) (*Show, error) Find(id string) (*Show, error) Search(query string) ([]Show, error) + Update(show Show) (*Show, error) } type ShowRepo struct { @@ -97,3 +101,13 @@ func (s ShowRepo) Search(query string) ([]Show, error) { return shows, nil } + +func (s ShowRepo) Update(show Show) (*Show, error) { + result := s.db.Save(&show) + + if result.Error != nil { + return nil, result.Error + } + + return &show, nil +} diff --git a/models/show_test.go b/models/show_test.go index da7d78ede42ce230f96e405bbb7db6d7917f110c..79227329e53dffed4c4426250b93e7e572bfd870 100644 --- a/models/show_test.go +++ b/models/show_test.go @@ -23,17 +23,20 @@ type ShowRepoSuite struct { func (suite *ShowRepoSuite) SetupTest() { _ = os.Chdir("..") cfg := config.NewConfig() + if cfg.Env != "test" { + panic("Tests must be run in test environment") + } suite.conn = models.NewConnection(cfg, zap.NewNop()) suite.subject = models.NewShowRepo(suite.conn) } func (suite *ShowRepoSuite) TearDownTest() { - _ = suite.conn.Exec("TRUNCATE TABLE shows") + _ = suite.conn.Exec("TRUNCATE TABLE shows CASCADE ") } func (suite *ShowRepoSuite) TestUpsert() { show := models.Show{ - MalID: 1, + MalID: 19, } _, err := suite.subject.Upsert(show) @@ -44,7 +47,7 @@ func (suite *ShowRepoSuite) TestUpsert() { func (suite *ShowRepoSuite) TestUpsertDuplicate() { show := models.Show{ - MalID: 1, + MalID: 18, } _, _ = suite.subject.Upsert(show) @@ -58,7 +61,7 @@ func (suite *ShowRepoSuite) TestUpsertDuplicate() { func (suite *ShowRepoSuite) TestFind() { show := &models.Show{ - MalID: 1, + MalID: 17, } show, err := suite.subject.Upsert(*show) @@ -72,12 +75,11 @@ func (suite *ShowRepoSuite) TestFind() { func (suite *ShowRepoSuite) TestSearchByTitle() { show := &models.Show{ - MalID: 1, + MalID: 16, Title: "Test", } - show, err := suite.subject.Upsert(*show) - suite.NoError(err) + suite.conn.Create(show) found, err := suite.subject.Search("Test") @@ -88,7 +90,7 @@ func (suite *ShowRepoSuite) TestSearchByTitle() { func (suite *ShowRepoSuite) TestSearchByTitleEnglish() { show := &models.Show{ - MalID: 1, + MalID: 15, TitleEnglish: "Test", } @@ -104,7 +106,7 @@ func (suite *ShowRepoSuite) TestSearchByTitleEnglish() { func (suite *ShowRepoSuite) TestSearchByTitleJapanese() { show := &models.Show{ - MalID: 1, + MalID: 14, TitleJapanese: "テスト", } @@ -120,7 +122,7 @@ func (suite *ShowRepoSuite) TestSearchByTitleJapanese() { func (suite *ShowRepoSuite) TestSearchByTitleSynonyms_FirstSynonym() { show := &models.Show{ - MalID: 1, + MalID: 13, TitleSynonyms: []string{"Test", "Another"}, } @@ -136,7 +138,7 @@ func (suite *ShowRepoSuite) TestSearchByTitleSynonyms_FirstSynonym() { func (suite *ShowRepoSuite) TestSearchByTitleSynonyms_SecondSynonym() { show := &models.Show{ - MalID: 1, + MalID: 11, TitleSynonyms: []string{"Test", "Another"}, } @@ -150,6 +152,23 @@ func (suite *ShowRepoSuite) TestSearchByTitleSynonyms_SecondSynonym() { suite.Equal(show.ID, found[0].ID) } +func (suite *ShowRepoSuite) TestUpdate() { + show := &models.Show{ + MalID: 12, + Title: "Definitely Not Test", + } + + show, err := suite.subject.Upsert(*show) + suite.NoError(err) + + show.Title = "Test" + + updated, err := suite.subject.Update(*show) + + suite.NoError(err) + suite.Equal("Test", updated.Title) +} + func TestShowRepoSuite(t *testing.T) { suite.Run(t, new(ShowRepoSuite)) } diff --git a/tasks/job.go b/tasks/job.go deleted file mode 100644 index 89657a2417bb426032414b86ea35054fc81ac209..0000000000000000000000000000000000000000 --- a/tasks/job.go +++ /dev/null @@ -1,44 +0,0 @@ -package tasks - -import ( - "encoding/json" - "gorm.io/gorm" - "time" -) - -type Job struct { - gorm.Model - Queue string - Payload []byte - At time.Time - Retries uint -} - -func NewJob(queue string, payload interface{}, at time.Time) (*Job, error) { - job := &Job{ - Queue: queue, - At: at, - } - err := job.MarshalPayload(payload) - if err != nil { - return nil, err - } - return job, nil -} - -func (j *Job) MarshalPayload(v interface{}) error { - b, err := json.Marshal(v) - if err != nil { - return err - } - j.Payload = b - return nil -} - -func (j *Job) UnmarshalPayload(v interface{}) error { - err := json.Unmarshal(j.Payload, v) - if err != nil { - return err - } - return nil -} diff --git a/tasks/job_test.go b/tasks/job_test.go deleted file mode 100644 index ae29939fe990f20c8b0f334e22ff7d44cf968a69..0000000000000000000000000000000000000000 --- a/tasks/job_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package tasks_test - -import ( - "github.com/stretchr/testify/assert" - "testing" - "time" - - "steamshard.net/oppai-api/tasks" -) - -type FakePayload struct { - FakeField string `json:"fake_field"` -} - -func TestNewJob(t *testing.T) { - payload := FakePayload{ - FakeField: "fake", - } - - at := time.Now() - - job, err := tasks.NewJob("test", payload, at) - assert.Nil(t, err) - assert.Equal(t, "test", job.Queue) - assert.Equal(t, at, job.At) - - var p FakePayload - err = job.UnmarshalPayload(&p) - assert.Nil(t, err) - assert.Equal(t, "fake", p.FakeField) -} diff --git a/tasks/queue.go b/tasks/queue.go deleted file mode 100644 index 61d0bc3d99563fdbbf4dc35c6807e73a4d25bd8b..0000000000000000000000000000000000000000 --- a/tasks/queue.go +++ /dev/null @@ -1,116 +0,0 @@ -package tasks - -import ( - "errors" - "github.com/cenkalti/backoff/v4" - "go.uber.org/zap" - "gorm.io/gorm" - "gorm.io/gorm/clause" - "time" -) - -// This is heavily inspired by https://github.com/taylorchu/work, -// adapted to use PostgreSQL locks as a way to guarantee exactly-once processing. -// work/v2 is a fantastic library, but I don't need half of it, and I don't like at-least-once processing for this use case. -// -// Hey Karol, I actually used that interview question for something :D - -type Queue struct { - Name string - Priority uint - MaxRetries uint - - manager *QueueManager -} - -func newQueue(name string, priority uint, maxRetries uint) *Queue { - return &Queue{ - Name: name, - Priority: priority, - MaxRetries: maxRetries, - } -} - -func (q *Queue) setManager(m *QueueManager) { - q.manager = m -} - -func (q *Queue) enqueue(job *Job) error { - result := q.manager.conn.Create(&job) - if result.Error != nil { - return result.Error - } - - return nil -} - -func (q *Queue) dequeueAndProcess(workerPid int64) (bool, error) { - var job *Job - - q.manager.logger.Debug("Dequeueing job", zap.String("queue", q.Name), zap.Int64("worker_pid", workerPid)) - - processedAnything := false - - err := q.manager.conn.Transaction(func(tx *gorm.DB) error { - result := tx. - Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}). - Order("at asc"). - First(&job, "queue = ? AND at < ? AND retries <= ?", q.Name, time.Now(), q.MaxRetries) - - if result.Error != nil { - if errors.Is(result.Error, gorm.ErrRecordNotFound) { - return nil - } - - return result.Error - } - - handler, ok := q.manager.handlers[job.Queue] - if !ok { - return errors.New("no handler for queue " + job.Queue) - } - - defer func() { - if r := recover(); r != nil { - q.manager.logger.Error("Recovering from panic", zap.Any("recovered", r), zap.String("queue", q.Name), zap.Int64("worker_pid", workerPid), zap.Any("jid", job.ID)) - job.At = time.Now().Add(jobBackoff(job)) - job.Retries++ - tx.Save(&job) - } - }() - - q.manager.logger.Debug("Processing job", zap.String("queue", q.Name), zap.Int64("worker_pid", workerPid), zap.Any("jid", job.ID)) - err := handler(job, q.manager) - if err != nil { - q.manager.logger.Error("Error processing job", zap.Any("jid", job.ID), zap.Error(err), zap.String("queue", q.Name), zap.Int64("worker_pid", workerPid)) - job.At = time.Now().Add(jobBackoff(job)) - job.Retries++ - tx.Save(&job) - return nil - } - - q.manager.logger.Debug("Processed job", zap.String("queue", q.Name), zap.Int64("worker_pid", workerPid), zap.Any("jid", job.ID)) - tx.Unscoped().Delete(&job) // We don't want to keep processed jobs in the database - - processedAnything = true - return nil - }) - - return processedAnything, err -} - -func jobBackoff(job *Job) time.Duration { - b := backoff.NewExponentialBackOff() - b.InitialInterval = time.Second - b.RandomizationFactor = 0.2 - b.Multiplier = 1.6 - b.MaxInterval = time.Hour - b.MaxElapsedTime = 0 - b.Reset() - - var next time.Duration - for i := uint(0); i < job.Retries; i++ { - next = b.NextBackOff() - } - return next -} diff --git a/tasks/queue_manager.go b/tasks/queue_manager.go deleted file mode 100644 index 10ececf5f457e9eaff6980a0385b5a106575600f..0000000000000000000000000000000000000000 --- a/tasks/queue_manager.go +++ /dev/null @@ -1,198 +0,0 @@ -package tasks - -import ( - "context" - "fmt" - "go.uber.org/zap" - "gorm.io/gorm" - "os" - "os/signal" - "slices" - "sync" - "syscall" - "time" - - "steamshard.net/oppai-api/config" -) - -type QueueManager struct { - conn *gorm.DB - queues map[string]*Queue - queuesByPriority map[uint][]string - priorities []uint - handlers map[string]HandlerFunc - logger *zap.Logger - cancel context.CancelFunc - wg sync.WaitGroup - config *config.Config -} - -type HandlerFunc func(*Job, *QueueManager) error - -func NewQueueManager(conn *gorm.DB, logger *zap.Logger, cfg *config.Config) *QueueManager { - logger.Info("Creating new queue manager") - return &QueueManager{ - conn: conn, - queues: make(map[string]*Queue), - queuesByPriority: make(map[uint][]string), - priorities: make([]uint, 0), - handlers: make(map[string]HandlerFunc), - logger: logger.Named("queue_manager"), - wg: sync.WaitGroup{}, - config: cfg, - } -} - -func (m *QueueManager) JobLogger(job *Job) *zap.Logger { - return m.logger.Named("job").With(zap.String("queue", job.Queue), zap.Uint("jid", job.ID)) -} - -func (m *QueueManager) Enqueue(job *Job) error { - if _, ok := m.queues[job.Queue]; !ok { - return fmt.Errorf("no queue %s", job.Queue) - } - - if _, ok := m.handlers[job.Queue]; !ok { - return fmt.Errorf("no handler for queue %s", job.Queue) - } - - return m.queues[job.Queue].enqueue(job) -} - -// AddQueue registers a queue with the manager. -// -// Example: -// -// manager.AddQueue("default", 0, 10) -// -// manager.Handle("default", func(job *tasks.Job, manager *tasks.QueueManager) error { -// payload := struct { -// Name string `json:"name"` -// }{} -// err := job.UnmarshalPayload(&payload) -// if err != nil { -// return err -// } -// -// manager.JobLogger(job).Info("Processing job", zap.String("name", payload.Name)) -// return nil -// }) -func (m *QueueManager) AddQueue(name string, priority uint, maxRetries uint) { - if _, ok := m.queues[name]; ok { - m.logger.Warn("Queue already registered", zap.String("name", name)) - return - } - - queue := newQueue(name, priority, maxRetries) - m.registerQueue(queue) -} - -func (m *QueueManager) Handle(queueName string, handler HandlerFunc) { - m.handlers[queueName] = handler -} - -func (m *QueueManager) Start() { - ctx, cancel := context.WithCancel(context.Background()) - m.cancel = cancel - for i := int64(0); i < m.config.BackgroundProcessingConcurrency; i++ { - m.wg.Add(1) - m.logger.Info("Starting worker", zap.Int64("pid", i)) - go func(w *QueueManager) { - defer w.wg.Done() - - for { - select { - case <-ctx.Done(): - return - default: - } - - ok, err := w.dequeueAndProcess(i) - if err != nil { - w.logger.Error("Error dequeuing and processing", zap.Error(err), zap.Int64("pid", i)) - } - - if !ok { - w.logger.Debug("No jobs to process", zap.Int64("pid", i), zap.Duration("interval", w.config.BackgroundProcessingInterval)) - time.Sleep(m.config.BackgroundProcessingInterval) - } - } - }(m) - } - - go func() { - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - <-c - m.logger.Info("Received interrupt signal, stopping workers") - m.Stop() - os.Exit(0) - }() -} - -func (m *QueueManager) Stop() { - m.logger.Info("Sending stop signal to workers") - if m.cancel != nil { - m.cancel() - } - m.logger.Info("Waiting for workers to finish") - m.wg.Wait() - m.logger.Info("All workers finished, stopping queue manager") -} - -func (m *QueueManager) dequeueAndProcess(workerPid int64) (bool, error) { - for _, priority := range m.priorities { - m.logger.Debug("Checking priority", zap.Uint("priority", priority)) - if len(m.queuesByPriority[priority]) == 0 { - continue - } - - for _, queueName := range m.queuesByPriority[priority] { - m.logger.Debug("Checking queue", zap.String("name", queueName)) - ok, err := m.queues[queueName].dequeueAndProcess(workerPid) - if err != nil { - return false, err - } - - if ok { - return true, nil - } - } - } - - return false, nil -} - -func (m *QueueManager) registerQueue(queue *Queue) { - queue.setManager(m) - priority := queue.Priority - m.priorities = append(m.priorities, priority) - m.queuesByPriority[priority] = append(m.queuesByPriority[priority], queue.Name) - - m.priorities = uniqueUints(m.priorities) - slices.Sort(m.priorities) - slices.Reverse(m.priorities) - - m.logger.Info("Registered queue", zap.String("name", queue.Name), zap.Uint("priority", priority)) - m.logger.Debug("Priorities", zap.Any("priorities", m.priorities)) - m.logger.Debug("Queues by priority", zap.Any("queuesByPriority", m.queuesByPriority)) - - m.queues[queue.Name] = queue - m.logger.Info("Registered queues", zap.Any("queues", m.queues)) -} - -func uniqueUints(intSlice []uint) []uint { - // Create a map to track seen integers. - seen := make(map[uint]bool) - var result []uint - - for _, v := range intSlice { - // If the integer is not already in the map, add it to the result slice. - if !seen[v] { - seen[v] = true - result = append(result, v) - } - } - - return result -} diff --git a/tasks/queue_manager_integration_test.go b/tasks/queue_manager_integration_test.go deleted file mode 100644 index ac814888c45b0223e24860714603f43fc6f03327..0000000000000000000000000000000000000000 --- a/tasks/queue_manager_integration_test.go +++ /dev/null @@ -1,208 +0,0 @@ -package tasks_test - -import ( - "fmt" - "github.com/stretchr/testify/suite" - "go.uber.org/zap" - "gorm.io/gorm" - "os" - "testing" - "time" - - "steamshard.net/oppai-api/config" - "steamshard.net/oppai-api/models" - "steamshard.net/oppai-api/tasks" -) - -type QueueManagerIntegrationTestSuite struct { - suite.Suite - - conn *gorm.DB - logger *zap.Logger - cfg *config.Config -} - -func (s *QueueManagerIntegrationTestSuite) SetupTest() { - _ = os.Chdir("..") - cfg := config.NewConfig() - logger := zap.NewNop() - conn := models.NewConnection(cfg, logger) - - cfg.BackgroundProcessingConcurrency = 1 - cfg.BackgroundProcessingInterval = time.Second - - s.conn = conn - s.logger = logger - s.cfg = cfg -} - -func (s *QueueManagerIntegrationTestSuite) TearDownTest() { - s.conn.Exec("TRUNCATE TABLE jobs") -} - -func (s *QueueManagerIntegrationTestSuite) TestHappyPath() { - m := tasks.NewQueueManager(s.conn, s.logger, s.cfg) - - s.NotNil(m) - - m.AddQueue("test", 1, 1) - - sideEffect := false - strSideEffect := "very real" - - m.Handle("test", func(job *tasks.Job, manager *tasks.QueueManager) error { - var p FakePayload - err := job.UnmarshalPayload(&p) - if err != nil { - return err - } - - strSideEffect = p.FakeField - - sideEffect = true - return nil - }) - - payload := FakePayload{FakeField: "fake"} - task, err := tasks.NewJob("test", payload, time.Time{}) - - s.NoError(err) - - err = m.Enqueue(task) - s.NoError(err) - - m.Start() - - time.Sleep(2 * time.Second) - s.True(sideEffect) - s.Equal("fake", strSideEffect) - - m.Stop() -} - -func (s *QueueManagerIntegrationTestSuite) TestError() { - m := tasks.NewQueueManager(s.conn, s.logger, s.cfg) - - s.NotNil(m) - - m.AddQueue("test", 1, 1) - - m.Handle("test", func(job *tasks.Job, manager *tasks.QueueManager) error { - return fmt.Errorf("fake error") - }) - - task, err := tasks.NewJob("test", nil, time.Time{}) - s.NoError(err) - - err = m.Enqueue(task) - s.NoError(err) - - s.NotPanics(func() { - m.Start() - time.Sleep(2 * time.Second) - m.Stop() - }) - - s.conn.Find(&task, task.ID) - s.Greater(task.Retries, uint(0)) -} - -func (s *QueueManagerIntegrationTestSuite) TestPanic() { - m := tasks.NewQueueManager(s.conn, s.logger, s.cfg) - - s.NotNil(m) - - m.AddQueue("test", 1, 1) - - m.Handle("test", func(job *tasks.Job, manager *tasks.QueueManager) error { - panic("I am very afraid") - }) - - task, err := tasks.NewJob("test", nil, time.Time{}) - s.NoError(err) - - err = m.Enqueue(task) - s.NoError(err) - - s.NotPanics(func() { - m.Start() - time.Sleep(2 * time.Second) - m.Stop() - }) - - s.conn.Find(&task, task.ID) - s.Greater(task.Retries, uint(0)) -} - -func (s *QueueManagerIntegrationTestSuite) TestNoHandler() { - m := tasks.NewQueueManager(s.conn, s.logger, s.cfg) - - s.NotNil(m) - - m.AddQueue("test", 1, 1) - - task, err := tasks.NewJob("test", nil, time.Time{}) - s.NoError(err) - - err = m.Enqueue(task) - s.Error(err, "no handler for queue test") -} - -func (s *QueueManagerIntegrationTestSuite) TestNoQueue() { - m := tasks.NewQueueManager(s.conn, s.logger, s.cfg) - - s.NotNil(m) - - task, err := tasks.NewJob("test", nil, time.Time{}) - s.NoError(err) - - err = m.Enqueue(task) - s.Error(err, "no handler for queue test") -} - -func (s *QueueManagerIntegrationTestSuite) TestDeadJob() { - m := tasks.NewQueueManager(s.conn, s.logger, s.cfg) - - s.NotNil(m) - - m.AddQueue("test", 1, 1) - - sideEffect := false - strSideEffect := "very real" - - m.Handle("test", func(job *tasks.Job, manager *tasks.QueueManager) error { - var p FakePayload - err := job.UnmarshalPayload(&p) - if err != nil { - return err - } - - strSideEffect = p.FakeField - - sideEffect = true - return nil - }) - - payload := FakePayload{FakeField: "fake"} - task, err := tasks.NewJob("test", payload, time.Time{}) - s.NoError(err) - - err = m.Enqueue(task) - s.NoError(err) - task.Retries = 2 - s.conn.Save(&task) - - m.Start() - - time.Sleep(2 * time.Second) - - // Job wasn't executed, so side effect readouts don't change - s.False(sideEffect) - s.Equal("very real", strSideEffect) - - m.Stop() -} - -func TestQueueManagerTestSuite(t *testing.T) { - suite.Run(t, new(QueueManagerIntegrationTestSuite)) -}