LCOV - code coverage report
Current view: top level - test/cpp/end2end - thread_stress_test.cc (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 56 96 58.3 %
Date: 2015-10-10 Functions: 19 26 73.1 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include <mutex>
      35             : #include <thread>
      36             : 
      37             : #include <grpc/grpc.h>
      38             : #include <grpc/support/thd.h>
      39             : #include <grpc/support/time.h>
      40             : #include <grpc++/channel.h>
      41             : #include <grpc++/client_context.h>
      42             : #include <grpc++/create_channel.h>
      43             : #include <grpc++/server.h>
      44             : #include <grpc++/server_builder.h>
      45             : #include <grpc++/server_context.h>
      46             : #include <gtest/gtest.h>
      47             : 
      48             : #include "test/core/util/port.h"
      49             : #include "test/core/util/test_config.h"
      50             : #include "test/cpp/util/echo_duplicate.grpc.pb.h"
      51             : #include "test/cpp/util/echo.grpc.pb.h"
      52             : 
      53             : using grpc::cpp::test::util::EchoRequest;
      54             : using grpc::cpp::test::util::EchoResponse;
      55             : using std::chrono::system_clock;
      56             : 
      57             : namespace grpc {
      58             : namespace testing {
      59             : 
      60             : namespace {
      61             : 
      62             : // When echo_deadline is requested, deadline seen in the ServerContext is set in
      63             : // the response in seconds.
      64      100000 : void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
      65             :                        EchoResponse* response) {
      66      100000 :   if (request->has_param() && request->param().echo_deadline()) {
      67           0 :     gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
      68           0 :     if (context->deadline() != system_clock::time_point::max()) {
      69           0 :       Timepoint2Timespec(context->deadline(), &deadline);
      70             :     }
      71           0 :     response->mutable_param()->set_request_deadline(deadline.tv_sec);
      72             :   }
      73       99980 : }
      74             : 
      75             : }  // namespace
      76             : 
      77           1 : class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
      78             :  public:
      79           1 :   TestServiceImpl() : signal_client_(false) {}
      80             : 
      81       99998 :   Status Echo(ServerContext* context, const EchoRequest* request,
      82             :               EchoResponse* response) GRPC_OVERRIDE {
      83       99998 :     response->set_message(request->message());
      84      100000 :     MaybeEchoDeadline(context, request, response);
      85      100000 :     if (request->has_param() && request->param().client_cancel_after_us()) {
      86             :       {
      87           0 :         std::unique_lock<std::mutex> lock(mu_);
      88           0 :         signal_client_ = true;
      89             :       }
      90           0 :       while (!context->IsCancelled()) {
      91             :         gpr_sleep_until(gpr_time_add(
      92             :             gpr_now(GPR_CLOCK_REALTIME),
      93           0 :             gpr_time_from_micros(request->param().client_cancel_after_us(),
      94           0 :                                  GPR_TIMESPAN)));
      95             :       }
      96           0 :       return Status::CANCELLED;
      97       99999 :     } else if (request->has_param() &&
      98           0 :                request->param().server_cancel_after_us()) {
      99             :       gpr_sleep_until(gpr_time_add(
     100             :           gpr_now(GPR_CLOCK_REALTIME),
     101           0 :           gpr_time_from_micros(request->param().server_cancel_after_us(),
     102           0 :                                GPR_TIMESPAN)));
     103           0 :       return Status::CANCELLED;
     104             :     } else {
     105      100000 :       EXPECT_FALSE(context->IsCancelled());
     106             :     }
     107       99992 :     return Status::OK;
     108             :   }
     109             : 
     110             :   // Unimplemented is left unimplemented to test the returned error.
     111             : 
     112           0 :   Status RequestStream(ServerContext* context,
     113             :                        ServerReader<EchoRequest>* reader,
     114             :                        EchoResponse* response) GRPC_OVERRIDE {
     115           0 :     EchoRequest request;
     116           0 :     response->set_message("");
     117           0 :     while (reader->Read(&request)) {
     118           0 :       response->mutable_message()->append(request.message());
     119             :     }
     120           0 :     return Status::OK;
     121             :   }
     122             : 
     123             :   // Return 3 messages.
     124             :   // TODO(yangg) make it generic by adding a parameter into EchoRequest
     125           0 :   Status ResponseStream(ServerContext* context, const EchoRequest* request,
     126             :                         ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE {
     127           0 :     EchoResponse response;
     128           0 :     response.set_message(request->message() + "0");
     129           0 :     writer->Write(response);
     130           0 :     response.set_message(request->message() + "1");
     131           0 :     writer->Write(response);
     132           0 :     response.set_message(request->message() + "2");
     133           0 :     writer->Write(response);
     134             : 
     135           0 :     return Status::OK;
     136             :   }
     137             : 
     138           0 :   Status BidiStream(ServerContext* context,
     139             :                     ServerReaderWriter<EchoResponse, EchoRequest>* stream)
     140             :       GRPC_OVERRIDE {
     141           0 :     EchoRequest request;
     142           0 :     EchoResponse response;
     143           0 :     while (stream->Read(&request)) {
     144           0 :       gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
     145           0 :       response.set_message(request.message());
     146           0 :       stream->Write(response);
     147             :     }
     148           0 :     return Status::OK;
     149             :   }
     150             : 
     151             :   bool signal_client() {
     152             :     std::unique_lock<std::mutex> lock(mu_);
     153             :     return signal_client_;
     154             :   }
     155             : 
     156             :  private:
     157             :   bool signal_client_;
     158             :   std::mutex mu_;
     159             : };
     160             : 
     161           2 : class TestServiceImplDupPkg
     162             :     : public ::grpc::cpp::test::util::duplicate::TestService::Service {
     163             :  public:
     164           0 :   Status Echo(ServerContext* context, const EchoRequest* request,
     165             :               EchoResponse* response) GRPC_OVERRIDE {
     166           0 :     response->set_message("no package");
     167           0 :     return Status::OK;
     168             :   }
     169             : };
     170             : 
     171           1 : class End2endTest : public ::testing::Test {
     172             :  protected:
     173           1 :   End2endTest() : kMaxMessageSize_(8192) {}
     174             : 
     175           1 :   void SetUp() GRPC_OVERRIDE {
     176           1 :     int port = grpc_pick_unused_port_or_die();
     177           1 :     server_address_ << "localhost:" << port;
     178             :     // Setup server
     179           1 :     ServerBuilder builder;
     180             :     builder.AddListeningPort(server_address_.str(),
     181           1 :                              InsecureServerCredentials());
     182           1 :     builder.RegisterService(&service_);
     183             :     builder.SetMaxMessageSize(
     184           1 :         kMaxMessageSize_);  // For testing max message size.
     185           1 :     builder.RegisterService(&dup_pkg_service_);
     186           1 :     server_ = builder.BuildAndStart();
     187           1 :   }
     188             : 
     189           1 :   void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
     190             : 
     191           1 :   void ResetStub() {
     192             :     std::shared_ptr<Channel> channel =
     193           1 :         CreateChannel(server_address_.str(), InsecureCredentials());
     194           1 :     stub_ = grpc::cpp::test::util::TestService::NewStub(channel);
     195           1 :   }
     196             : 
     197             :   std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
     198             :   std::unique_ptr<Server> server_;
     199             :   std::ostringstream server_address_;
     200             :   const int kMaxMessageSize_;
     201             :   TestServiceImpl service_;
     202             :   TestServiceImplDupPkg dup_pkg_service_;
     203             : };
     204             : 
     205         100 : static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub,
     206             :                     int num_rpcs) {
     207         100 :   EchoRequest request;
     208         200 :   EchoResponse response;
     209         100 :   request.set_message("Hello");
     210             : 
     211      100097 :   for (int i = 0; i < num_rpcs; ++i) {
     212       99997 :     ClientContext context;
     213      199988 :     Status s = stub->Echo(&context, request, &response);
     214      100000 :     EXPECT_EQ(response.message(), request.message());
     215       99997 :     EXPECT_TRUE(s.ok());
     216      100093 :   }
     217         100 : }
     218             : 
     219           5 : TEST_F(End2endTest, ThreadStress) {
     220           1 :   ResetStub();
     221           1 :   std::vector<std::thread*> threads;
     222         101 :   for (int i = 0; i < 100; ++i) {
     223         100 :     threads.push_back(new std::thread(SendRpc, stub_.get(), 1000));
     224             :   }
     225         101 :   for (int i = 0; i < 100; ++i) {
     226         100 :     threads[i]->join();
     227         100 :     delete threads[i];
     228           1 :   }
     229           1 : }
     230             : 
     231             : }  // namespace testing
     232             : }  // namespace grpc
     233             : 
     234           1 : int main(int argc, char** argv) {
     235           1 :   grpc_test_init(argc, argv);
     236           1 :   ::testing::InitGoogleTest(&argc, argv);
     237           1 :   return RUN_ALL_TESTS();
     238           3 : }

Generated by: LCOV version 1.10