Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

RPCSocketServer.ts 6.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. import http = require('http');
  2. import bsock = require('bsock');
  3. import * as uuid from "uuid/v4"
  4. import { Socket } from "./RPCSocketServer"
  5. type rpcType = 'hook' | 'unhook' | 'call'
  6. export type Outcome = "Success" | "Error"
  7. export type Visibility = "127.0.0.1" | "0.0.0.0"
  8. /* Responses */
  9. export class Response{
  10. constructor(
  11. public message?:string
  12. ){}
  13. }
  14. export class SuccessResponse extends Response{
  15. result:Outcome = "Success"
  16. constructor(
  17. message?:string
  18. ){
  19. super(message)
  20. }
  21. }
  22. export class ErrorResponse extends Response{
  23. result:Outcome = "Error"
  24. constructor(
  25. message: string = "Unknown error"
  26. ){
  27. super(message)
  28. }
  29. }
  30. export class SubscriptionResponse extends SuccessResponse{
  31. constructor(
  32. public uid: string,
  33. message?:string
  34. ){
  35. super(message)
  36. }
  37. }
  38. export type UnhookFunction = (uid:string) => Promise<SuccessResponse | ErrorResponse>
  39. export type callbackFunction = (...args) => Promise<SubscriptionResponse | ErrorResponse>
  40. export type AsyncFunction = (...args) => Promise<any>
  41. export interface RPCExporter{
  42. name: string
  43. exportRPCs() : socketioRPC[]
  44. exportPublicRPCs() : socketioRPC[]
  45. }
  46. type baseRPC = {
  47. type: rpcType
  48. name: string
  49. }
  50. type hookRPC = baseRPC & {
  51. type: 'hook'
  52. func: callbackFunction
  53. unhook: UnhookFunction
  54. }
  55. type unhookRPC = baseRPC & {
  56. type: 'unhook'
  57. func: UnhookFunction
  58. }
  59. type callRPC = baseRPC & {
  60. type: 'call'
  61. func: (...args) => Promise<any>
  62. }
  63. export type socketioRPC = callRPC | unhookRPC | hookRPC
  64. export type baseInfo = {
  65. owner: string,
  66. argNames: string[],
  67. }
  68. type HookInfo = baseRPC & baseInfo & {
  69. type: 'hook',
  70. generator: (socket) => callbackFunction
  71. unhook: UnhookFunction
  72. }
  73. type UnhookInfo = baseRPC & baseInfo & {
  74. type: 'unhook',
  75. func: UnhookFunction
  76. }
  77. type CallInfo = baseRPC & baseInfo & {
  78. type: 'call',
  79. func: AsyncFunction
  80. }
  81. type RpcInfo = HookInfo | UnhookInfo | CallInfo
  82. export type ExtendedRpcInfo = RpcInfo & { uniqueName: string }
  83. export const rpcToRpcinfo = (rpc : socketioRPC, owner: string):RpcInfo => {
  84. switch(rpc.type){
  85. case "call" :
  86. return {
  87. owner: owner,
  88. argNames: extractArgs(rpc.func),
  89. type: rpc.type,
  90. name: rpc.name,
  91. func: rpc.func,
  92. }
  93. case "unhook" :
  94. return {
  95. owner: owner,
  96. argNames: extractArgs(rpc.func),
  97. type: rpc.type,
  98. name: rpc.name,
  99. func: rpc.func,
  100. }
  101. case "hook" :
  102. const generator = hookGenerator(rpc)
  103. return {
  104. owner: owner,
  105. argNames: extractArgs(generator(undefined)),
  106. type: rpc.type,
  107. name: rpc.name,
  108. unhook: rpc.unhook,
  109. generator: generator,
  110. }
  111. }
  112. }
  113. function rpcHooker(socket: Socket, exporter:RPCExporter, makeUnique = true):ExtendedRpcInfo[]{
  114. const owner = exporter.name
  115. const RPCs = [...exporter.exportPublicRPCs(), ...exporter.exportRPCs()]
  116. const suffix = makeUnique?"-"+uuid().substr(0,4):""
  117. return RPCs.map(rpc => rpcToRpcinfo(rpc, owner))
  118. .map(info => {
  119. const ret:any = info
  120. ret.uniqueName = info.name+suffix
  121. switch(info.type){
  122. case "hook":
  123. socket.hook(ret.uniqueName, info.generator(socket))
  124. break;
  125. default:
  126. socket.hook(ret.uniqueName, info.func)
  127. }
  128. socket.on('close', () => socket.unhook(info.name))
  129. return ret
  130. })
  131. }
  132. const hookGenerator = (rpc:hookRPC): HookInfo['generator'] => {
  133. const argsArr = extractArgs(rpc.func)
  134. argsArr.pop()
  135. const args = argsArr.join(',')
  136. return eval(`(socket) => async (`+args+`) => {
  137. const res = await rpc.func(`+args+(args.length!==0?',':'')+` (x) => {
  138. socket.call(res.uid, x)
  139. })
  140. if(res.result == 'Success'){
  141. socket.on('close', async () => {
  142. const unhookRes = await rpc.unhook(res.uid)
  143. console.log("Specific close handler for", rpc.name, res.uid, unhookRes)
  144. })
  145. }
  146. return res
  147. }`)
  148. }
  149. const extractArgs = (f:Function):string[] => {
  150. let fn = String(f)
  151. let args = fn.substr(0, fn.indexOf(")"))
  152. args = args.substr(fn.indexOf("(")+1)
  153. let ret = args.split(",")
  154. return ret
  155. }
  156. type OnFunction = (type: 'error' | 'close', f: (e?:any)=>void) => Socket
  157. export interface Socket {
  158. port: number
  159. hook: (rpcname: string, ...args: any[]) => Socket
  160. unhook: (rpcname:string) => Socket
  161. call: (rpcname:string, ...args: any[]) => Promise<any>
  162. fire: (rpcname:string, ...args: any[]) => Promise<any>
  163. on: OnFunction
  164. destroy: ()=>void
  165. close: ()=>void
  166. }
  167. export type RPCSocketConf = {
  168. connectionHandler: (socket:Socket) => void
  169. errorHandler: (socket:Socket) => (error:any) => void
  170. closeHandler: (socket:Socket) => () => void
  171. }
  172. export class RPCSocketServer{
  173. private io = bsock.createServer()
  174. private wsServer = http.createServer()
  175. constructor(
  176. private port:number,
  177. private rpcExporters: RPCExporter[] = [],
  178. private visibility: Visibility = "127.0.0.1",
  179. private conf: RPCSocketConf = {
  180. errorHandler: (socket:Socket) => (error:any) => { socket.destroy(); console.error(error) },
  181. closeHandler: (socket:Socket) => () => { console.log("Socket closing") },
  182. connectionHandler: (socket:Socket) => { console.log("New websocket connection in port "+socket.port) }
  183. }
  184. ){
  185. this.startWebsocket()
  186. }
  187. private startWebsocket(){
  188. try{
  189. this.io.attach(this.wsServer)
  190. this.io.on('socket', (socket:Socket) => {
  191. socket.on('error', this.conf.errorHandler(socket))
  192. socket.on('close', this.conf.closeHandler(socket))
  193. if(this.visibility === "127.0.0.1")
  194. this.initRPCs(socket)
  195. else
  196. this.initPublicRPCs(socket)
  197. })
  198. this.wsServer.listen(this.port, this.visibility)
  199. }catch(e){
  200. //@ts-ignore
  201. this.errorHandler(undefined)("Unable to connect to socket")
  202. }
  203. }
  204. protected initRPCs(socket:Socket){
  205. socket.hook('info', () => rpcInfos)
  206. const rpcInfos:ExtendedRpcInfo[] = [
  207. ...this.rpcExporters.flatMap(exporter => rpcHooker(socket, exporter))
  208. ]
  209. }
  210. protected initPublicRPCs(socket:Socket){
  211. socket.hook('info', () => rpcInfos)
  212. const rpcInfos:ExtendedRpcInfo[] = [
  213. ...this.rpcExporters.flatMap(exporter => rpcHooker(socket, exporter))
  214. ]
  215. }
  216. }