rpc.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package rpc
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log/syslog"
  6. "net/http"
  7. "os"
  8. "os/signal"
  9. "strings"
  10. "syscall"
  11. "github.com/gorilla/mux"
  12. negronilogrus "github.com/meatballhat/negroni-logrus"
  13. "github.com/rs/cors"
  14. log "github.com/sirupsen/logrus"
  15. lSyslog "github.com/sirupsen/logrus/hooks/syslog"
  16. "github.com/urfave/negroni"
  17. "jrpcserver/infrastructure/serviceregistry"
  18. "jrpcserver/infrastructure/serviceregistry/consulagent"
  19. "jrpcserver/infrastructure/serviceregistry/none"
  20. )
  21. var dispatcher *Dispatcher
  22. func StartAPI(config interface{}) {
  23. // Create dispatcher for later use
  24. dispatcher = NewDispatcher(config)
  25. consulEnabled := len(dispatcher.serverConfig.Consul) != 0
  26. if len(dispatcher.serverConfig.Hostname) == 0 {
  27. n, _ := os.Hostname()
  28. dispatcher.serverConfig.Hostname = n
  29. }
  30. // Set up registry
  31. var registryConnector serviceregistry.ServiceRegistry // Default to none
  32. if consulEnabled {
  33. registryConnector = consulagent.NewConsulServiceRegistry(dispatcher.serverConfig.Consul, dispatcher.serverConfig.ServiceName, dispatcher.serverConfig.Hostname, dispatcher.serverConfig.Port, dispatcher.serverConfig.BaseURI, "/health")
  34. } else {
  35. registryConnector = none.NewDefaultServiceRegistry() // Default to none
  36. }
  37. // Define endpoint
  38. router := mux.NewRouter()
  39. // add middleware for a specific route and get params from route
  40. router.HandleFunc(fmt.Sprintf("%s/{method}", dispatcher.serverConfig.BaseURI), rpcHandler)
  41. router.HandleFunc("/health", healthHandler)
  42. // Includes some default middlewares to all routes
  43. negroniServer := negroni.New()
  44. negroniServer.Use(negroni.NewRecovery())
  45. // add log
  46. hook, err := lSyslog.NewSyslogHook("", "", syslog.LOG_INFO, "")
  47. log.StandardLogger().SetFormatter(&log.TextFormatter{
  48. DisableColors: true,
  49. FullTimestamp: true,
  50. })
  51. // log.StandardLogger().
  52. if err == nil {
  53. log.StandardLogger().Hooks.Add(hook)
  54. }
  55. negroniServer.Use(negronilogrus.NewMiddlewareFromLogger(log.StandardLogger(), dispatcher.serverConfig.ServiceName))
  56. // Add some useful handlers Add some headers
  57. negroniServer.UseFunc(AddWorkerHeader) // Add which instance
  58. negroniServer.UseFunc(AddWorkerVersion) // Which version
  59. // If there are any handlers in the config add them
  60. for _, handler := range dispatcher.serverConfig.Middleware {
  61. negroniServer.UseFunc(handler)
  62. }
  63. // Coors stuff
  64. handler := cors.New(
  65. cors.Options{
  66. AllowedOrigins: []string{"*"},
  67. AllowedMethods: []string{"HEAD", "GET", "POST", "PUT", "PATCH", "DELETE"},
  68. AllowedHeaders: []string{"*"},
  69. AllowCredentials: true,
  70. }).Handler(router) // Add coors
  71. negroniServer.UseHandler(handler)
  72. // Add Server Metrics
  73. log.Println("Starting JSON RPC Server Version", dispatcher.serverConfig.Version, dispatcher.serverConfig.BaseURI, "on port:", dispatcher.serverConfig.Port)
  74. // Set up shutdown resister
  75. c := make(chan os.Signal, 2)
  76. signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  77. // Set up a way to cleanly shutdown / deregister
  78. go func() {
  79. <-c
  80. registryConnector.Deregister()
  81. log.Println("Shutting Down")
  82. os.Exit(0)
  83. }()
  84. // Register (if required with consul or other registry)
  85. registryConnector.Register()
  86. log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", dispatcher.serverConfig.Port), negroniServer))
  87. }
  88. func rpcHandler(w http.ResponseWriter, r *http.Request) {
  89. vars := mux.Vars(r)
  90. method := vars["method"]
  91. defer r.Body.Close()
  92. // Get Headers
  93. headers := make(map[string]string)
  94. // Loop through headers - we want the last
  95. for name, values := range r.Header {
  96. name = strings.ToLower(name)
  97. for _, h := range values {
  98. headers[name] = h
  99. }
  100. }
  101. // Make the call
  102. dipatcherResponse := dispatcher.Execute(method, headers, r.Body)
  103. var b []byte
  104. if dipatcherResponse.RawResponse { // non command response
  105. b, _ = json.MarshalIndent(dipatcherResponse.Value, "", " ")
  106. } else { // Encode a command response
  107. b, _ = json.MarshalIndent(dipatcherResponse, "", " ")
  108. }
  109. // TODO Append schema type
  110. w.Header().Set("Content-Type", "application/json")
  111. if dipatcherResponse.Code == 0 {
  112. w.WriteHeader(http.StatusOK)
  113. w.Write(b)
  114. } else {
  115. w.WriteHeader(dipatcherResponse.Code)
  116. w.Write(b)
  117. }
  118. }
  119. func healthHandler(w http.ResponseWriter, req *http.Request) {
  120. w.Header().Set("Content-Type", "application/json")
  121. w.Write([]byte("{ \"status\":\"up\"}"))
  122. }
  123. // AddWorkerHeader - adds header of which node actually processed request
  124. func AddWorkerHeader(rw http.ResponseWriter, req *http.Request, next http.HandlerFunc) {
  125. host, err := os.Hostname()
  126. if err != nil {
  127. host = "Unknown"
  128. }
  129. rw.Header().Add("X-Worker", host)
  130. if next != nil {
  131. next(rw, req)
  132. }
  133. }
  134. // AddWorkerVersion - adds header of which version is installed
  135. func AddWorkerVersion(rw http.ResponseWriter, req *http.Request, next http.HandlerFunc) {
  136. if len(dispatcher.serverConfig.Version) == 0 {
  137. dispatcher.serverConfig.Version = "UNKNOWN"
  138. }
  139. rw.Header().Add("X-Worker-Version", dispatcher.serverConfig.Version)
  140. if next != nil {
  141. next(rw, req)
  142. }
  143. }