Projects
Eulaceura:Mainline:GA
distributed-beget
_service:obs_scm:0004-refactor-using-the-reacto...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:0004-refactor-using-the-reactor-framework.patch of Package distributed-beget
diff --git b/services/include/list.h b/services/include/list.h new file mode 100644 index 0000000..f45bdf5 --- /dev/null +++ b/services/include/list.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef BASE_STARTUP_INITLITE_LIST_H +#define BASE_STARTUP_INITLITE_LIST_H +#include <stddef.h> + +#ifdef __cplusplus +#if __cplusplus +extern "C" { +#endif +#endif + +typedef struct ListNode { + struct ListNode *next; + struct ListNode *prev; +} ListNode, ListHead; + +#define ListEmpty(node) \ + do { \ + node.next = &node; \ + node.prev = &node; \ + } while (0) \ + +#define ListEntry(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member))) + + +void OH_ListAddTail(struct ListNode *head, struct ListNode *item); +void OH_ListRemove(struct ListNode *item); + +#ifdef __cplusplus +#if __cplusplus +} +#endif +#endif + +#endif // BASE_STARTUP_INITLITE_LIST_H diff --git a/services/param/base/BUILD.gn b/services/param/base/BUILD.gn index 178ac87..b253055 100644 --- a/services/param/base/BUILD.gn +++ b/services/param/base/BUILD.gn @@ -11,7 +11,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import("//build/ohos.gni") -import("//build/config/sysroot.gni") config("exported_header_files") { visibility = [ ":*" ] @@ -19,7 +18,7 @@ config("exported_header_files") { "//base/startup/init/interfaces/innerkits/include", "//base/startup/init/services/include/param", "//base/startup/init/services/include", - "${sysroot}/usr/include/hilog", + "//base/hiviewdfx/hilog/interfaces/native/innerkits/include", ] } @@ -30,7 +29,7 @@ comm_sources = [ base_include_dirs = [ "//base/startup/init/services/param/include", "//base/startup/init/services/param/base", - "${sysroot}/usr/include/hilog", + "//base/hiviewdfx/hilog/interfaces/native/innerkits/include", ] source_set("parameterbase") { diff --git a/services/param/linux/param_request.c b/services/param/linux/param_request.c index afd95fe..76947f2 100644 --- a/services/param/linux/param_request.c +++ b/services/param/linux/param_request.c @@ -29,9 +29,8 @@ #include <stdio.h> #include "beget_ext.h" -#include "param_manager.h" -static void ClearEnv(ParamRequestMsg* pmsg, ParamRespMsg* respmsg, int fd) +static void ClearEnv(ParamReqMsg* pmsg, ParamRespMsg* respmsg, int fd) { if (pmsg != NULL) free(pmsg); @@ -49,9 +48,8 @@ static int GetClientSocket() struct sockaddr_un serverAddr; bzero(&serverAddr, sizeof(serverAddr)); serverAddr.sun_family = PF_UNIX; - strncpy(serverAddr.sun_path, PIPE_NAME, strlen(PIPE_NAME) + 1); + strncpy(serverAddr.sun_path, PIPE_NAME, strlen(PIPE_NAME)); if (connect(cfd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) { - close(cfd); perror("Failed to connect"); return -1; } @@ -59,33 +57,33 @@ static int GetClientSocket() return cfd; } -static struct ParamRequestMsg* GetRequestMsg(uint32_t type, uint32_t size) +static struct ParamReqMsg* GetRequestMsg(uint32_t type, uint32_t size) { uint32_t data_alloc_size = size; if (data_alloc_size > PARAM_VALUE_LEN_MAX || data_alloc_size == 0) data_alloc_size = PARAM_VALUE_LEN_MAX; - struct ParamRequestMsg *pmsg; + struct ParamReqMsg *pmsg; if (type == GET_PARAMETER) { - pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg)); - BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamRequestMsg"); - bzero(pmsg, sizeof(struct ParamRequestMsg)); + pmsg = (struct ParamReqMsg*)malloc(sizeof(struct ParamReqMsg)); + BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamReqMsg"); + bzero(pmsg, sizeof(struct ParamReqMsg)); } else if (type == SET_PARAMETER) { - pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg) + data_alloc_size); - BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamRequestMsg"); - bzero(pmsg, sizeof(struct ParamRequestMsg) + data_alloc_size); + pmsg = (struct ParamReqMsg*)malloc(sizeof(struct ParamReqMsg) + data_alloc_size); + BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamReqMsg"); + bzero(pmsg, sizeof(struct ParamReqMsg) + data_alloc_size); } else if (type == WAIT_PARAMETER) { - pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg) + data_alloc_size); - BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamRequestMsg"); - bzero(pmsg, sizeof(struct ParamRequestMsg) + data_alloc_size); + pmsg = (struct ParamReqMsg*)malloc(sizeof(struct ParamReqMsg) + data_alloc_size); + BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamReqMsg"); + bzero(pmsg, sizeof(struct ParamReqMsg) + data_alloc_size); } pmsg->datasize = data_alloc_size; pmsg->type = type; return pmsg; } -static struct ParamRespMsg* StartRequest(int fd, struct ParamRequestMsg* pmsg) +static struct ParamRespMsg* StartRequest(int fd, struct ParamReqMsg* pmsg) { - int ret = send(fd, pmsg, sizeof(struct ParamRequestMsg) + pmsg->datasize, 0); + int ret = send(fd, pmsg, sizeof(struct ParamReqMsg) + pmsg->datasize, 0); BEGET_ERROR_CHECK(ret > 0, return NULL, "Failed to send msg"); struct ParamRespMsg* respmsg = (struct ParamRespMsg*)malloc(sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX); @@ -106,13 +104,13 @@ int SystemSetParameter(const char *name, const char *value) int fd = GetClientSocket(); if (fd < 0) return -1; - struct ParamRequestMsg* pmsg = GetRequestMsg(SET_PARAMETER, strlen(value)); + struct ParamReqMsg* pmsg = GetRequestMsg(SET_PARAMETER, strlen(value)); if (pmsg == NULL) { close(fd); return -1; } - strncpy(pmsg->key, name, sizeof(pmsg->key) - 1); + strncpy(pmsg->key, name, sizeof(pmsg->key)); strncpy(pmsg->data, value, pmsg->datasize); int ret; struct ParamRespMsg* respmsg = StartRequest(fd, pmsg); @@ -138,10 +136,10 @@ int SystemReadParam(const char *name, char *value, uint32_t *len) int fd = GetClientSocket(); if (fd < 0) return -1; - struct ParamRequestMsg* pmsg = GetRequestMsg(GET_PARAMETER, *len); + struct ParamReqMsg* pmsg = GetRequestMsg(GET_PARAMETER, *len); BEGET_ERROR_CHECK(pmsg != NULL, close(fd);return -1, "Invalid pmsg"); - strncpy(pmsg->key, name, sizeof(pmsg->key) - 1); + strncpy(pmsg->key, name, sizeof(pmsg->key)); int ret; struct ParamRespMsg* respmsg = StartRequest(fd, pmsg); if (respmsg == NULL) { @@ -175,11 +173,14 @@ int SystemWaitParameter(const char *name, const char *value, int32_t timeout) if (fd < 0) return -1; - struct ParamRequestMsg* pmsg = GetRequestMsg(WAIT_PARAMETER, strlen(value) + 1); + struct ParamReqMsg* pmsg = GetRequestMsg(WAIT_PARAMETER, strlen(value) + 1); BEGET_ERROR_CHECK(pmsg != NULL, close(fd);return -1, "Invalid pmsg"); + if (timeout < 0) { + timeout = 30; + } pmsg->timeout = timeout; - strncpy(pmsg->key, name, sizeof(pmsg->key) - 1); + strncpy(pmsg->key, name, sizeof(pmsg->key)); strncpy(pmsg->data, value, sizeof(pmsg->datasize)); struct ParamRespMsg* respmsg = StartRequest(fd, pmsg); if (respmsg == NULL) { diff --git a/services/param/linux/param_request.h b/services/param/linux/param_request.h index dd95f1b..8264ed4 100644 --- a/services/param/linux/param_request.h +++ b/services/param/linux/param_request.h @@ -3,13 +3,13 @@ #include "parameter.h" -typedef struct ParamRequestMsg { +typedef struct ParamReqMsg { uint32_t type; uint32_t datasize; uint32_t timeout; char key[PARAM_NAME_LEN_MAX]; char data[0]; -} ParamRequestMsg; +} ParamReqMsg; typedef struct ParamRespMsg { uint32_t flag; diff --git a/services/param_service/BUILD.gn b/services/param_service/BUILD.gn index 84f429f..cfcabce 100644 --- a/services/param_service/BUILD.gn +++ b/services/param_service/BUILD.gn @@ -21,6 +21,8 @@ ohos_executable("param_service") { "src/param_server.c", "src/le_utils.c", "src/trie_comm.c", + "src/hash.c", + "src/base_task.c" ] include_dirs = [ @@ -28,10 +30,14 @@ ohos_executable("param_service") { "//base/startup/init/interfaces/innerkits/include/syspara", "//base/startup/init/interfaces/innerkits/include", "//base/startup/init/services/param/include", - "//base/startup/init/services/param/linux/", + "//base/startup/init/services/param/linux", + "//base/startup/init/services/include", ] deps = [ "//base/startup/init/services/utils:libinit_utils" ] + + cflags = [ "-Wno-incompatible-pointer-types" ] + external_deps = [ "c_utils:utils", ] diff --git b/services/param_service/include/base_task.h b/services/param_service/include/base_task.h new file mode 100644 index 0000000..372c33e --- /dev/null +++ b/services/param_service/include/base_task.h @@ -0,0 +1,79 @@ +#ifndef BSAE_TASK_H +#define BSAE_TASK_H +#include <stdint.h> + +#include "list.h" +#include "hash.h" +#include "base_task.h" +#include "param_request.h" + +#define DEFAULT_MAX_EVENTS 1024 + +typedef void* LoopHandle; + +typedef enum : uint32_t { + Event_Read, + Event_Write, +} EventType; + +typedef enum : uint32_t { + NORMAL_TYPE, + WAIT_TYPE, +} ClientType; + +typedef struct EventBuffer_ { + uint32_t datasize; + uint8_t data[0]; +} EventBuffer; + +typedef struct Content_ { + HashNode hashNode; + ParamRespMsg *respmsg; +} Content; + +typedef struct BaseTask_ { + int taskId; + HashNode hashNode; + void (*close)(LoopHandle, struct BaseTask_*); + void (*handleEvent)(LoopHandle, struct BaseTask_*, EventType); +} BaseTask; + +typedef struct WaitInfo_ { + ListNode anchor; + int32_t timeout; + int32_t taskId; + char condition[0]; +} WaitInfo; + +typedef struct ClientTask_ { + BaseTask base; + uint32_t type; + void (*recvMessage)(LoopHandle, BaseTask*); + void (*sendMessage)(LoopHandle, BaseTask*); + void (*disconnect)(LoopHandle, BaseTask*); + union { + EventBuffer *content; + void *extraInfo; + } info; +} ClientTask; + +typedef struct ServerTask_ { + BaseTask base; + void (*incommingConnect)(LoopHandle, BaseTask*); +} ServerTask; + +typedef struct EventLoop_ { + int epollFd; + int maxevents; + void (*Run)(struct EventLoop_*); + void (*AddEvent)(struct EventLoop_*, BaseTask*, EventType); + void (*ModEvent)(struct EventLoop_*, BaseTask*, EventType); + void (*DelEvent)(struct EventLoop_*, BaseTask*); + HashTab *tab; +} EventLoop; + +EventLoop* GetDefaultLoop(); +BaseTask* CreateBaseTask(EventLoop *loop, uint32_t size); +void RunLoop(EventLoop *loop); + +#endif // BSAE_TASK_H \ No newline at end of file diff --git b/services/param_service/include/hash.h b/services/param_service/include/hash.h new file mode 100644 index 0000000..d898e30 --- /dev/null +++ b/services/param_service/include/hash.h @@ -0,0 +1,37 @@ +#ifndef HASH_H +#define HASH_H + +#include <stddef.h> + +#define HASHNODE_ENTRY(ptr, type, member) ((type*)((char*)(ptr) - offsetof(type, member))) + +typedef struct HashNode { + struct HashNode *next; +} HashNode; + +typedef struct HashTab { + int (*nodeHash)(HashNode*); + int (*keyHash)(const void *key); + int (*nodeCompare)(HashNode*, HashNode*); + int (*keyCompare)(HashNode *node, const void *key); + void (*nodeFree)(HashNode*); + int maxBucket; + HashNode *buckets[0]; +} HashTab; + +typedef struct { + int (*nodeHash)(HashNode*); + int (*keyHash)(const void *key); + int (*nodeCompare)(HashNode*, HashNode*); + int (*keyCompare)(HashNode *node, const void *key); + void (*nodeFree)(HashNode*); + int maxBucket; +} HashInfo; + +int HashTabCreate(HashTab **tab, HashInfo *info); +int HashNodeAdd(HashTab *tab, HashNode *node); +void HashNodeRemove(HashTab *tab, HashNode *node); // only remove, don't free +int HashTabDestroy(HashTab *tab); +HashNode* GetHashNode(HashTab *tab, const void* key); + +#endif // HASH_H \ No newline at end of file diff --git a/services/param_service/include/param_server.h b/services/param_service/include/param_server.h index 7bca45f..91668a9 100644 --- a/services/param_service/include/param_server.h +++ b/services/param_service/include/param_server.h @@ -1,22 +1,13 @@ -#ifndef LE_SOCKET_H -#define LE_SOCKET_H -#include <stdint.h> +#ifndef PARAM_SERVER_H +#define PARAM_SERVER_H +#include <pthread.h> + #include "param_utils.h" #include "parameter.h" +#include "list.h" +#include "base_task.h" -#define LOOP_MAX_CLIENT 1024 -#define LOOP_MAX_SOCKET 1024 - -struct EventArgs { - int epollFd; - int clientFd; -}; - -enum { - SOCK_UNKNOWN = -1, - SOCK_DISCONNECTED, - SOCK_CONNECTED, -}; +#define MAX_CLIENT 1024 -void ParamServerStart(); -#endif // LE_SOCKET_H +int ParamServerInit(EventLoop*); +#endif // PARAM_SERVER_H diff --git a/services/param_service/include/trie_comm.h b/services/param_service/include/trie_comm.h index dfd08ec..df1181a 100644 --- a/services/param_service/include/trie_comm.h +++ b/services/param_service/include/trie_comm.h @@ -7,10 +7,10 @@ #define WORKSPACE_NAME WORKSPACE_DIR "/param.tmp" #define WORKSPACE_SIZE (1024*1000) -typedef struct ListNode { +typedef struct TrieListNode { uint32_t prev; uint32_t next; -} ListNode; +} TrieListNode; typedef struct ParamNode { uint8_t keyLen; @@ -19,7 +19,7 @@ typedef struct ParamNode { } ParamNode; typedef struct TrieNode { - ListNode node; + TrieListNode node; uint32_t child; uint32_t left; uint32_t right; @@ -39,5 +39,4 @@ int ParamWorkSpaceInit(); int SetParamtoMem(const char* key, const char* value); int GetParamFromMem(const char* key, char* value, uint32_t len); int WaitParam(const char* key, const char* value, uint32_t timeout); -void DumpParam(); #endif // TRIE_UTILS_H \ No newline at end of file diff --git b/services/param_service/include/trie_queue.h b/services/param_service/include/trie_queue.h new file mode 100644 index 0000000..6c96f96 --- /dev/null +++ b/services/param_service/include/trie_queue.h @@ -0,0 +1,84 @@ +#ifndef TRIE_QUEUE_H +#define TRIE_QUEUE_H +#include <stdlib.h> + +#include "trie_comm.h" +#include <stdio.h> + +typedef struct QueueItem { + struct QueueItem* prev; + struct QueueItem* next; + TrieNode* node; +} QueueItem; + +typedef struct TrieNodeQueue { + int size; + int ready; + QueueItem queue; + void (*push)(struct TrieNodeQueue*, TrieNode*); + TrieNode* (*pop)(struct TrieNodeQueue*); +} TrieNodeQueue; + +void TrieNodePush(TrieNodeQueue* tq, TrieNode* node); +TrieNode* TrieNodePop(TrieNodeQueue* tq); + +inline void TrieQueueFirstStageInit(TrieNodeQueue* tq) +{ + if (tq == NULL) { + return; + } + tq->size = 0; + tq->ready = 0; + tq->queue.next = &(tq->queue); + tq->queue.prev = &(tq->queue); + tq->push = TrieNodePush; + tq->pop = TrieNodePop; +} + +inline void TrieQueueSecondStageInit(TrieNodeQueue* tq) +{ + if (tq == NULL) { + return; + } + tq->ready = 1; +} + +inline void TrieNodePush(TrieNodeQueue* tq, TrieNode* node) +{ + if (tq == NULL || node == NULL) { + return; + } + if (!tq->ready) { + return; + } + QueueItem* item = (QueueItem*)malloc(sizeof(QueueItem)); + if (!item) { + return; + } + + QueueItem* queue = &tq->queue; + item->node = node; + item->next = queue; + item->prev = queue->prev; + queue->prev->next = item; + queue->prev = item; + tq->size++; +} + +inline TrieNode* TrieNodePop(TrieNodeQueue* tq) +{ + if (tq == NULL || tq->size == 0) { + return NULL; + } + + QueueItem* queue = &tq->queue; + QueueItem* item = queue->prev; + queue->prev = queue->prev->prev; + queue->prev->next = queue; + tq->size--; + TrieNode* node = item->node; + free(item); + return node; +} + +#endif // TRIE_QUEUE_H \ No newline at end of file diff --git b/services/param_service/src/base_task.c b/services/param_service/src/base_task.c new file mode 100644 index 0000000..e97d7dc --- /dev/null +++ b/services/param_service/src/base_task.c @@ -0,0 +1,118 @@ +#include <errno.h> +#include <stdlib.h> +#include <sys/epoll.h> + +#include "beget_ext.h" +#include "base_task.h" + +static EventLoop *staticloop = NULL; + +void AddEvent_(EventLoop *loop, BaseTask *task, EventType event) +{ + struct epoll_event ev = {}; + ev.data.fd = task->taskId; + if (event == Event_Read) { + ev.events = EPOLLIN; + } else if (event == Event_Write) { + ev.events = EPOLLOUT; + } + + (void)epoll_ctl(loop->epollFd, EPOLL_CTL_ADD, task->taskId, &ev); +} + +void ModEvent_(EventLoop *loop, BaseTask *task, EventType event) +{ + struct epoll_event ev = {}; + ev.data.fd = task->taskId; + if (event == Event_Read) { + ev.events = EPOLLIN; + } else if (event == Event_Write) { + ev.events = EPOLLOUT; + } + + (void)epoll_ctl(loop->epollFd, EPOLL_CTL_MOD, task->taskId, &ev); +} + +void DelEvent_(EventLoop *loop, BaseTask *task) +{ + (void)epoll_ctl(loop->epollFd, EPOLL_CTL_DEL, task->taskId, NULL); +} + +void ProcessEvent(EventLoop *loop, int fd, EventType type) +{ + BEGET_ERROR_CHECK(loop != NULL && loop->tab != NULL, return, "%s, invalid param", __func__); + HashNode *node = GetHashNode(loop->tab, &fd); + if (node == NULL) { + return; + } + BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode); + task->handleEvent(loop, task, type); +} + +void Run_(EventLoop *loop) +{ + BEGET_ERROR_CHECK(loop != NULL, return, "invalid param"); + struct epoll_event *events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * loop->maxevents); + BEGET_ERROR_CHECK(events!= NULL, return, "fail to allocate space to epoll event"); + + while (1) { + int num = epoll_wait(loop->epollFd, events, loop->maxevents, -1); + for (int i = 0; i < num; ++i) { + if (events[i].events & EPOLLIN) { + ProcessEvent(loop, events[i].data.fd, Event_Read); + } else if (events[i].events & EPOLLOUT) { + ProcessEvent(loop, events[i].data.fd, Event_Write); + } + } + } +} + +static int CreateLoop(EventLoop **loop) +{ + if (loop == NULL) { + return -1; + } + EventLoop *handle= (EventLoop*)calloc(1, sizeof(EventLoop)); + BEGET_ERROR_CHECK(handle != NULL, return -1, "fail to allocate space for EventLoop"); + handle->epollFd = epoll_create(DEFAULT_MAX_EVENTS); + BEGET_ERROR_CHECK(handle->epollFd > 0, free(handle);return -1, "failed to create epoll. errno [%d]", errno); + handle->Run = Run_; + handle->AddEvent = AddEvent_; + handle->ModEvent = ModEvent_; + handle->DelEvent = DelEvent_; + handle->maxevents = DEFAULT_MAX_EVENTS; + handle->tab == NULL; + *loop = handle; + return 0; +} + + +BaseTask* CreateBaseTask(EventLoop *loop, uint32_t size) +{ + BaseTask *task = (BaseTask*)calloc(1, size); + BEGET_ERROR_CHECK(task != NULL, return NULL, "fail to create base task"); + task->hashNode.next = NULL; + return task; +} + +EventLoop* GetDefaultLoop() +{ + if (staticloop != NULL) { + return staticloop; + } + int ret = CreateLoop(&staticloop); + BEGET_ERROR_CHECK(ret == 0, return NULL, "fail to create default loop"); + return staticloop; +} + +void RunLoop(EventLoop *loop) +{ + if (loop != NULL && loop->Run != NULL) { + loop->Run(loop); + } +} + +void StopLoop() +{ + +} \ No newline at end of file diff --git b/services/param_service/src/hash.c b/services/param_service/src/hash.c new file mode 100644 index 0000000..ad3021c --- /dev/null +++ b/services/param_service/src/hash.c @@ -0,0 +1,106 @@ +#include "hash.h" +#include "beget_ext.h" + +#include <stdlib.h> + +int HashTabCreate(HashTab **tab, HashInfo *info) +{ + BEGET_ERROR_CHECK(tab != NULL && info != NULL, return -1, "%s : invalid arguments", __func__); + *tab = (HashTab*)calloc(1, sizeof(HashTab) + sizeof(HashNode) * info->maxBucket); + BEGET_ERROR_CHECK(*tab != NULL, return -1, "fail to calloc hash tab"); + (*tab)->nodeHash = info->nodeHash; + (*tab)->keyHash = info->keyHash; + (*tab)->nodeCompare = info->nodeCompare; + (*tab)->keyCompare = info->keyCompare; + (*tab)->nodeFree = info->nodeFree; + (*tab)->maxBucket = info->maxBucket; + return 0; +} + +static HashNode* CheckHashNodeIsExist(HashTab *tab, HashNode* head, HashNode *target) +{ + int ret; + HashNode *tmp = head; + while (tmp) { + ret = tab->nodeCompare(tmp, target); + if (ret == 0) { + return tmp; + } + tmp = tmp->next; + } + return NULL; +} + +int HashNodeAdd(HashTab *tab, HashNode *node) +{ + BEGET_ERROR_CHECK(tab != NULL && node != NULL, return -1, "%s : invalid param", __func__); + int hashCode = tab->nodeHash(node); + hashCode = hashCode > 0 ? hashCode : -hashCode; + hashCode = hashCode % tab->maxBucket; + HashNode *tmp = CheckHashNodeIsExist(tab, tab->buckets[hashCode], node); + if (tmp != NULL) { + BEGET_LOGE("node was exist"); + return -1; + } + node->next = tab->buckets[hashCode]; + tab->buckets[hashCode] = node; + return 0; +} + +void HashNodeRemove(HashTab *tab, HashNode *node) +{ + BEGET_ERROR_CHECK(tab != NULL && node != NULL, return, "%s : invalid param", __func__); + int hashCode = tab->nodeHash(node); + hashCode = hashCode > 0 ? hashCode : -hashCode; + hashCode = hashCode % tab->maxBucket; + HashNode *current = tab->buckets[hashCode]; + HashNode *prepare = NULL; + + while (current) { + int ret = tab->nodeCompare(current, node); + if (ret == 0) { + if (current == tab->buckets[hashCode]) { + tab->buckets[hashCode] = current->next; + } else { + prepare->next = current->next; + } + return; + } + prepare = current; + current = current->next; + } +} + +int HashTabDestroy(HashTab *tab) +{ + BEGET_ERROR_CHECK(tab != NULL, return -1, "%s : invalid arguments", __func__); + if (tab->nodeFree == NULL) { + BEGET_LOGE("%s : can not find node free func", __func__); + return -1; + } + for (int i = 0; i < tab->maxBucket; ++i) { + while(tab->buckets[i]) { + HashNode *next = tab->buckets[i]->next; + tab->nodeFree(tab->buckets[i]); + tab->buckets[i] = next; + } + } + return 0; +} + +HashNode* GetHashNode(HashTab *tab, const void *key) +{ + BEGET_ERROR_CHECK(tab != NULL && key != NULL, return NULL, "%s : invalid param", __func__); + int hashCode = tab->keyHash(key); + hashCode = hashCode > 0 ? hashCode : -hashCode; + hashCode = hashCode % tab->maxBucket; + HashNode *tmp = tab->buckets[hashCode]; + while (tmp != NULL) { + int ret = tab->keyCompare(tmp, key); + if (ret == 0) { + return tmp; + } + tmp = tmp->next; + } + return NULL; +} \ No newline at end of file diff --git a/services/param_service/src/le_utils.c b/services/param_service/src/le_utils.c index 8fa0401..c4f5b69 100644 --- a/services/param_service/src/le_utils.c +++ b/services/param_service/src/le_utils.c @@ -3,8 +3,6 @@ #include <fcntl.h> #include <unistd.h> #include <string.h> -#include <sys/stat.h> -#include "securec.h" #define MAX_BUF 1024 diff --git a/services/param_service/src/main.c b/services/param_service/src/main.c index 1daa246..58ef853 100644 --- a/services/param_service/src/main.c +++ b/services/param_service/src/main.c @@ -6,8 +6,7 @@ #include <sys/prctl.h> #include <string.h> #include "beget_ext.h" -#include "trie_comm.h" -#include "param_server.h" +#include "base_task.h"; int main(int argc, char* argv[]) { @@ -17,11 +16,18 @@ int main(int argc, char* argv[]) return -1; } - int ret = ParamWorkSpaceInit(); + EventLoop *defaultLoop = GetDefaultLoop(); + int ret = ParamServerInit(defaultLoop); + if (ret != 0) { + BEGET_LOGE("ParamServerInit failed\n"); + } + + ret = ParamWorkSpaceInit(); if (ret != 0) { BEGET_LOGE("ParamWorkSpaceInit failed\n"); exit(EXIT_FAILURE); } - ParamServerStart(); + + RunLoop(defaultLoop); return 0; } diff --git a/services/param_service/src/param_server.c b/services/param_service/src/param_server.c index 8b38d0f..9ae06a3 100644 --- a/services/param_service/src/param_server.c +++ b/services/param_service/src/param_server.c @@ -3,13 +3,13 @@ #include <sys/un.h> #include <sys/stat.h> #include <sys/epoll.h> +#include <sys/timerfd.h> #include <errno.h> #include <unistd.h> #include <stddef.h> #include <stdio.h> #include <stdlib.h> #include <string.h> -#include <pthread.h> #include "beget_ext.h" #include "param_server.h" @@ -19,153 +19,445 @@ #include "securec.h" #include "le_utils.h" -void HandleEvent(struct EventArgs* args) -{ - int clientFd = args->clientFd; - int epollFd = args->epollFd; - struct ParamRequestMsg* pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg) + PARAM_VALUE_LEN_MAX); - BEGET_ERROR_CHECK(pmsg != NULL, return, "failed to malloc ParamRequestMsg"); - bzero(pmsg, sizeof(struct ParamRequestMsg) + PARAM_VALUE_LEN_MAX); - pmsg->datasize = PARAM_VALUE_LEN_MAX; - int status = SOCK_CONNECTED; - while (1) { - int ret = recv(clientFd, pmsg, sizeof(struct ParamRequestMsg) + pmsg->datasize, 0); - if (ret == 0) { - status = SOCK_DISCONNECTED; - break; - } else if (ret < 0) { - if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { - continue; - } - status = SOCK_UNKNOWN; - break; - } else { - break; - } +static int ParamTimerCreate(EventLoop *loop); +struct ParamTimer { + int8_t isCreate; + BaseTask *task; +}; + +static struct ParamTimer ptimer = {0, NULL}; + +static ListNode* GetAwaitHead() +{ + static ListNode *awaitHead = NULL; + if (awaitHead == NULL) { + ListNode *head = (ListNode*)malloc(sizeof(ListNode)); + BEGET_ERROR_CHECK(head != NULL, return NULL, "%s, failed to allocate space", __func__); + head->next = head; + head->prev = head; + awaitHead = head; } + return awaitHead; +} - if (status != SOCK_CONNECTED) { - epoll_ctl(epollFd, EPOLL_CTL_DEL, clientFd, NULL); - free(pmsg); +static void CloseTask(LoopHandle handle, BaseTask *task) +{ + if (handle == NULL || task == NULL) { return; } + EventLoop *loop = (EventLoop*)handle; + loop->DelEvent(loop, task); + close(task->taskId); + HashNodeRemove(loop->tab, &task->hashNode); + free(task); +} + +int32_t SocketRecv(LoopHandle handle, BaseTask *task, EventBuffer *buf, size_t length) +{ + BEGET_ERROR_CHECK(handle != NULL && task != NULL && buf != NULL, return -1, "%s : invalid param", __func__); + int32_t readlen = (int32_t)recv(task->taskId, buf->data + buf->datasize, length, 0); + if (readlen > 0) { + buf->datasize += readlen; + } + return readlen; +} - int ret; - struct ParamRespMsg* respmsg = (struct ParamRespMsg*)malloc(sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX); - BEGET_ERROR_CHECK(respmsg != NULL, free(pmsg);return, "Failed to malloc ParamRespMsg"); - bzero(respmsg, sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX); - switch(pmsg->type) { +int32_t SocketSend(LoopHandle handle, BaseTask *task, EventBuffer *buf) +{ + BEGET_ERROR_CHECK(handle != NULL && task != NULL && buf != NULL, return -1, "%s : invalid param", __func__); + int32_t writelen = (int32_t)send(task->taskId, buf->data, buf->datasize, 0); + return writelen; +} + +static void CheckAndTriggerWait(LoopHandle handle, char *key, char *value) +{ + BEGET_ERROR_CHECK(handle != NULL && key != NULL && value != NULL, return, "%s, invalid value", __func__); + EventLoop *loop = (EventLoop*)handle; + char fullStr[PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX] = {0}; + sprintf(fullStr, "%s=%s", key, value); + ListNode *head = GetAwaitHead(); + ListNode *tmp = head->next; + BEGET_ERROR_CHECK(head != NULL, return, "%s, invalid list node", __func__); + while (tmp != head) { + WaitInfo *info = ListEntry(tmp, WaitInfo, anchor); + if (strcmp(info->condition, fullStr) == 0) { + HashNode *hashNode = GetHashNode(loop->tab, &info->taskId); + BaseTask *task = HASHNODE_ENTRY(hashNode, BaseTask, hashNode); + loop->AddEvent(loop, task, Event_Write); + OH_ListRemove(&info->anchor); + } + tmp = tmp->next; + } +} + +static void HandleMessageInner_(LoopHandle handle, ClientTask *task, ParamReqMsg *msg) +{ + BEGET_ERROR_CHECK(handle != NULL && task != NULL && msg != NULL, return, "%s : invalid param", __func__); + EventLoop *loop = (EventLoop*)handle; + uint32_t flag; + char data[PARAM_VALUE_LEN_MAX] = {0}; + switch(msg->type) { case SET_PARAMETER: { - ret = SetParamtoMem(pmsg->key, pmsg->data); - respmsg->flag = ret; + task->type = NORMAL_TYPE; + flag = SetParamtoMem(msg->key, msg->data); + if (flag == 0) { + CheckAndTriggerWait(handle, msg->key, msg->data); + } break; } case GET_PARAMETER: { - if (pmsg->datasize > PARAM_VALUE_LEN_MAX) { - pmsg->datasize = PARAM_VALUE_LEN_MAX; - } - ret = GetParamFromMem(pmsg->key, respmsg->data, pmsg->datasize); - respmsg->flag = ret; - if (ret == 0) { - respmsg->datasize = strlen(respmsg->data); - } + task->type = NORMAL_TYPE; + flag = GetParamFromMem(msg->key, data, PARAM_VALUE_LEN_MAX); break; } case WAIT_PARAMETER: { - ret = WaitParam(pmsg->key, pmsg->data, pmsg->timeout); - respmsg->flag = ret; + task->type = NORMAL_TYPE; + flag = WaitParam(msg->key, msg->data, msg->timeout); + if (flag != 0) { + task->type = WAIT_TYPE; + WaitInfo *info = (WaitInfo*)malloc(sizeof(WaitInfo) + msg->datasize); + BEGET_ERROR_CHECK(info != NULL, break, "%s, failed to allocate wait info space", __func__); + sprintf(info->condition, "%s=%s", msg->key, msg->data); + info->timeout = msg->timeout; + info->taskId = task->base.taskId; + task->info.extraInfo = (void*)info; + OH_ListAddTail(GetAwaitHead(), &info->anchor); + loop->DelEvent(loop, (BaseTask*)task); + ParamTimerCreate(loop); + return; + } break; } default: - respmsg->flag = -1; + task->type = NORMAL_TYPE; + flag = -1; break; } - ret = send(clientFd, respmsg, sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX, 0); - if (ret < 0) { - BEGET_LOGE("Failed to send data to : %d\n", clientFd); + EventBuffer *buf; + if (strlen(data) > 0) { + buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + sizeof(ParamRespMsg) + strlen(data) + 1); + BEGET_ERROR_CHECK(buf != NULL, return, "%s, failed to allocate buf", __func__); + buf->datasize = sizeof(ParamRespMsg) + strlen(data) + 1; + ParamRespMsg *respmsg = (ParamRespMsg*)(buf->data); + respmsg->flag = flag; + respmsg->datasize = strlen(data); + (void)memcpy_s(respmsg->data, strlen(data) + 1, data, strlen(data) + 1); + } else { + buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + sizeof(ParamRespMsg)); + BEGET_ERROR_CHECK(buf != NULL, return, "%s, failed to allocate buf", __func__); + buf->datasize = sizeof(ParamRespMsg); + ParamRespMsg *respmsg = buf->data; + respmsg->flag = flag; } - free(pmsg); - free(respmsg); + + task->info.content = buf; + loop->ModEvent(loop, (BaseTask*)task, Event_Write); } -int CtlAdd(int epollfd, int fd, uint32_t event) +static void OnSendMessage(LoopHandle handle, BaseTask *task) { - struct epoll_event ev = { - .data.fd = fd, - .events = event, - }; - int ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev); - BEGET_ERROR_CHECK(ret == 0, return -1, "failed to add epoll_ctl fd %d. errno [%d]", fd, errno); - return 0; + BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__); + EventLoop *loop = (EventLoop*)handle; + ClientTask *clienttask = (ClientTask*)task; + + EventBuffer *buf = NULL; + if (clienttask->type == NORMAL_TYPE) { + BEGET_ERROR_CHECK(clienttask->info.content != NULL, return, "no message to send"); + buf = clienttask->info.content; + } else if (clienttask->type == WAIT_TYPE) { + BEGET_ERROR_CHECK(clienttask->info.extraInfo != NULL, return, "no message to send"); + WaitInfo *info = (WaitInfo*)clienttask->info.extraInfo; + buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + sizeof(ParamRespMsg)); + BEGET_ERROR_CHECK(buf != NULL, return, "%s, failed to allocate buf", __func__); + buf->datasize = sizeof(ParamRespMsg); + ParamRespMsg *respmsg = buf->data; + respmsg->datasize = 0; + if (info->timeout > 0) { + respmsg->flag = 0; + } else { + respmsg->flag = -1; + } + free(info); + } + + int32_t ret = SocketSend(handle, task, buf); + if (ret < 0) { + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { + BEGET_LOGE("%s, resource busy, try again", __func__); + return; + } + BEGET_LOGE("%s, SocketSend fail, errno : %d", __func__, errno); + } + + free(buf); + loop->ModEvent(loop, task, Event_Read); } -void StartEpoll(int listenfd) -{ - int epollfd = epoll_create(LOOP_MAX_SOCKET); - BEGET_ERROR_CHECK(epollfd > 0, return, "failed to create epoll. errno [%d]", errno); - - int ret = CtlAdd(epollfd, listenfd, EPOLLIN); - BEGET_ERROR_CHECK(ret == 0, close(epollfd); return, "failed to CtlAdd"); - - struct epoll_event *events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * LOOP_MAX_SOCKET); - BEGET_ERROR_CHECK(events != NULL, close(epollfd); return, "failed to alloc memory for epoll_event"); - - while(1) { - int number = epoll_wait(epollfd, events, LOOP_MAX_SOCKET, -1); - for (int index = 0; index < number; ++index) { - int fd_ = events[index].data.fd; - if (fd_ == listenfd) { - struct sockaddr_un clientAddr; - socklen_t addrlen = sizeof(clientAddr); - bzero(&clientAddr, addrlen); - int clientfd = accept(listenfd, (struct sockaddr*)&clientAddr, &addrlen); - BEGET_ERROR_CHECK(clientfd >= 0, close(epollfd); return, "failed to accept socket"); - SetNoBlock(clientfd); - SetCloseExec(clientfd); - ret = CtlAdd(epollfd, clientfd, EPOLLIN | EPOLLET | EPOLLONESHOT); - BEGET_ERROR_CHECK(ret == 0, continue, "failed to CtlAdd"); - } else { - pthread_t threadId; - struct EventArgs args = {epollfd, fd_}; - ret = pthread_create(&threadId, NULL, (void*)HandleEvent, (void*)&args); - BEGET_ERROR_CHECK(ret == 0, continue, "faild to create pthread to handle parameter event"); +static void OnRecvMessage(LoopHandle handle, BaseTask *task) +{ + BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__); + EventLoop *loop = (EventLoop*)handle; + uint32_t payload = (uint32_t)sizeof(ParamReqMsg); + EventBuffer *buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + payload); + BEGET_ERROR_CHECK(buf != NULL, return, "%s, fail to allocate recv buf", __func__); + + int32_t recvlen = payload; + while (buf->datasize != payload) { + int32_t ret = SocketRecv(handle, task, buf, recvlen); + if (ret < 0) { + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { + continue; + } + BEGET_LOGE("Process SocketRecv fail, errno : ", errno); + goto CLOSE; + } else if (ret == 0) { + BEGET_LOGI("%d, client normal exist", task->taskId); + task->close(handle, task); + goto CLOSE; + } + recvlen = payload - buf->datasize; + } + + ParamReqMsg *reqmsg = buf->data; + if (reqmsg->datasize > 0) { + EventBuffer *tmp = (EventBuffer*)calloc(1, sizeof(EventBuffer) + payload + reqmsg->datasize); + (void)memcpy_s(tmp, payload, buf, payload); + free(buf); + buf = tmp; + reqmsg = buf->data; + recvlen = reqmsg->datasize; + payload += reqmsg->datasize; + while (buf->datasize != payload) { + int32_t ret = SocketRecv(handle, task, buf, recvlen); + if (ret < 0) { + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { + continue; + } + BEGET_LOGE("Process SocketRecv fail, errno : ", errno); + goto CLOSE; + } else if (ret == 0) { + BEGET_LOGI("%d, client normal exist", task->taskId); + task->close(handle, task); + goto CLOSE; } + recvlen = payload - buf->datasize; } } - close(epollfd); - free(events); + + HandleMessageInner_(handle, (ClientTask*)task, reqmsg); +CLOSE: + free(buf); } -int CreateSocket() +static void HandleClientEvent(LoopHandle handle, BaseTask *task, EventType type) { - unlink(PIPE_NAME); - int listenfd = socket(PF_UNIX, SOCK_STREAM, 0); - BEGET_ERROR_CHECK(listenfd > 0, return -1, "failed to create socket. errno [%d]", errno); + BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__); + ClientTask *clienttask = (ClientTask*)task; + if (type == Event_Read) { + clienttask->recvMessage(handle ,task); + } else if (type == Event_Write) { + clienttask->sendMessage(handle ,task); + } else { + BEGET_LOGE("%s, invalid type", __func__); + } +} + +static void ServerInCommingConnect(LoopHandle handle, BaseTask *task) +{ + BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__); + EventLoop *loop = (EventLoop*)handle; + + struct sockaddr_un clientAddr; + socklen_t addrlen = sizeof(clientAddr); + int clientfd = accept(task->taskId, (struct sockaddr*)&clientAddr, &addrlen); + BEGET_ERROR_CHECK(clientfd >= 0, return, "%s : failed to accept socket, %d", __func__, errno); + BEGET_LOGV("client fd = %d", clientfd); + SetNoBlock(clientfd); + SetCloseExec(clientfd); + ClientTask *clienttask = (ClientTask*)CreateBaseTask(loop, sizeof(ClientTask)); + BEGET_ERROR_CHECK(clienttask != NULL, close(clientfd); return, "%s : failed to create client task. errno [%d]", __func__, errno); + clienttask->base.taskId = clientfd; + clienttask->base.close = CloseTask; + clienttask->base.handleEvent = HandleClientEvent; + clienttask->recvMessage = OnRecvMessage; + clienttask->sendMessage = OnSendMessage; + clienttask->disconnect = NULL; + HashNodeAdd(loop->tab, &clienttask->base.hashNode); + loop->AddEvent(loop, (BaseTask*)clienttask, Event_Read); +} + +static void HandleServerEvent(LoopHandle handle, BaseTask *task, EventType type) +{ + (void)type; + BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__); + ServerTask *servertask = (ServerTask*)task; + servertask->incommingConnect(handle, task); +} + +static void CheckWaitParamTimeout(LoopHandle handle, uint64_t expire) +{ + BEGET_ERROR_CHECK(handle != NULL, return, "%s, invalid valud", __func__); + EventLoop *loop = (EventLoop*)handle; + ListNode *head = GetAwaitHead(); + ListNode *tmp = head->next; + BEGET_ERROR_CHECK(head != NULL, return, "%s, invalid list node", __func__); + while (tmp != head) { + WaitInfo *info = ListEntry(tmp, WaitInfo, anchor); + if (info->timeout > 0) { + info->timeout -= expire; + } else { + HashNode *hashNode = GetHashNode(loop->tab, &info->taskId); + BaseTask *task = HASHNODE_ENTRY(hashNode, BaseTask, hashNode); + loop->AddEvent(loop, task, Event_Write); + OH_ListRemove(&info->anchor); + } + tmp = tmp->next; + } + if (head->next == head) { + ptimer.task->close(handle, ptimer.task); + ptimer.isCreate = 0; + } +} +static void HandleTimerEvent(LoopHandle handle, BaseTask *task, EventType type) +{ + (void)type; + uint64_t exp; + read(task->taskId, &exp, sizeof(uint64_t)); + CheckWaitParamTimeout(handle, exp); + BEGET_LOGI("Entry timer task, exp : %ld", exp); +} + +static int ParamServerCreate(EventLoop *loop) +{ + BEGET_ERROR_CHECK(loop != NULL && loop->tab != NULL, return -1, "%s : invalid loop", __func__); + int server = socket(PF_UNIX, SOCK_STREAM, 0); + BEGET_ERROR_CHECK(server > 0, return -1, "failed to create socket. errno [%d]", errno); struct sockaddr_un serverAddr; - (void)memset_s(&serverAddr, sizeof(serverAddr), 0, sizeof(serverAddr)); serverAddr.sun_family = AF_UNIX; strncpy(serverAddr.sun_path, PIPE_NAME, sizeof(serverAddr.sun_path)); uint32_t size = offsetof(struct sockaddr_un, sun_path) + strlen(PIPE_NAME); - int ret = bind(listenfd, (struct sockaddr*)&serverAddr, size); - BEGET_ERROR_CHECK(ret >= 0, close(listenfd); return -1, "failed to bind socket. errno [%d]", errno); + int ret = bind(server, (struct sockaddr*)&serverAddr, size); + BEGET_ERROR_CHECK(ret >= 0, close(server); return -1, "failed to bind socket. errno [%d]", errno); + + SetNoBlock(server); + SetCloseExec(server); + ret = listen(server, MAX_CLIENT); + BEGET_ERROR_CHECK(ret >= 0, close(server); return -1, "failed to listen socket. errno [%d]", errno); + + ServerTask *servertask = (ServerTask*)CreateBaseTask(loop, sizeof(ServerTask)); + BEGET_ERROR_CHECK(servertask != NULL, close(server); return -1, "failed to create server task. errno [%d]", errno); + servertask->base.taskId = server; + servertask->base.close = CloseTask; + servertask->base.handleEvent = HandleServerEvent; + servertask->incommingConnect = ServerInCommingConnect; + HashNodeAdd(loop->tab, &servertask->base.hashNode); + loop->AddEvent(loop, (BaseTask*)servertask, Event_Read); + return 0; +} + +static int ParamTimerCreate(EventLoop *loop) +{ + if (ptimer.isCreate) { + return 0; + } + BEGET_ERROR_CHECK(loop != NULL && loop->tab != NULL, return -1, "%s : invalid loop", __func__); + int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC); + BEGET_ERROR_CHECK(timerfd > 0, return -1, "failed to create timerfd. errno [%d]", errno); + struct itimerspec timespec = { + .it_interval.tv_sec = 1, + .it_interval.tv_nsec = 0, + .it_value.tv_sec = 1, + .it_value.tv_nsec = 0, + }; + int ret = timerfd_settime(timerfd, 0, ×pec, NULL); + BEGET_ERROR_CHECK(ret == 0, return -1, "failed to set timerfd. errno [%d]", errno); + ptimer.task = (BaseTask*)CreateBaseTask(loop, sizeof(BaseTask)); + BEGET_ERROR_CHECK(ptimer.task != NULL, close(timerfd); return -1, "failed to create timer task. errno [%d]", errno); + ptimer.task->taskId = timerfd; + ptimer.task->handleEvent = HandleTimerEvent; + ptimer.task->close = CloseTask; + HashNodeAdd(loop->tab, &ptimer.task->hashNode); + loop->AddEvent(loop, ptimer.task, Event_Read); + ptimer.isCreate = 1; + return 0; +} + +static int NodeHash(HashNode *node) +{ + BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode); + return task->taskId; +} + +static int KeyHash(const void *key) +{ + int id = *(int*)key; + return id; +} + +static int NodeCompare(HashNode *node_1, HashNode *node_2) +{ + BaseTask *task_1 = HASHNODE_ENTRY(node_1, BaseTask, hashNode); + BaseTask *task_2 = HASHNODE_ENTRY(node_2, BaseTask, hashNode); + return (task_1->taskId - task_2->taskId); +} - SetNoBlock(listenfd); - SetCloseExec(listenfd); - ret = listen(listenfd, LOOP_MAX_CLIENT); - BEGET_ERROR_CHECK(ret >= 0, close(listenfd); return -1, "failed to listen socket. errno [%d]", errno); - ret = chmod(PIPE_NAME, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); - BEGET_ERROR_CHECK(ret == 0, close(listenfd); return -1, "failed to chmod %s. errno [%d]", PIPE_NAME, errno); +static int KeyCompare(HashNode *node, const void *key) +{ + BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode); + if (task == NULL) { + BEGET_LOGE("%s, invalid task", __func__); + return -1; + } + return (task->taskId - *((int*)key)); +} - return listenfd; +static void NodeFree(HashNode *node) +{ + BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode); + if (task->close != NULL) { + task->close(GetDefaultLoop(), task); + } + free(task); } -void ParamServerStart() +static void ResourceInit() { + unlink(PIPE_NAME); + chmod(PIPE_NAME, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); MakeDirRecursive(PIPE_PATH, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH); - int listenfd = CreateSocket(); - StartEpoll(listenfd); +} + +static int InitDefaultLoopTab(EventLoop *defaultLoop) +{ + BEGET_ERROR_CHECK(defaultLoop != NULL, return -1, "%s : invalid loop", __func__); + HashInfo info = { + .nodeHash = NodeHash, + .keyHash = KeyHash, + .nodeCompare = NodeCompare, + .keyCompare = KeyCompare, + .nodeFree = NodeFree, + .maxBucket = 128, + }; + HashTab *tab = NULL; + int ret = HashTabCreate(&tab, &info); + BEGET_ERROR_CHECK(ret == 0, return -1, "failed to create hash tab. errno [%d]", ret); + defaultLoop->tab = tab; + return 0; +} + +int ParamServerInit(EventLoop *defaultLoop) +{ + BEGET_ERROR_CHECK(defaultLoop != NULL, return -1, "%s, invalid event loop", __func__); + ResourceInit(); + + int ret; + ret = InitDefaultLoopTab(defaultLoop); + BEGET_ERROR_CHECK(ret == 0, return -1, "%s : failed to init loop", __func__); + ret = ParamServerCreate(defaultLoop); + BEGET_ERROR_CHECK(ret == 0, return -1, "%s : failed to create param server.", __func__); + return 0; } diff --git a/services/param_service/src/trie_comm.c b/services/param_service/src/trie_comm.c index fd8184a..ac31243 100644 --- a/services/param_service/src/trie_comm.c +++ b/services/param_service/src/trie_comm.c @@ -14,20 +14,23 @@ #include <sys/mman.h> #include <time.h> #include <signal.h> +#include <semaphore.h> #include "trie_comm.h" +#include "trie_queue.h" #include "le_utils.h" #include "param_utils.h" #include "parameter.h" #include "securec.h" static TrieHeader* paramWorkSpace; +static TrieNodeQueue trieQueue = {0}; +static sem_t dump_sem; static pthread_rwlock_t rwlock; -static pthread_mutex_t mtlock; -static atomic_bool cnt; -static atomic_bool waitCnt; +static pthread_mutex_t queuelock; +static atomic_int updateCnt = 0; -uint32_t trie_alloc(char* name) +static uint32_t trie_alloc(char* name) { BEGET_ERROR_CHECK(name != NULL, return 0, "invalid name"); uint32_t keySize = strlen(name) + 1; @@ -48,7 +51,7 @@ uint32_t trie_alloc(char* name) return nowOffset; } -uint32_t param_alloc(uint32_t size) +static uint32_t param_alloc(uint32_t size) { uint32_t allocSize = PARAM_ALIGN(sizeof(ParamNode) + size); uint32_t nowOffset = paramWorkSpace->currOffest; @@ -61,41 +64,41 @@ uint32_t param_alloc(uint32_t size) return nowOffset; } -TrieNode* GetRootNode() +static TrieNode* GetRootNode() { BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return NULL, "failed"); return (paramWorkSpace->shareAddr + paramWorkSpace->rootOffest); } -TrieNode* GetTrieEntry(uint32_t index) +static TrieNode* GetTrieEntry(uint32_t index) { BEGET_ERROR_CHECK(index <= paramWorkSpace->currOffest, return NULL, "invalid index"); TrieNode* entry = paramWorkSpace->shareAddr + index; return entry; } -ParamNode* GetParamEntry(uint32_t index) +static ParamNode* GetParamEntry(uint32_t index) { BEGET_ERROR_CHECK(index <= paramWorkSpace->currOffest, return NULL, "invalid index"); ParamNode* entry = paramWorkSpace->shareAddr + index; return entry; } -ListNode* GetListNodeEntry(uint32_t index) +static TrieListNode* GetTrieListNodeEntry(uint32_t index) { BEGET_ERROR_CHECK(index <= paramWorkSpace->currOffest, return NULL, "invalid index"); - ListNode* entry = paramWorkSpace->shareAddr + index;; + TrieListNode* entry = paramWorkSpace->shareAddr + index;; return entry; } -TrieNode* ListNodeGetTrieEntry(ListNode* node) +static TrieNode* TrieListNodeGetTrieEntry(TrieListNode* node) { BEGET_ERROR_CHECK(node != NULL, return NULL, "invalid node"); TrieNode* entry = (TrieNode*)((char*)node - offsetof(TrieNode, node)); return entry; } -void GetSubKey(const char* remainKey, char** subKey, uint32_t* prefixLen) +static void GetSubKey(const char* remainKey, char** subKey, uint32_t* prefixLen) { BEGET_ERROR_CHECK(remainKey != NULL, return, "invalid remainKey"); BEGET_ERROR_CHECK(subKey != NULL, return, "invalid subKey"); @@ -107,7 +110,7 @@ void GetSubKey(const char* remainKey, char** subKey, uint32_t* prefixLen) } } -int CompareKey(const char* nodeKey, const char* prefixKey, uint32_t prefixKeyLen) +static int CompareKey(const char* nodeKey, const char* prefixKey, uint32_t prefixKeyLen) { uint32_t nodeKeyLen = strlen(nodeKey); if (nodeKeyLen > prefixKeyLen) { @@ -118,7 +121,7 @@ int CompareKey(const char* nodeKey, const char* prefixKey, uint32_t prefixKeyLen return strncmp(nodeKey, prefixKey, prefixKeyLen); } -TrieNode* FindSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen) +static TrieNode* FindSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen) { if (current == NULL || remainKey == NULL) return NULL; @@ -138,7 +141,7 @@ TrieNode* FindSubTrieNode(TrieNode* current, const char* remainKey, uint32_t pre return FindSubTrieNode(subTrieNode, remainKey, prefixLen); } -TrieNode* AddSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen) +static TrieNode* AddSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen) { if (current == NULL || remainKey == NULL) return NULL; @@ -170,10 +173,10 @@ TrieNode* AddSubTrieNode(TrieNode* current, const char* remainKey, uint32_t pref return AddSubTrieNode(subTrieNode, remainKey, prefixLen); } -int CheckParamName(const char* name) +static int CheckParamName(const char* name) { BEGET_ERROR_CHECK(name != NULL, return -1, "invalid parameter name"); - size_t nameLen = strlen(name); + int nameLen = (int)strlen(name); if (name[0] == '.' || name[nameLen - 1] == '.') return -1; for (int i = 0; i < nameLen; ++i) { @@ -191,145 +194,7 @@ int CheckParamName(const char* name) return 0; } -int SetParamtoMem(const char* key, const char* value) -{ - BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace"); - BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value"); - BEGET_ERROR_CHECK((strlen(key) > 0) && (strlen(key) <= PARAM_NAME_LEN_MAX), return -1, "invalid key len"); - BEGET_ERROR_CHECK((strlen(value) > 0) && (strlen(value) <= PARAM_VALUE_LEN_MAX), return -1, "invalid value len"); - BEGET_ERROR_CHECK(CheckParamName(key) == 0, return -1, "invalid parameter name"); - - TrieNode* root = GetRootNode(); - TrieNode* current = GetRootNode(); - if (root == NULL || current == NULL) - return -1; - - char* remainKey = (char *)key; - pthread_rwlock_wrlock(&rwlock); - while(1) { - char* subKey; - uint32_t prefixLen; - GetSubKey(remainKey, &subKey, &prefixLen); - if (current->child != 0) { - current = AddSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen); - BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not AddSubTrieNode"); - } else { - char prefixKey[PARAM_NAME_LEN_MAX] = {0}; - (void)memcpy_s(prefixKey, PARAM_NAME_LEN_MAX, remainKey, prefixLen); - current->child = trie_alloc(prefixKey); - BEGET_ERROR_CHECK(current->child != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not alloc tire node"); - current = GetTrieEntry(current->child); - BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get trie entry"); - } - if (subKey == NULL) { - if (current->dataIndex) { - int ret = strncmp(key, CONST_PREFIX, strlen(CONST_PREFIX)) ; - BEGET_ERROR_CHECK(ret != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not change the value of a constant parameter"); - ParamNode* saveParam = GetParamEntry(current->dataIndex); - BEGET_ERROR_CHECK(saveParam != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry"); - (void)memcpy_s(saveParam->data + saveParam->keyLen + 1, PARAM_VALUE_LEN_MAX, value, strlen(value)); - saveParam->valueLen = strlen(value); - break; - } - uint32_t allocSize = strlen(key) + PARAM_VALUE_LEN_MAX + 2; - current->dataIndex = param_alloc(allocSize); - ParamNode* saveParam = GetParamEntry(current->dataIndex); - BEGET_ERROR_CHECK((current->dataIndex != 0) && (saveParam != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not alloc param or get param entry"); - sprintf(saveParam->data, "%s=%s", key, value); - saveParam->keyLen = strlen(key); - saveParam->valueLen = strlen(value); - - current->node.prev = root->node.prev; - current->node.next = (void*)(&root->node) - paramWorkSpace->shareAddr; - ListNode* rootPrevListNode = GetListNodeEntry(root->node.prev); - TrieNode* rootPrevTrieNode = ListNodeGetTrieEntry(rootPrevListNode); - BEGET_ERROR_CHECK((rootPrevListNode != NULL) && (rootPrevTrieNode != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not get list entry or get trie entry"); - rootPrevTrieNode->node.next = (void*)(¤t->node) - paramWorkSpace->shareAddr; - root->node.prev = (void*)(¤t->node) - paramWorkSpace->shareAddr; - break; - } - remainKey = subKey + 1; - } - atomic_store(&cnt, 1); - atomic_store(&waitCnt, 1); - pthread_rwlock_unlock(&rwlock); - return 0; -} - -int GetParamFromMem(const char* key, char* value, uint32_t len) -{ - BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace"); - BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value"); - - TrieNode* current = GetRootNode(); - if (current == NULL) - return -1; - - ParamNode* paramData; - char* remainKey = (char *)key; - pthread_rwlock_rdlock(&rwlock); - while (1) { - char* subKey; - uint32_t prefixLen; - GetSubKey(remainKey, &subKey, &prefixLen); - current = FindSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen); - BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not find sub trie node : %s", key); - if (subKey == NULL) { - paramData = GetParamEntry(current->dataIndex); - BEGET_ERROR_CHECK(paramData != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry"); - break; - } - remainKey = subKey + 1; - } - - if (len > paramData->valueLen) { - (void)memcpy_s(value, PARAM_VALUE_LEN_MAX, paramData->data + paramData->keyLen + 1, paramData->valueLen); - value[paramData->valueLen] = '\0'; - } else { - (void)memcpy_s(value, len, paramData->data + paramData->keyLen + 1, len); - value[len] = '\0'; - } - pthread_rwlock_unlock(&rwlock); - return 0; -} - -int WaitParam(const char* key, const char* value, uint32_t timeout) -{ - BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace"); - BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value"); - int ret; - char tmp[PARAM_VALUE_LEN_MAX] = {0}; - ret = GetParamFromMem(key, tmp, sizeof(tmp)); - if (ret == 0) { - if (strncmp(value, "*", strlen(value)) == 0) { - return 0; - } - if (strlen(value) == strlen(tmp) && strncmp(value, tmp, strlen(value)) == 0) { - return 0; - } - bzero(tmp, sizeof(tmp)); - } - while (timeout != 0) { - if (atomic_load(&waitCnt)) { - atomic_store(&waitCnt, 0); - ret = GetParamFromMem(key, tmp, sizeof(tmp)); - if (ret == 0) { - if (strlen(tmp) == 1 && strncmp(value, "*", 1) == 0) { - return 0; - } - if (strlen(value) == strlen(tmp) && strncmp(value, tmp, strlen(value)) == 0) { - return 0; - } - bzero(tmp, sizeof(tmp)); - } - } - --timeout; - sleep(1); - } - return -1; -} - -void WritetoDisk(TrieNode* node, FILE* fp) +static void WritetoDisk(TrieNode* node, FILE* fp) { BEGET_ERROR_CHECK(node != NULL, return, "invalid node"); BEGET_ERROR_CHECK(fp != NULL, return, "invalid file descriptor "); @@ -344,57 +209,125 @@ void WritetoDisk(TrieNode* node, FILE* fp) fputs(buf, fp); } -void DumpParam() +static void FullWrite() +{ + // clean up trieQueue + pthread_mutex_lock(&queuelock); + while (trieQueue.size > 0) { + trieQueue.pop(&trieQueue); + } + pthread_mutex_unlock(&queuelock); + unlink(USER_PARAM_FILE); + FILE* fp = fopen(USER_PARAM_FILE, "w+"); + TrieNode* root = GetRootNode(); + TrieListNode* current = GetTrieListNodeEntry(root->node.next); + BEGET_ERROR_CHECK((root != NULL) && (current != NULL), fclose(fp); return, "can not get root node or get list entry"); + while (current != &root->node) { + TrieNode* trienode = TrieListNodeGetTrieEntry(current); + WritetoDisk(trienode, fp); + current = GetTrieListNodeEntry(current->next); + } + fclose(fp); +} + +static void AppendWrite() { - if (atomic_load(&cnt)) { - pthread_mutex_lock(&mtlock); - atomic_store(&cnt, 0); - unlink(USER_PARAM_FILE); - FILE* fp = fopen(USER_PARAM_FILE, "a+"); - TrieNode* root = GetRootNode(); - ListNode* current = GetListNodeEntry(root->node.next); - BEGET_ERROR_CHECK((root != NULL) && (current != NULL), pthread_mutex_unlock(&mtlock); fclose(fp); return, "can not get root node or get list entry"); - while (current != &root->node) { - TrieNode* trienode = ListNodeGetTrieEntry(current); + FILE* fp = fopen(USER_PARAM_FILE, "a+"); + pthread_mutex_lock(&queuelock); + while (trieQueue.size > 0) { + TrieNode* trienode = trieQueue.pop(&trieQueue); + if (trienode != NULL) { WritetoDisk(trienode, fp); - current = GetListNodeEntry(current->next); } - fclose(fp); - pthread_mutex_unlock(&mtlock); } + pthread_mutex_unlock(&queuelock); + fclose(fp); } -void ProcessParamFile(char* fileName) +static void DumpParam() +{ + FullWrite(); // Init阶段,相同key值的字段仅有一份 + while (1) { + sem_wait(&dump_sem); + sem_init(&dump_sem, 0, 0); + /* + persist属性的字段为持久化保存,当含有persist字段的key更新或新增次数未达到累积次数时(此处临界值为50),从队列中获取更改或新增的字段,对file进行追加型写入,保存方式如下: + 第一次设置时,file 内容: + persist.openeuler.version=22.03 + 第二次设置时,file 内容: + persist.openeuler.version=22.03 + persist.openeuler.version=23.03 + 当达到累积次数时,不再从队列中获取更改或新增的字段,遍历整个workspace,重新对file写入, 此时相同key值的字段仅有一份,保存方式如下: + 第一次设置时,file 内容: + persist.openeuler.version=22.03 + 第二次设置时,file 内容: + persist.openeuler.version=23.03 + */ + if (atomic_load(&updateCnt) >= 50) { + atomic_store(&updateCnt, 0); + FullWrite(); + } else { + AppendWrite(); + } + } +} + +static void ProcessParamFile(char* fileName) { BEGET_ERROR_CHECK(access(fileName, F_OK) == 0, perror("error"); return, "failed to access %s", fileName); FILE* fp = fopen(fileName, "r"); BEGET_ERROR_CHECK(fp != NULL, return, "failed to open %s", fileName); - char buf[PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX]; - bzero(buf, sizeof(buf)); - while (fgets(buf, sizeof(buf), fp) != NULL) { - buf[strlen(buf) - 1] = '\0'; - if (*buf == '#') + + char *line = (char*)calloc(1, PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX); + while (fgets(line, PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX, fp) != NULL) { + line[strlen(line) - 1] = '\0'; + if (*line == '#') continue; - char *sep = buf; - char *key = NULL; + + // Skip the line beginning spaces + while (isspace(*line) && (*line != '\0')) { + line++; + } + + // Skip the spaces at the end of line + int len = strlen(line); + while (len > 0 && isspace(line[len - 1])) { + line[len - 1] = '\0'; + len--; + } + + if (*line == '\0') + continue; + + char *sep = line; + char *key = sep; char *value = NULL; while (*sep != '\0') { + if (isspace(*sep)) { + *sep = '\0'; + } if (*sep == '=') { *sep = '\0'; value = sep + 1; - key = buf; break; } - ++sep; + sep++; + } + + if (value) { + // Skip the value beginning spaces + while (isspace(*value) && (*value != '\0')) { + value++; + } + if (*value == '\0') + continue; + SetParamtoMem(key, value); } - if (key) { - SetParamtoMem(key, value); - } } fclose(fp); } -void ReadFileInDir(char* dir, char* postfix) +static void ReadFileInDir(char* dir, char* postfix) { BEGET_ERROR_CHECK((dir != NULL) && (postfix != NULL), return, "invalid directory"); DIR* pDir = opendir(dir); @@ -417,7 +350,7 @@ void ReadFileInDir(char* dir, char* postfix) closedir(pDir); } -void LoadParam(char* dir) +static void LoadParam(char* dir) { BEGET_ERROR_CHECK(dir != NULL, return, "invalid directory"); struct stat st; @@ -428,35 +361,16 @@ void LoadParam(char* dir) } // 定时持久化数据 -void CreateParamListener() +static int CreateParamListener() { - atomic_init(&cnt, 0); - struct sigevent sigev; - bzero(&sigev, sizeof(struct sigevent)); - sigev.sigev_notify = SIGEV_THREAD; - sigev.sigev_notify_function = DumpParam; - sigev.sigev_notify_attributes = NULL; - - timer_t timerId; - if (timer_create(CLOCK_REALTIME, &sigev, &timerId) != 0) { - perror("timer_create:"); - exit(EXIT_FAILURE); - } - - struct itimerspec value; - bzero(&value, sizeof(struct itimerspec)); - value.it_value.tv_sec = 1; - value.it_value.tv_nsec = 0; - value.it_interval.tv_sec = 1; - value.it_interval.tv_nsec = 0; - - if (timer_settime(timerId, 0, &value, NULL) != 0) { - perror("timer_settime:"); - exit(EXIT_FAILURE); - } + pthread_t dp; + int ret = pthread_create(&dp, NULL, (void*)DumpParam, NULL); + BEGET_ERROR_CHECK(ret == 0, return ret, "failed to create param listener"); + pthread_detach(dp); + return 0; } -void InitRootNode() +static void InitRootNode() { BEGET_ERROR_CHECK(paramWorkSpace != NULL, return, "invalid paramWorkSpace"); TrieNode* rootNode = paramWorkSpace->shareAddr + trie_alloc("#"); @@ -469,16 +383,16 @@ int ParamWorkSpaceInit() MakeDirRecursive(SYSTEM_PARAM_PATH, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH); MakeDirRecursive(USER_PARAM_PATH, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH); MakeDirRecursive(WORKSPACE_DIR, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH); + + sem_init(&dump_sem, 0, 0); pthread_rwlock_init(&rwlock, NULL); - pthread_mutex_init(&mtlock, NULL); - atomic_init(&waitCnt, 0); + pthread_mutex_init(&queuelock, NULL); paramWorkSpace = (TrieHeader*)malloc(sizeof(TrieHeader)); BEGET_ERROR_CHECK(paramWorkSpace != NULL, return -1, "failed to malloc for param workspace"); int fd = open(WORKSPACE_NAME, O_CREAT | O_RDWR | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); BEGET_ERROR_CHECK(fd > 0, return -1, "failed to open %s", WORKSPACE_NAME); - int ret = ftruncate(fd, WORKSPACE_SIZE); - (void)ret; + ftruncate(fd, WORKSPACE_SIZE); paramWorkSpace->shareAddr = mmap(NULL, WORKSPACE_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); BEGET_ERROR_CHECK(paramWorkSpace->shareAddr != MAP_FAILED, return -1, "failed to create mmap"); paramWorkSpace->rootOffest = 0; @@ -486,8 +400,142 @@ int ParamWorkSpaceInit() paramWorkSpace->trieSize = 0; paramWorkSpace->paramSize = 0; InitRootNode(); + + // before LoadParam + TrieQueueFirstStageInit(&trieQueue); LoadParam(SYSTEM_PARAM_PATH); LoadParam(USER_PARAM_PATH); - CreateParamListener(); + // behind LoadParam + TrieQueueSecondStageInit(&trieQueue); + + return CreateParamListener(); +} + +int SetParamtoMem(const char* key, const char* value) +{ + BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace"); + BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value"); + BEGET_ERROR_CHECK((strlen(key) > 0) && (strlen(key) <= PARAM_NAME_LEN_MAX), return -1, "invalid key len"); + BEGET_ERROR_CHECK((strlen(value) > 0) && (strlen(value) <= PARAM_VALUE_LEN_MAX), return -1, "invalid value len"); + BEGET_ERROR_CHECK(CheckParamName(key) == 0, return -1, "invalid parameter name"); + + TrieNode* root = GetRootNode(); + TrieNode* current = GetRootNode(); + if (root == NULL || current == NULL) + return -1; + + char* remainKey = key; + pthread_rwlock_wrlock(&rwlock); + while(1) { + char* subKey; + uint32_t prefixLen; + GetSubKey(remainKey, &subKey, &prefixLen); + if (current->child != 0) { + current = AddSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen); + BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not AddSubTrieNode"); + } else { + char prefixKey[PARAM_NAME_LEN_MAX] = {0}; + (void)memcpy_s(prefixKey, PARAM_NAME_LEN_MAX, remainKey, prefixLen); + current->child = trie_alloc(prefixKey); + BEGET_ERROR_CHECK(current->child != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not alloc tire node"); + current = GetTrieEntry(current->child); + BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get trie entry"); + } + if (subKey == NULL) { + if (current->dataIndex) { // Param update + int ret = strncmp(key, CONST_PREFIX, strlen(CONST_PREFIX)) ; + BEGET_ERROR_CHECK(ret != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not change the value of a constant parameter"); + ParamNode* saveParam = GetParamEntry(current->dataIndex); + BEGET_ERROR_CHECK(saveParam != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry"); + (void)memcpy_s(saveParam->data + saveParam->keyLen + 1, PARAM_VALUE_LEN_MAX, value, strlen(value)); + saveParam->valueLen = strlen(value); + atomic_fetch_add_explicit(&updateCnt, 1, memory_order_relaxed); + break; + } else { // Param add + uint32_t allocSize = strlen(key) + PARAM_VALUE_LEN_MAX + 2; + current->dataIndex = param_alloc(allocSize); + ParamNode* saveParam = GetParamEntry(current->dataIndex); + BEGET_ERROR_CHECK((current->dataIndex != 0) && (saveParam != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not alloc param or get param entry"); + sprintf(saveParam->data, "%s=%s", key, value); + saveParam->keyLen = strlen(key); + saveParam->valueLen = strlen(value); + + current->node.prev = root->node.prev; + current->node.next = (void*)(&root->node) - paramWorkSpace->shareAddr; + TrieListNode* rootPrevTrieListNode = GetTrieListNodeEntry(root->node.prev); + TrieNode* rootPrevTrieNode = TrieListNodeGetTrieEntry(rootPrevTrieListNode); + BEGET_ERROR_CHECK((rootPrevTrieListNode != NULL) && (rootPrevTrieNode != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not get list entry or get trie entry"); + rootPrevTrieNode->node.next = (void*)(¤t->node) - paramWorkSpace->shareAddr; + root->node.prev = (void*)(¤t->node) - paramWorkSpace->shareAddr; + break; + } + } + remainKey = subKey + 1; + } + + if (strncmp(key, PARAM_PERSIST_PREFIX, strlen(PARAM_PERSIST_PREFIX)) == 0) { + pthread_mutex_lock(&queuelock); + trieQueue.push(&trieQueue, current); + pthread_mutex_unlock(&queuelock); + sem_post(&dump_sem); + } + pthread_rwlock_unlock(&rwlock); return 0; } + +int GetParamFromMem(const char* key, char* value, uint32_t len) +{ + BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace"); + BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value"); + + TrieNode* current = GetRootNode(); + if (current == NULL) + return -1; + + ParamNode* paramData; + char* remainKey = key; + pthread_rwlock_rdlock(&rwlock); + while (1) { + char* subKey; + uint32_t prefixLen; + GetSubKey(remainKey, &subKey, &prefixLen); + current = FindSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen); + BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not find sub trie node : %s", key); + if (subKey == NULL) { + paramData = GetParamEntry(current->dataIndex); + BEGET_ERROR_CHECK(paramData != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry"); + break; + } + remainKey = subKey + 1; + } + + if (len > paramData->valueLen) { + (void)memcpy_s(value, PARAM_VALUE_LEN_MAX, paramData->data + paramData->keyLen + 1, paramData->valueLen); + value[paramData->valueLen] = '\0'; + } else { + (void)memcpy_s(value, len, paramData->data + paramData->keyLen + 1, len); + value[len] = '\0'; + } + pthread_rwlock_unlock(&rwlock); + return 0; +} + +int WaitParam(const char* key, const char* value, uint32_t timeout) +{ + BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace"); + BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value"); + int ret; + char tmp[PARAM_VALUE_LEN_MAX] = {0}; + ret = GetParamFromMem(key, tmp, sizeof(tmp)); + if (ret == 0) { + if (strncmp(value, "*", strlen(value)) == 0) { + return 0; + } + if (strlen(value) == strlen(tmp) && strncmp(value, tmp, strlen(value)) == 0) { + return 0; + } + bzero(tmp, sizeof(tmp)); + } + + return -1; +} diff --git a/services/utils/BUILD.gn b/services/utils/BUILD.gn index e5f6a96..30529be 100755 --- a/services/utils/BUILD.gn +++ b/services/utils/BUILD.gn @@ -10,24 +10,27 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import("//build/config/sysroot.gni") config("exported_header_files") { visibility = [ ":*" ] include_dirs = [ "//base/startup/init/services/include", - "${sysroot}/usr/include/hilog", + "//base/hiviewdfx/hilog/interfaces/native/innerkits/include", ] } import("//build/ohos.gni") ohos_static_library("libinit_utils") { - sources = [ "init_utils.c" ] + sources = [ + "init_utils.c", + "list.c" + ] public_configs = [ ":exported_header_files" ] include_dirs = [ "//base/startup/init/interfaces/innerkits/include", "//third_party/bounds_checking_function/include", - "${sysroot}/usr/include/hilog", + "//base/hiviewdfx/hilog/interfaces/native/innerkits/include", + "//base/startup/init/services/include" ] deps = [ "//base/hiviewdfx/hilog/interfaces/native/innerkits:libhilog", diff --git a/services/utils/init_utils.c b/services/utils/init_utils.c index 8b4b2e0..733f863 100644 --- a/services/utils/init_utils.c +++ b/services/utils/init_utils.c @@ -49,7 +49,7 @@ float ConvertMicrosecondToSecond(int x) } #ifndef __LITEOS_M__ -__attribute__((unused)) static bool CheckDigit(const char *name) +static bool CheckDigit(const char *name) { size_t nameLen = strlen(name); for (size_t i = 0; i < nameLen; ++i) { diff --git b/services/utils/list.c b/services/utils/list.c new file mode 100644 index 0000000..2ef1ad5 --- /dev/null +++ b/services/utils/list.c @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "list.h" + +#include <stddef.h> +#include <stdlib.h> + +void OH_ListAddTail(struct ListNode *head, struct ListNode *item) +{ + if (head == NULL || item == NULL) { + return; + } + item->next = head; + item->prev = head->prev; + head->prev->next = item; + head->prev = item; +} + +void OH_ListRemove(struct ListNode *item) +{ + if (item == NULL) { + return; + } + item->next->prev = item->prev; + item->prev->next = item->next; +} \ No newline at end of file
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2