add message_bus service dependency (#25)

This commit is contained in:
link 2022-12-01 17:17:44 +08:00 committed by GitHub
parent f6ad3aa89a
commit 98a35cb107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 102 additions and 37 deletions

View File

@ -74,7 +74,27 @@ paths:
"500":
description: Internal server error
$ref: "#/components/responses/BadResponse"
/event/local_storage/{serial}:
delete:
summary: Delete an event
description: Delete an event
operationId: deleteEventBySerial
tags:
- event
parameters:
- $ref: "#/components/parameters/serial"
responses:
"200":
description: Event deleted
$ref: "#/components/responses/OKResponse"
"401":
description: Event not found
"404":
description: Event not found
$ref: "#/components/responses/ResponseNotFound"
"500":
$ref: "#/components/responses/BadResponse"
description: Internal server error
components:
securitySchemes:
access_token:
@ -82,6 +102,14 @@ components:
in: header
name: Authorization
parameters:
serial:
name: serial
in: path
description: Serial of the local storage
required: true
schema:
type: string
example: "1234567890"
event_uuid:
name: event_uuid
in: path
@ -92,6 +120,13 @@ components:
format: uuid
example: 123e4567-e89b-12d3-a456-426655440000
responses:
OKResponse:
description: OK
content:
application/json:
schema:
type: object
$ref: "#/components/schemas/BaseResponse"
DeleteEventOK:
description: Event deleted
content:

View File

@ -1,5 +1,7 @@
[Unit]
After=casaos-gateway.service
After=casaos-message-bus.service
After=casaos-local-storage.service
ConditionFileNotEmpty=/etc/casaos/user-service.conf
Description=CasaOS User Service

View File

@ -5,6 +5,7 @@ import (
"fmt"
"log"
"strings"
"time"
"github.com/IceWhaleTech/CasaOS-Common/external"
"github.com/IceWhaleTech/CasaOS-Common/utils/logger"
@ -17,49 +18,53 @@ import (
)
func EventListen() {
for i := 0; i < 100; i++ {
messageBusUrl, err := external.GetMessageBusAddress(config.CommonInfo.RuntimePath)
if err != nil {
logger.Error("get message bus url error", zap.Any("err", err))
return
}
wsURL := fmt.Sprintf("ws://%s/event/%s", strings.ReplaceAll(messageBusUrl, "http://", ""), "local-storage")
ws, err := websocket.Dial(wsURL, "", "http://localhost")
if err != nil {
logger.Error("connect websocket err", zap.Any("error", err))
}
defer ws.Close()
log.Println("subscribed to", wsURL)
for {
msg := make([]byte, 1024)
n, err := ws.Read(msg)
messageBusUrl, err := external.GetMessageBusAddress(config.CommonInfo.RuntimePath)
if err != nil {
log.Fatalln(err.Error())
logger.Error("get message bus url error", zap.Any("err", err))
return
}
var event message_bus.Event
if err := json.Unmarshal(msg[:n], &event); err != nil {
log.Println(err.Error())
}
propertiesStr, err := json.Marshal(event.Properties)
wsURL := fmt.Sprintf("ws://%s/event/%s", strings.ReplaceAll(messageBusUrl, "http://", ""), "local-storage")
ws, err := websocket.Dial(wsURL, "", "http://localhost")
if err != nil {
logger.Error("connect websocket err", zap.Any("error", err))
time.Sleep(time.Second * 5)
continue
}
model := model.EventModel{
SourceID: event.SourceID,
Name: event.Name,
Properties: string(propertiesStr),
UUID: *event.Uuid,
defer ws.Close()
log.Println("subscribed to", wsURL)
for {
msg := make([]byte, 1024)
n, err := ws.Read(msg)
if err != nil {
log.Fatalln(err.Error())
}
var event message_bus.Event
if err := json.Unmarshal(msg[:n], &event); err != nil {
log.Println(err.Error())
}
propertiesStr, err := json.Marshal(event.Properties)
if err != nil {
continue
}
model := model.EventModel{
SourceID: event.SourceID,
Name: event.Name,
Properties: string(propertiesStr),
UUID: *event.Uuid,
}
service.MyService.Event().CreateEvemt(model)
output, err := json.MarshalIndent(event, "", " ")
if err != nil {
log.Println(err.Error())
}
log.Println(string(output))
}
service.MyService.Event().CreateEvemt(model)
output, err := json.MarshalIndent(event, "", " ")
if err != nil {
log.Println(err.Error())
}
log.Println(string(output))
}
}

View File

@ -18,3 +18,7 @@ func (s *UserService) GetEvents(ctx echo.Context, params codegen.GetEventsParams
list := service.MyService.Event().GetEvents()
return ctx.JSON(http.StatusOK, list)
}
func (s *UserService) DeleteEventBySerial(ctx echo.Context, serial codegen.Serial) error {
service.MyService.Event().DeleteEventBySerial(serial)
return ctx.JSON(http.StatusOK, serial)
}

View File

@ -1,6 +1,8 @@
package service
import (
"encoding/json"
"github.com/IceWhaleTech/CasaOS-UserService/model"
"gorm.io/gorm"
)
@ -10,6 +12,7 @@ type EventService interface {
GetEvents() (list []model.EventModel)
GetEventByUUID(uuid string) (m model.EventModel)
DeleteEvent(uuid string)
DeleteEventBySerial(serial string)
}
type eventService struct {
@ -31,7 +34,23 @@ func (e *eventService) GetEventByUUID(uuid string) (m model.EventModel) {
func (e *eventService) DeleteEvent(uuid string) {
e.db.Where("uuid = ?", uuid).Delete(&model.EventModel{})
}
func (e *eventService) DeleteEventBySerial(serial string) {
list := []model.EventModel{}
e.db.Find(&list)
for _, v := range list {
if v.SourceID == "local-storage" {
properties := make(map[string]string)
err := json.Unmarshal([]byte(v.Properties), &properties)
if err != nil {
continue
}
if properties["serial"] == serial {
e.db.Delete(&v)
}
}
}
}
func NewEventService(db *gorm.DB) EventService {
return &eventService{db: db}
}